Skip to content

Commit 6ead1ef

Browse files
authored
feat: make saveLastProcessedEvent part of transaction (#60)
# 🤖 Linear Closes GIT-232 ## Description We want to make last processed event part of the transaction on the happy path, so we increase reliability and error handling of the system. For this we are: - deprecating the InMemoryCache of EventRegistry - work directly with IEventRegistryRepository implementations - add optional txConnection to `saveLastProcessedEvent` method ## Checklist before requesting a review - [x] I have conducted a self-review of my code. - [x] I have conducted a QA. - [ ] If it is a core feature, I have included comprehensive tests. <!-- This is an auto-generated comment: release notes by coderabbit.ai --> ## Summary by CodeRabbit ## Release Notes - **New Features** - Enhanced event registry handling with improved transaction support. - Added new processed event changeset type for more flexible event management. - Introduced processed event handlers for managing application-related operations. - **Improvements** - Simplified event processing logic in orchestrators. - Updated data loader to support more comprehensive event handling. - Improved error handling for event processing workflows. - **Technical Updates** - Refactored event registry interfaces and implementations. - Updated type definitions for better type safety and flexibility. - Expanded test coverage for processed event handlers and orchestrator functionalities. <!-- end of auto-generated comment: release notes by coderabbit.ai -->
1 parent 2494cb6 commit 6ead1ef

File tree

14 files changed

+313
-59
lines changed

14 files changed

+313
-59
lines changed

apps/processing/src/services/processing.service.ts

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers";
44
import {
55
DatabaseEventRegistry,
66
DatabaseStrategyRegistry,
7-
InMemoryCachedEventRegistry,
87
InMemoryCachedStrategyRegistry,
98
Orchestrator,
109
RetroactiveProcessor,
@@ -67,12 +66,6 @@ export class ProcessingService {
6766
// Initialize EVM provider
6867
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, logger);
6968

70-
// Initialize events registry for the chain
71-
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
72-
logger,
73-
eventsRegistry,
74-
[chain.id as ChainId],
75-
);
7669
const cachedStrategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
7770
logger,
7871
strategyRegistry,
@@ -84,7 +77,7 @@ export class ProcessingService {
8477
{ ...core, evmProvider },
8578
indexerClient,
8679
{
87-
eventsRegistry: cachedEventsRegistry,
80+
eventsRegistry,
8881
strategyRegistry: cachedStrategyRegistry,
8982
},
9083
chain.fetchLimit,
@@ -97,7 +90,7 @@ export class ProcessingService {
9790
{ ...core, evmProvider },
9891
indexerClient,
9992
{
100-
eventsRegistry: cachedEventsRegistry,
93+
eventsRegistry,
10194
strategyRegistry: cachedStrategyRegistry,
10295
checkpointRepository: strategyProcessingCheckpointRepository,
10396
},

apps/processing/test/unit/processing.service.spec.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import { EvmProvider } from "@grants-stack-indexer/chain-providers";
44
import {
55
DatabaseEventRegistry,
66
DatabaseStrategyRegistry,
7-
InMemoryCachedEventRegistry,
87
InMemoryCachedStrategyRegistry,
98
Orchestrator,
109
RetroactiveProcessor,
@@ -118,7 +117,6 @@ describe("ProcessingService", () => {
118117
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
119118
expect(EvmProvider).toHaveBeenCalledTimes(2);
120119
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(2);
121-
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);
122120

123121
// Verify orchestrators were created with correct parameters
124122
expect(processingService["orchestrators"].size).toBe(2);

packages/data-flow/src/data-loader/dataLoader.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import {
33
IApplicationPayoutRepository,
44
IApplicationRepository,
55
IDonationRepository,
6+
IEventRegistryRepository,
67
IProjectRepository,
78
IRoundRepository,
89
ITransactionManager,
@@ -17,6 +18,7 @@ import {
1718
createProjectHandlers,
1819
createRoundHandlers,
1920
} from "./handlers/index.js";
21+
import { createProcessedEventHandlers } from "./handlers/processedEvent.handlers.js";
2022
import { ChangesetHandlers } from "./types/index.js";
2123

2224
/**
@@ -42,6 +44,7 @@ export class DataLoader implements IDataLoader {
4244
application: IApplicationRepository;
4345
donation: IDonationRepository;
4446
applicationPayout: IApplicationPayoutRepository;
47+
eventRegistry: IEventRegistryRepository;
4548
},
4649
private readonly transactionManager: ITransactionManager,
4750
private readonly logger: ILogger,
@@ -52,6 +55,7 @@ export class DataLoader implements IDataLoader {
5255
...createApplicationHandlers(repositories.application),
5356
...createDonationHandlers(repositories.donation),
5457
...createApplicationPayoutHandlers(repositories.applicationPayout),
58+
...createProcessedEventHandlers(repositories.eventRegistry),
5559
};
5660
}
5761

packages/data-flow/src/data-loader/handlers/index.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,3 +3,4 @@ export * from "./project.handlers.js";
33
export * from "./round.handlers.js";
44
export * from "./donation.handlers.js";
55
export * from "./applicationPayout.handlers.js";
6+
export * from "./processedEvent.handlers.js";
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
import {
2+
IEventRegistryRepository,
3+
ProcessedEventChangeset,
4+
} from "@grants-stack-indexer/repository";
5+
6+
import { ChangesetHandler } from "../types/index.js";
7+
8+
/**
9+
* Collection of handlers for application-related operations.
10+
* Each handler corresponds to a specific Application changeset type.
11+
*/
12+
export type ProcessedEventHandlers = {
13+
[K in ProcessedEventChangeset["type"]]: ChangesetHandler<K>;
14+
};
15+
16+
/**
17+
* Creates handlers for managing application-related operations.
18+
*
19+
* @param repository - The application repository instance used for database operations
20+
* @returns An object containing all application-related handlers
21+
*/
22+
export const createProcessedEventHandlers = (
23+
repository: IEventRegistryRepository,
24+
): ProcessedEventHandlers => ({
25+
InsertProcessedEvent: (async (changeset, txConnection): Promise<void> => {
26+
await repository.saveLastProcessedEvent(
27+
changeset.args.chainId,
28+
changeset.args.processedEvent,
29+
txConnection,
30+
);
31+
}) satisfies ChangesetHandler<"InsertProcessedEvent">,
32+
});

packages/data-flow/src/orchestrator.ts

Lines changed: 37 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,12 @@ import {
77
UnsupportedEventException,
88
UnsupportedStrategy,
99
} from "@grants-stack-indexer/processors";
10-
import { RoundNotFound, RoundNotFoundForId } from "@grants-stack-indexer/repository";
10+
import {
11+
Changeset,
12+
IEventRegistryRepository,
13+
RoundNotFound,
14+
RoundNotFoundForId,
15+
} from "@grants-stack-indexer/repository";
1116
import {
1217
Address,
1318
AnyEvent,
@@ -29,7 +34,7 @@ import {
2934
Token,
3035
} from "@grants-stack-indexer/shared";
3136

32-
import type { IEventsFetcher, IEventsRegistry, IStrategyRegistry } from "./interfaces/index.js";
37+
import type { IEventsFetcher, IStrategyRegistry } from "./interfaces/index.js";
3338
import { EventsFetcher } from "./eventsFetcher.js";
3439
import { EventsProcessor } from "./eventsProcessor.js";
3540
import { InvalidEvent } from "./exceptions/index.js";
@@ -72,7 +77,7 @@ export class Orchestrator {
7277
private readonly eventsByBlockContext: Map<number, AnyIndexerFetchedEvent[]>;
7378
private readonly eventsFetcher: IEventsFetcher;
7479
private readonly eventsProcessor: EventsProcessor;
75-
private readonly eventsRegistry: IEventsRegistry;
80+
private readonly eventsRegistry: IEventRegistryRepository;
7681
private readonly strategyRegistry: IStrategyRegistry;
7782
private readonly dataLoader: DataLoader;
7883
private readonly retryHandler: RetryHandler;
@@ -91,7 +96,7 @@ export class Orchestrator {
9196
private dependencies: Readonly<CoreDependencies>,
9297
private indexerClient: IIndexerClient,
9398
private registries: {
94-
eventsRegistry: IEventsRegistry;
99+
eventsRegistry: IEventRegistryRepository;
95100
strategyRegistry: IStrategyRegistry;
96101
},
97102
private fetchLimit: number = 1000,
@@ -113,6 +118,7 @@ export class Orchestrator {
113118
application: this.dependencies.applicationRepository,
114119
donation: this.dependencies.donationRepository,
115120
applicationPayout: this.dependencies.applicationPayoutRepository,
121+
eventRegistry: this.eventsRegistry,
116122
},
117123
this.dependencies.transactionManager,
118124
this.logger,
@@ -146,18 +152,34 @@ export class Orchestrator {
146152
continue;
147153
}
148154

149-
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
150-
...event,
151-
rawEvent: event,
152-
});
153-
154155
await this.retryHandler.execute(
155156
async () => {
156-
await this.handleEvent(event!);
157+
const changesets = await this.handleEvent(event!);
158+
if (changesets) {
159+
await this.dataLoader.applyChanges([
160+
...changesets,
161+
{
162+
type: "InsertProcessedEvent",
163+
args: {
164+
chainId: this.chainId,
165+
processedEvent: {
166+
...event!,
167+
rawEvent: event,
168+
},
169+
},
170+
},
171+
]);
172+
}
157173
},
158174
{ abortSignal: signal },
159175
);
160176
} catch (error: unknown) {
177+
if (event) {
178+
await this.eventsRegistry.saveLastProcessedEvent(this.chainId, {
179+
...event,
180+
rawEvent: event,
181+
});
182+
}
161183
// TODO: notify
162184
if (
163185
error instanceof UnsupportedStrategy ||
@@ -403,7 +425,9 @@ export class Orchestrator {
403425
return tokenPrices;
404426
}
405427

406-
private async handleEvent(event: ProcessorEvent<ContractName, AnyEvent>): Promise<void> {
428+
private async handleEvent(
429+
event: ProcessorEvent<ContractName, AnyEvent>,
430+
): Promise<Changeset[] | undefined> {
407431
event = await this.enhanceStrategyId(event);
408432
if (this.isPoolCreated(event)) {
409433
const handleable = existsHandler(event.strategyId);
@@ -421,12 +445,11 @@ export class Orchestrator {
421445
chainId: this.chainId,
422446
});
423447
// we skip the event if the strategy id is not handled yet
424-
return;
448+
return undefined;
425449
}
426450
}
427451

428-
const changesets = await this.eventsProcessor.processEvent(event);
429-
await this.dataLoader.applyChanges(changesets);
452+
return this.eventsProcessor.processEvent(event);
430453
}
431454

432455
/**

packages/data-flow/src/retroactiveProcessor.ts

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
22
import { existsHandler, UnsupportedEventException } from "@grants-stack-indexer/processors";
3-
import { IStrategyProcessingCheckpointRepository } from "@grants-stack-indexer/repository";
3+
import {
4+
IEventRegistryRepository,
5+
IStrategyProcessingCheckpointRepository,
6+
} from "@grants-stack-indexer/repository";
47
import {
58
Address,
69
AnyEvent,
@@ -20,7 +23,6 @@ import {
2023
EventsFetcher,
2124
EventsProcessor,
2225
IEventsFetcher,
23-
IEventsRegistry,
2426
InvalidEvent,
2527
IStrategyRegistry,
2628
Queue,
@@ -60,7 +62,7 @@ type EventPointer = {
6062
export class RetroactiveProcessor {
6163
private readonly eventsFetcher: IEventsFetcher;
6264
private readonly eventsProcessor: EventsProcessor;
63-
private readonly eventsRegistry: IEventsRegistry;
65+
private readonly eventsRegistry: IEventRegistryRepository;
6466
private readonly strategyRegistry: IStrategyRegistry;
6567
private readonly dataLoader: DataLoader;
6668
private readonly checkpointRepository: IStrategyProcessingCheckpointRepository;
@@ -81,7 +83,7 @@ export class RetroactiveProcessor {
8183
private dependencies: Readonly<CoreDependencies>,
8284
private indexerClient: IIndexerClient,
8385
private registries: {
84-
eventsRegistry: IEventsRegistry;
86+
eventsRegistry: IEventRegistryRepository;
8587
strategyRegistry: IStrategyRegistry;
8688
checkpointRepository: IStrategyProcessingCheckpointRepository;
8789
},
@@ -104,6 +106,7 @@ export class RetroactiveProcessor {
104106
application: this.dependencies.applicationRepository,
105107
donation: this.dependencies.donationRepository,
106108
applicationPayout: this.dependencies.applicationPayoutRepository,
109+
eventRegistry: this.eventsRegistry,
107110
},
108111
this.dependencies.transactionManager,
109112
this.logger,

packages/data-flow/test/data-loader/dataLoader.spec.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {
55
IApplicationPayoutRepository,
66
IApplicationRepository,
77
IDonationRepository,
8+
IEventRegistryRepository,
89
IProjectRepository,
910
IRoundRepository,
1011
ITransactionManager,
@@ -41,6 +42,10 @@ describe("DataLoader", () => {
4142
insertApplicationPayout: vi.fn(),
4243
} as IApplicationPayoutRepository;
4344

45+
const mockEventRegistryRepository = {
46+
saveLastProcessedEvent: vi.fn(),
47+
} as unknown as IEventRegistryRepository;
48+
4449
const logger: ILogger = {
4550
debug: vi.fn(),
4651
error: vi.fn(),
@@ -61,6 +66,7 @@ describe("DataLoader", () => {
6166
application: mockApplicationRepository,
6267
donation: mockDonationRepository,
6368
applicationPayout: mockApplicationPayoutRepository,
69+
eventRegistry: mockEventRegistryRepository,
6470
},
6571
mockTransactionManager,
6672
logger,
Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
import { describe, expect, it, vi } from "vitest";
2+
3+
import { IEventRegistryRepository, TransactionConnection } from "@grants-stack-indexer/repository";
4+
import { ChainId } from "@grants-stack-indexer/shared";
5+
6+
import { createProcessedEventHandlers } from "../../../src/data-loader/handlers/processedEvent.handlers.js";
7+
8+
describe("ProcessedEvent Handlers", () => {
9+
const chainId = 1 as ChainId;
10+
const mockEvent = {
11+
blockNumber: 1,
12+
blockTimestamp: 1234567890,
13+
logIndex: 0,
14+
rawEvent: {},
15+
};
16+
17+
describe("InsertProcessedEvent", () => {
18+
it("saves event to repository within transaction", async () => {
19+
const saveLastProcessedEvent = vi.fn();
20+
const mockRepository = {
21+
saveLastProcessedEvent,
22+
getLastProcessedEvent: vi.fn(),
23+
} as unknown as IEventRegistryRepository<TransactionConnection>;
24+
25+
const handlers = createProcessedEventHandlers(mockRepository);
26+
27+
const mockTx = {} as TransactionConnection;
28+
29+
await handlers.InsertProcessedEvent(
30+
{
31+
type: "InsertProcessedEvent",
32+
args: {
33+
chainId,
34+
processedEvent: mockEvent,
35+
},
36+
},
37+
mockTx,
38+
);
39+
40+
expect(saveLastProcessedEvent).toHaveBeenCalledWith(chainId, mockEvent, mockTx);
41+
});
42+
43+
it("propagates repository errors", async () => {
44+
const error = new Error("Database error");
45+
const saveLastProcessedEvent = vi.fn().mockRejectedValue(error);
46+
const mockRepository = {
47+
saveLastProcessedEvent,
48+
getLastProcessedEvent: vi.fn(),
49+
} as unknown as IEventRegistryRepository<TransactionConnection>;
50+
51+
const handlers = createProcessedEventHandlers(mockRepository);
52+
53+
const mockTx = {} as TransactionConnection;
54+
55+
await expect(
56+
handlers.InsertProcessedEvent(
57+
{
58+
type: "InsertProcessedEvent",
59+
args: {
60+
chainId,
61+
processedEvent: mockEvent,
62+
},
63+
},
64+
mockTx,
65+
),
66+
).rejects.toThrow(error);
67+
});
68+
});
69+
});

0 commit comments

Comments
 (0)