Skip to content
This repository was archived by the owner on May 20, 2025. It is now read-only.

Commit 5d5499d

Browse files
grtlrAlexandcoats
and
Alexandcoats
authored
feat(db)!: separate database collections into individual types (#626) (#650)
* feat(db): separate database collections into individual types (#626) * Separate database collections into individual types. * Create ext traits to simplify usage. * Use custom thread safe iterator for chunking. * Write chunks tests and cleanup code. * refactor(db): move collection helpers and add docs * Rename trait * Minor improvements * Fmt Co-authored-by: Alexandcoats <[email protected]>
1 parent e1e4dc8 commit 5d5499d

File tree

22 files changed

+1635
-1079
lines changed

22 files changed

+1635
-1079
lines changed

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ console-subscriber = { version = "0.1", default-features = false, optional = tru
8888

8989
[dev-dependencies]
9090
packable = { version = "=0.6.1", default-features = false }
91+
rand = { version = "0.8", default-features = false, features = [ "std" ] }
9192

9293
[features]
9394
default = [

src/bin/inx-chronicle/api/routes.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@ use axum::{
88
routing::{get, post},
99
Extension, Json, Router,
1010
};
11-
use chronicle::{db::MongoDb, types::stardust::milestone::MilestoneTimestamp};
11+
use chronicle::{
12+
db::{collections::MilestoneCollection, MongoDb},
13+
types::stardust::milestone::MilestoneTimestamp,
14+
};
1215
use hyper::StatusCode;
1316
use serde::Deserialize;
1417
use time::{Duration, OffsetDateTime};
@@ -70,7 +73,11 @@ fn is_new_enough(timestamp: MilestoneTimestamp) -> bool {
7073
pub async fn is_healthy(database: &MongoDb) -> Result<bool, ApiError> {
7174
#[cfg(feature = "stardust")]
7275
{
73-
let newest = match database.get_newest_milestone().await? {
76+
let newest = match database
77+
.collection::<MilestoneCollection>()
78+
.get_newest_milestone()
79+
.await?
80+
{
7481
Some(last) => last,
7582
None => return Ok(false),
7683
};

src/bin/inx-chronicle/api/stardust/analytics/routes.rs

Lines changed: 26 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use axum::{routing::get, Extension, Router};
55
use bee_api_types_stardust::responses::RentStructureResponse;
66
use chronicle::{
77
db::{
8-
collections::{OutputKind, PayloadKind},
8+
collections::{BlockCollection, OutputCollection, OutputKind, PayloadKind, ProtocolUpdateCollection},
99
MongoDb,
1010
},
1111
types::stardust::block::{
@@ -69,7 +69,10 @@ async fn address_activity_analytics(
6969
database: Extension<MongoDb>,
7070
MilestoneRange { start_index, end_index }: MilestoneRange,
7171
) -> ApiResult<AddressAnalyticsResponse> {
72-
let res = database.get_address_analytics(start_index, end_index).await?;
72+
let res = database
73+
.collection::<OutputCollection>()
74+
.get_address_analytics(start_index, end_index)
75+
.await?;
7376

7477
Ok(AddressAnalyticsResponse {
7578
total_active_addresses: res.total_active_addresses.to_string(),
@@ -82,7 +85,10 @@ async fn block_activity_analytics<B: PayloadKind>(
8285
database: Extension<MongoDb>,
8386
MilestoneRange { start_index, end_index }: MilestoneRange,
8487
) -> ApiResult<BlockAnalyticsResponse> {
85-
let res = database.get_block_analytics::<B>(start_index, end_index).await?;
88+
let res = database
89+
.collection::<BlockCollection>()
90+
.get_block_analytics::<B>(start_index, end_index)
91+
.await?;
8692

8793
Ok(BlockAnalyticsResponse {
8894
count: res.count.to_string(),
@@ -93,7 +99,10 @@ async fn output_activity_analytics<O: OutputKind>(
9399
database: Extension<MongoDb>,
94100
MilestoneRange { start_index, end_index }: MilestoneRange,
95101
) -> ApiResult<OutputAnalyticsResponse> {
96-
let res = database.get_output_analytics::<O>(start_index, end_index).await?;
102+
let res = database
103+
.collection::<OutputCollection>()
104+
.get_output_analytics::<O>(start_index, end_index)
105+
.await?;
97106

98107
Ok(OutputAnalyticsResponse {
99108
count: res.count.to_string(),
@@ -106,6 +115,7 @@ async fn unspent_output_ledger_analytics<O: OutputKind>(
106115
LedgerIndex { ledger_index }: LedgerIndex,
107116
) -> ApiResult<OutputAnalyticsResponse> {
108117
let res = database
118+
.collection::<OutputCollection>()
109119
.get_unspent_output_analytics::<O>(ledger_index)
110120
.await?
111121
.ok_or(ApiError::NoResults)?;
@@ -121,6 +131,7 @@ async fn storage_deposit_ledger_analytics(
121131
LedgerIndex { ledger_index }: LedgerIndex,
122132
) -> ApiResult<StorageDepositAnalyticsResponse> {
123133
let res = database
134+
.collection::<OutputCollection>()
124135
.get_storage_deposit_analytics(ledger_index)
125136
.await?
126137
.ok_or(ApiError::NoResults)?;
@@ -145,7 +156,10 @@ async fn nft_activity_analytics(
145156
database: Extension<MongoDb>,
146157
MilestoneRange { start_index, end_index }: MilestoneRange,
147158
) -> ApiResult<OutputDiffAnalyticsResponse> {
148-
let res = database.get_nft_output_analytics(start_index, end_index).await?;
159+
let res = database
160+
.collection::<OutputCollection>()
161+
.get_nft_output_analytics(start_index, end_index)
162+
.await?;
149163

150164
Ok(OutputDiffAnalyticsResponse {
151165
created_count: res.created_count.to_string(),
@@ -158,7 +172,10 @@ async fn native_token_activity_analytics(
158172
database: Extension<MongoDb>,
159173
MilestoneRange { start_index, end_index }: MilestoneRange,
160174
) -> ApiResult<OutputDiffAnalyticsResponse> {
161-
let res = database.get_foundry_output_analytics(start_index, end_index).await?;
175+
let res = database
176+
.collection::<OutputCollection>()
177+
.get_foundry_output_analytics(start_index, end_index)
178+
.await?;
162179

163180
Ok(OutputDiffAnalyticsResponse {
164181
created_count: res.created_count.to_string(),
@@ -172,11 +189,13 @@ async fn richest_addresses_ledger_analytics(
172189
RichestAddressesQuery { top, ledger_index }: RichestAddressesQuery,
173190
) -> ApiResult<RichestAddressesResponse> {
174191
let res = database
192+
.collection::<OutputCollection>()
175193
.get_richest_addresses(ledger_index, top)
176194
.await?
177195
.ok_or(ApiError::NoResults)?;
178196

179197
let hrp = database
198+
.collection::<ProtocolUpdateCollection>()
180199
.get_protocol_parameters_for_ledger_index(res.ledger_index)
181200
.await?
182201
.ok_or(InternalApiError::CorruptState("no protocol parameters"))?
@@ -201,6 +220,7 @@ async fn token_distribution_ledger_analytics(
201220
LedgerIndex { ledger_index }: LedgerIndex,
202221
) -> ApiResult<TokenDistributionResponse> {
203222
let res = database
223+
.collection::<OutputCollection>()
204224
.get_token_distribution(ledger_index)
205225
.await?
206226
.ok_or(ApiError::NoResults)?;

src/bin/inx-chronicle/api/stardust/core/routes.rs

Lines changed: 47 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,10 @@ use bee_block_stardust::{
2525
};
2626
use chronicle::{
2727
db::{
28-
collections::{OutputMetadataResult, OutputWithMetadataResult, UtxoChangesResult},
28+
collections::{
29+
BlockCollection, MilestoneCollection, OutputCollection, OutputMetadataResult, OutputWithMetadataResult,
30+
ProtocolUpdateCollection, TreasuryCollection, UtxoChangesResult,
31+
},
2932
MongoDb,
3033
},
3134
types::{
@@ -101,6 +104,7 @@ pub fn routes() -> Router {
101104

102105
pub async fn info(database: Extension<MongoDb>) -> ApiResult<InfoResponse> {
103106
let protocol = database
107+
.collection::<ProtocolUpdateCollection>()
104108
.get_latest_protocol_parameters()
105109
.await?
106110
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
@@ -113,26 +117,27 @@ pub async fn info(database: Extension<MongoDb>) -> ApiResult<InfoResponse> {
113117
false
114118
});
115119

116-
let newest_milestone =
117-
database
118-
.get_newest_milestone()
119-
.await?
120-
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
121-
"no milestone in the database",
122-
)))?;
123-
let oldest_milestone =
124-
database
125-
.get_oldest_milestone()
126-
.await?
127-
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
128-
"no milestone in the database",
129-
)))?;
120+
let newest_milestone = database
121+
.collection::<MilestoneCollection>()
122+
.get_newest_milestone()
123+
.await?
124+
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
125+
"no milestone in the database",
126+
)))?;
127+
let oldest_milestone = database
128+
.collection::<MilestoneCollection>()
129+
.get_oldest_milestone()
130+
.await?
131+
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
132+
"no milestone in the database",
133+
)))?;
130134

131135
let latest_milestone = LatestMilestoneResponse {
132136
index: newest_milestone.milestone_index.0,
133137
timestamp: newest_milestone.milestone_timestamp.0,
134138
milestone_id: bee_block_stardust::payload::milestone::MilestoneId::from(
135139
database
140+
.collection::<MilestoneCollection>()
136141
.get_milestone_id(newest_milestone.milestone_index)
137142
.await?
138143
.ok_or(ApiError::Internal(InternalApiError::CorruptState(
@@ -183,12 +188,20 @@ async fn block(
183188
if let Some(value) = headers.get(axum::http::header::ACCEPT) {
184189
if value.eq(&*BYTE_CONTENT_HEADER) {
185190
return Ok(BlockResponse::Raw(
186-
database.get_block_raw(&block_id).await?.ok_or(ApiError::NoResults)?,
191+
database
192+
.collection::<BlockCollection>()
193+
.get_block_raw(&block_id)
194+
.await?
195+
.ok_or(ApiError::NoResults)?,
187196
));
188197
}
189198
}
190199

191-
let block = database.get_block(&block_id).await?.ok_or(ApiError::NoResults)?;
200+
let block = database
201+
.collection::<BlockCollection>()
202+
.get_block(&block_id)
203+
.await?
204+
.ok_or(ApiError::NoResults)?;
192205
Ok(BlockResponse::Json(BlockDto::try_from(block)?))
193206
}
194207

@@ -198,6 +211,7 @@ async fn block_metadata(
198211
) -> ApiResult<BlockMetadataResponse> {
199212
let block_id = BlockId::from_str(&block_id_str).map_err(ApiError::bad_parse)?;
200213
let metadata = database
214+
.collection::<BlockCollection>()
201215
.get_block_metadata(&block_id)
202216
.await?
203217
.ok_or(ApiError::NoResults)?;
@@ -243,6 +257,7 @@ fn create_output_metadata_response(metadata: OutputMetadataResult) -> OutputMeta
243257
async fn output(database: Extension<MongoDb>, Path(output_id): Path<String>) -> ApiResult<OutputResponse> {
244258
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
245259
let OutputWithMetadataResult { output, metadata } = database
260+
.collection::<OutputCollection>()
246261
.get_output_with_metadata(&output_id)
247262
.await?
248263
.ok_or(ApiError::NoResults)?;
@@ -261,6 +276,7 @@ async fn output_metadata(
261276
) -> ApiResult<OutputMetadataResponse> {
262277
let output_id = OutputId::from_str(&output_id).map_err(ApiError::bad_parse)?;
263278
let metadata = database
279+
.collection::<OutputCollection>()
264280
.get_output_metadata(&output_id)
265281
.await?
266282
.ok_or(ApiError::NoResults)?;
@@ -274,6 +290,7 @@ async fn transaction_included_block(
274290
) -> ApiResult<BlockResponse> {
275291
let transaction_id = TransactionId::from_str(&transaction_id).map_err(ApiError::bad_parse)?;
276292
let block = database
293+
.collection::<BlockCollection>()
277294
.get_block_for_transaction(&transaction_id)
278295
.await?
279296
.ok_or(ApiError::NoResults)?;
@@ -282,7 +299,10 @@ async fn transaction_included_block(
282299
}
283300

284301
async fn receipts(database: Extension<MongoDb>) -> ApiResult<ReceiptsResponse> {
285-
let mut receipts_at = database.stream_all_receipts().await?;
302+
let mut receipts_at = database
303+
.collection::<MilestoneCollection>()
304+
.stream_all_receipts()
305+
.await?;
286306
let mut receipts = Vec::new();
287307
while let Some((receipt, at)) = receipts_at.try_next().await? {
288308
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
@@ -301,7 +321,10 @@ async fn receipts(database: Extension<MongoDb>) -> ApiResult<ReceiptsResponse> {
301321
}
302322

303323
async fn receipts_migrated_at(database: Extension<MongoDb>, Path(index): Path<u32>) -> ApiResult<ReceiptsResponse> {
304-
let mut receipts_at = database.stream_receipts_migrated_at(index.into()).await?;
324+
let mut receipts_at = database
325+
.collection::<MilestoneCollection>()
326+
.stream_receipts_migrated_at(index.into())
327+
.await?;
305328
let mut receipts = Vec::new();
306329
while let Some((receipt, at)) = receipts_at.try_next().await? {
307330
let receipt: &bee_block_stardust::payload::milestone::MilestoneOption = &receipt.try_into()?;
@@ -321,6 +344,7 @@ async fn receipts_migrated_at(database: Extension<MongoDb>, Path(index): Path<u3
321344

322345
async fn treasury(database: Extension<MongoDb>) -> ApiResult<TreasuryResponse> {
323346
database
347+
.collection::<TreasuryCollection>()
324348
.get_latest_treasury()
325349
.await?
326350
.ok_or(ApiError::NoResults)
@@ -337,6 +361,7 @@ async fn milestone(
337361
) -> ApiResult<MilestoneResponse> {
338362
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;
339363
let milestone_payload = database
364+
.collection::<MilestoneCollection>()
340365
.get_milestone_payload_by_id(&milestone_id)
341366
.await?
342367
.ok_or(ApiError::NoResults)?;
@@ -359,6 +384,7 @@ async fn milestone_by_index(
359384
headers: HeaderMap,
360385
) -> ApiResult<MilestoneResponse> {
361386
let milestone_payload = database
387+
.collection::<MilestoneCollection>()
362388
.get_milestone_payload(index)
363389
.await?
364390
.ok_or(ApiError::NoResults)?;
@@ -381,6 +407,7 @@ async fn utxo_changes(
381407
) -> ApiResult<UtxoChangesResponse> {
382408
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;
383409
let milestone_index = database
410+
.collection::<MilestoneCollection>()
384411
.get_milestone_payload_by_id(&milestone_id)
385412
.await?
386413
.ok_or(ApiError::NoResults)?
@@ -401,6 +428,7 @@ async fn collect_utxo_changes(database: &MongoDb, milestone_index: MilestoneInde
401428
created_outputs,
402429
consumed_outputs,
403430
} = database
431+
.collection::<OutputCollection>()
404432
.get_utxo_changes(milestone_index)
405433
.await?
406434
.ok_or(ApiError::NoResults)?;

src/bin/inx-chronicle/api/stardust/explorer/routes.rs

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@ use std::str::FromStr;
55

66
use axum::{extract::Path, routing::get, Extension, Router};
77
use chronicle::{
8-
db::MongoDb,
8+
db::{
9+
collections::{BlockCollection, LedgerUpdateCollection, MilestoneCollection, OutputCollection},
10+
MongoDb,
11+
},
912
types::stardust::block::{payload::milestone::MilestoneId, Address, BlockId},
1013
};
1114
use futures::{StreamExt, TryStreamExt};
@@ -45,6 +48,7 @@ async fn ledger_updates_by_address(
4548
let address_dto = Address::from_str(&address).map_err(ApiError::bad_parse)?;
4649

4750
let mut record_stream = database
51+
.collection::<LedgerUpdateCollection>()
4852
.stream_ledger_updates_by_address(
4953
&address_dto,
5054
// Get one extra record so that we can create the cursor.
@@ -84,13 +88,15 @@ async fn ledger_updates_by_milestone(
8488
let milestone_id = MilestoneId::from_str(&milestone_id).map_err(ApiError::bad_parse)?;
8589

8690
let milestone_index = database
91+
.collection::<MilestoneCollection>()
8792
.get_milestone_payload_by_id(&milestone_id)
8893
.await?
8994
.ok_or(ApiError::NotFound)?
9095
.essence
9196
.index;
9297

9398
let mut record_stream = database
99+
.collection::<LedgerUpdateCollection>()
94100
.stream_ledger_updates_by_milestone(milestone_index, page_size + 1, cursor)
95101
.await?;
96102

@@ -122,6 +128,7 @@ async fn ledger_updates_by_milestone(
122128
async fn balance(database: Extension<MongoDb>, Path(address): Path<String>) -> ApiResult<BalanceResponse> {
123129
let address = Address::from_str(&address).map_err(ApiError::bad_parse)?;
124130
let res = database
131+
.collection::<OutputCollection>()
125132
.get_address_balance(address)
126133
.await?
127134
.ok_or(ApiError::NoResults)?;
@@ -140,6 +147,7 @@ async fn block_children(
140147
) -> ApiResult<BlockChildrenResponse> {
141148
let block_id = BlockId::from_str(&block_id).map_err(ApiError::bad_parse)?;
142149
let mut block_children = database
150+
.collection::<BlockCollection>()
143151
.get_block_children(&block_id, page_size, page)
144152
.await
145153
.map_err(|_| ApiError::NoResults)?;

src/bin/inx-chronicle/api/stardust/indexer/routes.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,9 @@ use std::str::FromStr;
66
use axum::{extract::Path, routing::get, Extension, Router};
77
use chronicle::{
88
db::{
9-
collections::{AliasOutputsQuery, BasicOutputsQuery, FoundryOutputsQuery, IndexedId, NftOutputsQuery},
9+
collections::{
10+
AliasOutputsQuery, BasicOutputsQuery, FoundryOutputsQuery, IndexedId, NftOutputsQuery, OutputCollection,
11+
},
1012
MongoDb,
1113
},
1214
types::stardust::block::output::{AliasId, FoundryId, NftId},
@@ -52,6 +54,7 @@ where
5254
{
5355
let id = ID::from_str(&id).map_err(ApiError::bad_parse)?;
5456
let res = database
57+
.collection::<OutputCollection>()
5558
.get_indexed_output_by_id(id)
5659
.await?
5760
.ok_or(ApiError::NoResults)?;
@@ -76,6 +79,7 @@ where
7679
bson::Document: From<Q>,
7780
{
7881
let res = database
82+
.collection::<OutputCollection>()
7983
.get_indexed_outputs(
8084
query,
8185
// Get one extra record so that we can create the cursor.

0 commit comments

Comments
 (0)