Skip to content

Commit 5c9e321

Browse files
authored
feat: bulk fetch metadata & prices for events batch (#57)
# 🤖 Linear Closes GIT-241 ## Description Currently, we are fetching price/metadata information when processing each event. This is sub-optimal, since we are having the external request delay for each event. As a better solution, we're fetching all the token/metadata data for a given batch of events prior to start processing that batch. This way, we can query all this data in parallel and have it cached when required during event processing. > [!NOTE] > This PR has the assumption that we will have really high rate limit API keys for Metadata and Pricing providers to make the most of this feature ## Checklist before requesting a review - [x] I have conducted a self-review of my code. - [x] I have conducted a QA. - [ ] If it is a core feature, I have included comprehensive tests. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Added support for bulk token price retrieval across multiple timestamps. - Enhanced event processing with improved metadata and token price fetching. - Implemented round-robin gateway selection for IPFS metadata retrieval. - Introduced `getTokenPrices` method across multiple pricing providers. - Added a new constant for minimum granularity in pricing retrieval. - **Improvements** - Updated caching mechanism for token prices. - Refined error handling in pricing and metadata providers. - Introduced new constants and interfaces to support expanded functionality. - **Technical Enhancements** - Expanded retry mechanism to return operation results. - Added new methods to pricing and metadata providers. - Improved event orchestration and processing workflows. - Enhanced test coverage for new pricing methods. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 5bb2026 commit 5c9e321

File tree

23 files changed

+693
-29
lines changed

23 files changed

+693
-29
lines changed

packages/data-flow/src/orchestrator.ts

Lines changed: 153 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
import { isNativeError } from "util/types";
22

33
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
4+
import { TokenPrice } from "@grants-stack-indexer/pricing";
45
import {
56
existsHandler,
67
UnsupportedEventException,
@@ -13,6 +14,7 @@ import {
1314
AnyIndexerFetchedEvent,
1415
ChainId,
1516
ContractName,
17+
getToken,
1618
Hex,
1719
ILogger,
1820
isAlloEvent,
@@ -23,6 +25,7 @@ import {
2325
RetryStrategy,
2426
StrategyEvent,
2527
stringify,
28+
Token,
2629
} from "@grants-stack-indexer/shared";
2730

2831
import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js";
@@ -31,6 +34,11 @@ import { EventsProcessor } from "./eventsProcessor.js";
3134
import { InvalidEvent } from "./exceptions/index.js";
3235
import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from "./internal.js";
3336

37+
type TokenWithTimestamps = {
38+
token: Token;
39+
timestamps: number[];
40+
};
41+
3442
/**
3543
* The Orchestrator is the central coordinator of the data flow system, managing the interaction between
3644
* three main components:
@@ -42,6 +50,7 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
4250
* The Orchestrator implements a continuous processing loop that:
4351
*
4452
* 1. Fetches batches of events from the indexer and stores them in an internal queue
53+
* 1.5 Bulk fetches metadata and prices for the batch, improving performance by reducing the number of requests and parallelizing them
4554
* 2. Processes each event from the queue:
4655
* - For strategy events and PoolCreated from Allo contract, enhances them with strategyId
4756
* - Forwards the event to the Events Processor which is in charge of delagating the processing of the event to the correct handler
@@ -116,7 +125,11 @@ export class Orchestrator {
116125
while (!signal.aborted) {
117126
let event: ProcessorEvent<ContractName, AnyEvent> | undefined;
118127
try {
119-
if (this.eventsQueue.isEmpty()) await this.enqueueEvents();
128+
if (this.eventsQueue.isEmpty()) {
129+
const events = await this.getNextEventsBatch();
130+
await this.bulkFetchMetadataAndPricesForBatch(events);
131+
await this.enqueueEvents(events);
132+
}
120133

121134
event = this.eventsQueue.pop();
122135

@@ -197,10 +210,66 @@ export class Orchestrator {
197210
});
198211
}
199212

213+
/**
214+
* Extracts unique metadata ids from the events batch.
215+
* @param events - Array of indexer fetched events to process
216+
* @returns Array of unique metadata ids found in the events
217+
*/
218+
private getMetadataFromEvents(events: AnyIndexerFetchedEvent[]): string[] {
219+
const ids = new Set<string>();
220+
221+
for (const event of events) {
222+
if ("metadata" in event.params) {
223+
ids.add(event.params.metadata[1]);
224+
}
225+
}
226+
227+
return Array.from(ids);
228+
}
229+
230+
/**
231+
* Extracts unique tokens from the events batch. Leaves out tokens with zero amount and sorts the timestamps.
232+
* @param events - Array of indexer fetched events to process
233+
* @returns Array of unique tokens with timestamps found in the events
234+
*/
235+
private getTokensFromEvents(events: AnyIndexerFetchedEvent[]): TokenWithTimestamps[] {
236+
const tokenMap = new Map<string, TokenWithTimestamps>();
237+
238+
for (const event of events) {
239+
if (
240+
"token" in event.params &&
241+
"amount" in event.params &&
242+
BigInt(event.params.amount) > 0n
243+
) {
244+
const token = getToken(this.chainId, event.params.token);
245+
if (!token) continue;
246+
247+
const existing = tokenMap.get(token.address);
248+
if (existing) {
249+
existing.timestamps.push(event.blockTimestamp);
250+
} else {
251+
tokenMap.set(token.address, {
252+
token,
253+
timestamps: [event.blockTimestamp],
254+
});
255+
}
256+
}
257+
}
258+
259+
// Convert timestamps to unique sorted arrays
260+
return Array.from(tokenMap.values()).map(({ token, timestamps }) => ({
261+
token,
262+
timestamps: [...new Set(timestamps)].sort((a, b) => a - b),
263+
}));
264+
}
265+
200266
/**
201267
* Sometimes the TimestampsUpdated event is part of the _initialize() function of a strategy.
202268
* In this case, the event is emitted before the PoolCreated event. We can safely ignore the error
203269
* if the PoolCreated event is present in the same block.
270+
* @param error - The error
271+
* @param event - The event
272+
* @returns True if the error should be ignored, false otherwise
204273
*/
205274
private shouldIgnoreTimestampsUpdatedError(
206275
error: Error,
@@ -225,9 +294,10 @@ export class Orchestrator {
225294
}
226295

227296
/**
228-
* Enqueue new events from the events fetcher using the last processed event as a starting point
297+
* Fetches the next events batch from the indexer
298+
* @returns The next events batch
229299
*/
230-
private async enqueueEvents(): Promise<void> {
300+
private async getNextEventsBatch(): Promise<AnyIndexerFetchedEvent[]> {
231301
const lastProcessedEvent = await this.eventsRegistry.getLastProcessedEvent(this.chainId);
232302
const blockNumber = lastProcessedEvent?.blockNumber ?? 0;
233303
const logIndex = lastProcessedEvent?.logIndex ?? 0;
@@ -240,6 +310,34 @@ export class Orchestrator {
240310
allowPartialLastBlock: false,
241311
});
242312

313+
return events;
314+
}
315+
316+
/**
317+
* Clear pricing and metadata caches and bulk fetch metadata and prices for the batch
318+
* @param events - The events batch
319+
*/
320+
private async bulkFetchMetadataAndPricesForBatch(
321+
events: AnyIndexerFetchedEvent[],
322+
): Promise<void> {
323+
// Clear caches if the provider supports it
324+
await this.dependencies.metadataProvider.clearCache?.();
325+
await this.dependencies.pricingProvider.clearCache?.();
326+
327+
const metadataIds = this.getMetadataFromEvents(events);
328+
const tokens = this.getTokensFromEvents(events);
329+
330+
await Promise.allSettled([
331+
this.bulkFetchMetadata(metadataIds),
332+
this.bulkFetchTokens(tokens),
333+
]);
334+
}
335+
336+
/**
337+
* Enqueue events and updates new context of events by block number for the batch
338+
* @param events - The events batch
339+
*/
340+
private async enqueueEvents(events: AnyIndexerFetchedEvent[]): Promise<void> {
243341
// Clear previous context
244342
this.eventsByBlockContext.clear();
245343
for (const event of events) {
@@ -252,6 +350,58 @@ export class Orchestrator {
252350
this.eventsQueue.push(...events);
253351
}
254352

353+
/**
354+
* Fetch all possible metadata for the batch.
355+
* @param metadataIds - The metadata ids
356+
* @returns The metadata
357+
*/
358+
private async bulkFetchMetadata(metadataIds: string[]): Promise<unknown[]> {
359+
const results = await Promise.allSettled(
360+
metadataIds.map((id) =>
361+
this.retryHandler.execute(() =>
362+
this.dependencies.metadataProvider.getMetadata<unknown>(id),
363+
),
364+
),
365+
);
366+
367+
const metadata: unknown[] = [];
368+
for (const result of results) {
369+
if (result.status === "fulfilled" && result.value) {
370+
metadata.push(result.value);
371+
}
372+
}
373+
374+
return metadata;
375+
}
376+
377+
/**
378+
* Fetch all tokens prices
379+
* @param tokens - The tokens with timestamps
380+
* @returns The token prices
381+
*/
382+
private async bulkFetchTokens(tokens: TokenWithTimestamps[]): Promise<TokenPrice[]> {
383+
const results = await Promise.allSettled(
384+
tokens.map(({ token, timestamps }) =>
385+
this.retryHandler.execute(async () => {
386+
const prices = await this.dependencies.pricingProvider.getTokenPrices(
387+
token.priceSourceCode,
388+
timestamps,
389+
);
390+
return prices;
391+
}),
392+
),
393+
);
394+
395+
const tokenPrices: TokenPrice[] = [];
396+
for (const result of results) {
397+
if (result.status === "fulfilled" && result.value) {
398+
tokenPrices.push(...result.value);
399+
}
400+
}
401+
402+
return tokenPrices;
403+
}
404+
255405
private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
256406
event = await this.enhanceStrategyId(event);
257407
if (this.isPoolCreated(event)) {

0 commit comments

Comments
 (0)