Skip to content

Process on-chain-received DApi batches #65

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/signed-api-fetch/data-fetcher.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@ describe('data fetcher', () => {
localDataStore.clear();
setState(
produce(getState(), (draft) => {
draft.signedApiUrlStore = [
'http://127.0.0.1:8090/0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
'https://pool.nodary.io/0xc52EeA00154B4fF1EbbF8Ba39FDe37F1AC3B9Fd4',
];
draft.signedApiUrlStore = {
hardhat: [
'http://127.0.0.1:8090/0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
'https://pool.nodary.io/0xc52EeA00154B4fF1EbbF8Ba39FDe37F1AC3B9Fd4',
],
};
})
);
});
Expand Down
5 changes: 4 additions & 1 deletion src/signed-api-fetch/data-fetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { clearInterval } from 'node:timers';

import { go } from '@api3/promise-utils';
import axios from 'axios';
import { uniq } from 'lodash';

import { HTTP_SIGNED_DATA_API_ATTEMPT_TIMEOUT, HTTP_SIGNED_DATA_API_HEADROOM } from '../constants';
import * as localDataStore from '../signed-data-store';
Expand Down Expand Up @@ -63,8 +64,10 @@ export const runDataFetcher = async () => {
});
}

const urls = uniq(Object.values(signedApiUrlStore).flat());

