Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/transports/websocket.ts
Original file line number Diff line number Diff line change
Expand Up @@ -417,8 +417,16 @@ export class WebSocketTransport<
logger.info(
`The connection is unresponsive (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
)
// Filter out query params from the URL to avoid leaking sensitive data
// and prevent metric cardinality explosion
const filteredUrl = this.currentUrl.split('?')[0]
metrics
.get('wsConnectionFailoverCount')
.labels({ transport_name: this.name, url: filteredUrl })
.set(this.streamHandlerInvocationsWithNoConnection)
} else if (connectionClosed && this.wsConnection) {
this.streamHandlerInvocationsWithNoConnection += 1
logger.info(
`Connection was closed externally (last message ${timeSinceLastMessage}ms ago), incremented failover counter to ${this.streamHandlerInvocationsWithNoConnection}`,
)
const filteredUrl = this.currentUrl.split('?')[0]
metrics
.get('wsConnectionFailoverCount')
Expand Down
359 changes: 359 additions & 0 deletions test/transports/websocket.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1312,3 +1312,362 @@
mockWsServer.close()
await t.context.clock.runToLastAsync()
})

// ---------------------------------------------------------------------------
// Tests demonstrating the connectionOpenedAt reset bug (1005 reconnect loop)
// ---------------------------------------------------------------------------

// Separate URL to avoid mock-socket "already listening" conflicts with
// preceding tests that may not fully close their mock servers.
const ENDPOINT_URL_1005 = 'wss://test-ws-1005.com/asd'

test.serial(
'failover counter increments when connections are externally closed (1005)',
async (t) => {
const base = 'ETH'
const quote = 'DOGE'

// TTL is higher than BACKGROUND_EXECUTE_MS_WS so the connectionUnresponsive
// path alone would NOT trigger. The fix detects externally-closed connections
// and increments the counter via a separate code path.
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000

mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false })
let connectionCounter = 0
const failoverCounterValues: number[] = []

mockWsServer.on('connection', (socket) => {
connectionCounter++
// Simulate provider dropping connection with 1005 shortly after open
setTimeout(() => {
socket.close({ code: 1005, reason: '', wasClean: false })
}, 100)
})

const transport = new WebSocketTransport<WebSocketTypes>({
url: (_context, _desiredSubs, urlConfigFunctionParameters) => {
failoverCounterValues.push(
urlConfigFunctionParameters.streamHandlerInvocationsWithNoConnection,
)
return ENDPOINT_URL_1005
},
handlers: {
message(message) {
if (!message.pair) return []

Check failure on line 1357 in test/transports/websocket.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
const [b, q] = message.pair.split('/')
return [
{
params: { base: b, quote: q },
response: {
data: { result: message.value },
result: message.value,
timestamps: { providerIndicatedTimeUnixMs: Date.now() },
},
},
]
},
},
builders: {
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
unsubscribeMessage: (params) => ({
request: 'unsubscribe',
pair: `${params.base}/${params.quote}`,
}),
},
})

const webSocketEndpoint = new AdapterEndpoint({
name: 'TEST',
transport,
inputParameters,
})

const config = new AdapterConfig(
{},
{
envDefaultOverrides: {
BACKGROUND_EXECUTE_MS_WS,
WS_SUBSCRIPTION_TTL: 60_000,
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
},
},
)

const adapter = new Adapter({
name: 'TEST',
defaultEndpoint: 'test',
config,
endpoints: [webSocketEndpoint],
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

const error = await testAdapter.request({ base, quote })
t.is(error.statusCode, 504)

// Advance clock through multiple reconnect cycles
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6)

// Multiple reconnections should have occurred
t.true(connectionCounter >= 3, `Expected at least 3 reconnects but got ${connectionCounter}`)

// The failover counter should have incremented because externally-closed
// connections are now detected and counted, even though
// connectionUnresponsive never becomes true.
t.true(
transport.streamHandlerInvocationsWithNoConnection > 0,
`Expected failover counter > 0, got ${transport.streamHandlerInvocationsWithNoConnection}`,
)
t.true(
failoverCounterValues.some((v) => v > 0),
`Expected at least one url() call with counter > 0, got: [${failoverCounterValues}]`,
)

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
},
)

test.serial(
'failover counter increments for unresponsive-but-open connections (control test)',
async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000

mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false })
let connectionCounter = 0
const failoverCounterValues: number[] = []

// Server accepts connections but never sends data -- connection stays open
mockWsServer.on('connection', (socket) => {
connectionCounter++
socket.on('message', () => {
// Accept subscribe messages but don't send any data back
})
})

const transport = new WebSocketTransport<WebSocketTypes>({
url: (_context, _desiredSubs, urlConfigFunctionParameters) => {
failoverCounterValues.push(
urlConfigFunctionParameters.streamHandlerInvocationsWithNoConnection,
)
return ENDPOINT_URL_1005
},
handlers: {
message(message) {
if (!message.pair) return []

Check failure on line 1462 in test/transports/websocket.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
const [b, q] = message.pair.split('/')
return [
{
params: { base: b, quote: q },
response: {
data: { result: message.value },
result: message.value,
timestamps: { providerIndicatedTimeUnixMs: Date.now() },
},
},
]
},
},
builders: {
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
unsubscribeMessage: (params) => ({
request: 'unsubscribe',
pair: `${params.base}/${params.quote}`,
}),
},
})

const webSocketEndpoint = new AdapterEndpoint({
name: 'TEST',
transport,
inputParameters,
})

const config = new AdapterConfig(
{},
{
envDefaultOverrides: {
BACKGROUND_EXECUTE_MS_WS,
WS_SUBSCRIPTION_TTL: 60_000,
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
},
},
)

const adapter = new Adapter({
name: 'TEST',
defaultEndpoint: 'test',
config,
endpoints: [webSocketEndpoint],
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

const error = await testAdapter.request({ base, quote })
t.is(error.statusCode, 504)

// Advance clock past WS_SUBSCRIPTION_UNRESPONSIVE_TTL so the open-but-
// silent connection is detected as unresponsive. Need 3+ cycles (15s+)
// because the check is strictly-greater-than (>) the TTL (10s).
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 4 + 500)

// Connection was reopened after unresponsive detection
t.true(connectionCounter >= 2, `Expected at least 2 connections but got ${connectionCounter}`)

// The failover counter should have incremented (unlike the rapid-close bug)
t.true(
transport.streamHandlerInvocationsWithNoConnection > 0,
`Expected failover counter > 0, got ${transport.streamHandlerInvocationsWithNoConnection}`,
)
t.true(
failoverCounterValues.some((v) => v > 0),
`Expected at least one url() call with counter > 0, got: [${failoverCounterValues}]`,
)

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
},
)

test.serial(
'EA recovers from rapid reconnect loop after server stabilizes',
async (t) => {
const base = 'ETH'
const quote = 'DOGE'
const recoveredPrice = 99999
const WS_SUBSCRIPTION_UNRESPONSIVE_TTL = 10_000
const DROP_COUNT = 3

mockWebSocketProvider(WebSocketClassProvider)
const mockWsServer = new Server(ENDPOINT_URL_1005, { mock: false })
let connectionCounter = 0
let dropConnections = false

mockWsServer.on('connection', (socket) => {
connectionCounter++
if (dropConnections && connectionCounter <= DROP_COUNT + 1) {
setTimeout(() => {
socket.close({ code: 1005, reason: '', wasClean: false })
}, 100)
return
}

const currentPrice = dropConnections ? recoveredPrice : price
socket.on('message', () => {
socket.send(
JSON.stringify({
pair: `${base}/${quote}`,
value: currentPrice,
}),
)
})
})

const transport = new WebSocketTransport<WebSocketTypes>({
url: () => ENDPOINT_URL_1005,
handlers: {
message(message) {
if (!message.pair) return []

Check failure on line 1576 in test/transports/websocket.test.ts

View workflow job for this annotation

GitHub Actions / lint

Expected { after 'if' condition
const [b, q] = message.pair.split('/')
return [
{
params: { base: b, quote: q },
response: {
data: { result: message.value },
result: message.value,
timestamps: { providerIndicatedTimeUnixMs: Date.now() },
},
},
]
},
},
builders: {
subscribeMessage: (params) => `S:${params.base}/${params.quote}`,
unsubscribeMessage: (params) => ({
request: 'unsubscribe',
pair: `${params.base}/${params.quote}`,
}),
},
})

const webSocketEndpoint = new AdapterEndpoint({
name: 'TEST',
transport,
inputParameters,
})

const config = new AdapterConfig(
{},
{
envDefaultOverrides: {
BACKGROUND_EXECUTE_MS_WS,
WS_SUBSCRIPTION_TTL: 120_000,
WS_SUBSCRIPTION_UNRESPONSIVE_TTL,
CACHE_MAX_AGE: 120_000,
},
},
)

const adapter = new Adapter({
name: 'TEST',
defaultEndpoint: 'test',
config,
endpoints: [webSocketEndpoint],
})

const testAdapter = await TestAdapter.startWithMockedCache(adapter, t.context)

// Phase 1: normal operation -- verify prices flow
await testAdapter.startBackgroundExecuteThenGetResponse(t, {
requestData: { base, quote },
expectedResponse: {
data: { result: price },
result: price,
statusCode: 200,
},
})
t.is(connectionCounter, 1)

// Phase 2: switch to dropping connections with 1005
dropConnections = true
mockWsServer.clients().forEach((client) => {
client.close({ code: 1005, reason: '', wasClean: false })
})

// Advance clock through enough reconnect cycles for drops to finish
// and the server to stabilize (DROP_COUNT=3 drops, then stable connection).
// Each cycle is ~5s, so 6 cycles (30s) is enough for 3 drops + recovery.
await runAllUntilTime(t.context.clock, BACKGROUND_EXECUTE_MS_WS * 6)

// The failover counter should have incremented during the drop phase
t.true(
transport.streamHandlerInvocationsWithNoConnection > 0,
`Expected failover counter > 0 after drops, got ${transport.streamHandlerInvocationsWithNoConnection}`,
)

// After server stabilizes, the subscribe message triggers a price response.
// With CACHE_MAX_AGE=120s, the recovered price should still be in cache.
const recoveredResponse = await testAdapter.request({ base, quote })
t.is(
recoveredResponse.statusCode,
200,
'EA should return 200 after server stabilizes and prices flow again',
)
const recoveredBody = JSON.parse(recoveredResponse.body as string)
t.is(
recoveredBody.result,
recoveredPrice,
`EA should serve the recovered price (${recoveredPrice})`,
)

testAdapter.api.close()
mockWsServer.close()
await t.context.clock.runToLastAsync()
},
)
Loading