diff --git a/api/auth_middleware.go b/api/auth_middleware.go index b72947ba..58d8d523 100644 --- a/api/auth_middleware.go +++ b/api/auth_middleware.go @@ -3,8 +3,12 @@ package api import ( "context" "fmt" + "net/url" + "strconv" "strings" + "time" + comms "bridgerton.audius.co/api/comms" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" "github.com/gofiber/fiber/v2" @@ -172,3 +176,90 @@ func (app *ApiServer) getUserIDFromWallet(ctx context.Context, wallet string) (i app.resolveWalletCache.Set(key, userId) return userId, nil } + +/* +* Parses query string for a signed comms GET request and returns the userId +associated with the signing wallet +*/ +func (app *ApiServer) userIdForSignedCommsRequest(c *fiber.Ctx) (int, error) { + if c.Method() != "GET" { + return 0, fiber.NewError(fiber.StatusBadRequest, "readSignedGet: bad method: "+c.Method()) + } + + sigBase64 := c.Get(comms.SigHeader) + + // for websocket request, read from query param instead of header + if querySig := c.Query("signature"); sigBase64 == "" && querySig != "" { + sigBase64 = querySig + } + + // Check that timestamp is not too old + timestamp, err := strconv.ParseInt(c.Query("timestamp"), 0, 64) + if err != nil { + return 0, fiber.NewError(fiber.StatusBadRequest, "failed to parse timestamp: "+err.Error()) + } + + tsAge := time.Now().UnixMilli() - timestamp + if tsAge < 0 { + tsAge *= -1 + } + if tsAge > comms.SignatureTimeToLiveMs { + return 0, fiber.NewError(fiber.StatusBadRequest, "timestamp not current") + } + + // Strip out app_name,api_key,signature to get the parameters that are actually used to generate the signature + uri := c.Request().URI() + path := string(uri.Path()) + query := string(uri.QueryString()) + + queryParams, err := url.ParseQuery(query) + if err != nil { + return 0, fiber.NewError(fiber.StatusBadRequest, "failed to parse query parameters: "+err.Error()) + } + + queryParams.Del("app_name") + queryParams.Del("api_key") + queryParams.Del("signature") + + // Build the final URL string + urlStr := path + if len(queryParams) > 0 { + urlStr += "?" + queryParams.Encode() + } + + payload := []byte(urlStr) + + wallet, pubkey, err := comms.RecoverSigningWallet(sigBase64, payload) + if err != nil { + return 0, fiber.NewError(fiber.StatusBadRequest, "failed to recoverSigningWallet: "+err.Error()) + } + userId, err := app.getUserIDFromWallet(c.Context(), wallet) + if err != nil { + return 0, err + } + + app.commsRpcProcessor.SetPubkeyForUser(int32(userId), pubkey) + + return userId, nil +} + +func (app *ApiServer) readSignedCommsPostRequest(c *fiber.Ctx) ([]byte, string, int, error) { + if c.Method() != "POST" { + return nil, "", 0, fiber.NewError(fiber.StatusBadRequest, "readSignedPost bad method: "+c.Method()) + } + + payload := c.Body() + + sigHex := c.Get(comms.SigHeader) + wallet, pubkey, err := comms.RecoverSigningWallet(sigHex, payload) + if err != nil { + return nil, "", 0, err + } + userId, err := app.getUserIDFromWallet(c.Context(), wallet) + if err != nil { + return nil, "", 0, err + } + + app.commsRpcProcessor.SetPubkeyForUser(int32(userId), pubkey) + return payload, wallet, userId, nil +} diff --git a/api/comms/chat.go b/api/comms/chat.go index 3595842e..389d073f 100644 --- a/api/comms/chat.go +++ b/api/comms/chat.go @@ -42,7 +42,7 @@ func chatCreate(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, p ($1, $2, $2) on conflict (chat_id) do update set created_at = $2, last_message_at = $2 where chat.created_at > $2 - `, params.ChatID, ts) + `, params.ChatID, ts.UTC()) if err != nil { return err } @@ -63,7 +63,7 @@ func chatCreate(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, p ($1, $2, $3, $4, $5) on conflict (chat_id, user_id) do update set invited_by_user_id=$2, invite_code=$3, created_at=$5 where chat_member.created_at > $5`, - params.ChatID, userId, invite.InviteCode, invitedUserId, ts) + params.ChatID, userId, invite.InviteCode, invitedUserId, ts.UTC()) if err != nil { return err } @@ -94,7 +94,7 @@ func chatCreate(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, p } func chatDelete(db dbv1.DBTX, ctx context.Context, userId int32, chatId string, messageTimestamp time.Time) error { - _, err := db.Exec(ctx, "update chat_member set cleared_history_at = $1, last_active_at = $1, unread_count = 0, is_hidden = true where chat_id = $2 and user_id = $3", messageTimestamp, chatId, userId) + _, err := db.Exec(ctx, "update chat_member set cleared_history_at = $1, last_active_at = $1, unread_count = 0, is_hidden = true where chat_id = $2 and user_id = $3", messageTimestamp.UTC(), chatId, userId) return err } @@ -168,7 +168,7 @@ func chatSendMessage(db dbv1.DBTX, ctx context.Context, userId int32, chatId str var err error _, err = db.Exec(ctx, "insert into chat_message (message_id, chat_id, user_id, created_at, ciphertext) values ($1, $2, $3, $4, $5)", - messageId, chatId, userId, messageTimestamp, ciphertext) + messageId, chatId, userId, messageTimestamp.UTC(), ciphertext) if err != nil { return err } @@ -198,9 +198,9 @@ func chatReactMessage(db dbv1.DBTX, ctx context.Context, userId int32, chatId st ($1, $2, $3, $4, $4) on conflict (user_id, message_id) do update set reaction = $3, updated_at = $4 where chat_message_reactions.updated_at < $4`, - userId, messageId, *reaction, messageTimestamp) + userId, messageId, *reaction, messageTimestamp.UTC()) } else { - _, err = db.Exec(ctx, "delete from chat_message_reactions where user_id = $1 and message_id = $2 and updated_at < $3", userId, messageId, messageTimestamp) + _, err = db.Exec(ctx, "delete from chat_message_reactions where user_id = $1 and message_id = $2 and updated_at < $3", userId, messageId, messageTimestamp.UTC()) } if err != nil { return err @@ -213,7 +213,7 @@ func chatReactMessage(db dbv1.DBTX, ctx context.Context, userId int32, chatId st func chatReadMessages(db dbv1.DBTX, ctx context.Context, userId int32, chatId string, readTimestamp time.Time) error { _, err := db.Exec(ctx, "update chat_member set unread_count = 0, last_active_at = $1 where chat_id = $2 and user_id = $3", - readTimestamp, chatId, userId) + readTimestamp.UTC(), chatId, userId) return err } @@ -241,7 +241,7 @@ func updatePermissions(db dbv1.DBTX, ctx context.Context, userId int32, permit C values ($1, $2, $3, $4) on conflict (user_id, permits) do update set allowed = $3 where chat_permissions.updated_at < $4 - `, userId, permit, permitAllowed, messageTimestamp) + `, userId, permit, permitAllowed, messageTimestamp.UTC()) return err } @@ -251,7 +251,7 @@ func chatSetPermissions(db dbv1.DBTX, ctx context.Context, userId int32, permits if allow == nil || permits == ChatPermissionAll || permits == ChatPermissionNone || isInPermitList(ChatPermissionAll, permitList) || isInPermitList(ChatPermissionNone, permitList) { _, err := db.Exec(ctx, ` delete from chat_permissions where user_id = $1 and updated_at < $2 - `, userId, messageTimestamp) + `, userId, messageTimestamp.UTC()) if err != nil { return err } @@ -263,7 +263,7 @@ func chatSetPermissions(db dbv1.DBTX, ctx context.Context, userId int32, permits _, err := db.Exec(ctx, ` insert into chat_permissions (user_id, permits, updated_at) values ($1, $2, $3) - on conflict do nothing`, userId, permits, messageTimestamp) + on conflict do nothing`, userId, permits, messageTimestamp.UTC()) return err } @@ -288,12 +288,12 @@ func chatSetPermissions(db dbv1.DBTX, ctx context.Context, userId int32, permits } func chatBlock(db dbv1.DBTX, ctx context.Context, userId int32, blockeeUserId int32, messageTimestamp time.Time) error { - _, err := db.Exec(ctx, "insert into chat_blocked_users (blocker_user_id, blockee_user_id, created_at) values ($1, $2, $3) on conflict do nothing", userId, blockeeUserId, messageTimestamp) + _, err := db.Exec(ctx, "insert into chat_blocked_users (blocker_user_id, blockee_user_id, created_at) values ($1, $2, $3) on conflict do nothing", userId, blockeeUserId, messageTimestamp.UTC()) return err } func chatUnblock(db dbv1.DBTX, ctx context.Context, userId int32, unblockedUserId int32, messageTimestamp time.Time) error { - _, err := db.Exec(ctx, "delete from chat_blocked_users where blocker_user_id = $1 and blockee_user_id = $2 and created_at < $3", userId, unblockedUserId, messageTimestamp) + _, err := db.Exec(ctx, "delete from chat_blocked_users where blocker_user_id = $1 and blockee_user_id = $2 and created_at < $3", userId, unblockedUserId, messageTimestamp.UTC()) return err } diff --git a/api/comms/chat_blast.go b/api/comms/chat_blast.go index 3aea22b4..f8c00e07 100644 --- a/api/comms/chat_blast.go +++ b/api/comms/chat_blast.go @@ -33,7 +33,7 @@ func chatBlast(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, pa ($1, $2, $3, $4, $5, $6, $7) on conflict (blast_id) do nothing - `, params.BlastID, userId, params.Audience, params.AudienceContentType, audienceContentID, params.Message, ts) + `, params.BlastID, userId, params.Audience, params.AudienceContentType, audienceContentID, params.Message, ts.UTC()) if err != nil { return nil, err } @@ -72,7 +72,7 @@ func chatBlast(db dbv1.DBTX, ctx context.Context, userId int32, ts time.Time, pa SELECT chat_id FROM targ; ` - rows, err := db.Query(ctx, fanOutSql, params.BlastID, ts) + rows, err := db.Query(ctx, fanOutSql, params.BlastID, ts.UTC()) if err != nil { return nil, err } diff --git a/api/comms/chat_block_test.go b/api/comms/chat_block_test.go index 5c3e5b4b..d43d8a90 100644 --- a/api/comms/chat_block_test.go +++ b/api/comms/chat_block_test.go @@ -27,7 +27,7 @@ func TestChatBlocking(t *testing.T) { user2Id := seededRand.Int31() assertBlocked := func(blockerUserId int32, blockeeUserId int32, timestamp time.Time, expected int) { - row := pool.QueryRow(ctx, "select count(*) from chat_blocked_users where blocker_user_id = $1 and blockee_user_id = $2 and created_at = $3", blockerUserId, blockeeUserId, timestamp) + row := pool.QueryRow(ctx, "select count(*) from chat_blocked_users where blocker_user_id = $1 and blockee_user_id = $2 and created_at = $3", blockerUserId, blockeeUserId, timestamp.UTC()) var count int err := row.Scan(&count) assert.NoError(t, err) diff --git a/api/comms/rpc_processor.go b/api/comms/rpc_processor.go index c24cc8b5..ed973da1 100644 --- a/api/comms/rpc_processor.go +++ b/api/comms/rpc_processor.go @@ -2,6 +2,8 @@ package comms import ( "context" + "crypto/ecdsa" + "encoding/base64" "encoding/json" "strings" "sync" @@ -12,17 +14,49 @@ import ( "bridgerton.audius.co/trashid" "go.uber.org/zap" + "github.com/ethereum/go-ethereum/crypto" + "github.com/gofiber/contrib/websocket" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgtype" "github.com/jackc/pgx/v5/pgxpool" + "github.com/jackc/pgxlisten" "github.com/tidwall/gjson" ) +var ( + chatMessageInsertedChannel = "chat_message_inserted" + chatBlastInsertedChannel = "chat_blast_inserted" + chatMessageReactionChanged = "chat_message_reaction_changed" +) + +type chatMessageInsertedNotification struct { + MessageID string `json:"message_id"` +} + +type chatBlastInsertedNotification struct { + BlastID string `json:"blast_id"` +} + +type chatMessageReactionInsertedNotification struct { + MessageID string `json:"message_id"` + UserID int32 `json:"user_id"` + Reaction *string `json:"reaction"` +} + type RPCProcessor struct { sync.Mutex - pool *dbv1.DBPools - writePool *pgxpool.Pool - validator *Validator - logger *zap.Logger + pool *dbv1.DBPools + writePool *pgxpool.Pool + validator *Validator + logger *zap.Logger + websocketManager *CommsWebsocketManager + + // PostgreSQL LISTEN/NOTIFY fields + listener *pgxlisten.Listener + listenCtx context.Context + listenCancel context.CancelFunc + listenWg sync.WaitGroup } func NewProcessor(pool *dbv1.DBPools, writePool *pgxpool.Pool, config *config.Config, logger *zap.Logger) (*RPCProcessor, error) { @@ -30,16 +64,50 @@ func NewProcessor(pool *dbv1.DBPools, writePool *pgxpool.Pool, config *config.Co // set up validator validator := NewValidator(pool, DefaultRateLimitConfig, config, logger) + var websocketManager *CommsWebsocketManager + var ctx context.Context + var cancel context.CancelFunc + if config.CommsMessagePush { + ctx, cancel = context.WithCancel(context.Background()) + websocketManager = NewCommsWebsocketManager(logger) + } + proc := &RPCProcessor{ - validator: validator, - pool: pool, - writePool: writePool, - logger: logger, + validator: validator, + pool: pool, + writePool: writePool, + logger: logger, + websocketManager: websocketManager, + + listenCtx: ctx, + listenCancel: cancel, + } + + if config.CommsMessagePush { + proc.startPgNotifyListeners() } return proc, nil } +func (proc *RPCProcessor) Shutdown() { + // If no listener, nothing to do + if proc.listenCancel == nil { + return + } + + proc.listenCancel() + + if proc.listener != nil { + // The listener will be stopped when the context is cancelled + proc.listener = nil + } + + // Wait for the listener goroutine to finish + proc.listenWg.Wait() + proc.logger.Info("Stopped listening for comms chat_message_inserted, chat_blast_inserted, and chat_message_reaction_inserted notifications") +} + func (proc *RPCProcessor) Validate(ctx context.Context, userId int32, rawRpc RawRPC) error { return proc.validator.Validate(ctx, userId, rawRpc) } @@ -66,7 +134,7 @@ func (proc *RPCProcessor) Apply(ctx context.Context, rpcLog *RpcLog) error { } // validate signing wallet - wallet, err := recoverSigningWallet(rpcLog.Sig, rpcLog.Rpc) + wallet, _, err := RecoverSigningWallet(rpcLog.Sig, rpcLog.Rpc) if err != nil { logger.Warn("unable to recover wallet, skipping") return nil @@ -93,7 +161,7 @@ func (proc *RPCProcessor) Apply(ctx context.Context, rpcLog *RpcLog) error { } // get ts - messageTs := rpcLog.RelayedAt + messageTs := rpcLog.RelayedAt.UTC() userId, err := proc.GetRPCCurrentUserID(ctx, rpcLog, &rawRpc) if err != nil { @@ -248,20 +316,10 @@ select last_active_at from chat_member where chat_id = $1 and user_id = $2` return err } - outgoingMessages, err := chatBlast(tx, ctx, userId, messageTs, params) + _, err = chatBlast(tx, ctx, userId, messageTs, params) if err != nil { return err } - // Send chat message websocket event to all recipients who have existing chats - for _, outgoingMessage := range outgoingMessages { - _, err := json.Marshal(outgoingMessage.ChatMessageRPC) - if err != nil { - logger.Error("err: invalid json", zap.Error(err)) - } else { - // TODO - // websocketNotify(json.RawMessage(j), userId, messageTs.Round(time.Microsecond)) - } - } default: logger.Warn("no handler for ", zap.String("method", rawRpc.Method)) } @@ -273,12 +331,6 @@ select last_active_at from chat_member where chat_id = $1 and user_id = $2` return err } logger.Debug("commited", zap.Duration("took", takeSplit())) - - // TODO - // send out websocket events fire + forget style - // websocketNotify(rpcLog.Rpc, userId, messageTs.Round(time.Microsecond)) - logger.Debug("websocket push done", zap.Duration("took", takeSplit())) - return nil } @@ -342,31 +394,265 @@ func insertRpcLogRow(db dbv1.DBTX, ctx context.Context, rpcLog *RpcLog) (int64, VALUES ($1, $2, $3, $4, $5, $6) ON CONFLICT DO NOTHING ` - result, err := db.Exec(ctx, query, rpcLog.RelayedBy, rpcLog.RelayedAt, time.Now(), rpcLog.FromWallet, rpcLog.Rpc, rpcLog.Sig) + result, err := db.Exec(ctx, query, rpcLog.RelayedBy, rpcLog.RelayedAt.UTC(), time.Now().UTC(), rpcLog.FromWallet, rpcLog.Rpc, rpcLog.Sig) if err != nil { return 0, err } return result.RowsAffected(), nil } -// func websocketNotify(rpcJson json.RawMessage, userId int32, timestamp time.Time) { -// if chatId := gjson.GetBytes(rpcJson, "params.chat_id").String(); chatId != "" { - -// var userIds []int32 -// err := db.Conn.Select(&userIds, `select user_id from chat_member where chat_id = $1 and is_hidden = false`, chatId) -// if err != nil { -// logger.Warn("failed to load chat members for websocket push " + err.Error()) -// return -// } - -// for _, receiverUserId := range userIds { -// websocketPush(userId, receiverUserId, rpcJson, timestamp) -// } -// } else if gjson.GetBytes(rpcJson, "method").String() == "chat.blast" { -// go func() { -// // Add delay before broadcasting blast messages - see PAY-3573 -// time.Sleep(30 * time.Second) -// websocketPushAll(userId, rpcJson, timestamp) -// }() -// } -// } +/** Watch for pg_notify() on new chat messages and blast messages so we can send websocket events to the appropriate users */ +func (proc *RPCProcessor) startPgNotifyListeners() error { + if proc.listener != nil { + return nil // Already listening + } + + proc.listener = &pgxlisten.Listener{ + Connect: func(ctx context.Context) (*pgx.Conn, error) { + // Use the write pool for listening to ensure we get notifications from the primary database + conn, err := proc.writePool.Acquire(ctx) + if err != nil { + return nil, err + } + return conn.Conn(), nil + }, + LogError: func(ctx context.Context, err error) { + proc.logger.Error("Comms RPC pg_notify listener error", zap.Error(err)) + }, + ReconnectDelay: 10 * time.Second, + } + + proc.listener.Handle(chatMessageInsertedChannel, pgxlisten.HandlerFunc(proc.handleChatMessageInserted)) + proc.listener.Handle(chatBlastInsertedChannel, pgxlisten.HandlerFunc(proc.handleChatBlastInserted)) + proc.listener.Handle(chatMessageReactionChanged, pgxlisten.HandlerFunc(proc.handleChatMessageReactionChanged)) + + // Start listening in a goroutine + proc.listenWg.Add(1) + go func() { + defer proc.listenWg.Done() + if err := proc.listener.Listen(proc.listenCtx); err != nil { + proc.logger.Error("Comms RPC pg_notify listener failed", zap.Error(err)) + } + }() + + proc.logger.Info("Started listening for comms chat_message_inserted, chat_blast_inserted, and chat_message_reaction_inserted notifications") + return nil +} + +func (proc *RPCProcessor) handleChatMessageInserted(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { + proc.logger.Debug("Received PostgreSQL notification for chat message", + zap.String("channel", notification.Channel), + zap.String("payload", notification.Payload)) + + var payload chatMessageInsertedNotification + if err := json.Unmarshal([]byte(notification.Payload), &payload); err != nil { + proc.logger.Error("Failed to parse chat message notification payload", zap.Error(err)) + return err + } + + type InsertedChatMessage struct { + MessageID string `db:"message_id"` + ChatID string `db:"chat_id"` + UserID int32 `db:"user_id"` + CreatedAt time.Time `db:"created_at"` + Ciphertext pgtype.Text `db:"ciphertext"` + IsPlaintext bool `db:"is_plaintext"` + } + // Joins on blasts to get message text if the origin was a blast + row, err := proc.writePool.Query(ctx, ` + SELECT + chat_message.message_id, + chat_message.chat_id, + chat_message.user_id, + chat_message.created_at, + COALESCE(chat_message.ciphertext, chat_blast.plaintext) AS ciphertext, + chat_blast.plaintext IS NOT NULL as is_plaintext + FROM chat_message + JOIN chat_member ON chat_message.chat_id = chat_member.chat_id + LEFT JOIN chat_blast USING (blast_id) + WHERE message_id = $1`, payload.MessageID) + if err != nil { + proc.logger.Error("Failed to query chat message", zap.Error(err)) + return err + } + chatMessage, err := pgx.CollectOneRow(row, pgx.RowToStructByName[InsertedChatMessage]) + if err != nil { + proc.logger.Error("Failed to collect chat message", zap.Error(err)) + return err + } + + // Get chat members to notify + userRows, err := proc.writePool.Query(ctx, `select user_id from chat_member where chat_id = $1 and is_hidden = false`, chatMessage.ChatID) + if err != nil { + proc.logger.Error("failed to load chat members for websocket push " + err.Error()) + return err + } + userIds, err := pgx.CollectRows(userRows, pgx.RowTo[int32]) + if err != nil { + proc.logger.Error("failed to collect user ids for websocket push " + err.Error()) + return err + } + + messageData := ChatMessageRPC{ + Method: MethodChatMessage, + Params: ChatMessageRPCParams{ + ChatID: chatMessage.ChatID, + MessageID: chatMessage.MessageID, + IsPlaintext: &chatMessage.IsPlaintext, + Message: chatMessage.Ciphertext.String, + }, + } + + messageJson, err := json.Marshal(messageData) + if err != nil { + proc.logger.Error("Failed to marshal message data", zap.Error(err)) + return err + } + + messageTs := chatMessage.CreatedAt.UTC().Round(time.Microsecond) + + // Send to all chat members except the sender + for _, receiverUserId := range userIds { + if receiverUserId != chatMessage.UserID { + proc.websocketManager.WebsocketPush(chatMessage.UserID, receiverUserId, messageJson, messageTs) + } + } + + return nil +} + +func (proc *RPCProcessor) handleChatBlastInserted(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { + proc.logger.Debug("Received PostgreSQL notification for chat blast", + zap.String("channel", notification.Channel), + zap.String("payload", notification.Payload)) + + var payload chatBlastInsertedNotification + if err := json.Unmarshal([]byte(notification.Payload), &payload); err != nil { + proc.logger.Error("Failed to parse chat blast notification payload", zap.Error(err)) + return err + } + + row, err := proc.writePool.Query(ctx, ` + SELECT blast_id, from_user_id, audience, audience_content_id, plaintext, created_at, audience_content_type + FROM chat_blast + WHERE blast_id = $1`, payload.BlastID) + if err != nil { + proc.logger.Error("Failed to query chat blast", zap.Error(err)) + return err + } + chatBlast, err := pgx.CollectOneRow(row, pgx.RowToStructByName[dbv1.ChatBlast]) + if err != nil { + proc.logger.Error("Failed to collect chat blast", zap.Error(err)) + return err + } + + blastData := ChatBlastRPC{ + Method: MethodChatBlast, + Params: ChatBlastRPCParams{ + BlastID: chatBlast.BlastID, + Audience: ChatBlastAudience(chatBlast.Audience), + Message: chatBlast.Plaintext, + }, + } + + if chatBlast.AudienceContentID.Valid { + audienceContentID, err := trashid.EncodeHashId(int(chatBlast.AudienceContentID.Int32)) + if err != nil { + proc.logger.Error("Failed to encode audience content id", zap.Error(err)) + return err + } + blastData.Params.AudienceContentID = &audienceContentID + } + if chatBlast.AudienceContentType.Valid { + audienceContentType := AudienceContentType(chatBlast.AudienceContentType.String) + blastData.Params.AudienceContentType = &audienceContentType + } + + blastJson, err := json.Marshal(blastData) + if err != nil { + proc.logger.Error("Failed to marshal blast data", zap.Error(err)) + return err + } + + blastTs := chatBlast.CreatedAt.Time.Round(time.Microsecond) + + // Send to all connected clients (blast messages go to everyone) + go func() { + proc.websocketManager.WebsocketPushAll(chatBlast.FromUserID, blastJson, blastTs) + }() + + return nil +} + +func (proc *RPCProcessor) handleChatMessageReactionChanged(ctx context.Context, notification *pgconn.Notification, conn *pgx.Conn) error { + proc.logger.Debug("Received PostgreSQL notification for chat message reaction", + zap.String("channel", notification.Channel), + zap.String("payload", notification.Payload)) + + // Parse the notification payload + var payload chatMessageReactionInsertedNotification + if err := json.Unmarshal([]byte(notification.Payload), &payload); err != nil { + proc.logger.Error("Failed to parse chat message reaction notification payload", zap.Error(err)) + return err + } + + // Get the chat_id for this message to find members to notify + var chatID string + err := proc.writePool.QueryRow(ctx, `SELECT chat_id FROM chat_message WHERE message_id = $1`, payload.MessageID).Scan(&chatID) + if err != nil { + proc.logger.Error("Failed to get chat_id for message", zap.Error(err)) + return err + } + + // Get chat members to notify + userRows, err := proc.writePool.Query(ctx, `select user_id from chat_member where chat_id = $1 and is_hidden = false`, chatID) + if err != nil { + proc.logger.Error("failed to load chat members for websocket push " + err.Error()) + return err + } + userIds, err := pgx.CollectRows(userRows, pgx.RowTo[int32]) + if err != nil { + proc.logger.Error("failed to collect user ids for websocket push " + err.Error()) + return err + } + + reactionData := ChatReactRPC{ + Method: MethodChatReact, + Params: ChatReactRPCParams{ + ChatID: chatID, + MessageID: payload.MessageID, + Reaction: payload.Reaction, + }, + } + + reactionJson, err := json.Marshal(reactionData) + if err != nil { + proc.logger.Error("Failed to marshal reaction data", zap.Error(err)) + return err + } + + // Use current time since we don't have the timestamp in the notification + reactionTs := time.Now().UTC().Round(time.Microsecond) + + // Send to all chat members except the sender + for _, receiverUserId := range userIds { + if receiverUserId != payload.UserID { + proc.websocketManager.WebsocketPush(payload.UserID, receiverUserId, reactionJson, reactionTs) + } + } + + return nil +} + +func (proc *RPCProcessor) RegisterWebsocket(userId int32, conn *websocket.Conn) { + proc.websocketManager.RegisterWebsocket(userId, conn) +} + +func (proc *RPCProcessor) SetPubkeyForUser(userId int32, pubkey *ecdsa.PublicKey) { + pubkeyBytes := crypto.FromECDSAPub(pubkey) + pubkeyBase64 := base64.StdEncoding.EncodeToString(pubkeyBytes) + _, err := proc.writePool.Exec(context.Background(), `insert into user_pubkeys values ($1, $2) on conflict do nothing`, userId, pubkeyBase64) + if err != nil { + proc.logger.Warn("failed to set pubkey for user", zap.Error(err)) + } +} diff --git a/api/comms/signed_request.go b/api/comms/signed_request.go index f6f3be8c..8a7a90bc 100644 --- a/api/comms/signed_request.go +++ b/api/comms/signed_request.go @@ -1,40 +1,28 @@ package comms import ( + "crypto/ecdsa" "encoding/base64" "errors" "github.com/ethereum/go-ethereum/crypto" - "github.com/gofiber/fiber/v2" ) -func ReadSignedPost(c *fiber.Ctx) ([]byte, string, error) { - if c.Method() != "POST" { - return nil, "", errors.New("readSignedPost bad method: " + c.Method()) - } - - payload := c.Body() - - sigHex := c.Get(SigHeader) - wallet, err := recoverSigningWallet(sigHex, payload) - return payload, wallet, err -} - -func recoverSigningWallet(signatureHex string, signedData []byte) (string, error) { +func RecoverSigningWallet(signatureHex string, signedData []byte) (string, *ecdsa.PublicKey, error) { sig, err := base64.StdEncoding.DecodeString(signatureHex) if err != nil { err = errors.New("bad sig header: " + err.Error()) - return "", err + return "", nil, err } // recover hash := crypto.Keccak256Hash(signedData) pubkey, err := crypto.SigToPub(hash[:], sig) if err != nil { - return "", err + return "", nil, err } wallet := crypto.PubkeyToAddress(*pubkey).Hex() - return wallet, nil + return wallet, pubkey, nil } diff --git a/api/comms/websocket.go b/api/comms/websocket.go new file mode 100644 index 00000000..10b55d99 --- /dev/null +++ b/api/comms/websocket.go @@ -0,0 +1,268 @@ +package comms + +import ( + "encoding/json" + "sync" + "time" + + "bridgerton.audius.co/trashid" + "github.com/gofiber/contrib/websocket" + "go.uber.org/zap" +) + +const ( + sendQueueSize = 128 // per-connection limit + pingInterval = 30 * time.Second + readIdleTimeout = 60 * time.Second + writeDeadline = 10 * time.Second // Timeout for pushing a message to a client + recentTTL = 10 * time.Second + maxIncomingMsgSize = 1 << 20 // 1MB limit to incoming messages +) + +type CommsWebsocketManager struct { + mu sync.RWMutex + clients map[int32]map[*Client]struct{} // userId -> set of clients (could be connected from multiple devices) + recentMessages []*recentMessage + logger *zap.Logger +} + +type Client struct { + userId int32 + conn *websocket.Conn + send chan []byte + quit chan struct{} + + manager *CommsWebsocketManager +} + +type recentMessage struct { + userId int32 + sentAt time.Time + payload []byte +} + +func NewCommsWebsocketManager(logger *zap.Logger) *CommsWebsocketManager { + return &CommsWebsocketManager{ + clients: make(map[int32]map[*Client]struct{}), + recentMessages: []*recentMessage{}, + logger: logger, + } +} + +// RegisterWebsocket wires up a long-lived read/write loop. +// Do NOT write directly to conn here; only the write pump writes. +func (m *CommsWebsocketManager) RegisterWebsocket(userId int32, conn *websocket.Conn) { + cl := &Client{ + userId: userId, + conn: conn, + send: make(chan []byte, sendQueueSize), + quit: make(chan struct{}), + manager: m, + } + + // Add to manager + m.mu.Lock() + if m.clients[userId] == nil { + m.clients[userId] = make(map[*Client]struct{}) + } + m.clients[userId][cl] = struct{}{} + m.mu.Unlock() + + // Replay very recent messages for this user by enqueuing them + now := time.Now() + m.mu.RLock() + for _, r := range m.recentMessages { + if r.userId == userId && now.Sub(r.sentAt) < recentTTL { + select { + case cl.send <- r.payload: + default: + // If they connect with a full buffer immediately, just drop replay. + m.logger.Info("ws replay dropped due to full buffer", zap.Int32("userId", userId)) + } + } + } + m.mu.RUnlock() + + // Start pumps and block so the connection is not closed + done := make(chan struct{}) + go func() { + cl.readPump() + close(done) + }() + go cl.writePump() + <-done +} + +func (m *CommsWebsocketManager) removeClient(cl *Client) { + m.mu.Lock() + defer m.mu.Unlock() + set := m.clients[cl.userId] + if set != nil { + if _, ok := set[cl]; ok { + delete(set, cl) + if len(set) == 0 { + delete(m.clients, cl.userId) + } + } + } + // Close underlying connection and channels + _ = cl.conn.Close() + select { + case <-cl.quit: + // already closed + default: + close(cl.quit) + } +} + +func (cl *Client) readPump() { + // Keep the connection alive by consuming control/data frames and handling pongs. + cl.conn.SetReadLimit(maxIncomingMsgSize) + _ = cl.conn.SetReadDeadline(time.Now().Add(readIdleTimeout)) + cl.conn.SetPongHandler(func(string) error { + return cl.conn.SetReadDeadline(time.Now().Add(readIdleTimeout)) + }) + + // We don't expect app-level inbound messages, but we still need to read to + // receive pings/closes and keep deadlines fresh. + for { + mt, r, err := cl.conn.NextReader() + if err != nil { + cl.manager.logger.Debug("ws read closed", + zap.Int32("userId", cl.userId), + zap.Error(err)) + cl.manager.removeClient(cl) + return + } + // Discard payload if any; we only care about keeping the socket healthy. + switch mt { + case websocket.TextMessage, websocket.BinaryMessage: + // Efficiently drain without allocating: + buf := make([]byte, 1024) + for { + _, derr := r.Read(buf) + if derr != nil { + break + } + } + } + } +} + +func (cl *Client) writePump() { + ticker := time.NewTicker(pingInterval) + defer ticker.Stop() + + for { + select { + case msg, ok := <-cl.send: + if !ok { + // Manager closed channel; send a close frame if possible. + _ = cl.conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + _ = cl.conn.WriteMessage(websocket.CloseMessage, nil) + cl.manager.removeClient(cl) + return + } + _ = cl.conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err := cl.conn.WriteMessage(websocket.TextMessage, msg); err != nil { + cl.manager.logger.Info("ws write error", + zap.Int32("userId", cl.userId), + zap.Error(err)) + cl.manager.removeClient(cl) + return + } + + case <-ticker.C: + // Keep-alive ping + _ = cl.conn.SetWriteDeadline(time.Now().Add(writeDeadline)) + if err := cl.conn.WriteMessage(websocket.PingMessage, nil); err != nil { + cl.manager.logger.Debug("ws ping failed", + zap.Int32("userId", cl.userId), + zap.Error(err)) + cl.manager.removeClient(cl) + return + } + + case <-cl.quit: + // Manager requested shutdown + return + } + } +} + +// Push to a single receiver (all connected clients) +func (m *CommsWebsocketManager) WebsocketPush(senderUserId int32, receiverUserId int32, rpcJson json.RawMessage, timestamp time.Time) { + encodedSenderUserId, _ := trashid.EncodeHashId(int(senderUserId)) + encodedReceiverUserId, _ := trashid.EncodeHashId(int(receiverUserId)) + + data := struct { + RPC json.RawMessage `json:"rpc"` + Metadata Metadata `json:"metadata"` + }{ + rpcJson, + Metadata{ + Timestamp: timestamp.Format(time.RFC3339Nano), + SenderUserID: encodedSenderUserId, + ReceiverUserID: encodedReceiverUserId, + UserID: encodedSenderUserId, + }, + } + + payload, err := json.Marshal(data) + if err != nil { + m.logger.Warn("invalid websocket json " + err.Error()) + return + } + + // Fanout + m.mu.RLock() + targets := m.clients[receiverUserId] + m.mu.RUnlock() + + for cl := range targets { + select { + case cl.send <- payload: + // ok + default: + // If we get here, the client buffer is full (too slow in processing) + // and we will drop them for now. They can re-connect if needed. + m.logger.Info("ws buffer full; dropping client", + zap.Int32("userId", receiverUserId)) + m.removeClient(cl) + } + } + + // Clear expired messages out of the recentMessages buffer + m.mu.Lock() + now := time.Now() + kept := m.recentMessages[:0] + for _, r := range m.recentMessages { + if now.Sub(r.sentAt) < recentTTL { + kept = append(kept, r) + } + } + m.recentMessages = append(kept, &recentMessage{ + userId: receiverUserId, + sentAt: now, + payload: payload, + }) + m.mu.Unlock() + + m.logger.Debug("websocket push", + zap.Int32("userId", receiverUserId), + zap.Int("numClients", len(targets))) +} + +// Push to all connected clients +func (m *CommsWebsocketManager) WebsocketPushAll(senderUserId int32, rpcJson json.RawMessage, timestamp time.Time) { + m.mu.RLock() + userIds := make([]int32, 0, len(m.clients)) + for uid := range m.clients { + userIds = append(userIds, uid) + } + m.mu.RUnlock() + + for _, uid := range userIds { + m.WebsocketPush(senderUserId, uid, rpcJson, timestamp) + } +} diff --git a/api/comms_mutate.go b/api/comms_mutate.go index e5afa369..42415b1a 100644 --- a/api/comms_mutate.go +++ b/api/comms_mutate.go @@ -12,7 +12,7 @@ import ( ) func (app *ApiServer) mutateChat(c *fiber.Ctx) error { - payload, wallet, err := comms.ReadSignedPost(c) + payload, wallet, userId, err := app.readSignedCommsPostRequest(c) if err != nil { return fiber.NewError(fiber.StatusBadRequest, "bad request: "+err.Error()) } @@ -26,17 +26,12 @@ func (app *ApiServer) mutateChat(c *fiber.Ctx) error { rpcLog := &comms.RpcLog{ RelayedBy: "bridge", - RelayedAt: time.Now(), + RelayedAt: time.Now().UTC(), FromWallet: wallet, Rpc: payload, Sig: c.Get(comms.SigHeader), } - userId, err := app.getUserIDFromWallet(c.Context(), wallet) - if err != nil { - return err - } - err = app.commsRpcProcessor.Validate(c.Context(), int32(userId), rawRpc) if err != nil { if errors.Is(err, comms.ErrAttestationFailed) { diff --git a/api/comms_websocket.go b/api/comms_websocket.go new file mode 100644 index 00000000..290800b6 --- /dev/null +++ b/api/comms_websocket.go @@ -0,0 +1,11 @@ +package api + +import ( + "github.com/gofiber/contrib/websocket" +) + +func (app *ApiServer) getChatWebsocket(conn *websocket.Conn) { + userId := int32(conn.Locals("websocketUserId").(int)) + + app.commsRpcProcessor.RegisterWebsocket(userId, conn) +} diff --git a/api/resolve_middleware.go b/api/resolve_middleware.go index e2d1228e..ce1b7a2b 100644 --- a/api/resolve_middleware.go +++ b/api/resolve_middleware.go @@ -4,6 +4,7 @@ import ( "strings" "bridgerton.audius.co/trashid" + "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/utils" ) @@ -91,3 +92,15 @@ func (app *ApiServer) requirePlaylistIdMiddleware(c *fiber.Ctx) error { c.Locals("playlistId", playlistId) return c.Next() } + +func (app *ApiServer) validateWebsocketMiddleware(c *fiber.Ctx) error { + if !websocket.IsWebSocketUpgrade(c) { + return fiber.ErrUpgradeRequired + } + userId, err := app.userIdForSignedCommsRequest(c) + if err != nil { + return err + } + c.Locals("websocketUserId", userId) + return c.Next() +} diff --git a/api/server.go b/api/server.go index 09c999b7..b236a802 100644 --- a/api/server.go +++ b/api/server.go @@ -30,6 +30,7 @@ import ( "github.com/gagliardetto/solana-go/rpc" "github.com/gofiber/contrib/fiberzap/v2" "github.com/gofiber/contrib/swagger" + "github.com/gofiber/contrib/websocket" "github.com/gofiber/fiber/v2" "github.com/gofiber/fiber/v2/middleware/adaptor" "github.com/gofiber/fiber/v2/middleware/cors" @@ -297,7 +298,6 @@ func NewApiServer(config config.Config) *ApiServer { app.Use("/v1/full/tracks/best_new_releases", BalancerForward(config.PythonUpstreams)) app.Use("/v1/full/tracks/most_loved", BalancerForward(config.PythonUpstreams)) app.Use("/v1/full/tracks/remixables", BalancerForward(config.PythonUpstreams)) - app.Use("/comms/chats/ws", BalancerForward(config.PythonUpstreams)) } v1 := app.Group("/v1") @@ -503,6 +503,7 @@ func NewApiServer(config config.Config) *ApiServer { comms.Get("/chats/permissions", app.getChatPermissions) comms.Get("/chats/blockers", app.getChatBlockers) comms.Get("/chats/blockees", app.getChatBlockees) + comms.Get("/chats/ws", app.validateWebsocketMiddleware, websocket.New(app.getChatWebsocket)) comms.Get("/chats/:chatId/messages", app.getChatMessages) comms.Get("/chats/:chatId", app.getChat) @@ -669,6 +670,7 @@ func (as *ApiServer) Serve() { signal.Notify(c, os.Interrupt) go func() { <-c + as.commsRpcProcessor.Shutdown() flushTicker.Stop() // Shutdown metrics collector if it exists diff --git a/api/server_test.go b/api/server_test.go index 2ab05e6e..06d201f6 100644 --- a/api/server_test.go +++ b/api/server_test.go @@ -38,9 +38,13 @@ func emptyTestApp(t *testing.T) *ApiServer { // Dummy key DelegatePrivateKey: "0633fddb74e32b3cbc64382e405146319c11a1a52dc96598e557c5dbe2f31468", SolanaConfig: config.SolanaConfig{RpcProviders: []string{""}}, + // Disable message push by default. Tests for it can create + // an RPC processor directly. + CommsMessagePush: false, }) t.Cleanup(func() { + app.commsRpcProcessor.Shutdown() app.pool.Close() if app.writePool != nil { app.writePool.Close() diff --git a/config/config.go b/config/config.go index b6933e16..27a62224 100644 --- a/config/config.go +++ b/config/config.go @@ -38,6 +38,7 @@ type Config struct { BirdeyeToken string SolanaIndexerWorkers int SolanaIndexerRetryInterval time.Duration + CommsMessagePush bool } var Cfg = Config{ @@ -57,6 +58,7 @@ var Cfg = Config{ BirdeyeToken: os.Getenv("birdeyeToken"), SolanaIndexerWorkers: 50, SolanaIndexerRetryInterval: 5 * time.Minute, + CommsMessagePush: true, } func init() { @@ -124,6 +126,14 @@ func init() { log.Fatalf("Unknown environment: %s", env) } + if os.Getenv("commsMessagePush") != "" { + commsMessagePushEnabled, err := strconv.ParseBool(os.Getenv("commsMessagePush")) + if err != nil { + log.Fatalf("Invalid commsMessagePush: %s", err) + } + Cfg.CommsMessagePush = commsMessagePushEnabled + } + // Solana indexer config retryInterval := os.Getenv("solanaIndexerRetryInterval") if retryInterval != "" { diff --git a/ddl/functions/handle_chat_blast.sql b/ddl/functions/handle_chat_blast.sql new file mode 100644 index 00000000..b9471c7a --- /dev/null +++ b/ddl/functions/handle_chat_blast.sql @@ -0,0 +1,22 @@ +create or replace function handle_chat_blast() returns trigger as $$ +declare +begin + PERFORM pg_notify('chat_blast_inserted', json_build_object('blast_id', new.blast_id)::text); + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + raise; + +end; +$$ language plpgsql; + + +do $$ begin + create trigger on_chat_blast + after insert on chat_blast + for each row execute procedure handle_chat_blast(); +exception + when others then null; +end $$; diff --git a/ddl/functions/handle_chat_message.sql b/ddl/functions/handle_chat_message.sql new file mode 100644 index 00000000..d8b3c618 --- /dev/null +++ b/ddl/functions/handle_chat_message.sql @@ -0,0 +1,22 @@ +create or replace function handle_chat_message() returns trigger as $$ +declare +begin + PERFORM pg_notify('chat_message_inserted', json_build_object('message_id', new.message_id)::text); + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + raise; + +end; +$$ language plpgsql; + + +do $$ begin + create trigger on_chat_message + after insert on chat_message + for each row execute procedure handle_chat_message(); +exception + when others then null; +end $$; diff --git a/ddl/functions/handle_chat_message_reaction.sql b/ddl/functions/handle_chat_message_reaction.sql new file mode 100644 index 00000000..93302c0b --- /dev/null +++ b/ddl/functions/handle_chat_message_reaction.sql @@ -0,0 +1,40 @@ +create or replace function handle_chat_message_reaction_changed() returns trigger as $$ +declare + message_id text; + user_id bigint; + reaction text; +begin + -- Get the values from either NEW or OLD record + if tg_op = 'DELETE' then + message_id := old.message_id; + user_id := old.user_id; + reaction := null; -- Set reaction to null for deletions + else + message_id := new.message_id; + user_id := new.user_id; + reaction := new.reaction; + end if; + + PERFORM pg_notify('chat_message_reaction_changed', json_build_object( + 'message_id', message_id, + 'user_id', user_id, + 'reaction', reaction + )::text); + return coalesce(new, old); + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + raise; + +end; +$$ language plpgsql; + + +do $$ begin + create trigger on_chat_message_reaction_changed + after insert or update or delete on chat_message_reactions + for each row execute procedure handle_chat_message_reaction_changed(); +exception + when others then null; +end $$; diff --git a/ddl/functions/handle_comms_rpc_log.sql b/ddl/functions/handle_comms_rpc_log.sql new file mode 100644 index 00000000..f5e7e557 --- /dev/null +++ b/ddl/functions/handle_comms_rpc_log.sql @@ -0,0 +1,23 @@ +create or replace function handle_comms_rpc_log() returns trigger as $$ +declare +begin + -- Send notification with the signature (primary key) of the new rpc_log record + PERFORM pg_notify('rpc_log_inserted', json_build_object('sig', new.sig)::text); + return null; + +exception + when others then + raise warning 'An error occurred in %: %', tg_name, sqlerrm; + raise; + +end; +$$ language plpgsql; + + +do $$ begin + create trigger on_rpc_log + after insert on rpc_log + for each row execute procedure handle_comms_rpc_log(); +exception + when others then null; +end $$; diff --git a/go.mod b/go.mod index 9eeb0ef4..b97a3a7a 100644 --- a/go.mod +++ b/go.mod @@ -19,7 +19,8 @@ require ( github.com/go-playground/validator/v10 v10.26.0 github.com/gofiber/contrib/fiberzap/v2 v2.1.5 github.com/gofiber/contrib/swagger v1.3.0 - github.com/gofiber/fiber/v2 v2.52.6 + github.com/gofiber/contrib/websocket v1.3.4 + github.com/gofiber/fiber/v2 v2.52.9 github.com/jackc/pgx/v5 v5.7.4 github.com/jackc/pgxlisten v0.0.0-20241106001234-1d6f6656415c github.com/joho/godotenv v1.5.1 @@ -36,9 +37,9 @@ require ( github.com/tidwall/gjson v1.18.0 github.com/tidwall/pretty v1.2.1 github.com/tidwall/sjson v1.2.5 - github.com/valyala/fasthttp v1.59.0 + github.com/valyala/fasthttp v1.65.0 go.uber.org/zap v1.27.0 - golang.org/x/sync v0.13.0 + golang.org/x/sync v0.16.0 google.golang.org/grpc v1.71.1 google.golang.org/protobuf v1.36.6 ) @@ -48,7 +49,7 @@ require ( filippo.io/edwards25519 v1.0.0-rc.1 // indirect github.com/DataDog/zstd v1.5.2 // indirect github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 // indirect - github.com/andybalholm/brotli v1.1.1 // indirect + github.com/andybalholm/brotli v1.2.0 // indirect github.com/artyom/autoflags v1.1.1 // indirect github.com/artyom/httpflags v1.2.0 // indirect github.com/artyom/oembed v1.0.1 // indirect @@ -85,6 +86,7 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.6.0 // indirect github.com/ethereum/c-kzg-4844 v1.0.0 // indirect github.com/ethereum/go-verkle v0.2.2 // indirect + github.com/fasthttp/websocket v1.5.12 // indirect github.com/fatih/color v1.18.0 // indirect github.com/fatih/structs v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -161,6 +163,7 @@ require ( github.com/rivo/uniseg v0.4.7 // indirect github.com/rogpeppe/go-internal v1.14.1 // indirect github.com/sasha-s/go-deadlock v0.3.5 // indirect + github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287 // indirect github.com/segmentio/asm v1.2.0 // indirect github.com/streamingfast/logging v0.0.0-20230608130331-f22c91403091 // indirect github.com/stretchr/objx v0.5.2 // indirect @@ -180,12 +183,12 @@ require ( go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/ratelimit v0.2.0 // indirect - golang.org/x/crypto v0.37.0 // indirect + golang.org/x/crypto v0.41.0 // indirect golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 // indirect - golang.org/x/net v0.39.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/term v0.31.0 // indirect - golang.org/x/text v0.24.0 // indirect + golang.org/x/net v0.43.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/term v0.34.0 // indirect + golang.org/x/text v0.28.0 // indirect golang.org/x/time v0.9.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20250324211829-b45e905df463 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20250324211829-b45e905df463 // indirect diff --git a/go.sum b/go.sum index 8fb09250..7f46c4db 100644 --- a/go.sum +++ b/go.sum @@ -18,8 +18,8 @@ github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjC github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129 h1:MzBOUgng9orim59UnfUTLRjMpd09C5uEVQ6RPGeCaVI= github.com/andres-erbsen/clock v0.0.0-20160526145045-9e14626cd129/go.mod h1:rFgpPQZYZ8vdbc+48xibu8ALc3yeyd64IhHS+PU6Yyg= -github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= -github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= +github.com/andybalholm/brotli v1.2.0 h1:ukwgCxwYrmACq68yiUqwIWnGY0cTPox/M94sVwToPjQ= +github.com/andybalholm/brotli v1.2.0/go.mod h1:rzTDkvFWvIrjDXZHkuS16NPggd91W3kUSvPlQ1pLaKY= github.com/aquasecurity/esquery v0.2.0 h1:9WWXve95TE8hbm3736WB7nS6Owl8UGDeu+0jiyE9ttA= github.com/aquasecurity/esquery v0.2.0/go.mod h1:VU+CIFR6C+H142HHZf9RUkp4Eedpo9UrEKeCQHWf9ao= github.com/artyom/autoflags v1.1.1 h1:8flRmpb7xpjLHFVcM+HN+cEEKLw+H5a2hABDbRvfG9A= @@ -124,6 +124,8 @@ github.com/ethereum/go-ethereum v1.15.8 h1:H6NilvRXFVoHiXZ3zkuTqKW5XcxjLZniV5Ujx github.com/ethereum/go-ethereum v1.15.8/go.mod h1:+S9k+jFzlyVTNcYGvqFhzN/SFhI6vA+aOY4T5tLSPL0= github.com/ethereum/go-verkle v0.2.2 h1:I2W0WjnrFUIzzVPwm8ykY+7pL2d4VhlsePn4j7cnFk8= github.com/ethereum/go-verkle v0.2.2/go.mod h1:M3b90YRnzqKyyzBEWJGqj8Qff4IDeXnzFw0P9bFw3uk= +github.com/fasthttp/websocket v1.5.12 h1:e4RGPpWW2HTbL3zV0Y/t7g0ub294LkiuXXUuTOUInlE= +github.com/fasthttp/websocket v1.5.12/go.mod h1:I+liyL7/4moHojiOgUOIKEWm9EIxHqxZChS+aMFltyg= github.com/fatih/color v1.18.0 h1:S8gINlzdQ840/4pfAwic/ZE0djQEH3wM94VfqLTZcOM= github.com/fatih/color v1.18.0/go.mod h1:4FelSpRwEGDpQ12mAdzqdOukCy4u8WUtOY6lkT/6HfU= github.com/fatih/structs v1.1.0 h1:Q7juDM0QtcnhCpeyLGQKyg4TOIghuNXrkL32pHAUMxo= @@ -204,8 +206,10 @@ github.com/gofiber/contrib/fiberzap/v2 v2.1.5 h1:mJnuTFd1gQw/DFUz1mkJlMmiwPGJ5cE github.com/gofiber/contrib/fiberzap/v2 v2.1.5/go.mod h1:PtrHZhZvHC8deg3jRfjzlv1tk3Mtn0cmat7db+eqA6I= github.com/gofiber/contrib/swagger v1.3.0 h1:J1InCTPUW/DzDlG+QwWcD5QZ4W9HlyCRHLZjKKVZd+g= github.com/gofiber/contrib/swagger v1.3.0/go.mod h1:zlZljpjIz1VhKR25+Inxl7WaOkgyM10nITUFXn6sV5A= -github.com/gofiber/fiber/v2 v2.52.6 h1:Rfp+ILPiYSvvVuIPvxrBns+HJp8qGLDnLJawAu27XVI= -github.com/gofiber/fiber/v2 v2.52.6/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= +github.com/gofiber/contrib/websocket v1.3.4 h1:tWeBdbJ8q0WFQXariLN4dBIbGH9KBU75s0s7YXplOSg= +github.com/gofiber/contrib/websocket v1.3.4/go.mod h1:kTFBPC6YENCnKfKx0BoOFjgXxdz7E85/STdkmZPEmPs= +github.com/gofiber/fiber/v2 v2.52.9 h1:YjKl5DOiyP3j0mO61u3NTmK7or8GzzWzCFzkboyP5cw= +github.com/gofiber/fiber/v2 v2.52.9/go.mod h1:YEcBbO/FB+5M1IZNBP9FO3J9281zgPAreiI1oqg8nDw= github.com/gofrs/flock v0.12.1 h1:MTLVXXHf8ekldpJk3AKicLij9MdwOWkZ+a/jHHZby9E= github.com/gofrs/flock v0.12.1/go.mod h1:9zxTsyu5xtJ9DK+1tFZyibEV7y3uwDxPPfbxeeHCoD0= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= @@ -417,6 +421,8 @@ github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/sasha-s/go-deadlock v0.3.5 h1:tNCOEEDG6tBqrNDOX35j/7hL5FcFViG6awUGROb2NsU= github.com/sasha-s/go-deadlock v0.3.5/go.mod h1:bugP6EGbdGYObIlx7pUZtWqlvo8k9H6vCBBsiChJQ5U= +github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287 h1:qIQ0tWF9vxGtkJa24bR+2i53WBCz1nW/Pc47oVYauC4= +github.com/savsgio/gotils v0.0.0-20250408102913-196191ec6287/go.mod h1:sM7Mt7uEoCeFSCBM+qBrqvEo+/9vdmj19wzp3yzUhmg= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/segmentio/encoding v0.4.1 h1:KLGaLSW0jrmhB58Nn4+98spfvPvmo4Ci1P/WIQ9wn7w= @@ -465,8 +471,8 @@ github.com/tklauser/numcpus v0.6.1 h1:ng9scYS7az0Bk4OZLvrNXNSAO2Pxr1XXRAPyjhIx+F github.com/tklauser/numcpus v0.6.1/go.mod h1:1XfjsgE2zo8GVw7POkMbHENHzVg3GzmoZ9fESEdAacY= github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.59.0 h1:Qu0qYHfXvPk1mSLNqcFtEk6DpxgA26hy6bmydotDpRI= -github.com/valyala/fasthttp v1.59.0/go.mod h1:GTxNb9Bc6r2a9D0TWNSPwDz78UxnTGBViY3xZNEqyYU= +github.com/valyala/fasthttp v1.65.0 h1:j/u3uzFEGFfRxw79iYzJN+TteTJwbYkru9uDp3d0Yf8= +github.com/valyala/fasthttp v1.65.0/go.mod h1:P/93/YkKPMsKSnATEeELUCkG8a7Y+k99uxNHVbKINr4= github.com/valyala/fasttemplate v1.2.2 h1:lxLXG0uE3Qnshl9QyaK6XJxMXlQZELvChBOCmQD0Loo= github.com/valyala/fasttemplate v1.2.2/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= @@ -529,8 +535,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE= -golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc= +golang.org/x/crypto v0.41.0 h1:WKYxWedPGCTVVl5+WHSSrOBT0O8lx32+zxmHxijgXp4= +golang.org/x/crypto v0.41.0/go.mod h1:pO5AFd7FA68rFak7rOAGVuygIISepHftHnr8dr6+sUc= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0 h1:e66Fs6Z+fZTbFBAxKfP3PALWBtpfqks2bwGcexMxgtk= golang.org/x/exp v0.0.0-20240909161429-701f63a606c0/go.mod h1:2TbTHSBQa924w8M6Xs1QcRcFwyucIwBGpK1p2f1YFFY= @@ -556,8 +562,8 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY golang.org/x/net v0.0.0-20201110031124-69a78807bb2b/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY= -golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E= +golang.org/x/net v0.43.0 h1:lat02VYK2j4aLzMzecihNvTlJNQUq316m2Mr9rnM6YE= +golang.org/x/net v0.43.0/go.mod h1:vhO1fvI4dGsIjh73sWfUVjj3N7CA9WkKJNQm2svM6Jg= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -565,8 +571,8 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610= -golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= +golang.org/x/sync v0.16.0 h1:ycBJEhp9p4vXvUZNszeOq0kGTPghopOL8q0fq3vstxw= +golang.org/x/sync v0.16.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -584,18 +590,18 @@ golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20= -golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= +golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= +golang.org/x/sys v0.35.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o= -golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw= +golang.org/x/term v0.34.0 h1:O/2T7POpk0ZZ7MAzMeWFSg6S5IpWd/RXDlM9hgM3DR4= +golang.org/x/term v0.34.0/go.mod h1:5jC53AEywhIVebHgPVeg0mj8OD3VO9OzclacVrqpaAw= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0= -golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU= +golang.org/x/text v0.28.0 h1:rhazDwis8INMIwQ4tpjLDzUhx6RlXqZNPEM0huQojng= +golang.org/x/text v0.28.0/go.mod h1:U8nCwOR8jO/marOQ0QbDiOngZVEBB7MAiitBuMjXiNU= golang.org/x/time v0.9.0 h1:EsRrnYcQiGH+5FfbgvV4AP7qEZstoyrHB0DzarOQ4ZY= golang.org/x/time v0.9.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -607,8 +613,8 @@ golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= -golang.org/x/tools v0.32.0 h1:Q7N1vhpkQv7ybVzLFtTjvQya2ewbwNDZzUgfXGqtMWU= -golang.org/x/tools v0.32.0/go.mod h1:ZxrU41P/wAbZD8EDa6dDCa6XfpkhJ7HFMjHJXfBDu8s= +golang.org/x/tools v0.35.0 h1:mBffYraMEf7aa0sB+NuKnuCy8qI/9Bughn8dC2Gu5r0= +golang.org/x/tools v0.35.0/go.mod h1:NKdj5HkL/73byiZSJjqJgKn3ep7KjFkBOkR/Hps3VPw= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=