Skip to content

Commit 93c6b91

Browse files
committed
feat: use transaction manager in data loader
1 parent ea2c124 commit 93c6b91

21 files changed

+337
-256
lines changed

apps/processing/src/services/sharedDependencies.service.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
KyselyRoundRepository,
1616
KyselyStrategyProcessingCheckpointRepository,
1717
KyselyStrategyRegistryRepository,
18+
KyselyTransactionManager,
1819
} from "@grants-stack-indexer/repository";
1920
import { ILogger, Logger } from "@grants-stack-indexer/shared";
2021

@@ -50,6 +51,8 @@ export class SharedDependenciesService {
5051
logger,
5152
);
5253

54+
const transactionManager = new KyselyTransactionManager(kyselyDatabase);
55+
5356
const projectRepository = new KyselyProjectRepository(kyselyDatabase, env.DATABASE_SCHEMA);
5457
const roundRepository = new KyselyRoundRepository(kyselyDatabase, env.DATABASE_SCHEMA);
5558
const applicationRepository = new KyselyApplicationRepository(
@@ -97,6 +100,7 @@ export class SharedDependenciesService {
97100
donationRepository,
98101
metadataProvider,
99102
applicationPayoutRepository,
103+
transactionManager,
100104
},
101105
registriesRepositories: {
102106
eventRegistryRepository,

apps/processing/test/unit/sharedDependencies.service.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ vi.mock("@grants-stack-indexer/repository", () => ({
4242
})),
4343
KyselyEventRegistryRepository: vi.fn(),
4444
KyselyStrategyProcessingCheckpointRepository: vi.fn(),
45+
KyselyTransactionManager: vi.fn(),
4546
}));
4647

4748
vi.mock("@grants-stack-indexer/pricing", () => ({
@@ -145,6 +146,7 @@ describe("SharedDependenciesService", () => {
145146
expect(dependencies.core).toHaveProperty("donationRepository");
146147
expect(dependencies.core).toHaveProperty("metadataProvider");
147148
expect(dependencies.core).toHaveProperty("applicationPayoutRepository");
149+
expect(dependencies.core).toHaveProperty("transactionManager");
148150

149151
// Verify registries
150152
expect(dependencies.registriesRepositories).toHaveProperty("eventRegistryRepository");

packages/data-flow/src/data-loader/dataLoader.ts

Lines changed: 19 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,11 @@ import {
55
IDonationRepository,
66
IProjectRepository,
77
IRoundRepository,
8+
ITransactionManager,
89
} from "@grants-stack-indexer/repository";
9-
import { ILogger, stringify } from "@grants-stack-indexer/shared";
10+
import { ILogger } from "@grants-stack-indexer/shared";
1011

11-
import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js";
12+
import { IDataLoader, InvalidChangeset } from "../internal.js";
1213
import {
1314
createApplicationHandlers,
1415
createApplicationPayoutHandlers,
@@ -42,6 +43,7 @@ export class DataLoader implements IDataLoader {
4243
donation: IDonationRepository;
4344
applicationPayout: IApplicationPayoutRepository;
4445
},
46+
private readonly transactionManager: ITransactionManager,
4547
private readonly logger: ILogger,
4648
) {
4749
this.handlers = {
@@ -54,40 +56,26 @@ export class DataLoader implements IDataLoader {
5456
}
5557

5658
/** @inheritdoc */
57-
public async applyChanges(changesets: Changeset[]): Promise<ExecutionResult> {
58-
const result: ExecutionResult = {
59-
changesets: [],
60-
numExecuted: 0,
61-
numSuccessful: 0,
62-
numFailed: 0,
63-
errors: [],
64-
};
65-
59+
public async applyChanges(changesets: Changeset[]): Promise<void> {
6660
const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]);
6761
if (invalidTypes.length > 0) {
6862
throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type));
6963
}
7064

71-
//TODO: research how to manage transactions so we can rollback on error
72-
for (const changeset of changesets) {
73-
result.numExecuted++;
74-
try {
75-
//TODO: inside each handler, we should add zod validation that the args match the expected type
76-
await this.handlers[changeset.type](changeset as never);
77-
result.changesets.push(changeset.type);
78-
result.numSuccessful++;
79-
} catch (error) {
80-
result.numFailed++;
81-
result.errors.push(
82-
`Failed to apply changeset ${changeset.type}: ${
83-
error instanceof Error ? error.message : String(error)
84-
}`,
85-
);
86-
this.logger.error(`${stringify(error, Object.getOwnPropertyNames(error))}`);
87-
break;
88-
}
89-
}
65+
await this.transactionManager.runInTransaction(async (tx) => {
66+
this.logger.debug("Starting transaction...");
67+
for (const changeset of changesets) {
68+
try {
69+
//TODO: inside each handler, we should add zod validation that the args match the expected type
70+
await this.handlers[changeset.type](changeset as never, tx);
71+
} catch (error) {
72+
this.logger.debug(
73+
`Error applying changeset ${changeset.type}. Rolling back transaction with ${changesets.length} changesets`,
74+
);
9075

91-
return result;
76+
throw error;
77+
}
78+
}
79+
});
9280
}
9381
}

packages/data-flow/src/data-loader/handlers/application.handlers.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,16 @@ export type ApplicationHandlers = {
1919
export const createApplicationHandlers = (
2020
repository: IApplicationRepository,
2121
): ApplicationHandlers => ({
22-
InsertApplication: (async (changeset): Promise<void> => {
23-
await repository.insertApplication(changeset.args);
22+
InsertApplication: (async (changeset, txConnection): Promise<void> => {
23+
await repository.insertApplication(changeset.args, txConnection);
2424
}) satisfies ChangesetHandler<"InsertApplication">,
2525

26-
UpdateApplication: (async (changeset): Promise<void> => {
26+
UpdateApplication: (async (changeset, txConnection): Promise<void> => {
2727
const { chainId, roundId, applicationId, application } = changeset.args;
28-
await repository.updateApplication({ chainId, roundId, id: applicationId }, application);
28+
await repository.updateApplication(
29+
{ chainId, roundId, id: applicationId },
30+
application,
31+
txConnection,
32+
);
2933
}) satisfies ChangesetHandler<"UpdateApplication">,
3034
});

packages/data-flow/src/data-loader/handlers/applicationPayout.handlers.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ export type ApplicationPayoutHandlers = {
2222
export const createApplicationPayoutHandlers = (
2323
repository: IApplicationPayoutRepository,
2424
): ApplicationPayoutHandlers => ({
25-
InsertApplicationPayout: (async (changeset): Promise<void> => {
26-
await repository.insertApplicationPayout(changeset.args.applicationPayout);
25+
InsertApplicationPayout: (async (changeset, txConnection): Promise<void> => {
26+
await repository.insertApplicationPayout(changeset.args.applicationPayout, txConnection);
2727
}) satisfies ChangesetHandler<"InsertApplicationPayout">,
2828
});

packages/data-flow/src/data-loader/handlers/donation.handlers.ts

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,11 @@ export type DonationHandlers = {
1717
* @returns An object containing all application-related handlers
1818
*/
1919
export const createDonationHandlers = (repository: IDonationRepository): DonationHandlers => ({
20-
InsertDonation: (async (changeset): Promise<void> => {
21-
await repository.insertDonation(changeset.args.donation);
20+
InsertDonation: (async (changeset, txConnection): Promise<void> => {
21+
await repository.insertDonation(changeset.args.donation, txConnection);
2222
}) satisfies ChangesetHandler<"InsertDonation">,
2323

24-
InsertManyDonations: (async (changeset): Promise<void> => {
25-
await repository.insertManyDonations(changeset.args.donations);
24+
InsertManyDonations: (async (changeset, txConnection): Promise<void> => {
25+
await repository.insertManyDonations(changeset.args.donations, txConnection);
2626
}) satisfies ChangesetHandler<"InsertManyDonations">,
2727
});

packages/data-flow/src/data-loader/handlers/project.handlers.ts

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -17,38 +17,38 @@ export type ProjectHandlers = {
1717
* @returns An object containing all project-related handlers
1818
*/
1919
export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({
20-
InsertProject: (async (changeset): Promise<void> => {
20+
InsertProject: (async (changeset, txConnection): Promise<void> => {
2121
const { project } = changeset.args;
22-
await repository.insertProject(project);
22+
await repository.insertProject(project, txConnection);
2323
}) satisfies ChangesetHandler<"InsertProject">,
2424

25-
UpdateProject: (async (changeset): Promise<void> => {
25+
UpdateProject: (async (changeset, txConnection): Promise<void> => {
2626
const { chainId, projectId, project } = changeset.args;
27-
await repository.updateProject({ id: projectId, chainId }, project);
27+
await repository.updateProject({ id: projectId, chainId }, project, txConnection);
2828
}) satisfies ChangesetHandler<"UpdateProject">,
2929

30-
InsertPendingProjectRole: (async (changeset): Promise<void> => {
30+
InsertPendingProjectRole: (async (changeset, txConnection): Promise<void> => {
3131
const { pendingProjectRole } = changeset.args;
32-
await repository.insertPendingProjectRole(pendingProjectRole);
32+
await repository.insertPendingProjectRole(pendingProjectRole, txConnection);
3333
}) satisfies ChangesetHandler<"InsertPendingProjectRole">,
3434

35-
DeletePendingProjectRoles: (async (changeset): Promise<void> => {
35+
DeletePendingProjectRoles: (async (changeset, txConnection): Promise<void> => {
3636
const { ids } = changeset.args;
37-
await repository.deleteManyPendingProjectRoles(ids);
37+
await repository.deleteManyPendingProjectRoles(ids, txConnection);
3838
}) satisfies ChangesetHandler<"DeletePendingProjectRoles">,
3939

40-
InsertProjectRole: (async (changeset): Promise<void> => {
40+
InsertProjectRole: (async (changeset, txConnection): Promise<void> => {
4141
const { projectRole } = changeset.args;
42-
await repository.insertProjectRole(projectRole);
42+
await repository.insertProjectRole(projectRole, txConnection);
4343
}) satisfies ChangesetHandler<"InsertProjectRole">,
4444

45-
DeleteAllProjectRolesByRole: (async (changeset): Promise<void> => {
45+
DeleteAllProjectRolesByRole: (async (changeset, txConnection): Promise<void> => {
4646
const { chainId, projectId, role } = changeset.args.projectRole;
47-
await repository.deleteManyProjectRoles(chainId, projectId, role);
47+
await repository.deleteManyProjectRoles(chainId, projectId, role, undefined, txConnection);
4848
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">,
4949

50-
DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise<void> => {
50+
DeleteAllProjectRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
5151
const { chainId, projectId, role, address } = changeset.args.projectRole;
52-
await repository.deleteManyProjectRoles(chainId, projectId, role, address);
52+
await repository.deleteManyProjectRoles(chainId, projectId, role, address, txConnection);
5353
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">,
5454
});

packages/data-flow/src/data-loader/handlers/round.handlers.ts

Lines changed: 28 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -17,24 +17,28 @@ export type RoundHandlers = {
1717
* @returns An object containing all round-related handlers
1818
*/
1919
export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({
20-
InsertRound: (async (changeset): Promise<void> => {
20+
InsertRound: (async (changeset, txConnection): Promise<void> => {
2121
const { round } = changeset.args;
22-
await repository.insertRound(round);
22+
await repository.insertRound(round, txConnection);
2323
}) satisfies ChangesetHandler<"InsertRound">,
2424

25-
UpdateRound: (async (changeset): Promise<void> => {
25+
UpdateRound: (async (changeset, txConnection): Promise<void> => {
2626
const { chainId, roundId, round } = changeset.args;
27-
await repository.updateRound({ id: roundId, chainId }, round);
27+
await repository.updateRound({ id: roundId, chainId }, round, txConnection);
2828
}) satisfies ChangesetHandler<"UpdateRound">,
2929

30-
UpdateRoundByStrategyAddress: (async (changeset): Promise<void> => {
30+
UpdateRoundByStrategyAddress: (async (changeset, txConnection): Promise<void> => {
3131
const { chainId, strategyAddress, round } = changeset.args;
3232
if (round) {
33-
await repository.updateRound({ strategyAddress, chainId: chainId }, round);
33+
await repository.updateRound(
34+
{ strategyAddress, chainId: chainId },
35+
round,
36+
txConnection,
37+
);
3438
}
3539
}) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">,
3640

37-
IncrementRoundFundedAmount: (async (changeset): Promise<void> => {
41+
IncrementRoundFundedAmount: (async (changeset, txConnection): Promise<void> => {
3842
const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args;
3943
await repository.incrementRoundFunds(
4044
{
@@ -43,37 +47,45 @@ export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers
4347
},
4448
fundedAmount,
4549
fundedAmountInUsd,
50+
txConnection,
4651
);
4752
}) satisfies ChangesetHandler<"IncrementRoundFundedAmount">,
4853

49-
IncrementRoundTotalDistributed: (async (changeset): Promise<void> => {
54+
IncrementRoundTotalDistributed: (async (changeset, txConnection): Promise<void> => {
5055
const { chainId, roundId, amount } = changeset.args;
5156
await repository.incrementRoundTotalDistributed(
5257
{
5358
chainId,
5459
roundId,
5560
},
5661
amount,
62+
txConnection,
5763
);
5864
}) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">,
5965

60-
InsertPendingRoundRole: (async (changeset): Promise<void> => {
66+
InsertPendingRoundRole: (async (changeset, txConnection): Promise<void> => {
6167
const { pendingRoundRole } = changeset.args;
62-
await repository.insertPendingRoundRole(pendingRoundRole);
68+
await repository.insertPendingRoundRole(pendingRoundRole, txConnection);
6369
}) satisfies ChangesetHandler<"InsertPendingRoundRole">,
6470

65-
DeletePendingRoundRoles: (async (changeset): Promise<void> => {
71+
DeletePendingRoundRoles: (async (changeset, txConnection): Promise<void> => {
6672
const { ids } = changeset.args;
67-
await repository.deleteManyPendingRoundRoles(ids);
73+
await repository.deleteManyPendingRoundRoles(ids, txConnection);
6874
}) satisfies ChangesetHandler<"DeletePendingRoundRoles">,
6975

70-
InsertRoundRole: (async (changeset): Promise<void> => {
76+
InsertRoundRole: (async (changeset, txConnection): Promise<void> => {
7177
const { roundRole } = changeset.args;
72-
await repository.insertRoundRole(roundRole);
78+
await repository.insertRoundRole(roundRole, txConnection);
7379
}) satisfies ChangesetHandler<"InsertRoundRole">,
7480

75-
DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise<void> => {
81+
DeleteAllRoundRolesByRoleAndAddress: (async (changeset, txConnection): Promise<void> => {
7682
const { chainId, roundId, role, address } = changeset.args.roundRole;
77-
await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address);
83+
await repository.deleteManyRoundRolesByRoleAndAddress(
84+
chainId,
85+
roundId,
86+
role,
87+
address,
88+
txConnection,
89+
);
7890
}) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">,
7991
});

packages/data-flow/src/data-loader/types/index.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
1-
import { Changeset } from "@grants-stack-indexer/repository";
1+
import { Changeset, TransactionConnection } from "@grants-stack-indexer/repository";
22

33
export type ChangesetHandler<T extends Changeset["type"]> = (
44
changeset: Extract<Changeset, { type: T }>,
5+
txConnection?: TransactionConnection,
56
) => Promise<void>;
67

78
export type ChangesetHandlers = {
Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,11 @@
11
import type { Changeset } from "@grants-stack-indexer/repository";
22

3-
import type { ExecutionResult } from "../internal.js";
4-
53
export interface IDataLoader {
64
/**
75
* Applies the changesets to the database.
86
* @param changesets - The changesets to apply.
97
* @returns The execution result.
108
* @throws {InvalidChangeset} if there are changesets with invalid types.
119
*/
12-
applyChanges(changesets: Changeset[]): Promise<ExecutionResult>;
10+
applyChanges(changesets: Changeset[]): Promise<void>;
1311
}

packages/data-flow/src/orchestrator.ts

Lines changed: 2 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,7 @@ export class Orchestrator {
9494
donation: this.dependencies.donationRepository,
9595
applicationPayout: this.dependencies.applicationPayoutRepository,
9696
},
97+
this.dependencies.transactionManager,
9798
this.logger,
9899
);
99100
this.eventsQueue = new Queue<ProcessorEvent<ContractName, AnyEvent>>(fetchLimit);
@@ -145,20 +146,7 @@ export class Orchestrator {
145146
}
146147

147148
const changesets = await this.eventsProcessor.processEvent(event);
148-
const executionResult = await this.dataLoader.applyChanges(changesets);
149-
150-
if (executionResult.numFailed > 0) {
151-
//TODO: should we retry the failed changesets?
152-
this.logger.error(
153-
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(
154-
event,
155-
)}`,
156-
{
157-
className: Orchestrator.name,
158-
chainId: this.chainId,
159-
},
160-
);
161-
}
149+
await this.dataLoader.applyChanges(changesets);
162150
} catch (error: unknown) {
163151
// TODO: improve error handling, retries and notify
164152
if (

packages/data-flow/src/retroactiveProcessor.ts

Lines changed: 2 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ export class RetroactiveProcessor {
100100
donation: this.dependencies.donationRepository,
101101
applicationPayout: this.dependencies.applicationPayoutRepository,
102102
},
103+
this.dependencies.transactionManager,
103104
this.logger,
104105
);
105106
}
@@ -208,17 +209,7 @@ export class RetroactiveProcessor {
208209

209210
event.strategyId = strategyId;
210211
const changesets = await this.eventsProcessor.processEvent(event);
211-
const executionResult = await this.dataLoader.applyChanges(changesets);
212-
213-
if (executionResult.numFailed > 0) {
214-
this.logger.error(
215-
`Failed to apply changesets. ${executionResult.errors.join("\n")} Event: ${stringify(event)}`,
216-
{
217-
className: RetroactiveProcessor.name,
218-
chainId: this.chainId,
219-
},
220-
);
221-
}
212+
await this.dataLoader.applyChanges(changesets);
222213
} catch (error) {
223214
if (error instanceof InvalidEvent || error instanceof UnsupportedEventException) {
224215
// Expected errors that we can safely ignore

0 commit comments

Comments
 (0)