Skip to content

Commit ab99914

Browse files
committed
Merge branch 'dev' into feat/fetch-old-events-for-unhandled-strategies
2 parents dd8e25e + 4b4c594 commit ab99914

30 files changed

+555
-228
lines changed

apps/processing/src/services/processing.service.ts

Lines changed: 47 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { optimism } from "viem/chains";
22

33
import { EvmProvider } from "@grants-stack-indexer/chain-providers";
4-
import { Orchestrator } from "@grants-stack-indexer/data-flow";
4+
import {
5+
DatabaseEventRegistry,
6+
DatabaseStrategyRegistry,
7+
InMemoryCachedEventRegistry,
8+
InMemoryCachedStrategyRegistry,
9+
Orchestrator,
10+
} from "@grants-stack-indexer/data-flow";
511
import { ChainId, Logger } from "@grants-stack-indexer/shared";
612

713
import { Environment } from "../config/env.js";
@@ -10,8 +16,10 @@ import { SharedDependencies, SharedDependenciesService } from "./index.js";
1016
/**
1117
* Processor service application
1218
* - Initializes core dependencies (repositories, providers) via SharedDependenciesService
19+
* - Initializes a StrategyRegistry and loads it with strategies from the database
1320
* For each chain:
1421
* - Sets up EVM provider with configured RPC endpoints
22+
* - Instantiates an EventsRegistry and loads it with the last processed event for the chain
1523
* - Creates an Orchestrator instance to coordinate an specific chain:
1624
* - Fetching on-chain events via indexer client
1725
* - Processing events through registered handlers
@@ -23,34 +31,63 @@ export class ProcessingService {
2331
private readonly logger = new Logger({ className: "ProcessingService" });
2432
private readonly kyselyDatabase: SharedDependencies["kyselyDatabase"];
2533

26-
private constructor(env: Environment, sharedDependencies: SharedDependencies) {
27-
const { CHAINS: chains } = env;
28-
const { core, registries, indexerClient, kyselyDatabase } = sharedDependencies;
34+
private constructor(
35+
orchestrators: Map<ChainId, Orchestrator>,
36+
kyselyDatabase: SharedDependencies["kyselyDatabase"],
37+
) {
38+
this.orchestrators = orchestrators;
2939
this.kyselyDatabase = kyselyDatabase;
40+
}
41+
42+
static async initialize(env: Environment): Promise<ProcessingService> {
43+
const sharedDependencies = await SharedDependenciesService.initialize(env);
44+
const { CHAINS: chains } = env;
45+
const { core, registriesRepositories, indexerClient, kyselyDatabase } = sharedDependencies;
46+
const { eventRegistryRepository, strategyRegistryRepository } = registriesRepositories;
47+
const orchestrators: Map<ChainId, Orchestrator> = new Map();
48+
49+
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
50+
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
51+
new DatabaseStrategyRegistry(
52+
new Logger({ className: "DatabaseStrategyRegistry" }),
53+
strategyRegistryRepository,
54+
),
55+
);
56+
const eventsRegistry = new DatabaseEventRegistry(
57+
new Logger({ className: "DatabaseEventRegistry" }),
58+
eventRegistryRepository,
59+
);
3060

3161
for (const chain of chains) {
3262
const chainLogger = new Logger({ chainId: chain.id as ChainId });
3363
// Initialize EVM provider
3464
const evmProvider = new EvmProvider(chain.rpcUrls, optimism, chainLogger);
3565

36-
this.orchestrators.set(
66+
// Initialize events registry for the chain
67+
const cachedEventsRegistry = await InMemoryCachedEventRegistry.initialize(
68+
new Logger({ className: "InMemoryCachedEventRegistry" }),
69+
eventsRegistry,
70+
[chain.id as ChainId],
71+
);
72+
73+
orchestrators.set(
3774
chain.id as ChainId,
3875
new Orchestrator(
3976
chain.id as ChainId,
4077
{ ...core, evmProvider },
4178
indexerClient,
42-
registries,
79+
{
80+
eventsRegistry: cachedEventsRegistry,
81+
strategyRegistry,
82+
},
4383
chain.fetchLimit,
4484
chain.fetchDelayMs,
4585
chainLogger,
4686
),
4787
);
4888
}
49-
}
5089

51-
static async initialize(env: Environment): Promise<ProcessingService> {
52-
const sharedDependencies = await SharedDependenciesService.initialize(env);
53-
return new ProcessingService(env, sharedDependencies);
90+
return new ProcessingService(orchestrators, kyselyDatabase);
5491
}
5592

5693
/**

apps/processing/src/services/sharedDependencies.service.ts

Lines changed: 15 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,15 @@
1-
import {
2-
CoreDependencies,
3-
DatabaseStrategyRegistry,
4-
IEventsRegistry,
5-
InMemoryCachedStrategyRegistry,
6-
InMemoryEventsRegistry,
7-
IStrategyRegistry,
8-
} from "@grants-stack-indexer/data-flow";
1+
import { CoreDependencies } from "@grants-stack-indexer/data-flow";
92
import { EnvioIndexerClient } from "@grants-stack-indexer/indexer-client";
103
import { IpfsProvider } from "@grants-stack-indexer/metadata";
114
import { PricingProviderFactory } from "@grants-stack-indexer/pricing";
125
import {
136
createKyselyDatabase,
7+
IEventRegistryRepository,
8+
IStrategyRegistryRepository,
149
KyselyApplicationPayoutRepository,
1510
KyselyApplicationRepository,
1611
KyselyDonationRepository,
12+
KyselyEventRegistryRepository,
1713
KyselyProjectRepository,
1814
KyselyRoundRepository,
1915
KyselyStrategyRegistryRepository,
@@ -24,9 +20,9 @@ import { Environment } from "../config/index.js";
2420

2521
export type SharedDependencies = {
2622
core: Omit<CoreDependencies, "evmProvider">;
27-
registries: {
28-
eventsRegistry: IEventsRegistry;
29-
strategyRegistry: IStrategyRegistry;
23+
registriesRepositories: {
24+
eventRegistryRepository: IEventRegistryRepository;
25+
strategyRegistryRepository: IStrategyRegistryRepository;
3026
};
3127
indexerClient: EnvioIndexerClient;
3228
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
@@ -35,7 +31,7 @@ export type SharedDependencies = {
3531
/**
3632
* Shared dependencies service
3733
* - Initializes core dependencies (repositories, providers)
38-
* - Initializes registries
34+
* - Initializes registries repositories
3935
* - Initializes indexer client
4036
*/
4137
export class SharedDependenciesService {
@@ -68,20 +64,13 @@ export class SharedDependenciesService {
6864
new Logger({ className: "IpfsProvider" }),
6965
);
7066

71-
// Initialize registries
72-
const eventsRegistry = new InMemoryEventsRegistry(
73-
new Logger({ className: "InMemoryEventsRegistry" }),
74-
);
75-
const strategyRepository = new KyselyStrategyRegistryRepository(
67+
const eventRegistryRepository = new KyselyEventRegistryRepository(
7668
kyselyDatabase,
7769
env.DATABASE_SCHEMA,
7870
);
79-
const strategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
80-
new Logger({ className: "InMemoryCachedStrategyRegistry" }),
81-
new DatabaseStrategyRegistry(
82-
new Logger({ className: "DatabaseStrategyRegistry" }),
83-
strategyRepository,
84-
),
71+
const strategyRegistryRepository = new KyselyStrategyRegistryRepository(
72+
kyselyDatabase,
73+
env.DATABASE_SCHEMA,
8574
);
8675

8776
// Initialize indexer client
@@ -100,9 +89,9 @@ export class SharedDependenciesService {
10089
metadataProvider,
10190
applicationPayoutRepository,
10291
},
103-
registries: {
104-
eventsRegistry,
105-
strategyRegistry,
92+
registriesRepositories: {
93+
eventRegistryRepository,
94+
strategyRegistryRepository,
10695
},
10796
indexerClient,
10897
kyselyDatabase,

apps/processing/test/unit/processing.service.spec.ts

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,13 @@
11
import { afterEach, beforeEach, describe, expect, it, vi } from "vitest";
22

33
import { EvmProvider } from "@grants-stack-indexer/chain-providers";
4-
import { Orchestrator } from "@grants-stack-indexer/data-flow";
4+
import {
5+
DatabaseEventRegistry,
6+
DatabaseStrategyRegistry,
7+
InMemoryCachedEventRegistry,
8+
InMemoryCachedStrategyRegistry,
9+
Orchestrator,
10+
} from "@grants-stack-indexer/data-flow";
511

612
import type { Environment } from "../../src/config/env.js";
713
import { ProcessingService } from "../../src/services/processing.service.js";
@@ -10,7 +16,7 @@ vi.mock("../../src/services/sharedDependencies.service.js", () => ({
1016
SharedDependenciesService: {
1117
initialize: vi.fn(() => ({
1218
core: {},
13-
registries: {},
19+
registriesRepositories: {},
1420
indexerClient: {},
1521
kyselyDatabase: {
1622
destroy: vi.fn(),
@@ -23,6 +29,39 @@ vi.mock("@grants-stack-indexer/chain-providers", () => ({
2329
EvmProvider: vi.fn(),
2430
}));
2531

32+
vi.mock("@grants-stack-indexer/data-flow", async (importOriginal) => {
33+
const actual = await importOriginal<typeof import("@grants-stack-indexer/data-flow")>();
34+
const mockStrategyRegistry = {
35+
getStrategies: vi.fn(),
36+
getStrategyId: vi.fn(),
37+
saveStrategyId: vi.fn(),
38+
};
39+
40+
const mockEventRegistry = {
41+
getLastProcessedEvent: vi.fn(),
42+
saveLastProcessedEvent: vi.fn(),
43+
};
44+
45+
return {
46+
...actual,
47+
InMemoryCachedStrategyRegistry: {
48+
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
49+
},
50+
DatabaseStrategyRegistry: vi.fn().mockImplementation(() => ({
51+
getStrategies: vi.fn(),
52+
getStrategyId: vi.fn(),
53+
saveStrategyId: vi.fn(),
54+
})),
55+
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
56+
getLastProcessedEvent: vi.fn(),
57+
saveLastProcessedEvent: vi.fn(),
58+
})),
59+
InMemoryCachedEventRegistry: {
60+
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
61+
},
62+
};
63+
});
64+
2665
vi.spyOn(Orchestrator.prototype, "run").mockImplementation(async function (signal: AbortSignal) {
2766
while (!signal.aborted) {
2867
await new Promise((resolve) => setTimeout(resolve, 100));
@@ -62,7 +101,12 @@ describe("ProcessingService", () => {
62101
});
63102

64103
it("initializes multiple orchestrators correctly", () => {
104+
expect(InMemoryCachedStrategyRegistry.initialize).toHaveBeenCalledTimes(1);
105+
expect(DatabaseStrategyRegistry).toHaveBeenCalledTimes(1);
106+
expect(DatabaseEventRegistry).toHaveBeenCalledTimes(1);
65107
expect(EvmProvider).toHaveBeenCalledTimes(2);
108+
expect(InMemoryCachedEventRegistry.initialize).toHaveBeenCalledTimes(2);
109+
66110
// Verify orchestrators were created with correct parameters
67111
expect(processingService["orchestrators"].size).toBe(2);
68112

apps/processing/test/unit/sharedDependencies.service.spec.ts

Lines changed: 16 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
2222
getStrategyId: vi.fn(),
2323
saveStrategyId: vi.fn(),
2424
})),
25+
KyselyEventRegistryRepository: vi.fn(),
2526
}));
2627

2728
vi.mock("@grants-stack-indexer/pricing", () => ({
@@ -45,8 +46,12 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
4546
saveStrategyId: vi.fn(),
4647
};
4748

49+
const mockEventRegistry = {
50+
getLastProcessedEvent: vi.fn(),
51+
saveLastProcessedEvent: vi.fn(),
52+
};
53+
4854
return {
49-
InMemoryEventsRegistry: vi.fn(),
5055
InMemoryCachedStrategyRegistry: {
5156
initialize: vi.fn().mockResolvedValue(mockStrategyRegistry),
5257
},
@@ -55,6 +60,13 @@ vi.mock("@grants-stack-indexer/data-flow", () => {
5560
getStrategyId: vi.fn(),
5661
saveStrategyId: vi.fn(),
5762
})),
63+
DatabaseEventRegistry: vi.fn().mockImplementation(() => ({
64+
getLastProcessedEvent: vi.fn(),
65+
saveLastProcessedEvent: vi.fn(),
66+
})),
67+
InMemoryCachedEventRegistry: {
68+
initialize: vi.fn().mockResolvedValue(mockEventRegistry),
69+
},
5870
};
5971
});
6072

@@ -98,7 +110,7 @@ describe("SharedDependenciesService", () => {
98110

99111
// Verify structure of returned dependencies
100112
expect(dependencies).toHaveProperty("core");
101-
expect(dependencies).toHaveProperty("registries");
113+
expect(dependencies).toHaveProperty("registriesRepositories");
102114
expect(dependencies).toHaveProperty("indexerClient");
103115
expect(dependencies).toHaveProperty("kyselyDatabase");
104116

@@ -112,10 +124,7 @@ describe("SharedDependenciesService", () => {
112124
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");
113125

114126
// Verify registries
115-
expect(dependencies.registries).toHaveProperty("eventsRegistry");
116-
expect(dependencies.registries).toHaveProperty("strategyRegistry");
117-
118-
// Verify InMemoryCachedStrategyRegistry initialization
119-
expect(dependencies.registries.strategyRegistry).toBeDefined();
127+
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");
128+
expect(dependencies.registriesRepositories).toHaveProperty("strategyRegistryRepository");
120129
});
121130
});

packages/data-flow/src/eventsRegistry.ts

Lines changed: 0 additions & 40 deletions
This file was deleted.

packages/data-flow/src/external.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
export {
22
DataLoader,
3-
InMemoryEventsRegistry,
43
InMemoryCachedStrategyRegistry,
4+
InMemoryCachedEventRegistry,
5+
DatabaseEventRegistry,
56
DatabaseStrategyRegistry,
67
Orchestrator,
78
} from "./internal.js";
Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
import { AnyEvent, ChainId, ContractName, ProcessorEvent } from "@grants-stack-indexer/shared";
1+
import { NewProcessedEvent, ProcessedEvent } from "@grants-stack-indexer/repository";
2+
import { ChainId } from "@grants-stack-indexer/shared";
23

34
/**
45
* The events registry saves as a checkpoint to the last processed event by the system.
@@ -10,16 +11,11 @@ export interface IEventsRegistry {
1011
* @param chainId - The chain id
1112
* @returns The last processed event or undefined if no event has been processed yet.
1213
*/
13-
getLastProcessedEvent(
14-
chainId: ChainId,
15-
): Promise<ProcessorEvent<ContractName, AnyEvent> | undefined>;
14+
getLastProcessedEvent(chainId: ChainId): Promise<ProcessedEvent | undefined>;
1615
/**
1716
* Save the last processed event by the system
1817
* @param chainId - The chain id
1918
* @param event - The event to save.
2019
*/
21-
saveLastProcessedEvent(
22-
chainId: ChainId,
23-
event: ProcessorEvent<ContractName, AnyEvent>,
24-
): Promise<void>;
20+
saveLastProcessedEvent(chainId: ChainId, event: NewProcessedEvent): Promise<void>;
2521
}

0 commit comments

Comments
 (0)