Skip to content

add logging #113

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: dev
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
406 changes: 203 additions & 203 deletions apps/indexer/config.yaml

Large diffs are not rendered by default.

34 changes: 31 additions & 3 deletions apps/processing/src/services/processing.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ export class ProcessingService {

static async initialize(env: Environment): Promise<ProcessingService> {
const sharedDependencies = await SharedDependenciesService.initialize(env);
const { logger } = sharedDependencies;
logger.debug("Shared dependencies initialized");

const { CHAINS: chains } = env;
const {
core,
registriesRepositories,
indexerClient,
kyselyDatabase,
logger,
retryStrategy,
notifier,
} = sharedDependencies;
Expand All @@ -60,29 +62,37 @@ export class ProcessingService {
strategyRegistryRepository,
strategyProcessingCheckpointRepository,
} = registriesRepositories;

const orchestrators: Map<ChainId, [Orchestrator, RetroactiveProcessor]> = new Map();
logger.debug("Starting chain initialization", { chainCount: chains.length });

const strategyRegistry = new DatabaseStrategyRegistry(logger, strategyRegistryRepository);
const eventsRegistry = new DatabaseEventRegistry(logger, eventRegistryRepository);
logger.debug("Created base registries");

const viemChainsArray = Object.values(viemChains) as Chain[];

for (const chain of chains) {
// Initialize EVM provider
logger.debug("Processing chain configuration", { chainId: chain.id });

const viemChain = extractChain({
chains: viemChainsArray,
id: chain.id,
});
if (!viemChain) {
logger.error("Invalid chain configuration", { chainId: chain.id });
throw new InvalidChainId(chain.id);
}

const evmProvider = new EvmProvider(chain.rpcUrls, viemChain, logger);
logger.debug("EVM provider created", { chainId: chain.id });

const cachedStrategyRegistry = await InMemoryCachedStrategyRegistry.initialize(
logger,
strategyRegistry,
chain.id as ChainId,
);
logger.debug("Cached strategy registry initialized", { chainId: chain.id });

const orchestrator = new Orchestrator(
chain.id as ChainId,
Expand All @@ -98,6 +108,8 @@ export class ProcessingService {
logger,
notifier,
);
logger.debug("Orchestrator created", { chainId: chain.id });

const retroactiveProcessor = new RetroactiveProcessor(
chain.id as ChainId,
{ ...core, evmProvider },
Expand All @@ -111,10 +123,13 @@ export class ProcessingService {
retryStrategy,
logger,
);
logger.debug("Retroactive processor created", { chainId: chain.id });

orchestrators.set(chain.id as ChainId, [orchestrator, retroactiveProcessor]);
logger.debug("Chain setup completed", { chainId: chain.id });
}

logger.debug("All chains initialized", { chainCount: orchestrators.size });
return new ProcessingService(orchestrators, kyselyDatabase, logger);
}

Expand All @@ -127,10 +142,10 @@ export class ProcessingService {
this.logger.info("Starting processor service...");

const abortController = new AbortController();
this.logger.debug("Created abort controller");

const orchestratorProcesses: Promise<void>[] = [];

// Handle graceful shutdown
process.on("SIGINT", () => {
this.logger.info("Received SIGINT signal. Shutting down...");
abortController.abort();
Expand All @@ -140,14 +155,20 @@ export class ProcessingService {
this.logger.info("Received SIGTERM signal. Shutting down...");
abortController.abort();
});
this.logger.debug("Signal handlers registered");

try {
for (const [orchestrator, _] of this.orchestrators.values()) {
this.logger.info(`Starting orchestrator for chain ${orchestrator.chainId}...`);
orchestratorProcesses.push(orchestrator.run(abortController.signal));
this.logger.debug("Orchestrator process queued", { chainId: orchestrator.chainId });
}

this.logger.debug("Waiting for all orchestrator processes", {
processCount: orchestratorProcesses.length,
});
await Promise.allSettled(orchestratorProcesses);
this.logger.debug("All orchestrator processes completed");
} catch (error) {
this.logger.error(`Processor service failed: ${error}`);
throw error;
Expand All @@ -161,7 +182,13 @@ export class ProcessingService {
async processRetroactiveEvents(): Promise<void> {
this.logger.info("Processing retroactive events...");
for (const [_, retroactiveProcessor] of this.orchestrators.values()) {
this.logger.debug("Starting retroactive processing", {
chainId: retroactiveProcessor.chainId,
});
await retroactiveProcessor.processRetroactiveStrategies();
this.logger.debug("Completed retroactive processing", {
chainId: retroactiveProcessor.chainId,
});
}
}

Expand All @@ -173,6 +200,7 @@ export class ProcessingService {
try {
this.logger.info("Releasing resources...");
await this.kyselyDatabase.destroy();
this.logger.debug("Database resources released");
} catch (error) {
this.logger.error(`Error releasing resources: ${error}`);
}
Expand Down
2 changes: 1 addition & 1 deletion packages/data-flow/src/eventsFetcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ export class EventsFetcher implements IEventsFetcher {
chainId,
blockNumber,
logIndex,
limit = 100,
limit = 500,
allowPartialLastBlock = true,
}: GetEventsAfterBlockNumberAndLogIndexParams): Promise<AnyIndexerFetchedEvent[]> {
return await this.indexerClient.getEventsAfterBlockNumberAndLogIndex({
Expand Down
Loading
Loading