Skip to content

Commit 8ab4b39

Browse files
committed
Merge branch 'feat/optimize-metadata-bootstrap' into fix/pending-round-roles
2 parents 4343cfa + 7a1a855 commit 8ab4b39

37 files changed

+2081
-91
lines changed

.env.example

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,10 +69,10 @@ INDEXER_ADMIN_SECRET=testing
6969
METADATA_SOURCE=public-gateway
7070
PUBLIC_GATEWAY_URLS=["https://ipfs.io","https://dweb.link","https://cloudflare-ipfs.com","https://gateway.pinata.cloud","https://ipfs.infura.io","https://ipfs.fleek.co","https://ipfs.eth.aragon.network","https://ipfs.jes.xxx","https://ipfs.lol","https://ipfs.mle.party"]
7171

72-
RETRY_MAX_ATTEMPTS=100000
73-
RETRY_BASE_DELAY_MS=200
74-
RETRY_MAX_DELAY_MS=1000
75-
RETRY_FACTOR=1.5
72+
RETRY_MAX_ATTEMPTS = 10
73+
RETRY_BASE_DELAY_MS = 200
74+
RETRY_MAX_DELAY_MS = 1000
75+
RETRY_FACTOR = 1.5
7676

7777
PRICING_SOURCE=coingecko
7878
COINGECKO_API_KEY={{YOUR_PRO_KEY}}

apps/processing/.env.example

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,7 @@ COINGECKO_API_TYPE=demo
1818
RETRY_MAX_ATTEMPTS=3
1919
RETRY_BASE_DELAY_MS=3000
2020
RETRY_FACTOR=2
21-
RETRY_MAX_DELAY_MS=300000
21+
RETRY_MAX_DELAY_MS=300000
22+
23+
NOTIFIER_PROVIDER=# 'slack' or 'null'
24+
SLACK_WEBHOOK_URL=

apps/processing/README.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,8 @@ Available options:
4040
| `RETRY_BASE_DELAY_MS` | Base delay for retry attempts | 3000 | No | |
4141
| `RETRY_FACTOR` | Delay factor for retry attempts | 2 | No | |
4242
| `RETRY_MAX_DELAY_MS` | Maximum delay for retry attempts | 300000 | No | |
43+
| `NOTIFIER_PROVIDER` | Notifier provider (slack or null) | null | No | |
44+
| `SLACK_WEBHOOK_URL` | Slack webhook URL | N/A | Only if NOTIFIER_PROVIDER is slack | |
4345

4446
## Available Scripts
4547

apps/processing/src/config/env.ts

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,9 @@ const baseSchema = z.object({
3939
RETRY_BASE_DELAY_MS: z.coerce.number().int().min(1).default(3000), // 3 seconds
4040
RETRY_FACTOR: z.coerce.number().min(1).default(2),
4141
RETRY_MAX_DELAY_MS: z.coerce.number().int().min(1).optional(), // 5 minute
42+
NOTIFIER_PROVIDER: z.enum(["slack", "null"]).default("null"),
43+
SLACK_WEBHOOK_URL: z.string().url().optional(),
44+
LOG_LEVEL: z.enum(["error", "warn", "info", "debug"]).default("info"),
4245
});
4346

