Skip to content

[API-292] Adds back comms websocket support#349

Merged
schottra merged 10 commits intomainfrom
comms-websocket
Sep 5, 2025
Merged

[API-292] Adds back comms websocket support#349
schottra merged 10 commits intomainfrom
comms-websocket

Conversation

@schottra
Copy link
Copy Markdown
Contributor

The original comms server would push a websocket message out as part of processing the rpc call. Since we will have multiple read replica servers and only one of them will be writing, that approach doesn't work.
So new plan!

  • We still write rpc_log messages to the database as part of the transaction that applies whatever chat operation
  • When that tx commits, we now have a pg_notify which will send the signature of the new rpc log to listeners
  • Each instance of the server will listen for the notify, fetch the rpc log, and then distribute it to its websocket clients if the message applies to them.

See previous DN implementation of websocket connections here: https://github.com/AudiusProject/audius-protocol/blob/49d271631bcc6b3528a48d6d377cbfb6e53c6459/comms/discovery/rpcz/websocket.go#L29

Some notes:

  • All instances will handle all rpc log messages since we push recent messages to users open connection/re-connection of the websocket. So it's not viable to skip processing pg notifications if they don't apply to current userIds connected.
  • The websocket library we were using in DN comms land does not directly support Fiber contexts/requests and adapting it proved to be a little hacky. So I replaced it with a Fiber-native library. And also completely rewrote the websocket implementation with some help ;-). See below for an extended description of the differences.
  • Added back the functionality to store pubkeys whenever we decode them in signed get/post requests for comms. This is at least necessary for local dev to work correctly. But will also keeps the behavior consistent. Will follow up later with a PR to store pubkeys during indexing when users are created.
  • Updated all spots where we dump a date string argument into a sql query for comms code to use explicit .UTC() calls. While this is functionally equivalent on production servers where the machine timezone matches the DB, it is necessary for running locally without a lot of weird bugs and test failures. Long term fix is to convert the columns to with timezone so that they are parsed into the correct "instant" no matter what the current machine timezone is.

TODO:
Needs a lot of testing to make sure websockets are stable and all message types propagate correctly.
I also just noticed that I'm not pushing blasts and need to handle that correctly.

About the websocket implementation:

gobwas/ws used a pattern of "hijacking" the connection and returning from the handler, then later calling Close(). All of the fiber libraries use a pattern where the handler you provide for upgrading the connection is supposed to block in a read/write loop until you're ready to close the connection. Otherwise clients connect and then immediately disconnect. You also have to handle ping/pong messages and update write timeouts etc. So I started from scratch / generated implementation and tweaked it from there. The new flow is:

  1. Allocate a client and add it to the map for the given userId
  2. Queue any recently applicable messages
  3. Start read and write pump goroutines
  4. Write pump will wait for messages on the channel for the client and then push them out. Read pump waits for incoming websocket messages. We really only care about closes. If we receive a payload we will discard it.
  5. If a particular connection lags too far in processing messages, sends a close request, doesn't respond to pings, etc we close the connection and attempt to send a close message

The differences/improvements are:

  • Detection of dead or overwhelmed clients
  • Proper close messages in both directions. Previous version would leave the connection around until a write to it failed due to the client being disconnected.
  • Writes that go to multiple connections will get pushed to channels for each of them instead of serially calling WriteMessage()

Copy link
Copy Markdown
Member

@raymondjacobson raymondjacobson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is awesome!

the ws impl could maybe use tests but not blocking for sure

Comment thread api/auth_middleware.go Outdated
Comment thread api/auth_middleware.go
return nil, "", 0, err
}

app.commsRpcProcessor.SetPubkeyForUser(int32(userId), pubkey)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice

Comment thread api/comms/websocket.go
Comment thread api/comms/websocket.go
// Discard payload if any; we only care about keeping the socket healthy.
switch mt {
case websocket.TextMessage, websocket.BinaryMessage:
// Efficiently drain without allocating:
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

wow

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not my idea lol. But I like it.

Comment thread api/comms/websocket.go
}{
rpcJson,
Metadata{
Timestamp: timestamp.Format(time.RFC3339Nano),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's this format?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Taken from existing implementation: https://github.com/AudiusProject/audius-protocol/blob/49d271631bcc6b3528a48d6d377cbfb6e53c6459/comms/discovery/rpcz/websocket.go#L77

It maps to an ISO timestamp with nanosecond accuracy (🙄 ). I am not sure if we break clients by changing it.

Comment thread api/comms/websocket.go
default:
// If we get here, the client buffer is full (too slow in processing)
// and we will drop them for now. They can re-connect if needed.
m.logger.Info("ws buffer full; dropping client",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the client also keeps trying to reconnect right?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, assuming it has good network and is actually running (app wasn't closed/backgrounded, browser tab wasn't closed, etc) it will try to re-connect if we drop it.
Does bring up an interesting possibility though:

  • We push 128 messages into a queue and aren't able to deliver
  • Client gets disconnected
  • Client re-connects and we push it the most recent messages we have for it, which might not include all of the 128 that it didn't receive.

That would be a strange experience where you are missing some chat messages. Technically I think that wouldn't happen in the existing comms implementation assuming the client eventually processed all of the 128 messages sitting in the queue. But I also think the sort of one-at-a-time nature of the existing implementation might degrade well before that point.

@schottra schottra merged commit 349640e into main Sep 5, 2025
5 checks passed
@schottra schottra deleted the comms-websocket branch September 5, 2025 16:37
@schottra schottra restored the comms-websocket branch September 5, 2025 18:38
@schottra schottra deleted the comms-websocket branch September 5, 2025 18:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants