In this blog, I’ve covered my approach on building a simple but scalable distributed chat application using Golang, Redis & Websockets.
Code for this:
https://github.com/the-arcade-01/react-golang-distributed-chat-app
You can test the app itself here: https://chat.arcade.build
Theory
First of all, what does Distributed
even means?
Distributed
in terms of system architecture means multiple components (whether it be server, database, message brokers etc) communicates with each other, even though they are running on their own machines (or nodes), instead of all being handled by a single machine.
In our context, we have separate database (redis) and backend server (golang websocket) running on their own machines.
- Why it is good though?
Since, each component serves their own purpose, managing them is a lot more easier. Let’s take an example, if our backend server is not able to handle loads of websocket connections, then we can quickly spin up multiple instances.
Now, let’s talk about Websockets
Websockets
are communication protocol, which allows full duplex communication (both client & server can send messages) over a single TCP connection.
-
How they work?
- First the client sends a HTTP request to the server having header
Upgrader
aswebsocket
. - Then the server responds with a
HTTP 101 status code
which means switching protocols. - Once the handshake is completed, both client and server can transfer data.
One more important thing in websockets are the ping-pong mechanism check, which tells us whether the connection is still active between client and server. How it works is, the server on some periodic intervals sends ping message to the client and client responds with a pong message. These messages are useful because they indicate that the connection is still active between other the resources and at any moment if one of the operation fails then we can disconnect the connection and reestablish it.
- First the client sends a HTTP request to the server having header
Redis
& How we are using Streams
and PubSub
for our chat.
Redis
offers us two solutions which serves our purpose, i.e, chat communication, via Streams
and PubSub
.
Redis PubSub
- Basically acts as a simple channel, in which you can send message (Publish operation) and receives message (Subscribe operation)
- Has a simple concept of
Fire & Forget
, means messages are not stored and they will only be delivered to the user if he was subscribed to channel before the message was sent. - There can be multiple Publishers and multiple Subscribers.
Redis Streams
- Streams acts as an append only log, which stores the incoming messages and allows us to perform different retrival operations on them like range query etc.
- Since message are stored in stream, we’ve the functionality to send those messages to new subscribers (like sending chat history or previous messages).
- Each event or message in a stream have timestamp field attached to it for maintaining the order.
- We can also have multiple publishers and multiple subscribers to a stream.
Code Overview
Here, I won’t paste all the code from the repo. I’ll just explain few implementation details which are necessary for the working.
Backend code
-
In my Golang server, I’ve used Gorilla’s websocket library for handling websocket server. It’s a very popular and battle tested library and easy to use with great documentation.
-
Establishing the websocket connection by upgrading the HTTP request, we first define the websocket upgrader and then in the handler we use it to upgrade the HTTP connection to a websocket connection.
upgrader: websocket.Upgrader{ ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: func(r *http.Request) bool { origin := r.Header.Get("Origin") return origin == Envs.WEB_URL }, }, ... func (h *handlers) handleWS(w http.ResponseWriter, r *http.Request) { username := r.URL.Query().Get("username") if username == "" { http.Error(w, "Username is required", http.StatusBadRequest) Log.ErrorContext(r.Context(), "Username is required") return } conn, err := h.upgrader.Upgrade(w, r, nil) if err != nil { Log.ErrorContext(r.Context(), "WebSocket upgrade failed", "error", err) http.Error(w, "Failed to upgrade connection", http.StatusInternalServerError) return } Log.InfoContext(r.Context(), "WebSocket connection established", "username", username) ctx := context.Background() go h.writePump(ctx, conn) go h.readPump(ctx, conn, username) }
-
We then open two goroutines, one which listens to the user’s messages via ws and publishes then into the message channel (
readPump
), other which takes the messages from the redis message channel and sends them back to the user via ws conn (writePump
). -
Now, I’ve implemented both the approaches
PubSub
andStreams
separately, so let’s look into them -
PubSub
:- Redis makes creating of the pubsub channel easier, we just need to call the Publish method on the name of the channel, if it exist then message will be pushed, if not then redis will first create it and then will push the message.
... // in readPump func for { // listening to user's input message _, msg, err := conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { Log.ErrorContext(ctx, "error on ws", "error", err) } break } // publishing that message into redis pubsub, the command looks like this if err := h.cacheRepo.publish(ctx, Envs.CHAT_CHANNEL, string(msg)); err != nil { Log.ErrorContext(ctx, "error publishing message to redis", "error", err) } }
- Now, since this is a simple channel, in the
writePump
function we just open this channel and listen to the incoming messages, and sends them back to the user - Here we also took care of the ping-pong mechanism check, which helps us to remove any stale connections or any inactive connections (stale conn with > 30sec of inactivity)
func (h *handlers) writePump(ctx context.Context, conn *websocket.Conn) { ticker := time.NewTicker(pingPeriod) pubsub := h.cacheRepo.redis.Subscribe(ctx, Envs.CHAT_CHANNEL) defer func() { pubsub.Close() ticker.Stop() conn.Close() }() ch := pubsub.Channel() for { select { case <-ctx.Done(): return case msg, ok := <-ch: if !ok { conn.WriteMessage(websocket.CloseMessage, []byte{}) return } // Expliciting checking message size, even though checkOrigin check is added // so that any manuall attempt to direct conn on ws with the same origin // doesn't cause cause conn break if len(msg.Payload) > maxMessageSize { Log.ErrorContext(ctx, "message size exceeds limit", "size", len(msg.Payload)) continue } // if message writing takes more time than writeWait, // which can means client as slow internet // and then just hang the conn and UI will restablish it conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := conn.WriteMessage(websocket.TextMessage, []byte(msg.Payload)); err != nil { Log.ErrorContext(ctx, "error sending messsage to websocket", "error", err) return } case <-ticker.C: // Here we are pinging the client on pingPeriod to check whether conn exists // this is done to remove any stale conns conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := conn.WriteMessage(websocket.PingMessage, nil); err != nil { Log.ErrorContext(ctx, "error sending ping message", "error", err) return } } } }
-
Streams
:- Both the
writePump
andreadPump
functions are the same only the part where pubsub is used is being changed and instead streams code is used. - First we need to initialize the stream, though redis allows us to initialize the stream just like the pubsub channel (if not present then create it), but here I’ve initialized it before so that I maintain only 1 stream.
func (c *cacheRepo) initStream(ctx context.Context, streamKey string) { if err := c.redis.XGroupCreateMkStream(ctx, streamKey, Envs.STREAM_CONSUMER_GROUP, "$").Err(); err != nil { if !strings.Contains(err.Error(), "BUSYGROUP") { Log.ErrorContext(ctx, "error creating stream group", "error", err) } } }
- To write to the stream
func (c *cacheRepo) writeToStream(ctx context.Context, streamKey, msg string) error { return c.redis.XAdd(ctx, &redis.XAddArgs{ Stream: streamKey, // provide the stream name // if the stream is not present, then it will give error, // to dynamically create the stream just like pubsub channel, // make this variable false NoMkStream: true, // max length of message that will be present at any point of time in stream MaxLen: int64(Envs.MAX_CHAT_LEN), // default ID generator ID: "*", Values: map[string]interface{}{ "message": msg, }, }).Err() }
- Reading from the stream
... // writePump func streams, err := h.repo.redis.XRead(ctx, &redis.XReadArgs{ Streams: []string{Envs.STREAM_KEY, lastMsgID}, Block: 0, }).Result() if err != nil { Log.ErrorContext(ctx, "error reading from stream", "error", err) continue } for _, stream := range streams { for _, msg := range stream.Messages { lastMsgID = msg.ID payload := msg.Values["message"].(string) // if message writing takes more time than writeWait, // which can means client as slow internet // and then just hang the conn and UI will restablish it conn.SetWriteDeadline(time.Now().Add(writeWait)) if err := conn.WriteMessage(websocket.TextMessage, []byte(payload)); err != nil { Log.ErrorContext(ctx, "error sending messsage to websocket", "error", err) return } } }
- Since streams stores the messages, we can actually send chat history to the user
func (c *cacheRepo) getMessagesFromStream(ctx context.Context, streamKey string) ([]string, error) { msgs, err := c.redis.XRevRangeN(ctx, streamKey, "+", "-", int64(Envs.MAX_CHAT_LEN)).Result() if err != nil { return nil, err } var result []string for _, msg := range msgs { result = append(result, msg.Values["message"].(string)) } return result, nil }
- Both the
Frontend code
- In frontend, we use the native javascript websocket client to establish connection with the backend
useEffect(() => {
if (username) {
const url = `${
import.meta.env.VITE_API_URL
}?username=${encodeURIComponent(username)}`;
ws.current = new WebSocket(url);
ws.current.onopen = () => {
console.log("Connected to WebSocket server");
};
ws.current.onmessage = (event) => {
const message: Message = JSON.parse(event.data);
console.log(message);
setMessages((prevMessages) => [...prevMessages, message]);
};
ws.current.onerror = (error) => {
console.error("WebSocket error:", error);
};
ws.current.onclose = () => {
console.log("Disconnected from WebSocket server");
};
return () => {
ws.current?.close();
};
}
}, [username]);
References
Some more learning resources which you can refer
- https://blog.algomaster.io/p/websockets
- https://blog.algomaster.io/p/message-queues
- https://redis.io/docs/latest/develop/interact/pubsub/
- https://redis.io/docs/latest/develop/data-types/streams/
- https://github.com/gorilla/websocket/tree/main/examples/chat