Skip to content

feat: Refactor event processing code #2152

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions crates/walrus-e2e-tests/tests/test_event_blobs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ async fn test_event_blobs() -> anyhow::Result<()> {
.read_blob::<Primary>(&prev_event_blob)
.await
.context("should be able to read blob we just stored")?;
let event_blob =
walrus_service::node::events::event_blob::EventBlob::new(&read_blob_primary)?;
let event_blob = walrus_service::event::event_blob::EventBlob::new(&read_blob_primary)?;
prev_event_blob = event_blob.prev_blob_id();
for i in event_blob {
tracing::debug!("element: {:?}", i);
Expand Down
15 changes: 6 additions & 9 deletions crates/walrus-service/bin/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,13 @@ use walrus_service::{
DbCheckpointManager,
SyncNodeConfigError,
common::config::SuiConfig,
event::event_processor::runtime::EventProcessorRuntime,
node::{
ConfigLoader,
StorageNode,
StorageNodeConfigLoader,
config::{self, StorageNodeConfig, defaults::REST_API_PORT},
dbtool::DbToolCommands,
events::event_processor_runtime::EventProcessorRuntime,
server::{RestApiConfig, RestApiServer},
system_events::EventManager,
},
Expand Down Expand Up @@ -540,14 +540,11 @@ mod commands {
keys::{SupportedKeyPair, TaggedKeyPair},
};
use walrus_service::{
node::{
DatabaseConfig,
config::TlsConfig,
events::{
EventProcessorConfig,
event_processor::{EventProcessor, EventProcessorRuntimeConfig, SystemConfig},
},
event::event_processor::{
config::{EventProcessorConfig, EventProcessorRuntimeConfig, SystemConfig},
processor::EventProcessor,
},
node::{DatabaseConfig, config::TlsConfig},
utils,
};
use walrus_sui::{
Expand Down Expand Up @@ -1039,7 +1036,7 @@ mod commands {

let system_config = SystemConfig::new(system_pkg_id, system_object_id, staking_object_id);
let event_processor = EventProcessor::new(
&event_processor_config,
event_processor_config.clone(),
runtime_config,
system_config,
&Registry::default(),
Expand Down
5 changes: 4 additions & 1 deletion crates/walrus-service/src/backup/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ use std::{net::SocketAddr, path::PathBuf, time::Duration};
use serde::{Deserialize, Serialize};
use serde_with::{DurationMilliSeconds, DurationSeconds, serde_as};

use crate::{common::config::SuiReaderConfig, node::events::EventProcessorConfig};
use crate::{
common::config::SuiReaderConfig,
event::event_processor::config::EventProcessorConfig,
};

/// The subdirectory in which to store the backup blobs when running without remote storage.
pub const BACKUP_BLOB_ARCHIVE_SUBDIR: &str = "archive";
Expand Down
2 changes: 1 addition & 1 deletion crates/walrus-service/src/backup/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use diesel::{
};
use diesel_async::{AsyncPgConnection, RunQueryDsl};

use crate::node::events::{CheckpointEventPosition, EventStreamCursor, EventStreamElement};
use crate::event::events::{CheckpointEventPosition, EventStreamCursor, EventStreamElement};

#[derive(Debug, Queryable, Selectable, Insertable)]
#[diesel(table_name = crate::backup::schema::stream_event)]
Expand Down
15 changes: 4 additions & 11 deletions crates/walrus-service/src/backup/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,11 @@ use super::{
use crate::{
backup::metrics::{BackupDbMetricSet, BackupFetcherMetricSet, BackupOrchestratorMetricSet},
common::utils::{self, MetricsAndLoggingRuntime, version},
node::{
DatabaseConfig,
events::{
CheckpointEventPosition,
EventStreamElement,
PositionedStreamEvent,
event_processor::EventProcessor,
event_processor_runtime::EventProcessorRuntime,
},
metrics::TelemetryLabel as _,
system_events::SystemEventProvider as _,
event::{
event_processor::{processor::EventProcessor, runtime::EventProcessorRuntime},
events::{CheckpointEventPosition, EventStreamElement, PositionedStreamEvent},
},
node::{DatabaseConfig, metrics::TelemetryLabel as _, system_events::SystemEventProvider as _},
};

/// The version of the Walrus backup service.
Expand Down
3 changes: 0 additions & 3 deletions crates/walrus-service/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,3 @@ pub(crate) mod api;
pub mod config;
pub(crate) mod telemetry;
pub mod utils;

#[cfg(feature = "client")]
pub mod event_blob_downloader;
57 changes: 3 additions & 54 deletions crates/walrus-service/src/common/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,15 +48,12 @@ use tracing_subscriber::{
};
use typed_store::DBMetrics;
use uuid::Uuid;
use walrus_core::{BlobId, PublicKey, ShardIndex};
use walrus_core::{PublicKey, ShardIndex};
use walrus_sdk::active_committees::ActiveCommittees;
use walrus_sui::{
client::{SuiReadClient, retry_client::RetriableSuiClient},
utils::SuiNetwork,
};
use walrus_sui::{client::retry_client::RetriableSuiClient, utils::SuiNetwork};
use walrus_utils::metrics::{Registry, monitored_scope};

use crate::node::{config::MetricsPushConfig, events::event_processor::EventProcessorMetrics};
use crate::node::config::MetricsPushConfig;

/// The maximum length of the storage node name. Keep in sync with `MAX_NODE_NAME_LENGTH` in
/// `contracts/walrus/sources/staking/staking_pool.move`.
Expand Down Expand Up @@ -90,8 +87,6 @@ macro_rules! version {
}
pub use version;

use crate::common::event_blob_downloader::EventBlobDownloader;

/// Helper functions applied to futures.
pub(crate) trait FutureHelpers: Future {
/// Reports metrics for the future.
Expand Down Expand Up @@ -705,52 +700,6 @@ pub fn init_scoped_tracing_subscriber() -> Result<DefaultGuard> {
Ok(guard)
}

/// Downloads event blobs for catchup purposes.
///
/// This function creates a client to download event blobs up to a specified
/// checkpoint. The blobs are stored in the provided recovery path.
#[cfg(feature = "client")]
pub async fn collect_event_blobs_for_catchup(
sui_client: RetriableSuiClient,
staking_object_id: ObjectID,
system_object_id: ObjectID,
upto_checkpoint: Option<u64>,
recovery_path: &Path,
metrics: Option<&EventProcessorMetrics>,
) -> Result<Vec<BlobId>> {
use walrus_sui::client::contract_config::ContractConfig;

let contract_config = ContractConfig::new(system_object_id, staking_object_id);
let sui_read_client = SuiReadClient::new(sui_client, &contract_config).await?;
let config = crate::client::ClientConfig::new_from_contract_config(contract_config);

let walrus_client =
walrus_sdk::client::Client::new_read_client_with_refresher(config, sui_read_client.clone())
.await?;

let blob_downloader = EventBlobDownloader::new(walrus_client, sui_read_client);
let blob_ids = blob_downloader
.download(upto_checkpoint, None, recovery_path, metrics)
.await?;

tracing::info!("successfully downloaded {} event blobs", blob_ids.len());
Ok(blob_ids)
}

/// Placeholder function for when the client feature is not enabled.
#[cfg(not(feature = "client"))]
pub async fn collect_event_blobs_for_catchup(
sui_client: RetriableSuiClient,
staking_object_id: ObjectID,
system_object_id: ObjectID,
package_id: Option<ObjectID>,
upto_checkpoint: Option<u64>,
recovery_path: &Path,
_metrics: Option<&EventProcessorMetrics>,
) -> Result<Vec<BlobId>> {
Ok(vec![])
}

/// Returns whether a node cursor should be repositioned.
///
/// The node cursor should be repositioned if it is behind the actual event index and it is at
Expand Down
9 changes: 9 additions & 0 deletions crates/walrus-service/src/event.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
// Copyright (c) Walrus Foundation
// SPDX-License-Identifier: Apache-2.0

//! Event related modules.

pub mod event_blob;
pub mod event_blob_downloader;
pub mod event_processor;
pub mod events;
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use sui_types::{
};
use walrus_core::{BlobId, Epoch};

use crate::node::events::IndexedStreamEvent;
use crate::event::events::IndexedStreamEvent;

/// The encoding of an entry in the blob file.
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@ use walrus_sdk::{client::Client as WalrusClient, error::ClientErrorKind};
use walrus_storage_node_client::api::BlobStatus;
use walrus_sui::client::{ReadClient, SuiReadClient};

use crate::node::events::{
event_blob::EventBlob as LocalEventBlob,
event_processor::EventProcessorMetrics,
};
use crate::event::event_blob::EventBlob as LocalEventBlob;

/// Responsible for downloading and managing event blobs
#[derive(Debug)]
Expand All @@ -41,7 +38,6 @@ impl EventBlobDownloader {
upto_checkpoint: Option<u64>,
from_blob: Option<BlobId>,
path: &Path,
metrics: Option<&EventProcessorMetrics>,
) -> Result<Vec<BlobId>> {
let mut blobs = Vec::new();
let mut prev_event_blob = match from_blob {
Expand Down Expand Up @@ -79,8 +75,8 @@ impl EventBlobDownloader {
}

let blob_path = path.join(prev_event_blob.to_string());
let (blob, blob_source) = if blob_path.exists() {
(std::fs::read(blob_path.as_path())?, "local")
let blob = if blob_path.exists() {
std::fs::read(blob_path.as_path())?
} else {
let result = self
.walrus_client
Expand All @@ -91,22 +87,11 @@ impl EventBlobDownloader {
.await;
let Ok(blob) = result else {
let err = result.err().unwrap();
metrics.inspect(|&m| {
m.event_processor_event_blob_fetched
.with_label_values(&["network"])
.inc()
});
return Err(err.into());
};
(blob, "network")
blob
};

metrics.inspect(|&m| {
m.event_processor_event_blob_fetched
.with_label_values(&[blob_source])
.inc()
});

tracing::info!(blob_id = %prev_event_blob, "finished reading event blob");

let mut event_blob = LocalEventBlob::new(&blob)?;
Expand Down
15 changes: 15 additions & 0 deletions crates/walrus-service/src/event/event_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
// Copyright (c) Walrus Foundation
// SPDX-License-Identifier: Apache-2.0

//! Event processor for processing events from the Sui network.

pub mod bootstrap;
pub mod catchup;
pub mod checkpoint;
pub mod client;
pub mod config;
pub mod db;
pub mod metrics;
pub mod package_store;
pub mod processor;
pub mod runtime;
67 changes: 67 additions & 0 deletions crates/walrus-service/src/event/event_processor/bootstrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// Copyright (c) Walrus Foundation
// SPDX-License-Identifier: Apache-2.0

//! Bootstrap module for getting the initial committee and checkpoint information.
use anyhow::{Result, anyhow};
use sui_sdk::rpc_types::{SuiObjectDataOptions, SuiTransactionBlockResponseOptions};
use sui_types::{
base_types::ObjectID,
committee::Committee,
messages_checkpoint::VerifiedCheckpoint,
sui_serde::BigInt,
};
use walrus_sui::client::retry_client::{RetriableRpcClient, RetriableSuiClient};

/// Gets the initial committee and checkpoint information by:
/// 1. Fetching the system package object
/// 2. Getting its previous transaction
/// 3. Using that transaction's checkpoint to get the committee and checkpoint data
///
/// Returns a tuple containing:
/// - The committee for the current or next epoch
/// - The verified checkpoint containing the system package deployment
pub async fn get_bootstrap_committee_and_checkpoint(
sui_client: RetriableSuiClient,
rpc_client: RetriableRpcClient,
system_pkg_id: ObjectID,
) -> Result<(Committee, VerifiedCheckpoint)> {
let object_options = SuiObjectDataOptions::new()
.with_bcs()
.with_type()
.with_previous_transaction();
let object = sui_client
.get_object_with_options(system_pkg_id, object_options)
.await?;
let txn_options = SuiTransactionBlockResponseOptions::new();
let txn_digest = object
.data
.ok_or(anyhow!("No object data"))?
.previous_transaction
.ok_or(anyhow!("No transaction data"))?;
let txn = sui_client
.get_transaction_with_options(txn_digest, txn_options)
.await?;
let checkpoint_data = rpc_client
.get_full_checkpoint(txn.checkpoint.ok_or(anyhow!("No checkpoint data"))?)
.await?;
let epoch = checkpoint_data.checkpoint_summary.epoch;
let checkpoint_summary = checkpoint_data.checkpoint_summary.clone();
let committee = if let Some(end_of_epoch_data) = &checkpoint_summary.end_of_epoch_data {
let next_committee = end_of_epoch_data
.next_epoch_committee
.iter()
.cloned()
.collect();
Committee::new(epoch + 1, next_committee)
} else {
let committee_info = sui_client
.get_committee_info(Some(BigInt::from(epoch)))
.await?;
Committee::new(
committee_info.epoch,
committee_info.validators.into_iter().collect(),
)
};
let verified_checkpoint = VerifiedCheckpoint::new_unchecked(checkpoint_summary);
Ok((committee, verified_checkpoint))
}
Loading
Loading