How to Implement Redis Pub/Sub #81
-
| SummaryI have written an example using this gem to communicate between multiple servers and scale out using Redis Pub/Sub, similar to the Redis Pub/Sub adapter of Socket.IO. Since this is my first time using  Repo: https://github.com/ttanimichi/async-websocket-pubsub-example Here are some specific concerns I have (please let me know if you notice any other points): 
 DetailsEight years ago, I developed a chat server for an online game using Node.js and Socket.IO. At the time, the game’s API server was implemented with Ruby on Rails, and I initially wanted to implement the chat server in Ruby as well. However, the typical way to build a WebSocket server in Ruby in those days was using EventMachine and em-websocket. Since the game was large-scale and had many users, I wanted a runtime that wouldn’t suffer from the C10K problem. The candidates for technology selection were: 
 I personally wanted to implement the server using Golang’s goroutines, but since no engineers in the company had experience with Golang and considering future maintenance after my potential transfer or resignation, Node.js was the most realistic choice. We successfully launched the chat server, and the high-level APIs provided by Socket.IO were very convenient. Currently, I work at a different company and need to implement a WebSocket server again. Since I have prior experience with Socket.IO, I initially thought of using it again, possibly with Deno as the runtime. However, one of the requirements is that clients must connect using plain WebSocket, not socket.io-client. Since a plain WebSocket connection is not possible with a Socket.IO server, I cannot use Socket.IO for this project. I considered building a plain WebSocket server with Deno, but since the company’s main service is implemented in Ruby on Rails and most of the backend engineers are familiar with Ruby, I decided that it would be ideal to implement the WebSocket server in Ruby for easier maintenance in the future. When I implemented the chat server eight years ago, the latest Ruby version was around 2.3. Now, with Ruby3, I wondered if it had overcome the C10K problem, unlike the older Ruby2 versions. The first approach I considered was using Ractor with M:N threads. Since M:N threads is equivalent to Golang’s goroutines, I thought it could handle the C10K problem. I asked @ko1 last year, on December 25th, whether Ruby 3.4’s M:N threads was production-ready. Unfortunately, according to him, it was not yet ready for production use. Perhaps in a few years, if I have another opportunity, I will try implementing it with Ractor and M:N threads. Next, I thought of using Ruby3’s Fiber scheduler. Since Fiber is similar to a Green Thread, with low creation and context switch costs, I thought it might be suitable for this project. I found a Fiber scheduler-based gem async-websocket, so I decided to try implementing the WebSocket server using this gem. Although I cannot use Socket.IO for this project due to the requirements, I still found the following two features of Socket.IO impressive: 
 With async-websocket, I wondered if I could implement similar features on my own. Here’s a conceptual diagram of what I came up with: Since the project only requires one-way communication from the server to clients, all WRITEs will be published by a dedicated Emitter. The Emitter publishes to Redis, and the async-websocket server processes subscribe to the respective channels. Each client only receives messages from the channels they are subscribed to. I implemented an example based on this design, and you can find the repo below: https://github.com/ttanimichi/async-websocket-pubsub-example Here’s a gif showing the server in action. I started multiple processes and assigned ports 7070 and 7071 to them. You can see that each client only receives messages from the channels they subscribed to: The main implementation in  #!/usr/bin/env -S falcon serve --bind http://localhost:7070 --count 1 -c
require 'async/websocket/adapters/rack'
require 'async/redis'
# key: channel name
# value: An array of connections for the channel
$connections = Hash.new { |h, k| h[k] = [] }
run lambda {|env|
  client = Sync do
    endpoint = Async::Redis.local_endpoint
    Async::Redis::Client.new(endpoint)
  end
  Async::WebSocket::Adapters::Rack.open(env, protocols: ['ws']) do |connection|
    loop do
      channel = connection.read.to_str
      puts "channel: #{channel}"
      $connections[channel] << connection
      client.subscribe(channel) do |context|
        puts "subscribed to #{channel}"
        loop do
          event = context.listen
          puts "event: #{event}"
          $connections[channel].each do |conn|
            conn.write("message: #{event[2]}")
            conn.flush
          end
        end
      end
    end
  ensure
    $connections.delete(connection)
  end
}It appears to work as expected. However, since this is my first time using async-websocket and async gems, I am concerned about potential issues with the implementation. Could you please review the code? Here are some specific concerns I have (please let me know if you notice any other points): 
 | 
Beta Was this translation helpful? Give feedback.
Replies: 1 comment 5 replies
-
| 
 It is fine to do that.  
 No, it's not necessary. A client is not a connection, but just a way to make a connection of a certain kind (in this case, a subscription). I've updated the example showing how I'd do it: https://github.com/socketry/async-websocket-pubsub-example 
 You should avoid designing the server using global variables. The updated example shows how to do this. Falcon always instantiates a fresh set of middleware per-thread or per-process (depending on how you start it), so we don't need to worry about instance variables getting shared (by default). This is different from Puma which creates one instance of your app and shares it across workers/threads, and may create thread safety issues. 
 No, not that I'm aware of at this time. Regarding the general design, I made some minor changes in my updated example code (linked above). You may also like to consider using SSE if the direction is only server -> client. I talked a little bit about this here: https://youtu.be/9tOMD491mFY?si=6CPxnsTSUzwZZFPi&t=1353. | 
Beta Was this translation helpful? Give feedback.


It is fine to do that.
connection.readis blocking and may fail if the remote end goes away. Similar withcontext.listen.No, it's not necessary. A client is not a connection, but just a way to make a connection of a certain kind (in this case, a subscription). I've updated the example showing how I'd do it: https://github.com/socketry/async-websocket-pubsub-example
You should avoid…