Skip to content

Commit 36f8638

Browse files
authored
feat: error retry handling (#49)
# 🤖 Linear Closes GIT-223 GIT-224 GIT-110 ## Description We implement a RetryHandler with ExponentialBackoff for handling error retries (centralized on the Orchestrator) and define two big base class of errors: - Retriable - NonRetriable In the next PR, will adjust the different errors of the system and packages to extend from this classes accordingly ## Checklist before requesting a review - [x] I have conducted a self-review of my code. - [x] I have conducted a QA. - [x] If it is a core feature, I have included comprehensive tests.
1 parent 0a645ba commit 36f8638

23 files changed

+625
-83
lines changed

apps/processing/.env.example

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,4 +13,9 @@ IPFS_GATEWAYS_URL=["https://ipfs.io","https://gateway.pinata.cloud","https://dwe
1313
PRICING_SOURCE= # 'coingecko' or 'dummy'
1414

1515
COINGECKO_API_KEY={{YOUR_KEY}}
16-
COINGECKO_API_TYPE=demo
16+
COINGECKO_API_TYPE=demo
17+
18+
RETRY_MAX_ATTEMPTS=3
19+
RETRY_BASE_DELAY_MS=3000
20+
RETRY_FACTOR=2
21+
RETRY_MAX_DELAY_MS=300000

apps/processing/README.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ Available options:
3636
| `DUMMY_PRICE` | Dummy price | 1 | No | Only if PRICING_SOURCE is dummy |
3737
| `COINGECKO_API_KEY` | API key for CoinGecko service | N/A | Yes | |
3838
| `COINGECKO_API_TYPE` | CoinGecko API tier (demo or pro) | pro | No | |
39+
| `RETRY_MAX_ATTEMPTS` | Maximum number of retry attempts | 3 | No | |
40+
| `RETRY_BASE_DELAY_MS` | Base delay for retry attempts | 3000 | No | |
41+
| `RETRY_FACTOR` | Delay factor for retry attempts | 2 | No | |
42+
| `RETRY_MAX_DELAY_MS` | Maximum delay for retry attempts | 300000 | No | |
3943

4044
## Available Scripts
4145

apps/processing/src/config/env.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,10 @@ const baseSchema = z.object({
3636
IPFS_GATEWAYS_URL: stringToJSONSchema
3737
.pipe(z.array(z.string().url()))
3838
.default('["https://ipfs.io"]'),
39+
RETRY_MAX_ATTEMPTS: z.coerce.number().int().min(1).default(3),
40+
RETRY_BASE_DELAY_MS: z.coerce.number().int().min(1).default(3000), // 3 seconds
41+
RETRY_FACTOR: z.coerce.number().int().min(1).default(2),
42+
RETRY_MAX_DELAY_MS: z.coerce.number().int().min(1).optional(), // 5 minute
3943
});
4044

4145
const dummyPricingSchema = baseSchema.extend({

apps/processing/src/services/processing.service.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,14 @@ export class ProcessingService {
4545
static async initialize(env: Environment): Promise<ProcessingService> {
4646
const sharedDependencies = await SharedDependenciesService.initialize(env);
4747
const { CHAINS: chains } = env;
48-
const { core, registriesRepositories, indexerClient, kyselyDatabase, logger } =
49-
sharedDependencies;
48+
const {
49+
core,
50+
registriesRepositories,
51+
indexerClient,
52+
kyselyDatabase,
53+
logger,
54+
retryStrategy,
55+
} = sharedDependencies;
5056
const {
5157
eventRegistryRepository,
5258
strategyRegistryRepository,
@@ -83,6 +89,7 @@ export class ProcessingService {
8389
},
8490
chain.fetchLimit,
8591
chain.fetchDelayMs,
92+
retryStrategy,
8693
logger,
8794
);
8895
const retroactiveProcessor = new RetroactiveProcessor(
@@ -95,6 +102,7 @@ export class ProcessingService {
95102
checkpointRepository: strategyProcessingCheckpointRepository,
96103
},
97104
chain.fetchLimit,
105+
retryStrategy,
98106
logger,
99107
);
100108

apps/processing/src/services/sharedDependencies.service.ts

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import {
1717
KyselyStrategyRegistryRepository,
1818
KyselyTransactionManager,
1919
} from "@grants-stack-indexer/repository";
20-
import { ILogger, Logger } from "@grants-stack-indexer/shared";
20+
import { ExponentialBackoff, ILogger, Logger, RetryStrategy } from "@grants-stack-indexer/shared";
2121

2222
import { Environment } from "../config/index.js";
2323

@@ -30,6 +30,7 @@ export type SharedDependencies = {
3030
};
3131
indexerClient: EnvioIndexerClient;
3232
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
33+
retryStrategy: RetryStrategy;
3334
logger: ILogger;
3435
};
3536

@@ -91,6 +92,13 @@ export class SharedDependenciesService {
9192
env.INDEXER_ADMIN_SECRET,
9293
);
9394

95+
const retryStrategy = new ExponentialBackoff({
96+
maxAttempts: env.RETRY_MAX_ATTEMPTS,
97+
baseDelay: env.RETRY_BASE_DELAY_MS,
98+
maxDelay: env.RETRY_MAX_DELAY_MS,
99+
factor: env.RETRY_FACTOR,
100+
});
101+
94102
return {
95103
core: {
96104
projectRepository,
@@ -109,6 +117,7 @@ export class SharedDependenciesService {
109117
},
110118
indexerClient,
111119
kyselyDatabase,
120+
retryStrategy,
112121
logger,
113122
};
114123
}

apps/processing/test/unit/sharedDependencies.service.spec.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@ const mocks = vi.hoisted(() => {
1919
};
2020
});
2121

22-
vi.mock("@grants-stack-indexer/shared", () => {
22+
vi.mock("@grants-stack-indexer/shared", async (importActual) => {
23+
const actual = await importActual<typeof import("@grants-stack-indexer/shared")>();
2324
return {
25+
...actual,
2426
Logger: {
2527
getInstance: vi.fn().mockReturnValue(mocks.logger),
2628
},

packages/data-flow/src/orchestrator.ts

Lines changed: 60 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@ import {
1616
isAlloEvent,
1717
isStrategyEvent,
1818
ProcessorEvent,
19+
RetriableError,
20+
RetryHandler,
21+
RetryStrategy,
1922
StrategyEvent,
2023
stringify,
2124
} from "@grants-stack-indexer/shared";
@@ -46,10 +49,11 @@ import { CoreDependencies, DataLoader, delay, IQueue, iStrategyAbi, Queue } from
4649
* The Orchestrator provides fault tolerance and performance optimization through:
4750
* - Configurable batch sizes for event fetching
4851
* - Delayed processing to prevent overwhelming the system
49-
* - Error handling and logging for various failure scenarios
52+
* - Retry handling with exponential backoff for transient failures
53+
* - Comprehensive error handling and logging for various failure scenarios
5054
* - Registry tracking of supported/unsupported strategies and events
5155
*
52-
* TODO: Enhance the error handling/retries, logging and observability
56+
* TODO: Enhance logging and observability
5357
*/
5458
export class Orchestrator {
5559
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
@@ -58,13 +62,15 @@ export class Orchestrator {
5862
private readonly eventsRegistry: IEventsRegistry;
5963
private readonly strategyRegistry: IStrategyRegistry;
6064
private readonly dataLoader: DataLoader;
65+
private readonly retryHandler: RetryHandler;
6166

6267
/**
6368
* @param chainId - The chain id
6469
* @param dependencies - The core dependencies
6570
* @param indexerClient - The indexer client
6671
* @param registries - The registries
6772
* @param fetchLimit - The fetch limit
73+
* @param retryStrategy - The retry strategy
6874
* @param fetchDelayInMs - The fetch delay in milliseconds
6975
*/
7076
constructor(
@@ -77,6 +83,7 @@ export class Orchestrator {
7783
},
7884
private fetchLimit: number = 1000,
7985
private fetchDelayInMs: number = 10000,
86+
private retryStrategy: RetryStrategy,
8087
private logger: ILogger,
8188
) {
8289
this.eventsFetcher = new EventsFetcher(this.indexerClient);
@@ -98,6 +105,7 @@ export class Orchestrator {
98105
this.logger,
99106
);
100107
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
108+
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
101109
}
102110

103111
async run(signal: AbortSignal): Promise<void> {
@@ -119,46 +127,42 @@ export class Orchestrator {
119127
await delay(this.fetchDelayInMs);
120128
continue;
121129
}
130+
122131
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
123132
...event,
124133
rawEvent: event,
125134
});
126135

127-
event = await this.enhanceStrategyId(event);
128-
if (this.isPoolCreated(event)) {
129-
const handleable = existsHandler(event.strategyId);
130-
await this.strategyRegistry.saveStrategyId(
131-
this.chainId,
132-
event.params.strategy,
133-
event.strategyId,
134-
handleable,
135-
);
136-
} else if (event.contractName === "Strategy" && "strategyId" in event) {
137-
if (!existsHandler(event.strategyId)) {
138-
this.logger.debug("Skipping event", {
139-
event,
140-
className: Orchestrator.name,
141-
chainId: this.chainId,
142-
});
143-
// we skip the event if the strategy id is not handled yet
144-
continue;
145-
}
146-
}
147-
148-
const changesets = await this.eventsProcessor.processEvent(event);
149-
await this.dataLoader.applyChanges(changesets);
136+
await this.retryHandler.execute(
137+
async () => {
138+
await this.handleEvent(event!);
139+
},
140+
{ abortSignal: signal },
141+
);
150142
} catch (error: unknown) {
151-
// TODO: improve error handling, retries and notify
143+
// TODO: notify
152144
if (
153145
error instanceof UnsupportedStrategy ||
154146
error instanceof InvalidEvent ||
155147
error instanceof UnsupportedEventException
156148
) {
157-
// this.logger.error(
158-
// `Current event cannot be handled. ${error.name}: ${error.message}. Event: ${stringify(event)}`,
159-
// );
149+
this.logger.debug(
150+
`Current event cannot be handled. ${error.name}: ${error.message}.`,
151+
{
152+
className: Orchestrator.name,
153+
chainId: this.chainId,
154+
event,
155+
},
156+
);
160157
} else {
161-
if (error instanceof Error || isNativeError(error)) {
158+
if (error instanceof RetriableError) {
159+
error.message = `Error processing event after retries. ${error.message}`;
160+
this.logger.error(error, {
161+
event,
162+
className: Orchestrator.name,
163+
chainId: this.chainId,
164+
});
165+
} else if (error instanceof Error || isNativeError(error)) {
162166
this.logger.error(error, {
163167
event,
164168
className: Orchestrator.name,
@@ -201,6 +205,32 @@ export class Orchestrator {
201205
this.eventsQueue.push(...events);
202206
}
203207

208+
private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
209+
event = await this.enhanceStrategyId(event);
210+
if (this.isPoolCreated(event)) {
211+
const handleable = existsHandler(event.strategyId);
212+
await this.strategyRegistry.saveStrategyId(
213+
this.chainId,
214+
event.params.strategy,
215+
event.strategyId,
216+
handleable,
217+
);
218+
} else if (event.contractName === "Strategy" && "strategyId" in event) {
219+
if (!existsHandler(event.strategyId)) {
220+
this.logger.debug("Skipping event", {
221+
event,
222+
className: Orchestrator.name,
223+
chainId: this.chainId,
224+
});
225+
// we skip the event if the strategy id is not handled yet
226+
return;
227+
}
228+
}
229+
230+
const changesets = await this.eventsProcessor.processEvent(event);
231+
await this.dataLoader.applyChanges(changesets);
232+
}
233+
204234
/**
205235
* Enhance the event with the strategy id when required
206236
* @param event - The event

packages/data-flow/src/retroactiveProcessor.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,8 @@ import {
99
Hex,
1010
ILogger,
1111
ProcessorEvent,
12+
RetryHandler,
13+
RetryStrategy,
1214
stringify,
1315
} from "@grants-stack-indexer/shared";
1416

@@ -62,6 +64,7 @@ export class RetroactiveProcessor {
6264
private readonly strategyRegistry: IStrategyRegistry;
6365
private readonly dataLoader: DataLoader;
6466
private readonly checkpointRepository: IStrategyProcessingCheckpointRepository;
67+
private readonly retryHandler: RetryHandler;
6568

6669
/**
6770
* Creates a new instance of RetroactiveProcessor
@@ -70,6 +73,7 @@ export class RetroactiveProcessor {
7073
* @param indexerClient - Client for fetching blockchain events
7174
* @param registries - Event and strategy registries for tracking processing state
7275
* @param fetchLimit - Maximum number of events to fetch in a single batch (default: 1000)
76+
* @param retryStrategy - The retry strategy
7377
* @param logger - Logger instance for debugging and monitoring
7478
*/
7579
constructor(
@@ -82,6 +86,7 @@ export class RetroactiveProcessor {
8286
checkpointRepository: IStrategyProcessingCheckpointRepository;
8387
},
8488
private fetchLimit: number = 1000,
89+
private retryStrategy: RetryStrategy,
8590
private logger: ILogger,
8691
) {
8792
this.eventsFetcher = new EventsFetcher(this.indexerClient);
@@ -103,6 +108,7 @@ export class RetroactiveProcessor {
103108
this.dependencies.transactionManager,
104109
this.logger,
105110
);
111+
this.retryHandler = new RetryHandler(retryStrategy, this.logger);
106112
}
107113

108114
/**
@@ -208,8 +214,11 @@ export class RetroactiveProcessor {
208214
if (this.hasReachedLastEvent(currentPointer, lastEventPointer)) break;
209215

210216
event.strategyId = strategyId;
211-
const changesets = await this.eventsProcessor.processEvent(event);
212-
await this.dataLoader.applyChanges(changesets);
217+
218+
await this.retryHandler.execute(async () => {
219+
const changesets = await this.eventsProcessor.processEvent(event!);
220+
await this.dataLoader.applyChanges(changesets);
221+
});
213222
} catch (error) {
214223
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
215224
// Expected errors that we can safely ignore

0 commit comments

Comments
 (0)