Skip to content

Commit beed16a

Browse files
committed
fix(sink): properly handle multiple reset templates separately in SqlDb sink
1 parent 9aaff9d commit beed16a

File tree

1 file changed

+16
-11
lines changed

1 file changed

+16
-11
lines changed

src/sinks/sql_db.rs

+16-11
Original file line numberDiff line numberDiff line change
@@ -43,34 +43,39 @@ impl gasket::framework::Worker<Stage> for Worker {
4343
async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
4444
let point = unit.point().clone();
4545

46-
let template = match unit {
46+
let templates = match unit {
4747
ChainEvent::Apply(p, r) => {
4848
let data = hbs_data(p.clone(), Some(r.clone()));
49-
match r {
49+
let template = match r {
5050
Record::CborBlock(_) => stage.templates.render("apply_cbor_block", &data),
5151
Record::CborTx(_) => stage.templates.render("apply_cbor_tx", &data),
5252
_ => stage.templates.render("apply", &data),
53-
}
53+
};
54+
vec![template]
5455
}
5556
ChainEvent::Undo(p, r) => {
5657
let data = hbs_data(p.clone(), Some(r.clone()));
57-
match r {
58+
let template = match r {
5859
Record::CborBlock(_) => stage.templates.render("undo_cbor_block", &data),
5960
Record::CborTx(_) => stage.templates.render("undo_cbor_tx", &data),
6061
_ => stage.templates.render("undo", &data),
61-
}
62+
};
63+
vec![template]
6264
}
6365
ChainEvent::Reset(p) => {
6466
let data = hbs_data(p.clone(), None);
65-
stage.templates.render("reset_cbor_block", &data).ok();
66-
stage.templates.render("reset_cbor_tx", &data)
67+
vec![
68+
stage.templates.render("reset_cbor_block", &data),
69+
stage.templates.render("reset_cbor_tx", &data),
70+
]
6771
}
6872
};
6973

70-
let statement = template.or_panic()?;
71-
72-
let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
73-
debug!(rows = result.rows_affected(), "sql statement executed");
74+
for template in templates {
75+
let statement = template.or_panic()?;
76+
let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
77+
debug!(rows = result.rows_affected(), "sql statement executed");
78+
}
7479

7580
stage.ops_count.inc(1);
7681
stage.latest_block.set(point.slot_or_default() as i64);

0 commit comments

Comments
 (0)