diff --git a/README.md b/README.md index 7e32839c..57d122a8 100644 --- a/README.md +++ b/README.md @@ -40,6 +40,7 @@ yarn # Install yarn dependencies - [Subscription](./docs/components/transport-types/subscription-transport.md) - [Streaming](./docs/components/transport-types/streaming-transport.md) - [Custom](./docs/components/transport-types/custom-transport.md) + - [Composite](./docs/components/transport-types/composite-transport.md) - Guides - [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md) - [Creating a new v3 EA](./docs/guides/creating-a-new-v3-ea.md) diff --git a/docs/components/transport-types/composite-transport.md b/docs/components/transport-types/composite-transport.md new file mode 100644 index 00000000..55790a7e --- /dev/null +++ b/docs/components/transport-types/composite-transport.md @@ -0,0 +1,55 @@ +# Composite transport + +`CompositeTransport` runs several child transports in parallel for the same endpoint and merges their writes into a single response cache. You choose when a newer value from any child should replace what is already cached by implementing `shouldUpdate`. + +Typical uses: + +- Combine a low-latency channel (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops. +- Prefer one provider’s quote over another’s when both are active, using freshness, spread, or custom rules in `shouldUpdate`. + +## How it works + +1. **Initialization** — Each child transport is initialized with the same adapter dependencies, except `responseCache` is replaced by a `[CompareResponseCache](../../../src/cache/response-cache/compare.ts)` wrapper. That wrapper forwards reads to the real endpoint cache but filters writes: a write is applied only when `shouldUpdate(next, current)` is true for the pending value versus the last locally seen value for that cache key, and again versus the value already in the shared cache (so concurrent children do not blindly overwrite each other). +2. **Subscriptions** — `registerRequest` is invoked on every child in parallel, so each transport can register the request in its own subscription set or equivalent. +3. **Background execution** — `backgroundExecute` is invoked on every child in parallel. All children share the same merged cache policy via `shouldUpdate`. + +Child transport names come from the keys of the `transports` object you pass in (for example `ws` and `rest`). Those names are passed to each child’s `initialize` as its `transportName`. + +## Configuration + +`CompositeTransport` is constructed with a `CompositeTransportConfig`: + +| Field | Description | +| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- | +| `transports` | Record of named child `Transport` instances. All children must use the same `TransportGenerics` as the composite. | +| `shouldUpdate` | `(next, current?) => boolean`. Return `true` if `next` should replace `current` in the cache. `current` is `undefined` when there is no prior value for that key. | + +The composite implements `Transport` but does not define `foregroundExecute`; behavior depends entirely on the children. + +## Example + +Two HTTP-style transports (here standing in for WS vs REST) both poll the same symbols. The cache keeps whichever result has the higher `result` field: + +```typescript +import { CompositeTransport, HttpTransport } from '@chainlink/external-adapter-framework/transports' + +const ws = new HttpTransport({ + /* ... */ +}) +const rest = new HttpTransport({ + /* ... */ +}) + +const transport = new CompositeTransport({ + transports: { ws, rest }, + shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0), +}) +``` + +Use the composite as the endpoint’s single `transport` in `AdapterEndpoint` (see `[test/transports/composite.test.ts](../../../test/transports/composite.test.ts)` for a full adapter-level example). + +## Notes + +- **Ordering** — Children run concurrently; which response arrives first is not guaranteed. `shouldUpdate` should encode your merge policy (for example “newer timestamp wins” or “always prefer stream unless stale”). +- **TTL** — TTL writes are forwarded to the underlying cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you rely on per-transport TTL behavior. +- **Errors** — Child transports still own parsing and error handling; the composite only decides whether successful cache entries from a child replace existing ones. diff --git a/docs/components/transports.md b/docs/components/transports.md index d236d8a2..d8599068 100644 --- a/docs/components/transports.md +++ b/docs/components/transports.md @@ -13,6 +13,7 @@ The v3 framework provides transports to fetch data from a Provider using the com - [HTTP Transport](./transport-types/http-transport.md) - [Websocket Transport](./transport-types/websocket-transport.md) - [SSE Transport](./transport-types/sse-transport.md) +- [Composite Transport](./transport-types/composite-transport.md) - [Custom Transport](./transport-types/custom-transport.md) ### Abstract Transports diff --git a/src/adapter/endpoint.ts b/src/adapter/endpoint.ts index e64ca34a..282201b5 100644 --- a/src/adapter/endpoint.ts +++ b/src/adapter/endpoint.ts @@ -1,4 +1,4 @@ -import { ResponseCache } from '../cache/response' +import { SimpleResponseCache } from '../cache/response' import { AdapterSettings } from '../config' import { TransportRoutes } from '../transports' import { @@ -83,7 +83,7 @@ export class AdapterEndpoint implements AdapterEndpo adapterSettings: T['Settings'], ): Promise { this.adapterName = adapterName - const responseCache = new ResponseCache({ + const responseCache = new SimpleResponseCache({ dependencies, adapterSettings: adapterSettings as AdapterSettings, adapterName, diff --git a/src/cache/response-cache/base.ts b/src/cache/response-cache/base.ts new file mode 100644 index 00000000..68d903aa --- /dev/null +++ b/src/cache/response-cache/base.ts @@ -0,0 +1,170 @@ +import { AdapterDependencies } from '../../adapter' +import { AdapterSettings } from '../../config' +import { + AdapterResponse, + makeLogger, + ResponseGenerics, + TimestampedAdapterResponse, + TimestampedProviderResult, + censor, + censorLogs, + TimestampedProviderErrorResponse, +} from '../../util' +import { + InputParameters, + InputParametersDefinition, + TypeFromDefinition, +} from '../../validation/input-params' +import { Cache, calculateAdapterName, calculateCacheKey, calculateFeedId } from '../' +import CensorList from '../../util/censor/censor-list' +import { validator } from '../../validation/utils' + +const logger = makeLogger('ResponseCache') + +export abstract class ResponseCache< + T extends { Parameters: InputParametersDefinition; Response: ResponseGenerics }, +> { + cache: Cache> + inputParameters: InputParameters + adapterName: string + endpointName: string + adapterSettings: AdapterSettings + dependencies: AdapterDependencies + + constructor({ + inputParameters, + adapterName, + endpointName, + adapterSettings, + dependencies, + }: { + dependencies: AdapterDependencies + adapterSettings: AdapterSettings + adapterName: string + endpointName: string + inputParameters: InputParameters + }) { + this.dependencies = dependencies + this.cache = dependencies.cache as Cache> + this.inputParameters = inputParameters + this.adapterName = adapterName + this.endpointName = endpointName + this.adapterSettings = adapterSettings + } + + /** + * Sets responses in the adapter cache (adding necessary metadata and defaults) + * + * @param transportName - transport name + * @param results - the entries to write to the cache + */ + abstract write(transportName: string, results: TimestampedProviderResult[]): Promise + + /** + * Sets responses with metadata in the adapter cache + * + * @param entries - the entries to write to the cache + */ + abstract writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise + + /** + * Sets a new TTL value for already cached responses in the adapter cache + * + * @param transportName - transport name + * @param params - set of parameters that uniquely relate to the response + * @param ttl - a new time in milliseconds until the response expires + */ + async writeTTL( + transportName: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + for (const param of params) { + const key = this.getCacheKey(transportName, param) + this.cache.setTTL(key, ttl) + } + } + + async get(key: string) { + return this.cache.get(key) + } + + protected generateCacheEntry( + transportNameForMeta: string, + transportNameForCache: string, + r: TimestampedProviderResult, + ) { + const censorList = CensorList.getAll() + const { data, result, errorMessage } = r.response + if (!errorMessage && data === undefined) { + logger.warn('The "data" property of the response is undefined.') + } else if (!errorMessage && result === undefined) { + logger.warn('The "result" property of the response is undefined.') + } + let censoredResponse + if (!censorList.length) { + censoredResponse = r.response + } else { + try { + censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< + T['Response'] + > + } catch (error) { + censorLogs(() => logger.error(`Error censoring response: ${error}`)) + censoredResponse = { + statusCode: 502, + errorMessage: 'Response could not be censored due to an error', + timestamps: r.response.timestamps, + } + } + } + + const response: AdapterResponse = { + ...censoredResponse, + statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, + } + + if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) { + response.meta = { + adapterName: calculateAdapterName(this.adapterName, r.params), + transportName: transportNameForMeta, + metrics: { + feedId: calculateFeedId( + { + adapterSettings: this.adapterSettings, + }, + r.params, + ), + }, + } + } + + if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { + const timestampValidator = validator.responseTimestamp() + const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) + if (error) { + censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) + } + } + + return { + key: this.getCacheKey(transportNameForCache, r.params), + value: response, + } as const + } + + getCacheKey(transportName: string, params: TypeFromDefinition) { + return calculateCacheKey({ + transportName, + data: params, + adapterName: this.adapterName, + endpointName: this.endpointName, + adapterSettings: this.adapterSettings, + }) + } +} diff --git a/src/cache/response-cache/compare.ts b/src/cache/response-cache/compare.ts new file mode 100644 index 00000000..84b618c8 --- /dev/null +++ b/src/cache/response-cache/compare.ts @@ -0,0 +1,88 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition, TypeFromDefinition } from '../../validation/input-params' + +/** + * Compares with existing cache entries before deciding to write or not + */ +export class CompareResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + readonly transportName: string + // The actual cache where responses are written to + responseCache: ResponseCache + // A local map to keep track of the most recent entries written to the responseCache + // We compare with this first before comparing with value in cache + // so that we can reduce cache reads + localCache: Map> + // True if next should replace current in cache + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean + + constructor( + transportName: string, + responseCache: ResponseCache, + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean, + ) { + super({ + inputParameters: responseCache.inputParameters, + adapterName: responseCache.adapterName, + endpointName: responseCache.endpointName, + adapterSettings: responseCache.adapterSettings, + dependencies: responseCache.dependencies, + }) + this.transportName = transportName + this.responseCache = responseCache + this.localCache = new Map() + this.shouldUpdate = shouldUpdate + } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries: { + key: string + value: AdapterResponse + }[] = [] + + for (const result of results) { + const { key, value } = this.generateCacheEntry(transportName, this.transportName, result) + if (!this.shouldUpdate(value, this.localCache.get(key))) { + continue + } + const entryInCache = await this.get(key) + if (!this.shouldUpdate(value, entryInCache)) { + continue + } + entries.push({ key, value }) + } + + await this.responseCache.writeEntries(entries) + + entries.forEach(({ key, value }) => { + this.localCache.set(key, value) + }) + } + + async writeEntries() { + throw new Error('Use write instead for CompareResponseCache') + } + + override async writeTTL( + _: string, + params: TypeFromDefinition[], + ttl: number, + ): Promise { + await this.responseCache.writeTTL(this.transportName, params, ttl) + } + + override async get(key: string) { + return this.responseCache.get(key) + } +} diff --git a/src/cache/response-cache/simple.ts b/src/cache/response-cache/simple.ts new file mode 100644 index 00000000..64e3249e --- /dev/null +++ b/src/cache/response-cache/simple.ts @@ -0,0 +1,46 @@ +import { ResponseCache } from './base' +import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util' +import { InputParametersDefinition } from '../../validation/input-params' +import * as cacheMetrics from '../metrics' + +/** + * Special type of cache to store responses for this adapter. + */ +export class SimpleResponseCache< + T extends { + Parameters: InputParametersDefinition + Response: ResponseGenerics + }, +> extends ResponseCache { + async writeEntries( + entries: { + key: string + value: AdapterResponse + }[], + ): Promise { + const ttl = this.adapterSettings.CACHE_MAX_AGE + await this.cache.setMany(entries, ttl) + + const now = Date.now() + for (const { key, value } of entries) { + // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record + const response = value as unknown as AdapterResponse + const feedId = response.meta?.metrics?.feedId + if (feedId) { + const providerTime = response.timestamps?.providerIndicatedTimeUnixMs + const timeDelta = providerTime ? now - providerTime : undefined + + // Record cache set count, max age, and staleness (set to 0 for cache set) + const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) + cacheMetrics.cacheSet(label, ttl, timeDelta) + } + } + + return + } + + async write(transportName: string, results: TimestampedProviderResult[]): Promise { + const entries = results.map((r) => this.generateCacheEntry(transportName, transportName, r)) + await this.writeEntries(entries) + } +} diff --git a/src/cache/response.ts b/src/cache/response.ts index 2fe961e4..e7294569 100644 --- a/src/cache/response.ts +++ b/src/cache/response.ts @@ -1,179 +1,3 @@ -import { AdapterDependencies } from '../adapter' -import { AdapterSettings } from '../config' -import { - AdapterResponse, - ResponseGenerics, - TimestampedAdapterResponse, - TimestampedProviderErrorResponse, - TimestampedProviderResult, - censor, - censorLogs, - makeLogger, -} from '../util' -import CensorList from '../util/censor/censor-list' -import { - InputParameters, - InputParametersDefinition, - TypeFromDefinition, -} from '../validation/input-params' -import { validator } from '../validation/utils' -import { Cache, calculateCacheKey, calculateFeedId, calculateAdapterName } from './' -import * as cacheMetrics from './metrics' - -const logger = makeLogger('ResponseCache') - -/** - * Special type of cache to store responses for this adapter. - */ -export class ResponseCache< - T extends { - Parameters: InputParametersDefinition - Response: ResponseGenerics - }, -> { - cache: Cache> - inputParameters: InputParameters - adapterName: string - endpointName: string - adapterSettings: AdapterSettings - - constructor({ - inputParameters, - adapterName, - endpointName, - adapterSettings, - dependencies, - }: { - dependencies: AdapterDependencies - adapterSettings: AdapterSettings - adapterName: string - endpointName: string - inputParameters: InputParameters - }) { - this.cache = dependencies.cache as Cache> - this.inputParameters = inputParameters - this.adapterName = adapterName - this.endpointName = endpointName - this.adapterSettings = adapterSettings - } - - /** - * Sets responses in the adapter cache (adding necessary metadata and defaults) - * - * @param results - the entries to write to the cache - */ - async write(transportName: string, results: TimestampedProviderResult[]): Promise { - const censorList = CensorList.getAll() - const entries = results.map((r) => { - const { data, result, errorMessage } = r.response - if (!errorMessage && data === undefined) { - logger.warn('The "data" property of the response is undefined.') - } else if (!errorMessage && result === undefined) { - logger.warn('The "result" property of the response is undefined.') - } - let censoredResponse - if (!censorList.length) { - censoredResponse = r.response - } else { - try { - censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse< - T['Response'] - > - } catch (error) { - censorLogs(() => logger.error(`Error censoring response: ${error}`)) - censoredResponse = { - statusCode: 502, - errorMessage: 'Response could not be censored due to an error', - timestamps: r.response.timestamps, - } - } - } - - const response: AdapterResponse = { - ...censoredResponse, - statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200, - } - - if ( - this.adapterSettings.METRICS_ENABLED && - this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED - ) { - response.meta = { - adapterName: calculateAdapterName(this.adapterName, r.params), - metrics: { - feedId: calculateFeedId( - { - adapterSettings: this.adapterSettings, - }, - r.params, - ), - }, - } - } - - if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) { - const timestampValidator = validator.responseTimestamp() - const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs) - if (error) { - censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`)) - } - } - - return { - key: calculateCacheKey({ - transportName, - data: r.params, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }), - value: response, - } as const - }) - - const ttl = this.adapterSettings.CACHE_MAX_AGE - await this.cache.setMany(entries, ttl) - - const now = Date.now() - for (const { key, value } of entries) { - // Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record - const response = value as unknown as AdapterResponse - const feedId = response.meta?.metrics?.feedId - if (feedId) { - const providerTime = response.timestamps?.providerIndicatedTimeUnixMs - const timeDelta = providerTime ? now - providerTime : undefined - - // Record cache set count, max age, and staleness (set to 0 for cache set) - const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type) - cacheMetrics.cacheSet(label, ttl, timeDelta) - } - } - - return - } - - /** - * Sets a new TTL value for already cached responses in the adapter cache - * - * @param transportName - transport name - * @param params - set of parameters that uniquely relate to the response - * @param ttl - a new time in milliseconds until the response expires - */ - async writeTTL( - transportName: string, - params: TypeFromDefinition[], - ttl: number, - ): Promise { - for (const param of params) { - const key = calculateCacheKey({ - transportName: transportName, - data: param, - adapterName: this.adapterName, - endpointName: this.endpointName, - adapterSettings: this.adapterSettings, - }) - - this.cache.setTTL(key, ttl) - } - } -} +export { ResponseCache } from './response-cache/base' +export { SimpleResponseCache } from './response-cache/simple' +export { CompareResponseCache } from './response-cache/compare' diff --git a/src/transports/composite.ts b/src/transports/composite.ts new file mode 100644 index 00000000..d5228847 --- /dev/null +++ b/src/transports/composite.ts @@ -0,0 +1,87 @@ +import { EndpointContext } from '../adapter' +import { CompareResponseCache } from '../cache/response-cache/compare' +import { ResponseCache } from '../cache/response' +import { makeLogger } from '../util' +import { AdapterRequest, AdapterResponse } from '../util/types' +import { TypeFromDefinition } from '../validation/input-params' +import type { Transport, TransportDependencies, TransportGenerics } from '.' + +const logger = makeLogger('CompositeTransport') + +export type CompositeTransportConfig = { + transports: Record> + + /** + * @param next - the next response to be written to the cache + * @param current - the current response in the cache + * @returns true if next should replace current in cache + */ + shouldUpdate: ( + next: AdapterResponse, + current?: AdapterResponse, + ) => boolean +} + +// Send requests to multiple transports and merge responses into a single cache according to shouldUpdate +export class CompositeTransport implements Transport { + name!: string + responseCache!: ResponseCache + private transports: Transport[] = [] + + constructor(private readonly config: CompositeTransportConfig) {} + + async initialize( + dependencies: TransportDependencies, + adapterSettings: T['Settings'], + endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + + const compareCache = new CompareResponseCache( + transportName, + this.responseCache, + this.config.shouldUpdate, + ) + + await Promise.all( + Object.entries(this.config.transports).map(([name, transport]) => + transport.initialize( + { ...dependencies, responseCache: compareCache }, + adapterSettings, + endpointName, + name, + ), + ), + ) + + this.transports = Object.values(this.config.transports) + } + + async registerRequest( + req: AdapterRequest>, + adapterSettings: T['Settings'], + ): Promise { + const results = await Promise.allSettled( + this.transports.map((t) => t.registerRequest?.(req, adapterSettings)), + ) + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport registerRequest failed: ${r.reason}`) + }) + } + + async backgroundExecute(context: EndpointContext): Promise { + const results = await Promise.allSettled( + this.transports.map((t) => t.backgroundExecute?.(context)), + ) + + results + .filter((r) => r.status === 'rejected') + .forEach((r) => { + logger.error(`Transport backgroundExecute failed: ${r.reason}`) + }) + } +} diff --git a/src/transports/index.ts b/src/transports/index.ts index ffb8a6e4..4ef80e99 100644 --- a/src/transports/index.ts +++ b/src/transports/index.ts @@ -7,6 +7,7 @@ import { InputParametersDefinition, TypeFromDefinition } from '../validation/inp export * from './http' export * from './sse' export * from './websocket' +export * from './composite' /** * Helper struct type that will be used to pass types to the generic parameters of a Transport. diff --git a/src/util/types.ts b/src/util/types.ts index d2d64b63..88bb48f8 100644 --- a/src/util/types.ts +++ b/src/util/types.ts @@ -86,6 +86,8 @@ export interface AdapterRequestMeta { export interface AdapterResponseMeta extends AdapterRequestMeta { /** Name of the adapter */ adapterName: string + /** Name of the transport */ + transportName: string } /** diff --git a/test/cache/response-cache/compare.test.ts b/test/cache/response-cache/compare.test.ts new file mode 100644 index 00000000..161ab252 --- /dev/null +++ b/test/cache/response-cache/compare.test.ts @@ -0,0 +1,104 @@ +import test from 'ava' +import { LocalCache } from '../../../src/cache/local' +import { CompareResponseCache } from '../../../src/cache/response-cache/compare' +import { SimpleResponseCache } from '../../../src/cache/response-cache/simple' +import { AdapterConfig } from '../../../src/config' +import { LoggerFactoryProvider } from '../../../src/util/logger' +import { InputParameters } from '../../../src/validation' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' +import { AdapterDependencies } from '../../../src/adapter' +import { metrics } from '../../../src/metrics' + +test.before(() => { + LoggerFactoryProvider.set() + metrics.initialize() +}) + +const buildSimpleCache = () => { + const config = new AdapterConfig({}) + config.initialize() + config.settings.METRICS_ENABLED = true + config.settings.EXPERIMENTAL_METRICS_ENABLED = true + config.validate() + + return new SimpleResponseCache({ + dependencies: { cache: new LocalCache(100) } as unknown as AdapterDependencies, + adapterSettings: config.settings, + adapterName: 'TEST', + endpointName: 'test', + inputParameters: new InputParameters(cacheTestInputParameters.definition), + }) +} + +const providerResult = (params: { base: string; factor: number }, result: number) => ({ + params, + response: { + data: null, + result, + timestamps: { + providerDataRequestedUnixMs: 0, + providerDataReceivedUnixMs: 0, + providerIndicatedTimeUnixMs: undefined, + }, + }, +}) + +test('writes under CompareResponseCache transportName', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 42)]) + + t.is(await compareCache.get(compareCache.getCacheKey('ws', params)), undefined) + + const entry = await compareCache.get(compareCache.getCacheKey('merged', params)) + t.is(entry?.result, 42) + t.is(entry?.meta?.transportName, 'ws') +}) + +test('second write override first write', async (t) => { + const compareCache = new CompareResponseCache('merged', buildSimpleCache(), () => true) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('ws', [providerResult(params, 1), providerResult(params, 2)]) + + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 2) +}) + +test('shouldUpdate can block write when new value is not fresher than cache', async (t) => { + const compareCache = new CompareResponseCache( + 'merged', + buildSimpleCache(), + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await compareCache.write('merged', [providerResult(params, 50)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 50) + t.is(compareCache.localCache.size, 1) +}) + +test('shouldUpdate can block write without old value in localCache', async (t) => { + const simpleCache = buildSimpleCache() + + const compareCache = new CompareResponseCache( + 'merged', + simpleCache, + (next, current) => (next?.result || 0) > (current?.result || 0), + ) + + const params = { base: 'ETH', factor: 1 } + + await simpleCache.write('merged', [providerResult(params, 100)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + + await compareCache.write('merged', [providerResult(params, 25)]) + t.is((await compareCache.get(compareCache.getCacheKey('merged', params)))?.result, 100) + t.is(compareCache.localCache.size, 0) +}) diff --git a/test/cache/response-cache.test.ts b/test/cache/response-cache/simple.test.ts similarity index 95% rename from test/cache/response-cache.test.ts rename to test/cache/response-cache/simple.test.ts index b01ae581..9fda037e 100644 --- a/test/cache/response-cache.test.ts +++ b/test/cache/response-cache/simple.test.ts @@ -1,18 +1,18 @@ import { Clock as InstalledClock } from '@sinonjs/fake-timers' -import { installTimers } from '../helper' +import { installTimers } from '../../helper' import untypedTest, { TestFn } from 'ava' import { FastifyInstance } from 'fastify' -import { Adapter, AdapterEndpoint } from '../../src/adapter' -import { AdapterConfig, SettingsDefinitionFromConfig } from '../../src/config' -import { AdapterRequest } from '../../src/util' -import { TypeFromDefinition } from '../../src/validation/input-params' +import { Adapter, AdapterEndpoint } from '../../../src/adapter' +import { AdapterConfig, SettingsDefinitionFromConfig } from '../../../src/config' +import { AdapterRequest } from '../../../src/util' +import { TypeFromDefinition } from '../../../src/validation/input-params' import { NopTransport, TestAdapter, assertEqualResponses, runAllUntilTime, -} from '../../src/util/testing-utils' -import { cacheTestInputParameters, CacheTestTransportTypes } from './helper' +} from '../../../src/util/testing-utils' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../helper' const test = untypedTest as TestFn<{ clock: InstalledClock diff --git a/test/metrics/metrics.test.ts b/test/metrics/metrics.test.ts index ef3d7399..6b9730ca 100644 --- a/test/metrics/metrics.test.ts +++ b/test/metrics/metrics.test.ts @@ -332,6 +332,7 @@ test.serial('validate response.meta has the correct properties', async (t) => { t.deepEqual(response.meta, { adapterName: 'TEST', metrics: { feedId: '{"from":"eth","to":"usd"}' }, + transportName: 'default_single_transport', }) }) diff --git a/test/transports/composite.test.ts b/test/transports/composite.test.ts new file mode 100644 index 00000000..47681605 --- /dev/null +++ b/test/transports/composite.test.ts @@ -0,0 +1,222 @@ +import { installTimers } from '../helper' +import untypedTest, { TestFn } from 'ava' +import axios, { AxiosResponse } from 'axios' +import MockAdapter from 'axios-mock-adapter' +import { FastifyInstance } from 'fastify' +import { Adapter, AdapterEndpoint, EndpointContext } from '../../src/adapter' +import { CompositeTransport } from '../../src/transports/composite' +import { + HttpTransport, + Transport, + TransportDependencies, + TransportGenerics, +} from '../../src/transports' +import { ResponseCache } from '../../src/cache/response' +import { AdapterRequest } from '../../src/util/types' +import { TestAdapter } from '../../src/util/testing-utils' +import { TypeFromDefinition } from '../../src/validation/input-params' +import { cacheTestInputParameters, CacheTestTransportTypes } from '../cache/helper' + +const test = untypedTest as TestFn<{ + clock: ReturnType + testAdapter: TestAdapter + api: FastifyInstance | undefined + ws: CountingCacheHttpTransport + rest: CountingCacheHttpTransport +}> + +process.env['CACHE_POLLING_MAX_RETRIES'] = '20' +process.env['CACHE_POLLING_SLEEP_MS'] = '10' +process.env['RETRY'] = '0' +process.env['BACKGROUND_EXECUTE_MS_HTTP'] = '1' +process.env['API_TIMEOUT'] = '0' + +const WS_PROVIDER = 'http://ea-composite-ws.test' +const REST_PROVIDER = 'http://ea-composite-rest.test' + +const axiosMock = new MockAdapter(axios) + +type CacheTestHttpTypes = CacheTestTransportTypes & { + Provider: { + RequestBody: unknown + ResponseBody: { result: number } + } +} + +class CountingCacheHttpTransport extends HttpTransport { + registerRequestCalls = 0 + + constructor(logicalName: string, baseURL: string) { + super({ + prepareRequests: (params) => + params.map((p) => ({ + params: [p], + request: { + baseURL, + url: '/price', + method: 'GET', + params: { base: p.base, factor: p.factor }, + }, + })), + parseResponse: (params, res: AxiosResponse<{ result: number }>) => + params.map((p) => ({ + params: p, + response: { + data: null, + result: res.data.result, + }, + })), + }) + this.name = logicalName + } + + override async registerRequest( + req: AdapterRequest>, + settings: CacheTestHttpTypes['Settings'], + ): Promise { + this.registerRequestCalls++ + return super.registerRequest(req, settings) + } +} + +test.before(async (t) => { + t.context.clock = installTimers() + + const ws = new CountingCacheHttpTransport('ws', WS_PROVIDER) + const rest = new CountingCacheHttpTransport('rest', REST_PROVIDER) + t.context.ws = ws + t.context.rest = rest + + const composite = new CompositeTransport({ + transports: { ws, rest }, + shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), + }) + + const adapter = new Adapter({ + name: 'TEST', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + transport: composite, + }), + ], + }) + + await TestAdapter.start(adapter, t.context) +}) + +test.after(async (t) => { + await t.context.testAdapter?.api.close() +}) + +test.afterEach((t) => { + t.context.ws.registerRequestCalls = 0 + t.context.rest.registerRequestCalls = 0 +}) + +test.serial( + 'composite transport returns value from working transport when one transport fails to produce a value', + async (t) => { + axiosMock.onGet(`${WS_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }).reply(500) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'ETH', factor: 5 } }) + .reply(200, { result: 42 }) + + const res = await t.context.testAdapter.request({ base: 'ETH', factor: 5 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 42) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + +test.serial( + 'composite transport merges child writes using shouldUpdate when run under an adapter', + async (t) => { + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 10 }) + axiosMock + .onGet(`${REST_PROVIDER}/price`, { params: { base: 'BTC', factor: 3 } }) + .reply(200, { result: 100 }) + + t.is(t.context.ws.name, 'ws') + t.is(t.context.rest.name, 'rest') + + const res = await t.context.testAdapter.request({ base: 'BTC', factor: 3 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 100) + t.is(t.context.ws.registerRequestCalls, 1) + t.is(t.context.rest.registerRequestCalls, 1) + }, +) + +class ThrowingTransport implements Transport { + name!: string + responseCache!: ResponseCache + + async initialize( + dependencies: TransportDependencies, + _adapterSettings: T['Settings'], + _endpointName: string, + transportName: string, + ): Promise { + this.name = transportName + this.responseCache = dependencies.responseCache + } + + async registerRequest( + _req: AdapterRequest>, + _adapterSettings: T['Settings'], + ): Promise { + throw new Error('ThrowingTransport.registerRequest intentional error') + } + + async backgroundExecute(_context: EndpointContext): Promise { + throw new Error('ThrowingTransport.backgroundExecute intentional error') + } +} + +test.serial( + 'composite transport returns value from working transport when the other transport throws in registerRequest and backgroundExecute', + async (t) => { + const workingTransport = new CountingCacheHttpTransport('working', WS_PROVIDER) + const throwingTransport = new ThrowingTransport() + + const composite = new CompositeTransport({ + transports: { working: workingTransport, throwing: throwingTransport }, + shouldUpdate: (next, current) => !current || (next?.result ?? 0) > (current?.result ?? 0), + }) + + const adapter = new Adapter({ + name: 'TEST_THROWING', + defaultEndpoint: 'test', + endpoints: [ + new AdapterEndpoint({ + name: 'test', + inputParameters: cacheTestInputParameters, + transport: composite, + }), + ], + }) + + const localContext = { clock: t.context.clock } as typeof t.context + const localAdapter = await TestAdapter.start(adapter, localContext) + + axiosMock + .onGet(`${WS_PROVIDER}/price`, { params: { base: 'LINK', factor: 2 } }) + .reply(200, { result: 77 }) + + const res = await localAdapter.request({ base: 'LINK', factor: 2 }) + + t.is(res.statusCode, 200) + t.is(res.json().result, 77) + t.is(workingTransport.registerRequestCalls, 1) + + await localAdapter.api.close() + }, +)