6 min read

Using Redis to build a Distributed Chat App in Go & React.js

Table of Contents

In this blog, I’ve covered my approach on building a simple but scalable distributed chat application using Golang, Redis & Websockets.

Code for this:
https://github.com/the-arcade-01/react-golang-distributed-chat-app

You can test the app itself here: https://chat.arcade.build

Live demo

Theory

First of all, what does Distributed even means?

Distributed in terms of system architecture means multiple components (whether it be server, database, message brokers etc) communicates with each other, even though they are running on their own machines (or nodes), instead of all being handled by a single machine.

In our context, we have separate database (redis) and backend server (golang websocket) running on their own machines.

App Architecture

  • Why it is good though?
    Since, each component serves their own purpose, managing them is a lot more easier. Let’s take an example, if our backend server is not able to handle loads of websocket connections, then we can quickly spin up multiple instances.

Now, let’s talk about Websockets

Websockets are communication protocol, which allows full duplex communication (both client & server can send messages) over a single TCP connection.

  • How they work?

    1. First the client sends a HTTP request to the server having header Upgrader as websocket.
    2. Then the server responds with a HTTP 101 status code which means switching protocols.
    3. Once the handshake is completed, both client and server can transfer data.

    WS conn

    One more important thing in websockets are the ping-pong mechanism check, which tells us whether the connection is still active between client and server. How it works is, the server on some periodic intervals sends ping message to the client and client responds with a pong message. These messages are useful because they indicate that the connection is still active between other the resources and at any moment if one of the operation fails then we can disconnect the connection and reestablish it.

Redis & How we are using Streams and PubSub for our chat.

Redis offers us two solutions which serves our purpose, i.e, chat communication, via Streams and PubSub.

Redis PubSub

  • Basically acts as a simple channel, in which you can send message (Publish operation) and receives message (Subscribe operation)
  • Has a simple concept of Fire & Forget, means messages are not stored and they will only be delivered to the user if he was subscribed to channel before the message was sent.
  • There can be multiple Publishers and multiple Subscribers.

Redis PubSub

Redis Streams

  • Streams acts as an append only log, which stores the incoming messages and allows us to perform different retrival operations on them like range query etc.
  • Since message are stored in stream, we’ve the functionality to send those messages to new subscribers (like sending chat history or previous messages).
  • Each event or message in a stream have timestamp field attached to it for maintaining the order.
  • We can also have multiple publishers and multiple subscribers to a stream.

Redis Streams

Code Overview

Here, I won’t paste all the code from the repo. I’ll just explain few implementation details which are necessary for the working.