4447
const dummyPricingSchema = baseSchema.extend({
@@ -75,6 +78,15 @@ const validationSchema = z
7578
...val,
7679
};
7780
})
81+
.superRefine((data, ctx) => {
82+
if (data.NOTIFIER_PROVIDER === "slack" && !data.SLACK_WEBHOOK_URL) {
83+
ctx.addIssue({
84+
code: "custom",
85+
message: "SLACK_WEBHOOK_URL is required when NOTIFIER_PROVIDER is slack",
86+
path: ["SLACK_WEBHOOK_URL"],
87+
});
88+
}
89+
})
7890
.and(
7991
z
8092
.discriminatedUnion("METADATA_SOURCE", [

apps/processing/src/services/processing.service.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@ export class ProcessingService {
5151
kyselyDatabase,
5252
logger,
5353
retryStrategy,
54+
notifier,
5455
} = sharedDependencies;
5556
const {
5657
eventRegistryRepository,
@@ -84,6 +85,7 @@ export class ProcessingService {
8485
chain.fetchDelayMs,
8586
retryStrategy,
8687
logger,
88+
notifier,
8789
);
8890
const retroactiveProcessor = new RetroactiveProcessor(
8991
chain.id as ChainId,

apps/processing/src/services/sharedDependencies.service.ts

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,16 @@ import {
2121
KyselyStrategyRegistryRepository,
2222
KyselyTransactionManager,
2323
} from "@grants-stack-indexer/repository";
24-
import { ExponentialBackoff, ILogger, Logger, RetryStrategy } from "@grants-stack-indexer/shared";
24+
import {
25+
ExponentialBackoff,
26+
ILogger,
27+
INotifier,
28+
Logger,
29+
NotifierConfig,
30+
NotifierFactory,
31+
NotifierProvider,
32+
RetryStrategy,
33+
} from "@grants-stack-indexer/shared";
2534

2635
import { Environment } from "../config/index.js";
2736

@@ -36,6 +45,7 @@ export type SharedDependencies = {
3645
kyselyDatabase: ReturnType<typeof createKyselyDatabase>;
3746
retryStrategy: RetryStrategy;
3847
logger: ILogger;
48+
notifier: INotifier;
3949
};
4050

4151
/**
@@ -48,6 +58,8 @@ export class SharedDependenciesService {
4858
static async initialize(env: Environment): Promise<SharedDependencies> {
4959
const logger = Logger.getInstance();
5060

61+
const notifier = NotifierFactory.create(this.getNotifierOptions(env), logger);
62+
5163
// Initialize repositories
5264
const kyselyDatabase = createKyselyDatabase(
5365
{
@@ -151,6 +163,26 @@ export class SharedDependenciesService {
151163
kyselyDatabase,
152164
retryStrategy,
153165
logger,
166+
notifier,
167+
};
168+
}
169+
170+
private static getNotifierOptions(env: Environment): NotifierConfig<NotifierProvider> {
171+
if (env.NOTIFIER_PROVIDER === "slack") {
172+
if (!env.SLACK_WEBHOOK_URL) {
173+
throw new Error("SLACK_WEBHOOK_URL is required when NOTIFIER_PROVIDER is 'slack'");
174+
}
175+
176+
return {
177+
notifierProvider: env.NOTIFIER_PROVIDER,
178+
opts: {
179+
webhookUrl: env.SLACK_WEBHOOK_URL,
180+
},
181+
};
182+
}
183+
184+
return {
185+
notifierProvider: "null",
154186
};
155187
}
156188
}

packages/data-flow/src/constants.ts

Lines changed: 0 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,3 @@
1-
/**
2-
* Maximum number of retries for bulk fetching metadata.
3-
*/
4-
export const MAX_BULK_FETCH_METADATA_RETRIES = 10;
5-
6-
/**
7-
* Base delay in milliseconds for bulk fetching metadata retries.
8-
*/
9-
export const METADATA_BULK_FETCH_BASE_DELAY_MS = 1000;
10-
11-
/**
12-
* Backoff factor for bulk fetching metadata retries.
13-
*/
14-
export const METADATA_BULK_FETCH_BACKOFF_FACTOR = 1.5;
15-
161
/**
172
* Maximum concurrency for bulk fetching metadata.
183
*/

packages/data-flow/src/orchestrator.ts

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
11
import { isNativeError } from "util/types";
22
import pMap from "p-map";
3-
import { retryAsyncUntilDefined } from "ts-retry";
4-
import { DelayParameters } from "ts-retry/lib/cjs/retry/options.js";
53

64
import { IIndexerClient } from "@grants-stack-indexer/indexer-client";
75
import { TokenPrice } from "@grants-stack-indexer/pricing";
@@ -24,6 +22,7 @@ import {
2422
ContractName,
2523
Hex,
2624
ILogger,
25+
INotifier,
2726
isAlloEvent,
2827
isStrategyEvent,
2928
ProcessorEvent,
@@ -38,12 +37,7 @@ import {
3837
} from "@grants-stack-indexer/shared";
3938

4039
import type { IEventsFetcher, IStrategyRegistry } from "./interfaces/index.js";
41-
import {
42-
MAX_BULK_FETCH_METADATA_CONCURRENCY,
43-
MAX_BULK_FETCH_METADATA_RETRIES,
44-
METADATA_BULK_FETCH_BACKOFF_FACTOR,
45-
METADATA_BULK_FETCH_BASE_DELAY_MS,
46-
} from "./constants.js";
40+
import { MAX_BULK_FETCH_METADATA_CONCURRENCY } from "./constants.js";
4741
import { EventsFetcher } from "./eventsFetcher.js";
4842
import { EventsProcessor } from "./eventsProcessor.js";
4943
import { InvalidEvent } from "./exceptions/index.js";
@@ -79,8 +73,6 @@ type TokenWithTimestamps = {
7973
* - Retry handling with exponential backoff for transient failures
8074
* - Comprehensive error handling and logging for various failure scenarios
8175
* - Registry tracking of supported/unsupported strategies and events
82-
*
83-
* TODO: Enhance logging and observability
8476
*/
8577
export class Orchestrator {
8678
private readonly eventsQueue: IQueue<ProcessorEvent<ContractName, AnyEvent>>;
@@ -113,6 +105,7 @@ export class Orchestrator {
113105
private fetchDelayInMs: number = 10000,
114106
private retryStrategy: RetryStrategy,
115107
private logger: ILogger,
108+
private notifier: INotifier,
116109
private environment: "development" | "staging" | "production" = "development",
117110
) {
118111
this.eventsFetcher = new EventsFetcher(this.indexerClient);
@@ -199,7 +192,6 @@ export class Orchestrator {
199192
rawEvent: event,
200193
});
201194
}
202-
// TODO: notify
203195
if (
204196
error instanceof UnsupportedStrategy ||
205197
error instanceof InvalidEvent ||
@@ -221,6 +213,11 @@ export class Orchestrator {
221213
className: Orchestrator.name,
222214
chainId: this.chainId,
223215
});
216+
void this.notifier.send(error.message, {
217+
chainId: this.chainId,
218+
event: event!,
219+
stack: error.getFullStack(),
220+
});
224221
} else if (error instanceof Error || isNativeError(error)) {
225222
const shouldIgnoreError = this.shouldIgnoreTimestampsUpdatedError(
226223
error,
@@ -232,6 +229,11 @@ export class Orchestrator {
232229
className: Orchestrator.name,
233230
chainId: this.chainId,
234231
});
232+
void this.notifier.send(error.message, {
233+
chainId: this.chainId,
234+
event: event!,
235+
stack: error.stack,
236+
});
235237
}
236238
} else {
237239
this.logger.error(
@@ -241,6 +243,13 @@ export class Orchestrator {
241243
chainId: this.chainId,
242244
},
243245
);
246+
void this.notifier.send(
247+
`Error processing event: ${stringify(event)} ${error}`,
248+
{
249+
chainId: this.chainId,
250+
event: event!,
251+
},
252+
);
244253
}
245254
}
246255
}
@@ -355,18 +364,8 @@ export class Orchestrator {
355364
metadataIds,
356365
async (id) => {
357366
try {
358-
const result = await retryAsyncUntilDefined(
359-
() => this.dependencies.metadataProvider.getMetadata<unknown>(id),
360-
{
361-
maxTry: MAX_BULK_FETCH_METADATA_RETRIES,
362-
delay: (params: DelayParameters<unknown>) => {
363-
return (
364-
METADATA_BULK_FETCH_BASE_DELAY_MS *
365-
METADATA_BULK_FETCH_BACKOFF_FACTOR ** params.currentTry
366-
);
367-
},
368-
},
369-
);
367+
const result =
368+
await this.dependencies.metadataProvider.getMetadata<unknown>(id);
370369
return { status: "fulfilled", value: result };
371370
} catch (error) {
372371
return { status: "rejected", reason: error };

0 commit comments

Comments
 (0)