Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.

Small inx-client and mongodb config and usage refactor #85

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[mongodb]
location = "mongodb://localhost:27017"
connect_url = "mongodb://localhost:27017"
username = "root"
password = "root"

[inx]
address = "http://localhost:9029"
connect_url = "http://localhost:9029"
connection_retry_interval = "5s"
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod routes;
use async_trait::async_trait;
use axum::Server;
use chronicle::{
db::MongoDatabase,
db::MongoDb,
runtime::actor::{context::ActorContext, Actor},
};
pub use error::ApiError;
Expand All @@ -32,13 +32,13 @@ pub type ApiResult<T> = Result<T, ApiError>;
/// The Chronicle API actor
#[derive(Debug)]
pub struct ApiWorker {
db: MongoDatabase,
db: MongoDb,
server_handle: Option<(JoinHandle<hyper::Result<()>>, oneshot::Sender<()>)>,
}

impl ApiWorker {
/// Create a new Chronicle API actor from a mongo connection.
pub fn new(db: MongoDatabase) -> Self {
pub fn new(db: MongoDb) -> Self {
Self {
db,
server_handle: None,
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use axum::{handler::Handler, routing::get, Extension, Router};
use chronicle::db::{
model::sync::{SyncData, SyncRecord},
MongoDatabase,
MongoDb,
};
use futures::TryStreamExt;
use hyper::Method;
Expand All @@ -17,7 +17,7 @@ use tower_http::{

use super::{error::ApiError, responses::*, ApiResult};

pub fn routes(db: MongoDatabase) -> Router {
pub fn routes(db: MongoDb) -> Router {
#[allow(unused_mut)]
let mut router = Router::new().route("/info", get(info)).route("/sync", get(sync));

Expand Down Expand Up @@ -51,7 +51,7 @@ async fn info() -> InfoResponse {
}
}

async fn sync(database: Extension<MongoDatabase>) -> ApiResult<SyncDataResponse> {
async fn sync(database: Extension<MongoDb>) -> ApiResult<SyncDataResponse> {
let mut res = database
.collection::<SyncRecord>()
.find(
Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/analytics/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chronicle::{
bson::DocExt,
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
stardust::payload::TransactionPayload,
};
Expand All @@ -25,7 +25,7 @@ pub fn routes() -> Router {
}

async fn address_analytics(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
TimeRange {
start_timestamp,
end_timestamp,
Expand Down
4 changes: 2 additions & 2 deletions bin/inx-chronicle/src/api/stardust/explorer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use chronicle::{
bson::{BsonExt, DocExt},
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
};
use futures::TryStreamExt;
Expand All @@ -28,7 +28,7 @@ pub fn routes() -> Router {
}

async fn transaction_history(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(address): Path<String>,
Pagination { page_size, page }: Pagination,
TimeRange {
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/stardust/indexer/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use chronicle::{
bson::{BsonExt, DocExt},
db::{
model::{inclusion_state::LedgerInclusionState, stardust::message::MessageRecord},
MongoDatabase,
MongoDb,
},
stardust::{output::OutputId, payload::transaction::TransactionId},
};
Expand All @@ -33,7 +33,7 @@ pub fn routes() -> Router {
}

async fn messages_query(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
query: MessagesQuery,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down Expand Up @@ -91,7 +91,7 @@ async fn messages_query(
}

async fn outputs_query(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
query: OutputsQuery,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/api/stardust/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use axum::Router;
use chronicle::{
bson::DocExt,
db::{model::stardust::milestone::MilestoneRecord, MongoDatabase},
db::{model::stardust::milestone::MilestoneRecord, MongoDb},
};
use mongodb::{
bson::{doc, DateTime},
Expand Down Expand Up @@ -51,7 +51,7 @@ pub fn routes() -> Router {
router
}

pub(crate) async fn start_milestone(database: &MongoDatabase, start_timestamp: OffsetDateTime) -> ApiResult<u32> {
pub(crate) async fn start_milestone(database: &MongoDb, start_timestamp: OffsetDateTime) -> ApiResult<u32> {
database
.doc_collection::<MilestoneRecord>()
.find(
Expand All @@ -69,7 +69,7 @@ pub(crate) async fn start_milestone(database: &MongoDatabase, start_timestamp: O
.ok_or(ApiError::NotFound)
}

pub(crate) async fn end_milestone(database: &MongoDatabase, end_timestamp: OffsetDateTime) -> ApiResult<u32> {
pub(crate) async fn end_milestone(database: &MongoDb, end_timestamp: OffsetDateTime) -> ApiResult<u32> {
database
.doc_collection::<MilestoneRecord>()
.find(
Expand Down
20 changes: 10 additions & 10 deletions bin/inx-chronicle/src/api/stardust/v2/routes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use chronicle::{
inclusion_state::LedgerInclusionState,
stardust::{message::MessageRecord, milestone::MilestoneRecord},
},
MongoDatabase,
MongoDb,
},
stardust::output::OutputId,
};
Expand Down Expand Up @@ -56,7 +56,7 @@ pub fn routes() -> Router {
.route("/milestones/:index", get(milestone))
}

async fn message(database: Extension<MongoDatabase>, Path(message_id): Path<String>) -> ApiResult<MessageResponse> {
async fn message(database: Extension<MongoDb>, Path(message_id): Path<String>) -> ApiResult<MessageResponse> {
let mut rec = database
.doc_collection::<MessageRecord>()
.find_one(doc! {"message_id": &message_id}, None)
Expand All @@ -75,7 +75,7 @@ async fn message(database: Extension<MongoDatabase>, Path(message_id): Path<Stri
})
}

async fn message_raw(database: Extension<MongoDatabase>, Path(message_id): Path<String>) -> ApiResult<Vec<u8>> {
async fn message_raw(database: Extension<MongoDb>, Path(message_id): Path<String>) -> ApiResult<Vec<u8>> {
let mut rec = database
.doc_collection::<MessageRecord>()
.find_one(doc! {"message_id": &message_id}, None)
Expand All @@ -86,7 +86,7 @@ async fn message_raw(database: Extension<MongoDatabase>, Path(message_id): Path<
}

async fn message_metadata(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
) -> ApiResult<MessageMetadataResponse> {
let mut rec = database
Expand Down Expand Up @@ -138,7 +138,7 @@ async fn message_metadata(
}

async fn message_children(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
Pagination { page_size, page }: Pagination,
Expanded { expanded }: Expanded,
Expand Down Expand Up @@ -182,7 +182,7 @@ async fn message_children(
})
}

async fn output(database: Extension<MongoDatabase>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
async fn output(database: Extension<MongoDb>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
output_by_transaction_id(
database,
Expand All @@ -192,7 +192,7 @@ async fn output(database: Extension<MongoDatabase>, Path(output_id): Path<String
}

async fn output_by_transaction_id(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path((transaction_id, idx)): Path<(String, u16)>,
) -> ApiResult<OutputResponse> {
let mut output = database
Expand Down Expand Up @@ -234,7 +234,7 @@ async fn output_by_transaction_id(
}

async fn transaction_for_message(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(message_id): Path<String>,
) -> ApiResult<TransactionResponse> {
let mut rec = database
Expand All @@ -253,7 +253,7 @@ async fn transaction_for_message(
}

async fn transaction_included_message(
database: Extension<MongoDatabase>,
database: Extension<MongoDb>,
Path(transaction_id): Path<String>,
) -> ApiResult<MessageResponse> {
let mut rec = database
Expand Down Expand Up @@ -281,7 +281,7 @@ async fn transaction_included_message(
})
}

async fn milestone(database: Extension<MongoDatabase>, Path(index): Path<u32>) -> ApiResult<MilestoneResponse> {
async fn milestone(database: Extension<MongoDb>, Path(index): Path<u32>) -> ApiResult<MilestoneResponse> {
database
.doc_collection::<MilestoneRecord>()
.find_one(doc! {"milestone_index": &index}, None)
Expand Down
6 changes: 3 additions & 3 deletions bin/inx-chronicle/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use async_trait::async_trait;
#[cfg(feature = "stardust")]
use chronicle::db::model::stardust;
use chronicle::{
db::{MongoDatabase, MongoDbError},
db::{MongoDb, MongoDbError},
runtime::{
actor::{context::ActorContext, event::HandleEvent, Actor},
error::RuntimeError,
Expand All @@ -23,11 +23,11 @@ pub enum BrokerError {

#[derive(Debug)]
pub struct Broker {
db: MongoDatabase,
db: MongoDb,
}

impl Broker {
pub fn new(db: MongoDatabase) -> Self {
pub fn new(db: MongoDb) -> Self {
Self { db }
}
}
Expand Down
21 changes: 10 additions & 11 deletions bin/inx-chronicle/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

use std::{fs, path::Path};

use chronicle::db::MongoConfig;
use chronicle::db::MongoDbConfig;
use serde::{Deserialize, Serialize};
use thiserror::Error;

Expand All @@ -20,28 +20,27 @@ pub enum ConfigError {

/// Configuration of Chronicle.
#[derive(Default, Debug, PartialEq, Eq, Serialize, Deserialize)]
pub struct Config {
pub mongodb: MongoConfig,
#[cfg(feature = "stardust")]
pub struct ChronicleConfig {
pub mongodb: MongoDbConfig,
#[cfg(feature = "inx")]
pub inx: InxConfig,
}

impl Config {
/// Reads a configuration file in `.toml` format.
impl ChronicleConfig {
pub fn from_file(path: impl AsRef<Path>) -> Result<Self, ConfigError> {
fs::read_to_string(&path)
.map_err(ConfigError::FileRead)
.and_then(|contents| toml::from_str::<Self>(&contents).map_err(ConfigError::TomlDeserialization))
}

/// Applies the appropriate command line arguments to the [`Config`].
/// Applies the appropriate command line arguments to the [`ChronicleConfig`].
pub fn apply_cli_args(&mut self, args: super::CliArgs) {
#[cfg(feature = "stardust")]
if let Some(inx) = args.inx {
self.inx = InxConfig::new(inx);
}
if let Some(db) = args.db {
self.mongodb = MongoConfig::new(db);
if let Some(connect_url) = args.db {
self.mongodb = MongoDbConfig::new().with_connect_url(connect_url);
}
}
}
Expand All @@ -52,9 +51,9 @@ mod test {

#[test]
fn config_file_conformity() -> Result<(), ConfigError> {
let _ = Config::from_file(concat!(
let _ = ChronicleConfig::from_file(concat!(
env!("CARGO_MANIFEST_DIR"),
"/bin/inx-chronicle/config.example.toml"
"/bin/inx-chronicle/config.template.toml"
))?;

Ok(())
Expand Down
20 changes: 3 additions & 17 deletions bin/inx-chronicle/src/inx/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@

use std::time::Duration;

use inx::{client::InxClient, tonic::Channel};
use serde::{Deserialize, Serialize};

pub use super::InxWorkerError;
Expand All @@ -13,7 +12,7 @@ pub use super::InxWorkerError;
#[derive(Clone, PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct InxConfig {
/// The bind address of node's INX interface.
pub address: String,
pub connect_url: String,
/// The time that has to pass until a new connection attempt is made.
#[serde(with = "humantime_serde")]
pub connection_retry_interval: Duration,
Expand All @@ -22,7 +21,7 @@ pub struct InxConfig {
impl Default for InxConfig {
fn default() -> Self {
Self {
address: "http://localhost:9029".into(),
connect_url: "http://localhost:9029".into(),
connection_retry_interval: Duration::from_secs(5),
}
}
Expand All @@ -32,21 +31,8 @@ impl InxConfig {
/// Creates a new [`InxConfig`]. The `address` is the address of the node's INX interface.
pub fn new(address: impl Into<String>) -> Self {
Self {
address: address.into(),
connect_url: address.into(),
..Default::default()
}
}

/// Constructs an [`InxClient`] by consuming the [`InxConfig`].
pub async fn build(&self) -> Result<InxClient<Channel>, InxWorkerError> {
let url = url::Url::parse(&self.address)?;

if url.scheme() != "http" {
return Err(InxWorkerError::InvalidAddress(self.address.clone()));
}

InxClient::connect(self.address.clone())
.await
.map_err(InxWorkerError::ConnectionError)
}
}
1 change: 1 addition & 0 deletions bin/inx-chronicle/src/inx/listener.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ impl Actor for InxListener {

async fn init(&mut self, cx: &mut ActorContext<Self>) -> Result<Self::State, Self::Error> {
let message_stream = self.inx.listen_to_messages(MessageFilter {}).await?.into_inner();

cx.spawn_actor_supervised::<MessageStream, _>(
InxStreamListener::new(self.broker_addr.clone())?.with_stream(message_stream),
)
Expand Down
Loading