Skip to content

Commit 4615f05

Browse files
committed
Compute stagger time correctly, process batches in parallel
1 parent 919ee4a commit 4615f05

File tree

3 files changed

+105
-47
lines changed

3 files changed

+105
-47
lines changed

src/update-feeds/update-feeds.test.ts

+57-12
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,6 @@ const provider = new ethers.providers.StaticJsonRpcProvider(rpcUrl, {
2222
name: chainId,
2323
});
2424

25-
jest.mock('../state');
26-
2725
describe(updateFeedsModule.startUpdateFeedsLoops.name, () => {
2826
it('starts staggered update loops for a chain', async () => {
2927
jest.spyOn(stateModule, 'getState').mockReturnValue(
@@ -41,6 +39,7 @@ describe(updateFeedsModule.startUpdateFeedsLoops.name, () => {
4139
},
4240
})
4341
);
42+
jest.spyOn(stateModule, 'updateState').mockImplementation();
4443
jest.spyOn(updateFeedsModule, 'runUpdateFeeds').mockImplementation();
4544
const intervalCalls = [] as number[];
4645
jest.spyOn(global, 'setInterval').mockImplementation((() => {
@@ -100,6 +99,7 @@ describe(updateFeedsModule.startUpdateFeedsLoops.name, () => {
10099
},
101100
})
102101
);
102+
jest.spyOn(stateModule, 'updateState').mockImplementation();
103103
jest.spyOn(updateFeedsModule, 'runUpdateFeeds').mockImplementation();
104104
const intervalCalls = [] as number[];
105105
jest.spyOn(global, 'setInterval').mockImplementation((() => {
@@ -171,7 +171,7 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
171171
expect(logger.error).toHaveBeenCalledWith('Failed to get first active dAPIs batch', new Error('provider-error'));
172172
});
173173

174-
it('fetches other batches in a staggered way and logs errors', async () => {
174+
it('fetches and processes other batches in a staggered way and logs errors', async () => {
175175
// Prepare the mocked contract so it returns three batches (of size 1) of dAPIs and the second batch fails to load.
176176
const firstDapi = generateReadDapiWithIndexResponse();
177177
const thirdDapi = generateReadDapiWithIndexResponse();
@@ -206,13 +206,21 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
206206
gasPriceStore: {},
207207
})
208208
);
209+
jest.spyOn(stateModule, 'updateState').mockImplementation();
209210
jest
210211
.spyOn(checkFeedsModule, 'getUpdatableFeeds')
211-
.mockResolvedValue([
212-
allowPartial<updateTransactionModule.UpdatableDapi>({ dapiInfo: firstDapi }),
213-
allowPartial<updateTransactionModule.UpdatableDapi>({ dapiInfo: thirdDapi }),
214-
]);
212+
.mockResolvedValueOnce([allowPartial<updateTransactionModule.UpdatableDapi>({ dapiInfo: firstDapi })])
213+
.mockResolvedValueOnce([allowPartial<updateTransactionModule.UpdatableDapi>({ dapiInfo: thirdDapi })]);
215214
jest.spyOn(updateTransactionModule, 'updateFeeds').mockResolvedValue([null, null]);
215+
const processBatchCalls = [] as number[];
216+
// eslint-disable-next-line @typescript-eslint/require-await
217+
const originalProcessBatch = updateFeedsModule.processBatch;
218+
jest
219+
.spyOn(updateFeedsModule, 'processBatch')
220+
.mockImplementation(async (...args: Parameters<typeof originalProcessBatch>) => {
221+
processBatchCalls.push(Date.now());
222+
return originalProcessBatch(...args);
223+
});
216224

217225
await updateFeedsModule.runUpdateFeeds(
218226
'provider-name',
@@ -231,7 +239,11 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
231239
expect(utilsModule.sleep).toHaveBeenCalledTimes(3);
232240
expect(sleepCalls[0]).toBeGreaterThan(0); // The first stagger time is computed dynamically (the execution time is subtracted from the interval time) which is slow on CI, so we just check it's non-zero.
233241
expect(sleepCalls[1]).toBe(0);
234-
expect(sleepCalls[2]).toBe(49.999_999_999_999_99); // Stagger time is actually 150 / 3 = 50, but there is a rounding error.
242+
expect(sleepCalls[2]).toBe(50);
243+
244+
// Expect the call times of processBatch to be staggered as well.
245+
expect(updateFeedsModule.processBatch).toHaveBeenCalledTimes(2);
246+
expect(processBatchCalls[1]! - processBatchCalls[0]!).toBeGreaterThanOrEqual(100); // The stagger time is 50ms, but second batch fails to load which means the third second processBatch call needs to happen after we wait for 2 stagger times.
235247

236248
// Expect the logs to be called with the correct context.
237249
expect(logger.error).toHaveBeenCalledTimes(1);
@@ -244,7 +256,7 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
244256
expect(logger.debug).toHaveBeenNthCalledWith(2, 'Processing batch of active dAPIs', expect.anything());
245257
expect(logger.debug).toHaveBeenNthCalledWith(3, 'Fetching batches of active dAPIs', {
246258
batchesCount: 3,
247-
staggerTime: 49.999_999_999_999_99,
259+
staggerTime: 50,
248260
});
249261
expect(logger.debug).toHaveBeenNthCalledWith(4, 'Fetching batch of active dAPIs', {
250262
batchIndex: 1,
@@ -254,9 +266,9 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
254266
});
255267
expect(logger.debug).toHaveBeenNthCalledWith(6, 'Processing batch of active dAPIs', expect.anything());
256268
expect(logger.debug).toHaveBeenNthCalledWith(7, 'Finished processing batches of active dAPIs', {
257-
batchesCount: 3,
258-
errorCount: 4,
259-
successCount: 0,
269+
dapiUpdateFailures: 2,
270+
dapiUpdates: 0,
271+
skippedBatchesCount: 1,
260272
});
261273
});
262274

@@ -280,6 +292,7 @@ describe(updateFeedsModule.runUpdateFeeds.name, () => {
280292
gasPriceStore: {},
281293
})
282294
);
295+
jest.spyOn(stateModule, 'updateState').mockImplementation();
283296
jest.spyOn(logger, 'error');
284297
jest.spyOn(checkFeedsModule, 'getUpdatableFeeds').mockRejectedValueOnce(new Error('unexpected-unhandled-error'));
285298

@@ -364,3 +377,35 @@ describe(updateFeedsModule.processBatch.name, () => {
364377
await expect(feeds).resolves.toStrictEqual([]);
365378
});
366379
});
380+
381+
describe(updateFeedsModule.calculateStaggerTime.name, () => {
382+
it('calculates zero stagger time for specific edge cases', () => {
383+
expect(updateFeedsModule.calculateStaggerTime(1, 10_000, 60_000)).toBe(0); // When there is only a single batch.
384+
expect(updateFeedsModule.calculateStaggerTime(2, 25_000, 30_000)).toBe(0); // When there are just two batches and fetching the first batch takes too long.
385+
});
386+
387+
it('uses remaining time to calculate stagger time when fetching batch takes too long', () => {
388+
expect(updateFeedsModule.calculateStaggerTime(3, 15_000, 30_000)).toBe(7500);
389+
expect(updateFeedsModule.calculateStaggerTime(10, 10_000, 50_000)).toBe(4444);
390+
expect(updateFeedsModule.calculateStaggerTime(10, 20_000, 20_000)).toBe(0);
391+
});
392+
393+
it('staggers the batches evenly', () => {
394+
const firstBatchDuration = 10_000;
395+
const batchCount = 11;
396+
const staggerTime = updateFeedsModule.calculateStaggerTime(batchCount, firstBatchDuration, 50_000);
397+
398+
const fetchTimes = [0, firstBatchDuration];
399+
for (let i = 1; i < batchCount - 1; i++) {
400+
fetchTimes.push(fetchTimes[1]! + staggerTime * i);
401+
}
402+
403+
expect(fetchTimes).toStrictEqual([
404+
0, 10_000, 14_000, 18_000, 22_000, 26_000, 30_000, 34_000, 38_000, 42_000, 46_000,
405+
]);
406+
});
407+
408+
it('returns zero if first batch takes more than the full update interval', () => {
409+
expect(updateFeedsModule.calculateStaggerTime(3, 60_000, 30_000)).toBe(0);
410+
});
411+
});

src/update-feeds/update-feeds.ts

+48-31
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ import { clearSponsorLastUpdateTimestampMs, initializeGasStore, hasPendingTransa
88
import { logger } from '../logger';
99
import { getState, updateState } from '../state';
1010
import type { ChainId, ProviderName } from '../types';
11-
import { isFulfilled, sleep, deriveSponsorWallet } from '../utils';
11+
import { sleep, deriveSponsorWallet } from '../utils';
1212

1313
import { getApi3ServerV1 } from './api3-server-v1';
1414
import { getUpdatableFeeds } from './check-feeds';
@@ -54,6 +54,26 @@ export const startUpdateFeedsLoops = async () => {
5454
);
5555
};
5656

57+
export const calculateStaggerTime = (
58+
batchesCount: number,
59+
firstBatchDuration: number,
60+
dataFeedUpdateIntervalMs: number
61+
) => {
62+
// First batch duration should not be longer than the update interval because we have a timeout in place. However, it
63+
// may happen if the fetching resolves very close to the end of timeout and the duration ends up slightly larger. It's
64+
// arguably a bit better to let the function return 0 instead of throwing an error.
65+
if (batchesCount <= 1 || firstBatchDuration >= dataFeedUpdateIntervalMs) return 0;
66+
67+
// Calculate the optimal stagger time between the the batches.
68+
const optimalStaggerTime = Math.round(dataFeedUpdateIntervalMs / batchesCount);
69+
// If the first batch took longer than the optimal stagger time, we use the remaining time to stagger the rest of
70+
// the batches.
71+
if (firstBatchDuration > optimalStaggerTime) {
72+
return batchesCount === 2 ? 0 : Math.round((dataFeedUpdateIntervalMs - firstBatchDuration) / (batchesCount - 1));
73+
}
74+
return optimalStaggerTime;
75+
};
76+
5777
export const runUpdateFeeds = async (providerName: ProviderName, chain: Chain, chainId: ChainId) => {
5878
await logger.runWithContext({ chainId, providerName, updateFeedsCoordinatorId: Date.now().toString() }, async () => {
5979
// We do not expect this function to throw, but its possible that some execution path is incorrectly handled and we
@@ -100,10 +120,6 @@ export const runUpdateFeeds = async (providerName: ProviderName, chain: Chain, c
100120
logger.error(`Failed to get first active dAPIs batch`, goFirstBatch.error);
101121
return;
102122
}
103-
if (Date.now() >= firstBatchStartTime + dataFeedUpdateIntervalMs) {
104-
logger.warn(`Fetching the first batch took the whole interval. Skipping updates.`);
105-
return;
106-
}
107123

108124
const { firstBatch, dapisCount } = goFirstBatch.data;
109125
if (dapisCount === 0) {
@@ -120,22 +136,22 @@ export const runUpdateFeeds = async (providerName: ProviderName, chain: Chain, c
120136
chainId
121137
).catch((error) => error);
122138

123-
// Calculate the stagger time between the rest of the batches.
139+
// Calculate the stagger time.
124140
const batchesCount = Math.ceil(dapisCount / dataFeedBatchSize);
125-
const staggerTime = batchesCount <= 1 ? 0 : (dataFeedUpdateInterval / batchesCount) * 1000;
141+
const firstBatchDuration = Date.now() - firstBatchStartTime;
142+
const staggerTime = calculateStaggerTime(batchesCount, firstBatchDuration, dataFeedUpdateIntervalMs);
126143

127144
// Wait the remaining stagger time required after fetching the first batch.
128-
const firstBatchDuration = Date.now() - firstBatchStartTime;
129145
await sleep(Math.max(0, staggerTime - firstBatchDuration));
130146

131-
// Fetch the rest of the batches in parallel in a staggered way.
147+
// Fetch the rest of the batches in parallel in a staggered way and process them.
132148
if (batchesCount > 1) {
133149
logger.debug('Fetching batches of active dAPIs', { batchesCount, staggerTime });
134150
}
135-
const otherBatches = await Promise.allSettled(
136-
range(1, batchesCount).map(async (batchIndex) => {
137-
await sleep((batchIndex - 1) * staggerTime);
151+
const processOtherBatchesPromises = range(1, batchesCount).map(async (batchIndex) => {
152+
await sleep((batchIndex - 1) * staggerTime);
138153

154+
const goBatch = await go(async () => {
139155
logger.debug(`Fetching batch of active dAPIs`, { batchIndex });
140156
const dapiBatchIndexStart = batchIndex * dataFeedBatchSize;
141157
const dapiBatchIndexEnd = Math.min(dapisCount, dapiBatchIndexStart + dataFeedBatchSize);
@@ -147,24 +163,18 @@ export const runUpdateFeeds = async (providerName: ProviderName, chain: Chain, c
147163
);
148164

149165
return returndata.map((returndata) => decodeReadDapiWithIndexResponse(dapiDataRegistry, returndata));
150-
})
151-
);
152-
for (const batch of otherBatches.filter((batch) => !isFulfilled(batch))) {
153-
logger.error(`Failed to get active dAPIs batch`, (batch as PromiseRejectedResult).reason);
154-
}
155-
const processOtherBatchesPromises = otherBatches
156-
.filter((result) => isFulfilled(result))
157-
.map(async (result) =>
158-
processBatch(
159-
(result as PromiseFulfilledResult<DecodedReadDapiWithIndexResponse[]>).value,
160-
providerName,
161-
provider,
162-
chainId
163-
)
164-
);
166+
});
167+
if (!goBatch.success) {
168+
logger.error(`Failed to get active dAPIs batch`, goBatch.error);
169+
return;
170+
}
171+
const batch = goBatch.data;
172+
173+
return processBatch(batch, providerName, provider, chainId);
174+
});
165175

166176
// Wait for all the batches to be processed and print stats from this run.
167-
const processingResult = await Promise.all([
177+
const processedBatches = await Promise.all([
168178
// eslint-disable-next-line @typescript-eslint/promise-function-async
169179
new Promise<Awaited<ReturnType<typeof processBatch>>>((resolve, reject) => {
170180
return processFirstBatchPromise.then((result) => {
@@ -175,9 +185,16 @@ export const runUpdateFeeds = async (providerName: ProviderName, chain: Chain, c
175185
}),
176186
...processOtherBatchesPromises,
177187
]);
178-
const successCount = processingResult.reduce((acc, { successCount }) => acc + successCount, 0);
179-
const errorCount = processingResult.reduce((acc, { errorCount }) => acc + errorCount, 0);
180-
logger.debug(`Finished processing batches of active dAPIs`, { batchesCount, successCount, errorCount });
188+
189+
// Print stats from this run.
190+
const skippedBatchesCount = processedBatches.filter((batch) => !batch).length;
191+
const dapiUpdates = processedBatches.reduce((acc, batch) => acc + (batch ? batch.successCount : 0), 0);
192+
const dapiUpdateFailures = processedBatches.reduce((acc, batch) => acc + (batch ? batch.errorCount : 0), 0);
193+
logger.debug(`Finished processing batches of active dAPIs`, {
194+
skippedBatchesCount,
195+
dapiUpdates,
196+
dapiUpdateFailures,
197+
});
181198
});
182199

183200
if (!goRunUpdateFeeds.success) {

src/utils.ts

-4
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,6 @@ import { AIRSEEKER_PROTOCOL_ID } from './constants';
55

66
export const sleep = async (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
77

8-
export function isFulfilled<T>(item: PromiseSettledResult<T>): item is PromiseFulfilledResult<T> {
9-
return item.status === 'fulfilled';
10-
}
11-
128
export function deriveBeaconId(airnodeAddress: string, templateId: string) {
139
return goSync(() => ethers.utils.solidityKeccak256(['address', 'bytes32'], [airnodeAddress, templateId])).data;
1410
}

0 commit comments

Comments
 (0)