diff --git a/.changeset/beige-timers-design.md b/.changeset/beige-timers-design.md new file mode 100644 index 0000000000..8b31cfadbc --- /dev/null +++ b/.changeset/beige-timers-design.md @@ -0,0 +1,5 @@ +--- +'@chainlink/coinpaprika-state-adapter': major +--- + +Add coinpaprika-state adapter for real-time state price streaming diff --git a/.pnp.cjs b/.pnp.cjs index c150a358ae..ca2c6da4f9 100644 --- a/.pnp.cjs +++ b/.pnp.cjs @@ -370,6 +370,10 @@ const RAW_RUNTIME_STATE = "name": "@chainlink/coinpaprika-adapter",\ "reference": "workspace:packages/sources/coinpaprika"\ },\ + {\ + "name": "@chainlink/coinpaprika-state-adapter",\ + "reference": "workspace:packages/sources/coinpaprika-state"\ + },\ {\ "name": "@chainlink/coinranking-adapter",\ "reference": "workspace:packages/sources/coinranking"\ @@ -1014,6 +1018,7 @@ const RAW_RUNTIME_STATE = ["@chainlink/coinmetrics-adapter", ["workspace:packages/sources/coinmetrics"]],\ ["@chainlink/coinmetrics-lwba-adapter", ["workspace:packages/sources/coinmetrics-lwba"]],\ ["@chainlink/coinpaprika-adapter", ["workspace:packages/sources/coinpaprika"]],\ + ["@chainlink/coinpaprika-state-adapter", ["workspace:packages/sources/coinpaprika-state"]],\ ["@chainlink/coinranking-adapter", ["workspace:packages/sources/coinranking"]],\ ["@chainlink/covid-tracker-adapter", ["workspace:packages/sources/covid-tracker"]],\ ["@chainlink/cryptex-adapter", ["workspace:packages/sources/cryptex"]],\ @@ -6067,6 +6072,21 @@ const RAW_RUNTIME_STATE = "linkType": "SOFT"\ }]\ ]],\ + ["@chainlink/coinpaprika-state-adapter", [\ + ["workspace:packages/sources/coinpaprika-state", {\ + "packageLocation": "./packages/sources/coinpaprika-state/",\ + "packageDependencies": [\ + ["@chainlink/coinpaprika-state-adapter", "workspace:packages/sources/coinpaprika-state"],\ + ["@chainlink/external-adapter-framework", "npm:2.8.0"],\ + ["@types/jest", "npm:29.5.14"],\ + ["@types/node", "npm:22.14.1"],\ + ["nock", "npm:13.5.6"],\ + ["tslib", "npm:2.4.1"],\ + ["typescript", "patch:typescript@npm%3A5.8.3#optional!builtin::version=5.8.3&hash=5786d5"]\ + ],\ + "linkType": "SOFT"\ + }]\ + ]],\ ["@chainlink/coinranking-adapter", [\ ["workspace:packages/sources/coinranking", {\ "packageLocation": "./packages/sources/coinranking/",\ diff --git a/packages/sources/coinpaprika-state/CHANGELOG.md b/packages/sources/coinpaprika-state/CHANGELOG.md new file mode 100644 index 0000000000..e69de29bb2 diff --git a/packages/sources/coinpaprika-state/README.md b/packages/sources/coinpaprika-state/README.md new file mode 100644 index 0000000000..207cab4488 --- /dev/null +++ b/packages/sources/coinpaprika-state/README.md @@ -0,0 +1,82 @@ +# COINPAPRIKA_STATE + +![2.7.0](https://img.shields.io/github/package-json/v/smartcontractkit/external-adapters-js?filename=packages/sources/coinpaprika-state/package.json) ![v3](https://img.shields.io/badge/framework%20version-v3-blueviolet) + +This document was generated automatically. Please see [Input Parameters](#Input-Parameters) for a list of environments variables and [Schemas](#Schemas) for additional examples. + +## Environment Variables + +| Required? | Name | Description | Type | Options | Default | +| :-------: | :---------------------: | :---------------------------------------------------------------------------------------: | :----: | :-----: | :------------------------------------------: | +| ✅ | `API_KEY` | An API key for Coinpaprika | string | | | +| | `API_ENDPOINT` | An API endpoint for Coinpaprika | string | | `https://chainlink-streaming.dexpaprika.com` | +| | `BACKGROUND_EXECUTE_MS` | The amount of time the background execute should sleep before performing the next request | number | | `3000` | +| | `REQUEST_TIMEOUT_MS` | Timeout for HTTP requests to the provider in milliseconds | number | | `60000` | +| | `RECONNECT_DELAY_MS` | Base delay for reconnection attempts in milliseconds | number | | `5000` | + +--- + +## Input Parameters + +Every EA supports base input parameters from [this list](https://github.com/smartcontractkit/ea-framework-js/blob/main/src/config/index.ts) + +| Required? | Name | Description | Type | Options | Default | +| :-------: | :------: | :-----------------: | :----: | :-----: | :-----: | +| | endpoint | The endpoint to use | string | | | + +## Coinpaprika-state Endpoint + +`coinpaprika-state` is the only supported name for this endpoint. + +### Input Params + +| Required? | Name | Aliases | Description | Type | Options | Default | Depends On | Not Valid With | +| :-------: | :-----: | :------------: | :--------------------------------------: | :----: | :-----: | :-----: | :--------: | :------------: | +| ✅ | `base` | `coin`, `from` | The symbol of the currency to query | string | | | | | +| ✅ | `quote` | `market`, `to` | The symbol of the currency to convert to | string | | | | | + +### Example + +Request: + +```json +{ + "id": "1", + "data": { + "base": "LUSD", + "quote": "USD" + }, + "debug": { + "cacheKey": "YlEjKJJLVmjXzFKQjFjVtKmQWlM=" + } +} +``` + +Response: + +```json +{ + "jobRunID": "1", + "data": { + "result": 1.000979, + "timestamp": 1758888503 + }, + "result": 1.000979, + "statusCode": 200, + "timestamps": { + "providerDataRequestedUnixMs": 1758888508939, + "providerDataReceivedUnixMs": 1758888508939, + "providerIndicatedTimeUnixMs": 1758888503000 + } +} +``` + +--- + +## Known Issues + +See [known-issues.md](./known-issues.md) for detailed information about streaming data, connection management, and error handling. + +--- + +MIT License diff --git a/packages/sources/coinpaprika-state/package.json b/packages/sources/coinpaprika-state/package.json new file mode 100644 index 0000000000..b1c9d077b2 --- /dev/null +++ b/packages/sources/coinpaprika-state/package.json @@ -0,0 +1,40 @@ +{ + "name": "@chainlink/coinpaprika-state-adapter", + "version": "0.0.0", + "description": "Chainlink coinpaprika-state adapter.", + "keywords": [ + "Chainlink", + "LINK", + "blockchain", + "oracle", + "coinpaprika-state" + ], + "main": "dist/index.js", + "types": "dist/index.d.ts", + "files": [ + "dist" + ], + "repository": { + "url": "https://github.com/smartcontractkit/external-adapters-js", + "type": "git" + }, + "license": "MIT", + "scripts": { + "clean": "rm -rf dist && rm -f tsconfig.tsbuildinfo", + "prepack": "yarn build", + "build": "tsc -b", + "server": "node -e 'require(\"./index.js\").server()'", + "server:dist": "node -e 'require(\"./dist/index.js\").server()'", + "start": "yarn server:dist" + }, + "devDependencies": { + "@types/jest": "^29.5.14", + "@types/node": "22.14.1", + "nock": "13.5.6", + "typescript": "5.8.3" + }, + "dependencies": { + "@chainlink/external-adapter-framework": "2.8.0", + "tslib": "2.4.1" + } +} diff --git a/packages/sources/coinpaprika-state/src/config/index.ts b/packages/sources/coinpaprika-state/src/config/index.ts new file mode 100644 index 0000000000..617d64b8ee --- /dev/null +++ b/packages/sources/coinpaprika-state/src/config/index.ts @@ -0,0 +1,26 @@ +import { AdapterConfig } from '@chainlink/external-adapter-framework/config' + +export const config = new AdapterConfig({ + API_KEY: { + description: 'An API key for Coinpaprika', + type: 'string', + required: true, + sensitive: true, + }, + API_ENDPOINT: { + description: 'An API endpoint for Coinpaprika', + type: 'string', + default: 'https://chainlink-streaming.dexpaprika.com', + }, + BACKGROUND_EXECUTE_MS: { + description: + 'The amount of time the background execute should sleep before performing the next request', + type: 'number', + default: 3_000, + }, + REQUEST_TIMEOUT_MS: { + description: 'Timeout for HTTP requests to the provider in milliseconds', + type: 'number', + default: 60_000, + }, +}) diff --git a/packages/sources/coinpaprika-state/src/endpoint/coinpaprika-state.ts b/packages/sources/coinpaprika-state/src/endpoint/coinpaprika-state.ts new file mode 100644 index 0000000000..6eb459b579 --- /dev/null +++ b/packages/sources/coinpaprika-state/src/endpoint/coinpaprika-state.ts @@ -0,0 +1,35 @@ +import { + PriceEndpoint, + priceEndpointInputParametersDefinition, +} from '@chainlink/external-adapter-framework/adapter' +import { SingleNumberResultResponse } from '@chainlink/external-adapter-framework/util' +import { InputParameters } from '@chainlink/external-adapter-framework/validation' +import { config } from '../config' +import { coinpaprikaSubscriptionTransport } from '../transport/coinpaprika-state' + +export const inputParameters = new InputParameters(priceEndpointInputParametersDefinition, [ + { + base: 'LUSD', + quote: 'USD', + }, +]) + +export type BaseEndpointTypes = { + Parameters: typeof inputParameters.definition + Response: SingleNumberResultResponse + Settings: typeof config.settings +} + +export const endpoint = new PriceEndpoint({ + name: 'price', + aliases: ['coinpaprika-state', 'state'], + transport: coinpaprikaSubscriptionTransport, + inputParameters, + requestTransforms: [ + (req) => { + req.requestContext.data.base = req.requestContext.data.base.toUpperCase() + req.requestContext.data.quote = req.requestContext.data.quote.toUpperCase() + return req + }, + ], +}) diff --git a/packages/sources/coinpaprika-state/src/endpoint/index.ts b/packages/sources/coinpaprika-state/src/endpoint/index.ts new file mode 100644 index 0000000000..8bfc4cab68 --- /dev/null +++ b/packages/sources/coinpaprika-state/src/endpoint/index.ts @@ -0,0 +1 @@ +export { endpoint as coinpaprikaState } from './coinpaprika-state' diff --git a/packages/sources/coinpaprika-state/src/index.ts b/packages/sources/coinpaprika-state/src/index.ts new file mode 100644 index 0000000000..2b9186148a --- /dev/null +++ b/packages/sources/coinpaprika-state/src/index.ts @@ -0,0 +1,13 @@ +import { expose, ServerInstance } from '@chainlink/external-adapter-framework' +import { PriceAdapter } from '@chainlink/external-adapter-framework/adapter' +import { config } from './config' +import { coinpaprikaState } from './endpoint' + +export const adapter = new PriceAdapter({ + defaultEndpoint: coinpaprikaState.name, + name: 'COINPAPRIKA_STATE', + config, + endpoints: [coinpaprikaState], +}) + +export const server = (): Promise => expose(adapter) diff --git a/packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts b/packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts new file mode 100644 index 0000000000..e7ee5d25f0 --- /dev/null +++ b/packages/sources/coinpaprika-state/src/transport/coinpaprika-state.ts @@ -0,0 +1,190 @@ +import { EndpointContext } from '@chainlink/external-adapter-framework/adapter' +import { ResponseCache } from '@chainlink/external-adapter-framework/cache/response' +import { TransportDependencies } from '@chainlink/external-adapter-framework/transports' +import { + StreamingTransport, + SubscriptionDeltas, +} from '@chainlink/external-adapter-framework/transports/abstract/streaming' +import { makeLogger, sleep } from '@chainlink/external-adapter-framework/util' +import { Requester } from '@chainlink/external-adapter-framework/util/requester' +import { TypeFromDefinition } from '@chainlink/external-adapter-framework/validation/input-params' +import { BaseEndpointTypes, inputParameters } from '../endpoint/coinpaprika-state' +import { + SSEConnectionCallbacks, + SSEConnectionConfig, + SSEConnectionManager, +} from './sse-connection-manager' + +const logger = makeLogger('CoinpaprikaStateTransport') + +export const COINPAPRIKA_STATE_EVENT_TYPE = 't_s' + +type RequestParams = typeof inputParameters.validated + +// Normal stream data (numbers may come as strings from API) +interface CoinpaprikaStreamData { + block_time: string | number + base_token_symbol: string + quote_symbol: string + volume_7d_usd: string | number + market_depth_plus_1_usd: string | number + market_depth_minus_1_usd: string | number + state_price: string | number +} + +export type TransportTypes = BaseEndpointTypes & { + Provider: { + RequestBody: Array<{ base: string; quote: string }> + ResponseBody: CoinpaprikaStreamData + } +} + +/** + * SSE transport that batches all pairs into one POST connection + */ +export class CoinpaprikaStateTransport extends StreamingTransport { + name!: string + responseCache!: ResponseCache + requester!: Requester + + private connectionManager!: SSEConnectionManager + + async initialize( + dependencies: TransportDependencies, + adapterSettings: TransportTypes['Settings'], + endpointName: string, + transportName: string, + ): Promise { + await super.initialize(dependencies, adapterSettings, endpointName, transportName) + this.requester = dependencies.requester + this.connectionManager = new SSEConnectionManager(this.requester, COINPAPRIKA_STATE_EVENT_TYPE) + } + + async streamHandler( + context: EndpointContext, + subscriptions: SubscriptionDeltas>, + ): Promise { + if ( + subscriptions.new.length || + subscriptions.stale.length || + !this.connectionManager.connected + ) { + logger.debug(`Updating stream: ${subscriptions.desired.length} pairs`) + await this.updateConnection(context, subscriptions.desired) + } + await sleep(context.adapterSettings.BACKGROUND_EXECUTE_MS) + } + + private async updateConnection( + context: EndpointContext, + pairs: RequestParams[], + ): Promise { + logger.debug(`updateConnection called with ${pairs.length} pairs`) + + if (pairs.length === 0 && this.connectionManager.connected) { + logger.debug('No pairs, disconnecting') + // TODO: This could potentially block if the disconnect hangs + await this.connectionManager.disconnect() + return + } + + const config: SSEConnectionConfig = { + apiEndpoint: context.adapterSettings.API_ENDPOINT, + apiKey: context.adapterSettings.API_KEY, + requestTimeoutMs: context.adapterSettings.REQUEST_TIMEOUT_MS, + } + + const callbacks: SSEConnectionCallbacks = { + onData: async (eventType: string, data: string) => { + await this.handleSSEEvent(eventType, data) + }, + onError: (error: Error) => { + logger.error(`SSE stream error: ${error.message}`) + }, + onConnectionError: async (status: number) => { + logger.error(`SSE connection error with status: ${status}`) + }, + onReconnectNeeded: () => { + logger.info('SSE ended; will reconnect on next cycle') + }, + } + + await this.connectionManager.connect(pairs, config, callbacks) + } + + getSubscriptionTtlFromConfig(adapterSettings: TransportTypes['Settings']): number { + return adapterSettings.WARMUP_SUBSCRIPTION_TTL + } + + private async handleSSEEvent(eventType: string, rawData: string): Promise { + // Handle explicit error events (stream-level errors) + if (eventType === 'error') { + // Parse base and quote from raw data + const data = JSON.parse(rawData) + const match = data.message.match(/unsupported (\w+)-(\w+) asset/) + if (!match) { + logger.warn(`Got unexpected error: ${rawData}`) + return + } + const [base, quote] = match.slice(1) + logger.warn(`Received error for ${base}/${quote}: ${data.message}`) + await this.responseCache.write(this.name, [ + { + params: { base, quote }, + response: { + statusCode: 400, + errorMessage: data.message, + timestamps: { + providerDataRequestedUnixMs: Date.now(), + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: undefined, + }, + }, + }, + ]) + return + } + + if (eventType !== COINPAPRIKA_STATE_EVENT_TYPE) { + logger.debug(`Skipping event type: ${eventType}`) + return + } + + // Parse incoming event + const streamData: CoinpaprikaStreamData = JSON.parse(rawData) + const param = { + base: streamData.base_token_symbol.toUpperCase(), + quote: streamData.quote_symbol.toUpperCase(), + } + const statePrice = Number(streamData.state_price) + const blockTime = Number(streamData.block_time) + + // guard against bad payloads + if (!Number.isFinite(statePrice) || !Number.isFinite(blockTime) || blockTime <= 0) { + logger.warn( + `Bad numeric fields for ${param.base}/${param.quote}: ${JSON.stringify(streamData)}`, + ) + return + } + + logger.debug(`tick ${param.base}/${param.quote}=${statePrice} t=${blockTime}`) + await this.responseCache.write(this.name, [ + { + params: param, + response: { + result: statePrice, + data: { + result: statePrice, + }, + timestamps: { + providerDataRequestedUnixMs: Date.now(), + providerDataReceivedUnixMs: Date.now(), + providerIndicatedTimeUnixMs: blockTime * 1000, + }, + }, + }, + ]) + } +} + +export const coinpaprikaSubscriptionTransport = new CoinpaprikaStateTransport() diff --git a/packages/sources/coinpaprika-state/src/transport/sse-connection-manager.ts b/packages/sources/coinpaprika-state/src/transport/sse-connection-manager.ts new file mode 100644 index 0000000000..2d6875d4e5 --- /dev/null +++ b/packages/sources/coinpaprika-state/src/transport/sse-connection-manager.ts @@ -0,0 +1,178 @@ +import { makeLogger } from '@chainlink/external-adapter-framework/util' +import { Requester } from '@chainlink/external-adapter-framework/util/requester' +import { Readable } from 'node:stream' +import { SSEParser } from './sse' + +const logger = makeLogger('SSEConnectionManager') + +export interface SSEConnectionCallbacks { + onData: (eventType: string, data: string) => Promise + onError: (error: Error) => void + onConnectionError: (status: number) => Promise + onReconnectNeeded: () => void +} + +export interface SSEConnectionConfig { + apiEndpoint: string + apiKey: string + requestTimeoutMs: number +} + +export interface PairRequest { + base: string + quote: string +} + +/** + * SSE connection manager for Coinpaprika + */ +export class SSEConnectionManager { + private isConnected = false + private currentAbortController: AbortController | null = null + private sseParser!: SSEParser + private requester: Requester + private defaultEventType: string + + constructor(requester: Requester, defaultEventType: string) { + this.requester = requester + this.defaultEventType = defaultEventType + } + + get connected(): boolean { + return this.isConnected + } + + async connect( + pairs: PairRequest[], + config: SSEConnectionConfig, + callbacks: SSEConnectionCallbacks, + ): Promise { + if (this.isConnected) { + logger.debug('Already connected, disconnecting first') + await this.disconnect() + } + + if (pairs.length === 0) { + logger.debug('No pairs to stream') + return + } + + logger.debug( + `Opening SSE connection for ${pairs.length} pairs: ${pairs + .map((p) => `${p.base}/${p.quote}`) + .join(', ')}`, + ) + + this.currentAbortController = new AbortController() + + try { + const request = this.buildRequest(pairs, config, this.currentAbortController.signal) + const key = `coinpaprika-state/stream:${pairs.map((p) => `${p.base}/${p.quote}`).join(',')}` + const { response } = await this.requester.request(key, request) + + if (response.status !== 200) { + await callbacks.onConnectionError(response.status) + this.disconnect() + return + } + + if (!response.data) { + logger.error('No data in HTTP 200 response') + await callbacks.onConnectionError(response.status) + return + } + + await this.setupStream(response.data as Readable, callbacks) + this.isConnected = true + logger.debug('SSE connection established') + } catch (error) { + this.cleanup() + logger.error(`Failed to create SSE connection: ${error}`) + callbacks.onError(error as Error) + } + } + + async disconnect(): Promise { + if (this.currentAbortController) { + logger.debug('Closing SSE connection') + } + this.cleanup() + } + + private cleanup(): void { + this.currentAbortController?.abort() + this.currentAbortController = null + this.isConnected = false + this.sseParser?.reset() + } + + private buildRequest( + pairs: PairRequest[], + config: SSEConnectionConfig, + signal: AbortSignal, + ): { + url: string + method: string + headers: Record + data: PairRequest[] + responseType: 'stream' + signal: AbortSignal + timeout: number + validateStatus: () => boolean + } { + return { + url: `${config.apiEndpoint}/stream`, + method: 'POST', + headers: { + Authorization: config.apiKey, + 'Content-Type': 'application/json', + Accept: 'text/event-stream', + }, + data: pairs, + responseType: 'stream', + signal, + timeout: config.requestTimeoutMs, + validateStatus: () => true, + } + } + + private async setupStream(stream: Readable, callbacks: SSEConnectionCallbacks): Promise { + let aborted = false + this.sseParser = new SSEParser(this.defaultEventType, (eventType, data) => { + callbacks.onData(eventType, data).catch((err) => { + logger.error(`Error in SSE data callback: ${err}`) + callbacks.onError(err) + }) + }) + + const onData = (chunk: Buffer) => { + const raw = chunk.toString('utf8') + logger.debug(`Raw SSE message received:\n${raw}`) + this.sseParser.push(raw) + } + + const onError = (err: Error) => { + if (err.name === 'CanceledError' || err.name === 'AbortError') { + aborted = true + } else { + logger.error(`Stream error: ${err}`) + callbacks.onError(err) + } + } + + const onEnd = () => { + stream.off('data', onData) + stream.off('error', onError) + this.cleanup() + + if (!aborted) { + logger.info('SSE ended unexpectedly') + callbacks.onReconnectNeeded() + } + } + + stream.on('data', onData) + stream.once('error', onError) + stream.once('end', onEnd) + } +} diff --git a/packages/sources/coinpaprika-state/src/transport/sse.ts b/packages/sources/coinpaprika-state/src/transport/sse.ts new file mode 100644 index 0000000000..b783cf641d --- /dev/null +++ b/packages/sources/coinpaprika-state/src/transport/sse.ts @@ -0,0 +1,54 @@ +/** + * SSE parser for chunked lines and event emission + */ +export class SSEParser { + private buffer = '' + private dataLines: string[] = [] + private currentEvent: string | null = null + private readonly defaultEvent: string + private readonly onEvent: (eventType: string, data: string) => void + + constructor(defaultEvent: string, onEvent: (eventType: string, data: string) => void) { + this.defaultEvent = defaultEvent + this.onEvent = onEvent + } + + /** + * Push a chunk and emit parsed events. + */ + push(chunk: string): void { + this.buffer += chunk + const lines = this.buffer.split('\n') + this.buffer = lines.pop() || '' + + for (const rawLine of lines) { + const line = rawLine.replace(/\r$/, '') + + if (line.startsWith(':')) continue + + if (line.startsWith('event:')) { + this.currentEvent = line.slice(6).trim() + continue + } + if (line.startsWith('data:')) { + this.dataLines.push(line.slice(5).trim()) + continue + } + + // end of an event + if (line.trim() === '' && this.dataLines.length > 0) { + const rawData = this.dataLines.join('\n') + this.dataLines = [] + const evt = this.currentEvent ?? this.defaultEvent + this.currentEvent = null + this.onEvent(evt, rawData) + } + } + } + + reset(): void { + this.buffer = '' + this.dataLines = [] + this.currentEvent = null + } +} diff --git a/packages/sources/coinpaprika-state/test-payload.json b/packages/sources/coinpaprika-state/test-payload.json new file mode 100644 index 0000000000..e0615a2670 --- /dev/null +++ b/packages/sources/coinpaprika-state/test-payload.json @@ -0,0 +1,16 @@ +{ + "requests": [ + { + "base": "LUSD", + "quote": "USD" + }, + { + "base": "EURA", + "quote": "USD" + }, + { + "base": "ALETH", + "quote": "USD" + } + ] +} \ No newline at end of file diff --git a/packages/sources/coinpaprika-state/test/integration/adapter.test.ts b/packages/sources/coinpaprika-state/test/integration/adapter.test.ts new file mode 100644 index 0000000000..2a7d6f9650 --- /dev/null +++ b/packages/sources/coinpaprika-state/test/integration/adapter.test.ts @@ -0,0 +1,323 @@ +import { + TestAdapter, + setEnvVariables, +} from '@chainlink/external-adapter-framework/util/testing-utils' +import nock from 'nock' +import { + endSSEStream, + mockStreamPost, + mockStreamPostAnyBody, + mockStreamPostRawAnyBody, + mockStreamPostRawMatchingBody, + sseEventChunk, + waitFor, +} from './fixtures' + +jest.setTimeout(10000) + +describe('coinpaprika-state adapter', () => { + let testAdapter: TestAdapter + let oldEnv: NodeJS.ProcessEnv + let outSpy: jest.SpyInstance, errSpy: jest.SpyInstance + + beforeAll(async () => { + outSpy = jest.spyOn(process.stdout, 'write').mockImplementation(() => true) + errSpy = jest.spyOn(process.stderr, 'write').mockImplementation(() => true) + + process.env.LOG_LEVEL = 'silent' + process.env.EA_LOG_LEVEL = 'silent' + process.env.PINO_LOG_LEVEL = 'silent' + process.env.METRICS_ENABLED = 'false' + process.env.RETRY = '0' + + oldEnv = JSON.parse(JSON.stringify(process.env)) + process.env.API_KEY = 'TEST-KEY' + process.env.API_ENDPOINT = 'http://localhost:1234' + process.env.BACKGROUND_EXECUTE_MS = '100' + + const adapter = (await import('../../src')).adapter + adapter.rateLimiting = undefined + testAdapter = await TestAdapter.startWithMockedCache(adapter, { + testAdapter: {} as TestAdapter, + }) + }) + + afterAll(async () => { + await testAdapter.api.close() + setEnvVariables(oldEnv) + outSpy.mockRestore() + errSpy.mockRestore() + }) + + afterEach(() => { + // Clean up nock after each test to prevent interference + nock.cleanAll() + }) + + it('happy path: streams ticks and serves latest state_price for LUSD/USD', async () => { + const { scope, stream } = mockStreamPost({ + apiBase: 'http://localhost:1234', + pairs: [{ base: 'LUSD', quote: 'USD' }], + events: [ + { + block_time: 1756224311, + base_token_symbol: 'LUSD', + quote_symbol: 'USD', + state_price: 1.0005, + volume_7d_usd: 1234.56, + market_depth_plus_1_usd: 0, + market_depth_minus_1_usd: 0, + }, + { + block_time: 1756224313, + base_token_symbol: 'LUSD', + quote_symbol: 'USD', + state_price: 1.0007, + volume_7d_usd: 2222, + market_depth_plus_1_usd: 0, + market_depth_minus_1_usd: 0, + }, + ], + }) + + await waitFor(async () => { + const r = await testAdapter.request({ base: 'LUSD', quote: 'USD' }) + expect(r.statusCode).toBe(200) + expect(r.json().result).toBeCloseTo(1.0007, 3) + expect(r.json().timestamps.providerIndicatedTimeUnixMs).toBe(1756224313000) + }) + + endSSEStream(stream) + scope.done() + }) + + it('returns 504 when no data available', async () => { + const scope = nock('http://localhost:1234') + .post('/stream') + .matchHeader('authorization', 'TEST-KEY') + .reply(200, () => ':heartbeat\n\n', { 'Content-Type': 'text/event-stream' }) + .persist() + + await new Promise((resolve) => setTimeout(resolve, 300)) + + const response = await testAdapter.request({ + base: 'UNKNOWN_X', + quote: 'USD', + }) + + expect(response.statusCode).toBe(504) + scope.persist(false) + nock.cleanAll() + }) + + it('401 conection error response causes 504 return', async () => { + const scope = nock('http://localhost:1234') + .post('/stream') + .matchHeader('authorization', 'INVALID-KEY') + .reply(401, { error: 'api key verification has failed' }) + .persist() + + await new Promise((resolve) => setTimeout(resolve, 300)) + + const response = await testAdapter.request({ + base: 'ETH401', + quote: 'USD', + }) + + expect(response.statusCode).toBe(504) + scope.persist(false) + nock.cleanAll() + }) + + it('400 connection error response causes 504 return', async () => { + const scope = nock('http://localhost:1234') + .post('/stream') + .reply(400, { error: 'bad' }) + .persist() + await new Promise((r) => setTimeout(r, 200)) + const res = await testAdapter.request({ base: 'ERR400', quote: 'USD' }) + expect(res.statusCode).toBe(504) + scope.persist(false) + nock.cleanAll() + }) + + it('multi-pair batching: caches all pairs independently', async () => { + const { scope, stream } = mockStreamPostAnyBody({ + apiBase: 'http://localhost:1234', + events: [ + { block_time: 10, base_token_symbol: 'LUSD', quote_symbol: 'USD', state_price: '1.01' }, + { block_time: 12, base_token_symbol: 'EURA', quote_symbol: 'USD', state_price: '1.02' }, + { block_time: 14, base_token_symbol: 'LUSD', quote_symbol: 'USD', state_price: '1.03' }, + ], + }) + + void testAdapter.request({ base: 'LUSD', quote: 'USD' }) + void testAdapter.request({ base: 'EURA', quote: 'USD' }) + + await waitFor(async () => { + const r1 = await testAdapter.request({ base: 'LUSD', quote: 'USD' }) + const r2 = await testAdapter.request({ base: 'EURA', quote: 'USD' }) + expect(r1.statusCode).toBe(200) + expect(r2.statusCode).toBe(200) + expect(r1.json().result).toBeCloseTo(1.03) + expect(r2.json().result).toBeCloseTo(1.02) + }) + + endSSEStream(stream) + scope.done() + }) + + it('pair-set change triggers reconnect immediately (closes existing stream)', async () => { + const BASE1 = 'LUSD_A' + const BASE2 = 'EURA_A' + + const { scope: s1, stream: st1 } = mockStreamPostRawAnyBody({ + apiBase: 'http://localhost:1234', + chunks: [ + sseEventChunk({ + block_time: 20, + base_token_symbol: BASE1, + quote_symbol: 'USD', + state_price: 1.0, + }), + ], + }) + + void testAdapter.request({ base: BASE1, quote: 'USD' }) + await waitFor(async () => { + const r = await testAdapter.request({ base: BASE1, quote: 'USD' }) + expect(r.statusCode).toBe(200) + expect(r.json().result).toBeCloseTo(1.0, 3) + }) + + const { scope: s2, stream: st2 } = mockStreamPostAnyBody({ + apiBase: 'http://localhost:1234', + events: [ + { + block_time: 22, + base_token_symbol: BASE2, + quote_symbol: 'USD', + state_price: 1.11, + }, + ], + }) + + void testAdapter.request({ base: BASE2, quote: 'USD' }) + await waitFor(async () => { + const r = await testAdapter.request({ base: BASE2, quote: 'USD' }) + expect(r.statusCode).toBe(200) + expect(r.json().result).toBeCloseTo(1.11, 3) + }) + + endSSEStream(st2) + endSSEStream(st1) + s1.done() + s2.done() + }) + + it('ignores events for unknown pairs and non-t_s event types', async () => { + const { scope, stream } = mockStreamPostRawAnyBody({ + apiBase: 'http://localhost:1234', + chunks: [ + sseEventChunk({ + block_time: 30, + base_token_symbol: 'BTC', + quote_symbol: 'JPY', + state_price: 9000000, + }), + sseEventChunk({ ping: true }, 'heartbeat'), + ], + }) + + const r = await testAdapter.request({ base: 'XYZ', quote: 'USD' }) + expect(r.statusCode).toBe(504) + + endSSEStream(stream) + scope.done() + }) + + it('malformed JSON is skipped, keeping last good value only', async () => { + const BASE = 'LUSD2' + const QUOTE = 'USD' + + const { scope, stream } = mockStreamPostRawMatchingBody({ + apiBase: 'http://localhost:1234', + chunks: [ + sseEventChunk('{not-json}'), + sseEventChunk({ + block_time: 40, + base_token_symbol: BASE, + quote_symbol: QUOTE, + state_price: '1.2', + }), + ], + matchBody: (body) => + Array.isArray(body) && + body.some((p: { base?: string; quote?: string }) => p?.base === BASE && p?.quote === QUOTE), + }) + + void testAdapter.request({ base: BASE, quote: QUOTE }) + + await waitFor(async () => { + expect(scope.isDone()).toBe(true) + }) + + await waitFor(async () => { + const r = await testAdapter.request({ base: BASE, quote: QUOTE }) + expect(r.statusCode).toBe(200) + expect(r.json().result).toBeCloseTo(1.2) + }) + + endSSEStream(stream) + }) + + it('string to number coercion works for state_price and block_time', async () => { + const BASE = 'COERCE_B' + const { scope, stream } = mockStreamPostAnyBody({ + apiBase: 'http://localhost:1234', + events: [ + { + block_time: '12345', + base_token_symbol: BASE, + quote_symbol: 'USD', + state_price: '1.2345', + }, + ], + }) + scope.persist() + + void testAdapter.request({ base: BASE, quote: 'USD' }) + await new Promise((r) => setTimeout(r, 200)) + + await waitFor(async () => { + const r = await testAdapter.request({ base: BASE, quote: 'USD' }) + expect(r.statusCode).toBe(200) + expect(r.json().result).toBeCloseTo(1.2345, 4) + expect(r.json().timestamps.providerIndicatedTimeUnixMs).toBe(12345000) + }, 10_000) + + await waitFor(async () => { + expect(scope.isDone()).toBe(true) + }) + + endSSEStream(stream) + scope.persist(false) + }) + + it('error stream message causes request to return 400 for unsupported asset', async () => { + const { scope, stream } = mockStreamPostRawAnyBody({ + apiBase: 'http://localhost:1234', + chunks: ['data: {"message":"unsupported CBBTC-USD asset"}\nevent: error\n\n'], + }) + scope.persist() + + void testAdapter.request({ base: 'CBBTC', quote: 'USD' }) + await new Promise((r) => setTimeout(r, 300)) + + const response = await testAdapter.request({ base: 'CBBTC', quote: 'USD' }) + expect(response.statusCode).toBe(400) + + endSSEStream(stream) + scope.persist(false) + }) +}) diff --git a/packages/sources/coinpaprika-state/test/integration/fixtures.ts b/packages/sources/coinpaprika-state/test/integration/fixtures.ts new file mode 100644 index 0000000000..5aaf0d27a0 --- /dev/null +++ b/packages/sources/coinpaprika-state/test/integration/fixtures.ts @@ -0,0 +1,184 @@ +import nock from 'nock' +import { PassThrough } from 'stream' + +export const sseEventChunk = (payload: object | string, event = 't_s') => { + const dataStr = typeof payload === 'string' ? payload : JSON.stringify(payload) + return `event: ${event}\n` + `data: ${dataStr}\n\n` +} + +const makeRawSSEStream = (chunks: string[]) => { + const stream = new PassThrough() + for (const ch of chunks) stream.write(ch) + return stream +} + +const makeSSEStream = (events: object[]) => { + return makeRawSSEStream(events.map((ev) => sseEventChunk(ev))) +} + +export const endSSEStream = (stream?: PassThrough) => { + if (!stream) return + try { + stream.end() + } catch { + // Ignore errors when ending stream + } + try { + stream.destroy() + } catch { + // Ignore errors when destroying stream + } +} + +export const mockStreamPost = ({ + apiBase, + pairs, + events, + authHeader = 'TEST-KEY', +}: { + apiBase: string + pairs: Array<{ base: string; quote: string }> + events: object[] + authHeader?: string +}) => { + const scope = nock(apiBase, { + reqheaders: { + authorization: (v) => v === authHeader, + accept: (v) => (v || '').toLowerCase().includes('text/event-stream'), + 'content-type': (v) => (v || '').toLowerCase().includes('application/json'), + }, + }) + + const stream = makeSSEStream(events) + + scope + .post('/stream', (body) => Array.isArray(body) && body.length === pairs.length) + .reply(200, () => stream, { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + }) + + return { scope, stream } +} + +export const mockStreamPostAnyBody = ({ + apiBase, + events, + authHeader = 'TEST-KEY', +}: { + apiBase: string + events: object[] + authHeader?: string +}) => { + const scope = nock(apiBase, { + reqheaders: { + authorization: (v) => v === authHeader, + accept: (v) => (v || '').toLowerCase().includes('text/event-stream'), + 'content-type': (v) => (v || '').toLowerCase().includes('application/json'), + }, + }) + + const stream = makeSSEStream(events) + + scope.post('/stream').reply(200, () => stream, { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + }) + + return { scope, stream } +} + +export const mockStreamPostRawAnyBody = ({ + apiBase, + chunks, + authHeader = 'TEST-KEY', +}: { + apiBase: string + chunks: string[] + authHeader?: string +}) => { + const scope = nock(apiBase, { + reqheaders: { + authorization: (v) => v === authHeader, + accept: (v) => (v || '').toLowerCase().includes('text/event-stream'), + 'content-type': (v) => (v || '').toLowerCase().includes('application/json'), + }, + }) + + const stream = makeRawSSEStream(chunks) + + scope.post('/stream').reply(200, () => stream, { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + }) + + return { scope, stream } +} + +export const waitFor = async ( + fn: () => Promise, + timeoutMs = 5000, + stepMs = 150, +): Promise => { + const start = Date.now() + let lastErr: unknown + // eslint-disable-next-line no-constant-condition + while (true) { + try { + return await fn() + } catch (e) { + lastErr = e + if (Date.now() - start > timeoutMs) throw lastErr + await new Promise((r) => setTimeout(r, stepMs)) + } + } +} + +export const mockStreamPostRawMatchingBody = ({ + apiBase, + chunks, + matchBody, + authHeader = 'TEST-KEY', + firstDelayMs = 10, +}: { + apiBase: string + chunks: string[] + matchBody: (body: unknown) => boolean + authHeader?: string + firstDelayMs?: number +}) => { + const scope = nock(apiBase, { + reqheaders: { + authorization: (v) => v === authHeader, + accept: (v) => (v || '').toLowerCase().includes('text/event-stream'), + 'content-type': (v) => (v || '').toLowerCase().includes('application/json'), + }, + }) + const stream = new PassThrough() + setTimeout(() => { + for (const ch of chunks) stream.write(ch) + }, firstDelayMs) + + scope + .post('/stream', (body) => { + try { + return matchBody(body) + } catch { + return false + } + }) + .reply(200, () => stream, { + 'Content-Type': 'text/event-stream', + Connection: 'keep-alive', + 'Cache-Control': 'no-cache', + 'Transfer-Encoding': 'chunked', + }) + + return { scope, stream } +} diff --git a/packages/sources/coinpaprika-state/test/unit/sseparser-simple.test.ts b/packages/sources/coinpaprika-state/test/unit/sseparser-simple.test.ts new file mode 100644 index 0000000000..8272fe4e7f --- /dev/null +++ b/packages/sources/coinpaprika-state/test/unit/sseparser-simple.test.ts @@ -0,0 +1,42 @@ +import { SSEParser } from '../../src/transport/sse' + +describe('SSEParser - Coinpaprika State Messages', () => { + let parser: SSEParser + let mockOnEvent: jest.Mock + + beforeEach(() => { + mockOnEvent = jest.fn() + parser = new SSEParser('t_s', mockOnEvent) + }) + + it('should parse standard coinpaprika-state message', () => { + const chunk = + 'data: {"block_time":1761192245,"send_timestamp":1761192248,"base_token_symbol":"LUSD","quote_symbol":"USD","volume_7d_usd":152457.672726,"market_depth_plus_1_usd":46067.488432,"market_depth_minus_1_usd":57854.860019,"state_price":1.001045}\nevent: t_s\n\n' + + parser.push(chunk) + + expect(mockOnEvent).toHaveBeenCalledTimes(1) + expect(mockOnEvent).toHaveBeenCalledWith( + 't_s', + '{"block_time":1761192245,"send_timestamp":1761192248,"base_token_symbol":"LUSD","quote_symbol":"USD","volume_7d_usd":152457.672726,"market_depth_plus_1_usd":46067.488432,"market_depth_minus_1_usd":57854.860019,"state_price":1.001045}', + ) + }) + + it('should parse error message without breaking', () => { + const chunk = 'data: {"message":"unsupported CBBTC-USD asset"}\nevent: error\n\n' + + parser.push(chunk) + + expect(mockOnEvent).toHaveBeenCalledTimes(1) + expect(mockOnEvent).toHaveBeenCalledWith('error', '{"message":"unsupported CBBTC-USD asset"}') + }) + + it('should handle non standard, non-error message without breaking', () => { + const chunk = 'data: {"nodata": 0}\nevent: different-type\n\n' + + parser.push(chunk) + + expect(mockOnEvent).toHaveBeenCalledTimes(1) + expect(mockOnEvent).toHaveBeenCalledWith('different-type', '{"nodata": 0}') + }) +}) diff --git a/packages/sources/coinpaprika-state/tsconfig.json b/packages/sources/coinpaprika-state/tsconfig.json new file mode 100644 index 0000000000..f59363fd76 --- /dev/null +++ b/packages/sources/coinpaprika-state/tsconfig.json @@ -0,0 +1,9 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src/**/*", "src/**/*.json"], + "exclude": ["dist", "**/*.spec.ts", "**/*.test.ts"] +} diff --git a/packages/sources/coinpaprika-state/tsconfig.test.json b/packages/sources/coinpaprika-state/tsconfig.test.json new file mode 100755 index 0000000000..e3de28cb5c --- /dev/null +++ b/packages/sources/coinpaprika-state/tsconfig.test.json @@ -0,0 +1,7 @@ +{ + "extends": "../../tsconfig.base.json", + "include": ["src/**/*", "**/test", "src/**/*.json"], + "compilerOptions": { + "noEmit": true + } +} diff --git a/packages/tsconfig.json b/packages/tsconfig.json index 1c5d196fe3..56bf828a62 100644 --- a/packages/tsconfig.json +++ b/packages/tsconfig.json @@ -257,6 +257,9 @@ { "path": "./sources/coinpaprika" }, + { + "path": "./sources/coinpaprika-state" + }, { "path": "./sources/coinranking" }, diff --git a/packages/tsconfig.test.json b/packages/tsconfig.test.json index 97e61aeba4..a8e393fad6 100644 --- a/packages/tsconfig.test.json +++ b/packages/tsconfig.test.json @@ -257,6 +257,9 @@ { "path": "./sources/coinpaprika/tsconfig.test.json" }, + { + "path": "./sources/coinpaprika-state/tsconfig.test.json" + }, { "path": "./sources/coinranking/tsconfig.test.json" }, diff --git a/yarn.lock b/yarn.lock index ab71796f30..173ec11681 100644 --- a/yarn.lock +++ b/yarn.lock @@ -3253,6 +3253,19 @@ __metadata: languageName: unknown linkType: soft +"@chainlink/coinpaprika-state-adapter@workspace:packages/sources/coinpaprika-state": + version: 0.0.0-use.local + resolution: "@chainlink/coinpaprika-state-adapter@workspace:packages/sources/coinpaprika-state" + dependencies: + "@chainlink/external-adapter-framework": "npm:2.8.0" + "@types/jest": "npm:^29.5.14" + "@types/node": "npm:22.14.1" + nock: "npm:13.5.6" + tslib: "npm:2.4.1" + typescript: "npm:5.8.3" + languageName: unknown + linkType: soft + "@chainlink/coinranking-adapter@workspace:*, @chainlink/coinranking-adapter@workspace:packages/sources/coinranking": version: 0.0.0-use.local resolution: "@chainlink/coinranking-adapter@workspace:packages/sources/coinranking"