[API-292] Adds back comms websocket support#349
Conversation
raymondjacobson
left a comment
There was a problem hiding this comment.
this is awesome!
the ws impl could maybe use tests but not blocking for sure
| return nil, "", 0, err | ||
| } | ||
|
|
||
| app.commsRpcProcessor.SetPubkeyForUser(int32(userId), pubkey) |
| // Discard payload if any; we only care about keeping the socket healthy. | ||
| switch mt { | ||
| case websocket.TextMessage, websocket.BinaryMessage: | ||
| // Efficiently drain without allocating: |
There was a problem hiding this comment.
Not my idea lol. But I like it.
| }{ | ||
| rpcJson, | ||
| Metadata{ | ||
| Timestamp: timestamp.Format(time.RFC3339Nano), |
There was a problem hiding this comment.
Taken from existing implementation: https://github.com/AudiusProject/audius-protocol/blob/49d271631bcc6b3528a48d6d377cbfb6e53c6459/comms/discovery/rpcz/websocket.go#L77
It maps to an ISO timestamp with nanosecond accuracy (🙄 ). I am not sure if we break clients by changing it.
| default: | ||
| // If we get here, the client buffer is full (too slow in processing) | ||
| // and we will drop them for now. They can re-connect if needed. | ||
| m.logger.Info("ws buffer full; dropping client", |
There was a problem hiding this comment.
the client also keeps trying to reconnect right?
There was a problem hiding this comment.
Yes, assuming it has good network and is actually running (app wasn't closed/backgrounded, browser tab wasn't closed, etc) it will try to re-connect if we drop it.
Does bring up an interesting possibility though:
- We push 128 messages into a queue and aren't able to deliver
- Client gets disconnected
- Client re-connects and we push it the most recent messages we have for it, which might not include all of the 128 that it didn't receive.
That would be a strange experience where you are missing some chat messages. Technically I think that wouldn't happen in the existing comms implementation assuming the client eventually processed all of the 128 messages sitting in the queue. But I also think the sort of one-at-a-time nature of the existing implementation might degrade well before that point.
The original comms server would push a websocket message out as part of processing the rpc call. Since we will have multiple read replica servers and only one of them will be writing, that approach doesn't work.
So new plan!
See previous DN implementation of websocket connections here: https://github.com/AudiusProject/audius-protocol/blob/49d271631bcc6b3528a48d6d377cbfb6e53c6459/comms/discovery/rpcz/websocket.go#L29
Some notes:
.UTC()calls. While this is functionally equivalent on production servers where the machine timezone matches the DB, it is necessary for running locally without a lot of weird bugs and test failures. Long term fix is to convert the columns towith timezoneso that they are parsed into the correct "instant" no matter what the current machine timezone is.TODO:
Needs a lot of testing to make sure websockets are stable and all message types propagate correctly.
I also just noticed that I'm not pushing blasts and need to handle that correctly.
About the websocket implementation:
gobwas/wsused a pattern of "hijacking" the connection and returning from the handler, then later calling Close(). All of the fiber libraries use a pattern where the handler you provide for upgrading the connection is supposed to block in a read/write loop until you're ready to close the connection. Otherwise clients connect and then immediately disconnect. You also have to handle ping/pong messages and update write timeouts etc. So I started from scratch / generated implementation and tweaked it from there. The new flow is:The differences/improvements are: