Skip to content

Commit 2147d4e

Browse files
committed
feat: dataLoader class
1 parent 140c122 commit 2147d4e

21 files changed

+839
-20
lines changed

packages/data-flow/package.json

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
},
3030
"dependencies": {
3131
"@grants-stack-indexer/indexer-client": "workspace:*",
32+
"@grants-stack-indexer/repository": "workspace:*",
3233
"@grants-stack-indexer/shared": "workspace:*",
3334
"viem": "2.21.19"
3435
}
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import {
2+
Changeset,
3+
IApplicationRepository,
4+
IProjectRepository,
5+
IRoundRepository,
6+
} from "@grants-stack-indexer/repository";
7+
8+
import { ExecutionResult, IDataLoader, InvalidChangeset } from "../internal.js";
9+
import {
10+
createApplicationHandlers,
11+
createProjectHandlers,
12+
createRoundHandlers,
13+
} from "./handlers/index.js";
14+
import { ChangesetHandlers } from "./types/index.js";
15+
16+
/**
17+
* DataLoader is responsible for applying changesets to the database.
18+
* It works by:
19+
* 1. Taking an array of changesets representing data modifications
20+
* 2. Validating that handlers exist for all changeset types
21+
* 3. Sequentially executing each changeset using the appropriate handler
22+
* 4. Tracking execution results including successes and failures
23+
* 5. Breaking execution if any changeset fails
24+
*
25+
* The handlers are initialized for different entity types (projects, rounds, applications)
26+
* and stored in a map for lookup during execution.
27+
*/
28+
29+
export class DataLoader implements IDataLoader {
30+
private readonly handlers: ChangesetHandlers;
31+
32+
constructor(
33+
private readonly repositories: {
34+
project: IProjectRepository;
35+
round: IRoundRepository;
36+
application: IApplicationRepository;
37+
},
38+
) {
39+
this.handlers = {
40+
...createProjectHandlers(repositories.project),
41+
...createRoundHandlers(repositories.round),
42+
...createApplicationHandlers(repositories.application),
43+
};
44+
}
45+
46+
/** @inheritdoc */
47+
public async applyChanges(changesets: Changeset[]): Promise<ExecutionResult> {
48+
const result: ExecutionResult = {
49+
changesets: [],
50+
numExecuted: 0,
51+
numSuccessful: 0,
52+
numFailed: 0,
53+
errors: [],
54+
};
55+
56+
const invalidTypes = changesets.filter((changeset) => !this.handlers[changeset.type]);
57+
if (invalidTypes.length > 0) {
58+
throw new InvalidChangeset(invalidTypes.map((changeset) => changeset.type));
59+
}
60+
61+
//TODO: research how to manage transactions so we can rollback on error
62+
for (const changeset of changesets) {
63+
result.numExecuted++;
64+
try {
65+
await this.handlers[changeset.type](changeset as never);
66+
result.changesets.push(changeset.type);
67+
result.numSuccessful++;
68+
} catch (error) {
69+
result.numFailed++;
70+
result.errors.push(
71+
`Failed to apply changeset ${changeset.type}: ${
72+
error instanceof Error ? error.message : String(error)
73+
}`,
74+
);
75+
console.error(error);
76+
break;
77+
}
78+
}
79+
80+
return result;
81+
}
82+
}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
import { IApplicationRepository } from "@grants-stack-indexer/repository";
2+
3+
import { ChangesetHandler } from "../types/index.js";
4+
5+
/**
6+
* Collection of handlers for application-related operations.
7+
* Each handler corresponds to a specific Application changeset type.
8+
*/
9+
export type ApplicationHandlers = {
10+
InsertApplication: ChangesetHandler<"InsertApplication">;
11+
UpdateApplication: ChangesetHandler<"UpdateApplication">;
12+
};
13+
14+
/**
15+
* Creates handlers for managing application-related operations.
16+
*
17+
* @param repository - The application repository instance used for database operations
18+
* @returns An object containing all application-related handlers
19+
*/
20+
export const createApplicationHandlers = (
21+
repository: IApplicationRepository,
22+
): ApplicationHandlers => ({
23+
InsertApplication: (async (changeset): Promise<void> => {
24+
await repository.insertApplication(changeset.args);
25+
}) satisfies ChangesetHandler<"InsertApplication">,
26+
27+
UpdateApplication: (async (changeset): Promise<void> => {
28+
const { chainId, roundId, applicationId, application } = changeset.args;
29+
await repository.updateApplication({ chainId, roundId, id: applicationId }, application);
30+
}) satisfies ChangesetHandler<"UpdateApplication">,
31+
});
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export * from "./application.handlers.js";
2+
export * from "./project.handlers.js";
3+
export * from "./round.handlers.js";
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
import { IProjectRepository } from "@grants-stack-indexer/repository";
2+
3+
import { ChangesetHandler } from "../types/index.js";
4+
5+
/**
6+
* Collection of handlers for project-related operations.
7+
* Each handler corresponds to a specific Project changeset type.
8+
*/
9+
export type ProjectHandlers = {
10+
InsertProject: ChangesetHandler<"InsertProject">;
11+
UpdateProject: ChangesetHandler<"UpdateProject">;
12+
InsertPendingProjectRole: ChangesetHandler<"InsertPendingProjectRole">;
13+
DeletePendingProjectRoles: ChangesetHandler<"DeletePendingProjectRoles">;
14+
InsertProjectRole: ChangesetHandler<"InsertProjectRole">;
15+
DeleteAllProjectRolesByRole: ChangesetHandler<"DeleteAllProjectRolesByRole">;
16+
DeleteAllProjectRolesByRoleAndAddress: ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">;
17+
};
18+
19+
/**
20+
* Creates handlers for managing project-related operations.
21+
*
22+
* @param repository - The project repository instance used for database operations
23+
* @returns An object containing all project-related handlers
24+
*/
25+
export const createProjectHandlers = (repository: IProjectRepository): ProjectHandlers => ({
26+
InsertProject: (async (changeset): Promise<void> => {
27+
const { project } = changeset.args;
28+
await repository.insertProject(project);
29+
}) satisfies ChangesetHandler<"InsertProject">,
30+
31+
UpdateProject: (async (changeset): Promise<void> => {
32+
const { chainId, projectId, project } = changeset.args;
33+
await repository.updateProject({ id: projectId, chainId }, project);
34+
}) satisfies ChangesetHandler<"UpdateProject">,
35+
36+
InsertPendingProjectRole: (async (changeset): Promise<void> => {
37+
const { pendingProjectRole } = changeset.args;
38+
await repository.insertPendingProjectRole(pendingProjectRole);
39+
}) satisfies ChangesetHandler<"InsertPendingProjectRole">,
40+
41+
DeletePendingProjectRoles: (async (changeset): Promise<void> => {
42+
const { ids } = changeset.args;
43+
await repository.deleteManyPendingProjectRoles(ids);
44+
}) satisfies ChangesetHandler<"DeletePendingProjectRoles">,
45+
46+
InsertProjectRole: (async (changeset): Promise<void> => {
47+
const { projectRole } = changeset.args;
48+
await repository.insertProjectRole(projectRole);
49+
}) satisfies ChangesetHandler<"InsertProjectRole">,
50+
51+
DeleteAllProjectRolesByRole: (async (changeset): Promise<void> => {
52+
const { chainId, projectId, role } = changeset.args.projectRole;
53+
await repository.deleteManyProjectRoles(chainId, projectId, role);
54+
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRole">,
55+
56+
DeleteAllProjectRolesByRoleAndAddress: (async (changeset): Promise<void> => {
57+
const { chainId, projectId, role, address } = changeset.args.projectRole;
58+
await repository.deleteManyProjectRoles(chainId, projectId, role, address);
59+
}) satisfies ChangesetHandler<"DeleteAllProjectRolesByRoleAndAddress">,
60+
});
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
import { IRoundRepository } from "@grants-stack-indexer/repository";
2+
3+
import { ChangesetHandler } from "../types/index.js";
4+
5+
/**
6+
* Collection of handlers for round-related operations.
7+
* Each handler corresponds to a specific Round changeset type.
8+
*/
9+
export type RoundHandlers = {
10+
InsertRound: ChangesetHandler<"InsertRound">;
11+
UpdateRound: ChangesetHandler<"UpdateRound">;
12+
UpdateRoundByStrategyAddress: ChangesetHandler<"UpdateRoundByStrategyAddress">;
13+
IncrementRoundFundedAmount: ChangesetHandler<"IncrementRoundFundedAmount">;
14+
IncrementRoundTotalDistributed: ChangesetHandler<"IncrementRoundTotalDistributed">;
15+
InsertPendingRoundRole: ChangesetHandler<"InsertPendingRoundRole">;
16+
DeletePendingRoundRoles: ChangesetHandler<"DeletePendingRoundRoles">;
17+
InsertRoundRole: ChangesetHandler<"InsertRoundRole">;
18+
DeleteAllRoundRolesByRoleAndAddress: ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">;
19+
};
20+
21+
/**
22+
* Creates handlers for managing round-related operations.
23+
*
24+
* @param repository - The round repository instance used for database operations
25+
* @returns An object containing all round-related handlers
26+
*/
27+
export const createRoundHandlers = (repository: IRoundRepository): RoundHandlers => ({
28+
InsertRound: (async (changeset): Promise<void> => {
29+
const { round } = changeset.args;
30+
await repository.insertRound(round);
31+
}) satisfies ChangesetHandler<"InsertRound">,
32+
33+
UpdateRound: (async (changeset): Promise<void> => {
34+
const { chainId, roundId, round } = changeset.args;
35+
await repository.updateRound({ id: roundId, chainId }, round);
36+
}) satisfies ChangesetHandler<"UpdateRound">,
37+
38+
UpdateRoundByStrategyAddress: (async (changeset): Promise<void> => {
39+
const { chainId, strategyAddress, round } = changeset.args;
40+
if (round) {
41+
await repository.updateRound({ strategyAddress, chainId: chainId }, round);
42+
}
43+
}) satisfies ChangesetHandler<"UpdateRoundByStrategyAddress">,
44+
45+
IncrementRoundFundedAmount: (async (changeset): Promise<void> => {
46+
const { chainId, roundId, fundedAmount, fundedAmountInUsd } = changeset.args;
47+
await repository.incrementRoundFunds(
48+
{
49+
chainId,
50+
roundId,
51+
},
52+
fundedAmount,
53+
fundedAmountInUsd,
54+
);
55+
}) satisfies ChangesetHandler<"IncrementRoundFundedAmount">,
56+
57+
IncrementRoundTotalDistributed: (async (changeset): Promise<void> => {
58+
const { chainId, roundId, amount } = changeset.args;
59+
await repository.incrementRoundTotalDistributed(
60+
{
61+
chainId,
62+
roundId,
63+
},
64+
amount,
65+
);
66+
}) satisfies ChangesetHandler<"IncrementRoundTotalDistributed">,
67+
68+
InsertPendingRoundRole: (async (changeset): Promise<void> => {
69+
const { pendingRoundRole } = changeset.args;
70+
await repository.insertPendingRoundRole(pendingRoundRole);
71+
}) satisfies ChangesetHandler<"InsertPendingRoundRole">,
72+
73+
DeletePendingRoundRoles: (async (changeset): Promise<void> => {
74+
const { ids } = changeset.args;
75+
await repository.deleteManyPendingRoundRoles(ids);
76+
}) satisfies ChangesetHandler<"DeletePendingRoundRoles">,
77+
78+
InsertRoundRole: (async (changeset): Promise<void> => {
79+
const { roundRole } = changeset.args;
80+
await repository.insertRoundRole(roundRole);
81+
}) satisfies ChangesetHandler<"InsertRoundRole">,
82+
83+
DeleteAllRoundRolesByRoleAndAddress: (async (changeset): Promise<void> => {
84+
const { chainId, roundId, role, address } = changeset.args.roundRole;
85+
await repository.deleteManyRoundRolesByRoleAndAddress(chainId, roundId, role, address);
86+
}) satisfies ChangesetHandler<"DeleteAllRoundRolesByRoleAndAddress">,
87+
});
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./dataLoader.js";
Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { Changeset } from "@grants-stack-indexer/repository";
2+
3+
export type ChangesetHandler<T extends Changeset["type"]> = (
4+
changeset: Extract<Changeset, { type: T }>,
5+
) => Promise<void>;
6+
7+
export type ChangesetHandlers = {
8+
[K in Changeset["type"]]: ChangesetHandler<K>;
9+
};
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
export * from "./invalidChangeset.exception.js";
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
export class InvalidChangeset extends Error {
2+
constructor(invalidTypes: string[]) {
3+
super(`Invalid changeset types: ${invalidTypes.join(", ")}`);
4+
}
5+
}

packages/data-flow/src/external.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
11
export { EventsFetcher } from "./internal.js";
2+
3+
export { DataLoader } from "./internal.js";
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
import type { Changeset } from "@grants-stack-indexer/repository";
2+
3+
import type { ExecutionResult } from "../internal.js";
4+
5+
export interface IDataLoader {
6+
/**
7+
* Applies the changesets to the database.
8+
* @param changesets - The changesets to apply.
9+
* @returns The execution result.
10+
* @throws {InvalidChangeset} if there are changesets with invalid types.
11+
*/
12+
applyChanges(changesets: Changeset[]): Promise<ExecutionResult>;
13+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import { AnyProtocolEvent } from "@grants-stack-indexer/shared";
2+
3+
/**
4+
* Interface for the events fetcher
5+
*/
6+
export interface IEventsFetcher {
7+
/**
8+
* Fetch the events by block number and log index for a chain
9+
* @param chainId id of the chain
10+
* @param blockNumber block number to fetch events from
11+
* @param logIndex log index in the block to fetch events from
12+
* @param limit limit of events to fetch
13+
*/
14+
fetchEventsByBlockNumberAndLogIndex(
15+
chainId: bigint,
16+
blockNumber: bigint,
17+
logIndex: number,
18+
limit?: number,
19+
): Promise<AnyProtocolEvent[]>;
20+
}
Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,2 @@
1-
import { AnyProtocolEvent } from "@grants-stack-indexer/shared";
2-
3-
/**
4-
* Interface for the events fetcher
5-
*/
6-
export interface IEventsFetcher {
7-
/**
8-
* Fetch the events by block number and log index for a chain
9-
* @param chainId id of the chain
10-
* @param blockNumber block number to fetch events from
11-
* @param logIndex log index in the block to fetch events from
12-
* @param limit limit of events to fetch
13-
*/
14-
fetchEventsByBlockNumberAndLogIndex(
15-
chainId: bigint,
16-
blockNumber: bigint,
17-
logIndex: number,
18-
limit?: number,
19-
): Promise<AnyProtocolEvent[]>;
20-
}
1+
export * from "./eventsFetcher.interface.js";
2+
export * from "./dataLoader.interface.js";

packages/data-flow/src/internal.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,5 @@
1+
export * from "./types/index.js";
2+
export * from "./interfaces/index.js";
3+
export * from "./exceptions/index.js";
4+
export * from "./data-loader/index.js";
15
export * from "./eventsFetcher.js";

packages/data-flow/src/types/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
import { Changeset } from "@grants-stack-indexer/repository";
2+
3+
export type ExecutionResult = {
4+
changesets: Changeset["type"][];
5+
numExecuted: number;
6+
numSuccessful: number;
7+
numFailed: number;
8+
errors: string[];
9+
};

0 commit comments

Comments
 (0)