Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ yarn # Install yarn dependencies
- [Subscription](./docs/components/transport-types/subscription-transport.md)
- [Streaming](./docs/components/transport-types/streaming-transport.md)
- [Custom](./docs/components/transport-types/custom-transport.md)
- [Composite](./docs/components/transport-types/composite-transport.md)
- Guides
- [Porting a v2 EA to v3](./docs/guides/porting-a-v2-ea-to-v3.md)
- [Creating a new v3 EA](./docs/guides/creating-a-new-v3-ea.md)
Expand Down
55 changes: 55 additions & 0 deletions docs/components/transport-types/composite-transport.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
# Composite transport

`CompositeTransport` runs several child transports in parallel for the same endpoint and merges their writes into a single response cache. You choose when a newer value from any child should replace what is already cached by implementing `shouldUpdate`.

Typical uses:

- Combine a low-latency channel (for example WebSocket) with a REST fallback so the cache still updates if the stream lags or drops.
- Prefer one provider’s quote over another’s when both are active, using freshness, spread, or custom rules in `shouldUpdate`.

## How it works

1. **Initialization** — Each child transport is initialized with the same adapter dependencies, except `responseCache` is replaced by a `[CompareResponseCache](../../../src/cache/response-cache/compare.ts)` wrapper. That wrapper forwards reads to the real endpoint cache but filters writes: a write is applied only when `shouldUpdate(next, current)` is true for the pending value versus the last locally seen value for that cache key, and again versus the value already in the shared cache (so concurrent children do not blindly overwrite each other).
2. **Subscriptions** — `registerRequest` is invoked on every child in parallel, so each transport can register the request in its own subscription set or equivalent.
3. **Background execution** — `backgroundExecute` is invoked on every child in parallel. All children share the same merged cache policy via `shouldUpdate`.

Child transport names come from the keys of the `transports` object you pass in (for example `ws` and `rest`). Those names are passed to each child’s `initialize` as its `transportName`.

## Configuration

`CompositeTransport` is constructed with a `CompositeTransportConfig`:

| Field | Description |
| -------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `transports` | Record of named child `Transport` instances. All children must use the same `TransportGenerics` as the composite. |
| `shouldUpdate` | `(next, current?) => boolean`. Return `true` if `next` should replace `current` in the cache. `current` is `undefined` when there is no prior value for that key. |

The composite implements `Transport` but does not define `foregroundExecute`; behavior depends entirely on the children.

## Example

Two HTTP-style transports (here standing in for WS vs REST) both poll the same symbols. The cache keeps whichever result has the higher `result` field:

```typescript
import { CompositeTransport, HttpTransport } from '@chainlink/external-adapter-framework/transports'

const ws = new HttpTransport<EndpointTypes>({
/* ... */
})
const rest = new HttpTransport<EndpointTypes>({
/* ... */
})

const transport = new CompositeTransport<EndpointTypes>({
transports: { ws, rest },
shouldUpdate: (next, current) => (next?.result ?? 0) > (current?.result ?? 0),
})
```

Use the composite as the endpoint’s single `transport` in `AdapterEndpoint` (see `[test/transports/composite.test.ts](../../../test/transports/composite.test.ts)` for a full adapter-level example).

## Notes

- **Ordering** — Children run concurrently; which response arrives first is not guaranteed. `shouldUpdate` should encode your merge policy (for example “newer timestamp wins” or “always prefer stream unless stale”).
- **TTL** — TTL writes are forwarded to the underlying cache with the composite’s transport name; see `CompareResponseCache.writeTTL` if you rely on per-transport TTL behavior.
- **Errors** — Child transports still own parsing and error handling; the composite only decides whether successful cache entries from a child replace existing ones.
1 change: 1 addition & 0 deletions docs/components/transports.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ The v3 framework provides transports to fetch data from a Provider using the com
- [HTTP Transport](./transport-types/http-transport.md)
- [Websocket Transport](./transport-types/websocket-transport.md)
- [SSE Transport](./transport-types/sse-transport.md)
- [Composite Transport](./transport-types/composite-transport.md)
- [Custom Transport](./transport-types/custom-transport.md)