Backend code

  • In my Golang server, I’ve used Gorilla’s websocket library for handling websocket server. It’s a very popular and battle tested library and easy to use with great documentation.

  • Establishing the websocket connection by upgrading the HTTP request, we first define the websocket upgrader and then in the handler we use it to upgrade the HTTP connection to a websocket connection.

    upgrader: websocket.Upgrader{
    		ReadBufferSize:  1024,
    		WriteBufferSize: 1024,
    		CheckOrigin: func(r *http.Request) bool {
    			origin := r.Header.Get("Origin")
    			return origin == Envs.WEB_URL
    		},
    	},
    ... 
    
    func (h *handlers) handleWS(w http.ResponseWriter, r *http.Request) {
        username := r.URL.Query().Get("username")
        if username == "" {
    	    http.Error(w, "Username is required", http.StatusBadRequest)
    	    Log.ErrorContext(r.Context(), "Username is required")
    	    return
        }
    
        conn, err := h.upgrader.Upgrade(w, r, nil)
        if err != nil {
    	    Log.ErrorContext(r.Context(), "WebSocket upgrade failed", "error", err)
    	    http.Error(w, "Failed to upgrade connection", http.StatusInternalServerError)
    	    return
        }
    
        Log.InfoContext(r.Context(), "WebSocket connection established", "username", username)
    
        ctx := context.Background()
        go h.writePump(ctx, conn)
        go h.readPump(ctx, conn, username)
    }
  • We then open two goroutines, one which listens to the user’s messages via ws and publishes then into the message channel (readPump), other which takes the messages from the redis message channel and sends them back to the user via ws conn (writePump).

  • Now, I’ve implemented both the approaches PubSub and Streams separately, so let’s look into them

  • PubSub:

    • Redis makes creating of the pubsub channel easier, we just need to call the Publish method on the name of the channel, if it exist then message will be pushed, if not then redis will first create it and then will push the message.
    ... 
    // in readPump func
    	for {
        // listening to user's input message
            _, msg, err := conn.ReadMessage()
            if err != nil {
      	      if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
      		      Log.ErrorContext(ctx, "error on ws", "error", err)
      	      }
      	      break
            }
        // publishing that message into redis pubsub, the command looks like this
            if err := h.cacheRepo.publish(ctx, Envs.CHAT_CHANNEL, string(msg)); err != nil {
      	      Log.ErrorContext(ctx, "error publishing message to redis", "error", err)
            }
        }
    • Now, since this is a simple channel, in the writePump function we just open this channel and listen to the incoming messages, and sends them back to the user
    • Here we also took care of the ping-pong mechanism check, which helps us to remove any stale connections or any inactive connections (stale conn with > 30sec of inactivity)
      func (h *handlers) writePump(ctx context.Context, conn *websocket.Conn) {
          ticker := time.NewTicker(pingPeriod)
          pubsub := h.cacheRepo.redis.Subscribe(ctx, Envs.CHAT_CHANNEL)
    
          defer func() {
              pubsub.Close()
              ticker.Stop()
              conn.Close()
          }()
    
          ch := pubsub.Channel()
    
          for {
          select {
          case <-ctx.Done():
            return
          case msg, ok := <-ch:
            if !ok {
              conn.WriteMessage(websocket.CloseMessage, []byte{})
              return
            }
    
            // Expliciting checking message size, even though checkOrigin check is added
            // so that any manuall attempt to direct conn on ws with the same origin
            // doesn't cause cause conn break
            if len(msg.Payload) > maxMessageSize {
              Log.ErrorContext(ctx, "message size exceeds limit", "size", len(msg.Payload))
              continue
            }
    
            // if message writing takes more time than writeWait,
            // which can means client as slow internet
            // and then just hang the conn and UI will restablish it
            conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload)); err != nil {
              Log.ErrorContext(ctx, "error sending messsage to websocket", "error", err)
              return
            }
          case <-ticker.C:
            // Here we are pinging the client on pingPeriod to check whether conn exists
            // this is done to remove any stale conns
            conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil {
              Log.ErrorContext(ctx, "error sending ping message", "error", err)
              return
            }
          }
        }
      }
    
  • Streams:

    • Both the writePump and readPump functions are the same only the part where pubsub is used is being changed and instead streams code is used.
    • First we need to initialize the stream, though redis allows us to initialize the stream just like the pubsub channel (if not present then create it), but here I’ve initialized it before so that I maintain only 1 stream.
        func (c *cacheRepo) initStream(ctx context.Context, streamKey string) {
          if err := c.redis.XGroupCreateMkStream(ctx, streamKey, Envs.STREAM_CONSUMER_GROUP, "$").Err(); err != nil {
            if !strings.Contains(err.Error(), "BUSYGROUP") {
              Log.ErrorContext(ctx, "error creating stream group", "error", err)
            }
          }
        }
    • To write to the stream
       func (c *cacheRepo) writeToStream(ctx context.Context, streamKey, msg string) error {
          return c.redis.XAdd(ctx, &redis.XAddArgs{
            Stream:     streamKey, // provide the stream name
            // if the stream is not present, then it will give error,
            // to dynamically create the stream just like pubsub channel,
            // make this variable false
            NoMkStream: true, 
            // max length of message that will be present at any point of time in stream
            MaxLen:     int64(Envs.MAX_CHAT_LEN), 
            // default ID generator
            ID:         "*",
            Values: map[string]interface{}{
              "message": msg,
            },
          }).Err()
        }
    • Reading from the stream
      ...
        // writePump func
          streams, err := h.repo.redis.XRead(ctx, &redis.XReadArgs{
          Streams: []string{Envs.STREAM_KEY, lastMsgID},
          Block:   0,
        }).Result()
      
        if err != nil {
          Log.ErrorContext(ctx, "error reading from stream", "error", err)
          continue
        }
      
        for _, stream := range streams {
          for _, msg := range stream.Messages {
            lastMsgID = msg.ID
            payload := msg.Values["message"].(string)
      
            // if message writing takes more time than writeWait,
            // which can means client as slow internet
            // and then just hang the conn and UI will restablish it
            conn.SetWriteDeadline(time.Now().Add(writeWait))
            if err := conn.WriteMessage(websocket.TextMessage, []byte(payload)); err != nil {
              Log.ErrorContext(ctx, "error sending messsage to websocket", "error", err)
              return
            }
          }
        }
    • Since streams stores the messages, we can actually send chat history to the user
        func (c *cacheRepo) getMessagesFromStream(ctx context.Context, streamKey string) ([]string, error) {
          msgs, err := c.redis.XRevRangeN(ctx, streamKey, "+", "-", int64(Envs.MAX_CHAT_LEN)).Result()
          if err != nil {
            return nil, err
          }
          var result []string
          for _, msg := range msgs {
            result = append(result, msg.Values["message"].(string))
          }
          return result, nil
        }

Frontend code

  • In frontend, we use the native javascript websocket client to establish connection with the backend

useEffect(() => {
  if (username) {
    const url = `${
      import.meta.env.VITE_API_URL
    }?username=${encodeURIComponent(username)}`;
    ws.current = new WebSocket(url);

    ws.current.onopen = () => {
      console.log("Connected to WebSocket server");
    };

    ws.current.onmessage = (event) => {
      const message: Message = JSON.parse(event.data);
      console.log(message);
      setMessages((prevMessages) => [...prevMessages, message]);
    };

    ws.current.onerror = (error) => {
      console.error("WebSocket error:", error);
    };

    ws.current.onclose = () => {
      console.log("Disconnected from WebSocket server");
    };

    return () => {
      ws.current?.close();
    };
  }
}, [username]);

References

Some more learning resources which you can refer

  1. https://blog.algomaster.io/p/websockets
  2. https://blog.algomaster.io/p/message-queues
  3. https://redis.io/docs/latest/develop/interact/pubsub/
  4. https://redis.io/docs/latest/develop/data-types/streams/
  5. https://github.com/gorilla/websocket/tree/main/examples/chat