return Promise.allSettled(
signedApiUrlStore.map(async (url) =>
urls.map(async (url) =>
go(
async () => {
const payload = await callSignedDataApi(url);
Expand Down
10 changes: 6 additions & 4 deletions src/state/state.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@ const stateMock = {
timestamp: 'something-silly',
},
},
signedApiUrlStore: [
'http://127.0.0.1:8090/0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
'https://pool.nodary.io/0xc52EeA00154B4fF1EbbF8Ba39FDe37F1AC3B9Fd4',
],
signedApiUrlStore: {
hardhat: [
'http://127.0.0.1:8090/0xbF3137b0a7574563a23a8fC8badC6537F98197CC',
'https://pool.nodary.io/0xc52EeA00154B4fF1EbbF8Ba39FDe37F1AC3B9Fd4',
],
},
dapis: {},
};

Expand Down
12 changes: 9 additions & 3 deletions src/state/state.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import type { BigNumber } from 'ethers';
import { produce, type Draft } from 'immer';

import type { Config } from '../config/schema';
import type { ChainId, DApiName, DecodedDataFeed, DataFeedId, SignedData } from '../types';
import type { ChainId, DApiName, DecodedDataFeed, DataFeedId, SignedData, Provider } from '../types';

interface GasState {
gasPrices: { price: BigNumber; timestampMs: number }[];
Expand Down Expand Up @@ -31,15 +31,21 @@ export interface State {
dataFetcherInterval?: NodeJS.Timeout;
gasPriceStore: Record<string, Record<string, GasState>>;
signedApiStore: Record<DataFeedId, SignedData>;
signedApiUrlStore: string[];
signedApiUrlStore: Record<Provider, string[]>;
dapis: Record<DApiName, DapiState>;
}

type StateUpdater = (draft: Draft<State>) => void;

let state: State | undefined;

export const getState = (): State => state!;
export const getState = (): State => {
if (!state) {
throw new Error('State is undefined.');
}

return state;
};

export const setState = (newState: State) => {
state = newState;
Expand Down
1 change: 1 addition & 0 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ export type TemplateId = EvmId;
export type DataFeedId = EvmId;
export type ChainId = string;
export type DApiName = string;
export type Provider = string;

// Taken from https://github.com/api3dao/signed-api/blob/main/packages/api/src/schema.ts
export const signedDataSchema = z.object({
Expand Down
28 changes: 14 additions & 14 deletions src/update-feeds/update-feeds.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import type { DapiDataRegistry } from '../../typechain-types';
import type { Chain } from '../config/schema';
import { logger } from '../logger';
import * as stateModule from '../state';
import type { State } from '../state';
import * as utilsModule from '../utils';

import * as dapiDataRegistryModule from './dapi-data-registry';
Expand Down Expand Up @@ -172,22 +171,23 @@ describe(runUpdateFeed.name, () => {
jest.spyOn(logger, 'error');

const testConfig = generateTestConfig();
jest.spyOn(stateModule, 'getState').mockReturnValue({
config: testConfig,
dapis: {},
signedApiStore: {},
signedApiUrlStore: ['url-one'],
gasPriceStore: {
'123': {
'some-test-provider': {
gasPrices: [],
sponsorLastUpdateTimestampMs: {
'0xdatafeedId': 100,
jest.spyOn(stateModule, 'getState').mockReturnValue(
allowPartial<stateModule.State>({
config: testConfig,
signedApiUrlStore: { 'some-test-provider': ['url-one'] },
signedApiStore: {},
gasPriceStore: {
'123': {
'some-test-provider': {
gasPrices: [],
sponsorLastUpdateTimestampMs: {
'0xdatafeedId': 100,
},
},
},
},
},
} as State);
})
);

await runUpdateFeed(
'provider-name',
Expand Down
16 changes: 7 additions & 9 deletions src/update-feeds/update-feeds.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type { Chain } from '../config/schema';
import { logger } from '../logger';
import { getStoreDataPoint } from '../signed-data-store';
import { getState, updateState } from '../state';
import type { ChainId, Provider } from '../types';
import { isFulfilled, sleep } from '../utils';

import {
Expand Down Expand Up @@ -43,7 +44,7 @@ export const startUpdateFeedLoops = async () => {
);
};

export const runUpdateFeed = async (providerName: string, chain: Chain, chainId: string) => {
export const runUpdateFeed = async (providerName: Provider, chain: Chain, chainId: ChainId) => {
await logger.runWithContext({ chainId, providerName, coordinatorTimestampMs: Date.now().toString() }, async () => {
const { dataFeedBatchSize, dataFeedUpdateInterval, providers, contracts } = chain;

Expand Down Expand Up @@ -77,7 +78,7 @@ export const runUpdateFeed = async (providerName: string, chain: Chain, chainId:
return;
}
const { firstBatch, dapisCount } = goFirstBatch.data;
const processFirstBatchPromise = processBatch(firstBatch, chainId);
const processFirstBatchPromise = processBatch(firstBatch, providerName, chainId);

// Calculate the stagger time between the rest of the batches.
const batchesCount = Math.ceil(dapisCount / dataFeedBatchSize);
Expand Down Expand Up @@ -105,10 +106,7 @@ export const runUpdateFeed = async (providerName: string, chain: Chain, chainId:
await dapiDataRegistry.callStatic.tryMulticall(readDapiWithIndexCalls)
);

const decodedBatch = returndata.map((returndata) => ({
...decodeReadDapiWithIndexResponse(dapiDataRegistry, returndata),
}));
return decodedBatch;
return returndata.map((returndata) => decodeReadDapiWithIndexResponse(dapiDataRegistry, returndata));
})
);
for (const batch of otherBatches.filter((batch) => !isFulfilled(batch))) {
Expand All @@ -117,7 +115,7 @@ export const runUpdateFeed = async (providerName: string, chain: Chain, chainId:
const processOtherBatchesPromises = otherBatches
.filter((result) => isFulfilled(result))
.map(async (result) =>
processBatch((result as PromiseFulfilledResult<ReadDapiWithIndexResponse[]>).value, chainId)
processBatch((result as PromiseFulfilledResult<ReadDapiWithIndexResponse[]>).value, providerName, chainId)
);

// Wait for all the batches to be processed.
Expand Down Expand Up @@ -162,7 +160,7 @@ export const updateFeeds = async (_batch: ReturnType<typeof getFeedsToUpdate>, _
// batch, execute
};

export const processBatch = async (batch: ReadDapiWithIndexResponse[], chainId: string) => {
export const processBatch = async (batch: ReadDapiWithIndexResponse[], providerName: Provider, chainId: string) => {
logger.debug('Processing batch of active dAPIs', { batch });

updateState((draft) => {
Expand All @@ -171,7 +169,7 @@ export const processBatch = async (batch: ReadDapiWithIndexResponse[], chainId:
dapi.decodedDataFeed.beacons.flatMap((dataFeed) => `${url}/${dataFeed.airnodeAddress}`)
);

draft.signedApiUrlStore = receivedUrls.flat();
draft.signedApiUrlStore[providerName] = receivedUrls.flat();

const cachedDapiResponse = draft.dapis[dapi.dapiName];

Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/mock-config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ export const init = (stateOverride?: Partial<State>) => {
config,
gasPriceStore: {},
signedApiStore: {},
signedApiUrlStore: [],
signedApiUrlStore: {},
dapis: {},
...stateOverride,
});
Expand Down