### Abstract Transports
Expand Down
4 changes: 2 additions & 2 deletions src/adapter/endpoint.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { ResponseCache } from '../cache/response'
import { SimpleResponseCache } from '../cache/response'
import { AdapterSettings } from '../config'
import { TransportRoutes } from '../transports'
import {
Expand Down Expand Up @@ -83,7 +83,7 @@ export class AdapterEndpoint<T extends EndpointGenerics> implements AdapterEndpo
adapterSettings: T['Settings'],
): Promise<void> {
this.adapterName = adapterName
const responseCache = new ResponseCache({
const responseCache = new SimpleResponseCache({
dependencies,
adapterSettings: adapterSettings as AdapterSettings,
adapterName,
Expand Down
170 changes: 170 additions & 0 deletions src/cache/response-cache/base.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
import { AdapterDependencies } from '../../adapter'
import { AdapterSettings } from '../../config'
import {
AdapterResponse,
makeLogger,
ResponseGenerics,
TimestampedAdapterResponse,
TimestampedProviderResult,
censor,
censorLogs,
TimestampedProviderErrorResponse,
} from '../../util'
import {
InputParameters,
InputParametersDefinition,
TypeFromDefinition,
} from '../../validation/input-params'
import { Cache, calculateAdapterName, calculateCacheKey, calculateFeedId } from '../'
import CensorList from '../../util/censor/censor-list'
import { validator } from '../../validation/utils'

const logger = makeLogger('ResponseCache')

export abstract class ResponseCache<
T extends { Parameters: InputParametersDefinition; Response: ResponseGenerics },
> {
cache: Cache<AdapterResponse<T['Response']>>
inputParameters: InputParameters<T['Parameters']>
adapterName: string
endpointName: string
adapterSettings: AdapterSettings
dependencies: AdapterDependencies

constructor({
inputParameters,
adapterName,
endpointName,
adapterSettings,
dependencies,
}: {
dependencies: AdapterDependencies
adapterSettings: AdapterSettings
adapterName: string
endpointName: string
inputParameters: InputParameters<T['Parameters']>
}) {
this.dependencies = dependencies
this.cache = dependencies.cache as Cache<AdapterResponse<T['Response']>>
this.inputParameters = inputParameters
this.adapterName = adapterName
this.endpointName = endpointName
this.adapterSettings = adapterSettings
}

/**
* Sets responses in the adapter cache (adding necessary metadata and defaults)
*
* @param transportName - transport name
* @param results - the entries to write to the cache
*/
abstract write(transportName: string, results: TimestampedProviderResult<T>[]): Promise<void>

/**
* Sets responses with metadata in the adapter cache
*
* @param entries - the entries to write to the cache
*/
abstract writeEntries(
entries: {
key: string
value: AdapterResponse<T['Response']>
}[],
): Promise<void>

/**
* Sets a new TTL value for already cached responses in the adapter cache
*
* @param transportName - transport name
* @param params - set of parameters that uniquely relate to the response
* @param ttl - a new time in milliseconds until the response expires
*/
async writeTTL(
transportName: string,
params: TypeFromDefinition<T['Parameters']>[],
ttl: number,
): Promise<void> {
for (const param of params) {
const key = this.getCacheKey(transportName, param)
this.cache.setTTL(key, ttl)
}
}

async get(key: string) {
return this.cache.get(key)
}

protected generateCacheEntry(
transportNameForMeta: string,
transportNameForCache: string,
r: TimestampedProviderResult<T>,
) {
const censorList = CensorList.getAll()
const { data, result, errorMessage } = r.response
if (!errorMessage && data === undefined) {
logger.warn('The "data" property of the response is undefined.')
} else if (!errorMessage && result === undefined) {
logger.warn('The "result" property of the response is undefined.')
}
let censoredResponse
if (!censorList.length) {
censoredResponse = r.response
} else {
try {
censoredResponse = censor(r.response, censorList, true) as TimestampedAdapterResponse<
T['Response']
>
} catch (error) {
censorLogs(() => logger.error(`Error censoring response: ${error}`))
censoredResponse = {
statusCode: 502,
errorMessage: 'Response could not be censored due to an error',
timestamps: r.response.timestamps,
}
}
}

const response: AdapterResponse<T['Response']> = {
...censoredResponse,
statusCode: (censoredResponse as TimestampedProviderErrorResponse).statusCode || 200,
}

if (this.adapterSettings.METRICS_ENABLED && this.adapterSettings.EXPERIMENTAL_METRICS_ENABLED) {
response.meta = {
adapterName: calculateAdapterName(this.adapterName, r.params),
transportName: transportNameForMeta,
metrics: {
feedId: calculateFeedId(
{
adapterSettings: this.adapterSettings,
},
r.params,
),
},
}
}

if (response.timestamps?.providerIndicatedTimeUnixMs !== undefined) {
const timestampValidator = validator.responseTimestamp()
const error = timestampValidator.fn(response.timestamps?.providerIndicatedTimeUnixMs)
if (error) {
censorLogs(() => logger.warn(`Provider indicated time is invalid: ${error}`))
}
}

return {
key: this.getCacheKey(transportNameForCache, r.params),
value: response,
} as const
}

getCacheKey(transportName: string, params: TypeFromDefinition<T['Parameters']>) {
return calculateCacheKey({
transportName,
data: params,
adapterName: this.adapterName,
endpointName: this.endpointName,
adapterSettings: this.adapterSettings,
})
}
}
88 changes: 88 additions & 0 deletions src/cache/response-cache/compare.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
import { ResponseCache } from './base'
import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util'
import { InputParametersDefinition, TypeFromDefinition } from '../../validation/input-params'

/**
* Compares with existing cache entries before deciding to write or not
*/
export class CompareResponseCache<
T extends {
Parameters: InputParametersDefinition
Response: ResponseGenerics
},
> extends ResponseCache<T> {
readonly transportName: string
// The actual cache where responses are written to
responseCache: ResponseCache<T>
// A local map to keep track of the most recent entries written to the responseCache
// We compare with this first before comparing with value in cache
// so that we can reduce cache reads
localCache: Map<string, AdapterResponse<T['Response']>>
// True if next should replace current in cache
shouldUpdate: (
next: AdapterResponse<T['Response']>,
current?: AdapterResponse<T['Response']>,
) => boolean

constructor(
transportName: string,
responseCache: ResponseCache<T>,
shouldUpdate: (
next: AdapterResponse<T['Response']>,
current?: AdapterResponse<T['Response']>,
) => boolean,
) {
super({
inputParameters: responseCache.inputParameters,
adapterName: responseCache.adapterName,
endpointName: responseCache.endpointName,
adapterSettings: responseCache.adapterSettings,
dependencies: responseCache.dependencies,
})
this.transportName = transportName
this.responseCache = responseCache
this.localCache = new Map()
this.shouldUpdate = shouldUpdate
}

async write(transportName: string, results: TimestampedProviderResult<T>[]): Promise<void> {
const entries: {
key: string
value: AdapterResponse<T['Response']>
}[] = []

for (const result of results) {
const { key, value } = this.generateCacheEntry(transportName, this.transportName, result)
if (!this.shouldUpdate(value, this.localCache.get(key))) {
continue
}
const entryInCache = await this.get(key)
if (!this.shouldUpdate(value, entryInCache)) {
continue
}
entries.push({ key, value })
}

await this.responseCache.writeEntries(entries)

entries.forEach(({ key, value }) => {
this.localCache.set(key, value)
})
}

async writeEntries() {
throw new Error('Use write instead for CompareResponseCache')
}

override async writeTTL(
_: string,
params: TypeFromDefinition<T['Parameters']>[],
ttl: number,
): Promise<void> {
await this.responseCache.writeTTL(this.transportName, params, ttl)
}

override async get(key: string) {
return this.responseCache.get(key)
}
}
46 changes: 46 additions & 0 deletions src/cache/response-cache/simple.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
import { ResponseCache } from './base'
import { AdapterResponse, ResponseGenerics, TimestampedProviderResult } from '../../util'
import { InputParametersDefinition } from '../../validation/input-params'
import * as cacheMetrics from '../metrics'

/**
* Special type of cache to store responses for this adapter.
*/
export class SimpleResponseCache<
T extends {
Parameters: InputParametersDefinition
Response: ResponseGenerics
},
> extends ResponseCache<T> {
async writeEntries(
entries: {
key: string
value: AdapterResponse<T['Response']>
}[],
): Promise<void> {
const ttl = this.adapterSettings.CACHE_MAX_AGE
await this.cache.setMany(entries, ttl)

const now = Date.now()
for (const { key, value } of entries) {
// Only record metrics if feed Id is present, otherwise assuming value is not adapter response to record
const response = value as unknown as AdapterResponse
const feedId = response.meta?.metrics?.feedId
if (feedId) {
const providerTime = response.timestamps?.providerIndicatedTimeUnixMs
const timeDelta = providerTime ? now - providerTime : undefined

// Record cache set count, max age, and staleness (set to 0 for cache set)
const label = cacheMetrics.cacheMetricsLabel(key, feedId, this.cache.type)
cacheMetrics.cacheSet(label, ttl, timeDelta)
}
}

return
}

async write(transportName: string, results: TimestampedProviderResult<T>[]): Promise<void> {
const entries = results.map((r) => this.generateCacheEntry(transportName, transportName, r))
await this.writeEntries(entries)
}
}
Loading
Loading