diff --git a/src/transports/websocket.ts b/src/transports/websocket.ts index f38736dc..46bec02d 100644 --- a/src/transports/websocket.ts +++ b/src/transports/websocket.ts @@ -417,8 +417,16 @@ export class WebSocketTransport< logger.info( `The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`, ) - // Filter out query params from the URL to avoid leaking sensitive data - // and prevent metric cardinality explosion + const filteredUrl = this.currentUrl.split('?')[0] + metrics + .get('wsConnectionFailoverCount') + .labels({ transport_name: this.name, url: filteredUrl }) + .set(this.streamHandlerInvocationsWithNoConnection) + } else if (connectionClosed && this.wsConnection) { + this.streamHandlerInvocationsWithNoConnection += 1 + logger.info( + `Connection was closed externally (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`, + ) const filteredUrl = this.currentUrl.split('?')[0] metrics .get('wsConnectionFailoverCount') diff --git a/test/transports/websocket.test.ts b/test/transports/websocket.test.ts index 529d8c5d..9151d211 100644 --- a/test/transports/websocket.test.ts +++ b/test/transports/websocket.test.ts @@ -1312,3 +1312,362 @@ test.serial('does not heartbeat when handler throws an error', async (t) => { mockWsServer.close() await t.context.clock.runToLastAsync() }) + +// --------------------------------------------------------------------------- +// Tests demonstrating the connectionOpenedAt reset bug (1005 reconnect loop) +// --------------------------------------------------------------------------- + +// Separate URL to avoid mock-socket "already listening" conflicts with +// preceding tests that may not fully close their mock servers. +const ENDPOINT_URL_1005 = 'wss://test-ws-1005.com/asd' + +test.serial( + 'failover counter increments when connections are externally closed (1005)', + async (t) => { + const base = 'ETH' + const quote = 'DOGE' + + // TTL is higher than BACKGROUND_EXECUTE_MS_WS so the connectionUnresponsive + // path alone would NOT trigger. The fix detects externally-closed connections + // and increments the counter via a separate code path. + const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000 + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false }) + let connectionCounter = 0 + const failoverCounterValues: number[] = [] + + mockWsServer.on('connection', (socket) => { + connectionCounter++ + // Simulate provider dropping connection with 1005 shortly after open + setTimeout(() => { + socket.close({ code: 1005, reason: '', wasClean: false }) + }, 100) + }) + + const transport = new WebSocketTransport({ + url: (_context, _desiredSubs, urlConfigFunctionParameters) => { + failoverCounterValues.push( + urlConfigFunctionParameters.streamHandlerInvocationsWithNoConnection, + ) + return ENDPOINT_URL_1005 + }, + handlers: { + message(message) { + if (!message.pair) return [] + const [b, q] = message.pair.split('/') + return [ + { + params: { base: b, quote: q }, + response: { + data: { result: message.value }, + result: message.value, + timestamps: { providerIndicatedTimeUnixMs: Date.now() }, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => `S:${params.base}/${params.quote}`, + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_TTL: 60_000, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + const error = await testAdapter.request({ base, quote }) + t.is(error.statusCode, 504) + + // Advance clock through multiple reconnect cycles + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6) + + // Multiple reconnections should have occurred + t.true(connectionCounter >= 3, `Expected at least 3 reconnects but got ${connectionCounter}`) + + // The failover counter should have incremented because externally-closed + // connections are now detected and counted, even though + // connectionUnresponsive never becomes true. + t.true( + transport.streamHandlerInvocationsWithNoConnection > 0, + `Expected failover counter > 0, got ${transport.streamHandlerInvocationsWithNoConnection}`, + ) + t.true( + failoverCounterValues.some((v) => v > 0), + `Expected at least one url() call with counter > 0, got: [${failoverCounterValues}]`, + ) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +) + +test.serial( + 'failover counter increments for unresponsive-but-open connections (control test)', + async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000 + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false }) + let connectionCounter = 0 + const failoverCounterValues: number[] = [] + + // Server accepts connections but never sends data -- connection stays open + mockWsServer.on('connection', (socket) => { + connectionCounter++ + socket.on('message', () => { + // Accept subscribe messages but don't send any data back + }) + }) + + const transport = new WebSocketTransport({ + url: (_context, _desiredSubs, urlConfigFunctionParameters) => { + failoverCounterValues.push( + urlConfigFunctionParameters.streamHandlerInvocationsWithNoConnection, + ) + return ENDPOINT_URL_1005 + }, + handlers: { + message(message) { + if (!message.pair) return [] + const [b, q] = message.pair.split('/') + return [ + { + params: { base: b, quote: q }, + response: { + data: { result: message.value }, + result: message.value, + timestamps: { providerIndicatedTimeUnixMs: Date.now() }, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => `S:${params.base}/${params.quote}`, + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_TTL: 60_000, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + const error = await testAdapter.request({ base, quote }) + t.is(error.statusCode, 504) + + // Advance clock past WS_SUBSCRIPTION_UNRESPONSIVE_TTL so the open-but- + // silent connection is detected as unresponsive. Need 3+ cycles (15s+) + // because the check is strictly-greater-than (>) the TTL (10s). + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 4 + 500) + + // Connection was reopened after unresponsive detection + t.true(connectionCounter >= 2, `Expected at least 2 connections but got ${connectionCounter}`) + + // The failover counter should have incremented (unlike the rapid-close bug) + t.true( + transport.streamHandlerInvocationsWithNoConnection > 0, + `Expected failover counter > 0, got ${transport.streamHandlerInvocationsWithNoConnection}`, + ) + t.true( + failoverCounterValues.some((v) => v > 0), + `Expected at least one url() call with counter > 0, got: [${failoverCounterValues}]`, + ) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +) + +test.serial( + 'EA recovers from rapid reconnect loop after server stabilizes', + async (t) => { + const base = 'ETH' + const quote = 'DOGE' + const recoveredPrice = 99999 + const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000 + const DROP_COUNT = 3 + + mockWebSocketProvider(WebSocketClassProvider) + const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false }) + let connectionCounter = 0 + let dropConnections = false + + mockWsServer.on('connection', (socket) => { + connectionCounter++ + if (dropConnections && connectionCounter <= DROP_COUNT + 1) { + setTimeout(() => { + socket.close({ code: 1005, reason: '', wasClean: false }) + }, 100) + return + } + + const currentPrice = dropConnections ? recoveredPrice : price + socket.on('message', () => { + socket.send( + JSON.stringify({ + pair: `${base}/${quote}`, + value: currentPrice, + }), + ) + }) + }) + + const transport = new WebSocketTransport({ + url: () => ENDPOINT_URL_1005, + handlers: { + message(message) { + if (!message.pair) return [] + const [b, q] = message.pair.split('/') + return [ + { + params: { base: b, quote: q }, + response: { + data: { result: message.value }, + result: message.value, + timestamps: { providerIndicatedTimeUnixMs: Date.now() }, + }, + }, + ] + }, + }, + builders: { + subscribeMessage: (params) => `S:${params.base}/${params.quote}`, + unsubscribeMessage: (params) => ({ + request: 'unsubscribe', + pair: `${params.base}/${params.quote}`, + }), + }, + }) + + const webSocketEndpoint = new AdapterEndpoint({ + name: 'TEST', + transport, + inputParameters, + }) + + const config = new AdapterConfig( + {}, + { + envDefaultOverrides: { + BACKGROUND_EXECUTE_MS_WS, + WS_SUBSCRIPTION_TTL: 120_000, + WS_SUBSCRIPTION_UNRESPONSIVE_TTL, + CACHE_MAX_AGE: 120_000, + }, + }, + ) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + config, + endpoints: [webSocketEndpoint], + }) + + const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context) + + // Phase 1: normal operation -- verify prices flow + await testAdapter.startBackgroundExecuteThenGetResponse(t, { + requestData: { base, quote }, + expectedResponse: { + data: { result: price }, + result: price, + statusCode: 200, + }, + }) + t.is(connectionCounter, 1) + + // Phase 2: switch to dropping connections with 1005 + dropConnections = true + mockWsServer.clients().forEach((client) => { + client.close({ code: 1005, reason: '', wasClean: false }) + }) + + // Advance clock through enough reconnect cycles for drops to finish + // and the server to stabilize (DROP_COUNT=3 drops, then stable connection). + // Each cycle is ~5s, so 6 cycles (30s) is enough for 3 drops + recovery. + await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6) + + // The failover counter should have incremented during the drop phase + t.true( + transport.streamHandlerInvocationsWithNoConnection > 0, + `Expected failover counter > 0 after drops, got ${transport.streamHandlerInvocationsWithNoConnection}`, + ) + + // After server stabilizes, the subscribe message triggers a price response. + // With CACHE_MAX_AGE=120s, the recovered price should still be in cache. + const recoveredResponse = await testAdapter.request({ base, quote }) + t.is( + recoveredResponse.statusCode, + 200, + 'EA should return 200 after server stabilizes and prices flow again', + ) + const recoveredBody = JSON.parse(recoveredResponse.body as string) + t.is( + recoveredBody.result, + recoveredPrice, + `EA should serve the recovered price (${recoveredPrice})`, + ) + + testAdapter.api.close() + mockWsServer.close() + await t.context.clock.runToLastAsync() + }, +)