Skip to content

Commit 9aaff9d

Browse files
committed
feat(sink): update SqlDb sink to support cbor blocks and transactions
1 parent 4946791 commit 9aaff9d

File tree

4 files changed

+104
-17
lines changed

4 files changed

+104
-17
lines changed

examples/postgresql/cursor

+42
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
[
2+
[
3+
60485013,
4+
"043018656b71c25c2a0338fe7000a53dedc704c5688e0c90a6093f08178a8b36"
5+
],
6+
[
7+
60485009,
8+
"80669929ff82d23e5dd097d1554541bc678d53322df6c7bfec13c62645c68737"
9+
],
10+
[
11+
60485003,
12+
"0f2f548b4360e3553923cc0fd96b9e10ec5142bc43f93e0a0a5509be7fd4aa20"
13+
],
14+
[
15+
60484990,
16+
"4fc80c10ec326b1d1229f3284c8b5d73c442524a95da91727f1f5684b8e58484"
17+
],
18+
[
19+
60484982,
20+
"f6c8a7b2b69b47da827ff255cf92e282493c70e64587ab233426b5b604939ca5"
21+
],
22+
[
23+
60484942,
24+
"15989755ba2c52d302b197771c076491ea15422a83f08f66c90e25ed1a73bb78"
25+
],
26+
[
27+
60484936,
28+
"d6e485277c441e6868fd742c6a3e57a4ca1de30b4e8c619624c054352f0b83a2"
29+
],
30+
[
31+
60484917,
32+
"40844b925abd2004c70694eebd6da02a6ec4c3eca51aba32314364d52679a75b"
33+
],
34+
[
35+
60484876,
36+
"43ae8400494cff3fdde99a7ca780d5b4a216116b44f4b39cc4f24f735eb70570"
37+
],
38+
[
39+
60484869,
40+
"4595260047133e4da97d2939c4edf6087621e6b72380c2179b35d6df35520ab4"
41+
]
42+
]

examples/postgresql/daemon.toml

+17-7
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,27 @@
11
[source]
22
type = "N2N"
3-
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]
3+
peers = ["preprod-node.play.dev.cardano.org:3001"]
4+
5+
[chain]
6+
type = "preprod"
47

58
[intersect]
69
type = "Point"
7-
value = [114220807, "01822b10afde3d09bd5e72758857b669ddadcdaad6776a3dc8ee902c3ace1d7e"]
10+
value = [60484568, "874858059dc6c871105b80b38238cf84a91e617b2afde53904407cb2a3299e9d"]
811

912
[[filters]]
10-
type = "SplitBlock"
13+
type = "EmitCbor"
1114

1215
[sink]
1316
type = "SqlDb"
14-
connection = "postgres://postgres:example@localhost:5432/postgres"
15-
apply_template = "INSERT INTO txs (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
16-
undo_template = "DELETE FROM txs WHERE slot = {{point.slot}}"
17-
reset_template = "DELETE FROM txs WHERE slot > {{point.slot}}"
17+
connection = "postgres://postgres:test1234@localhost:15432/oura_test"
18+
apply_cbor_block_template = "INSERT INTO blocks (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
19+
undo_cbor_block_template = "DELETE FROM blocks WHERE slot = {{point.slot}}"
20+
apply_cbor_tx_template = "INSERT INTO txs (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
21+
undo_cbor_tx_template = "DELETE FROM txs WHERE slot = {{point.slot}}"
22+
reset_cbor_block_template = "DELETE FROM blocks WHERE slot > {{point.slot}};"
23+
reset_cbor_tx_template = "DELETE FROM txs WHERE slot > {{point.slot}};"
24+
25+
[cursor]
26+
type = "File"
27+
path = "cursor"

examples/postgresql/init.sql

+12-1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,17 @@
1+
-- Table for storing CBOR blocks
2+
CREATE TABLE blocks (
3+
slot INTEGER NOT NULL,
4+
cbor BYTEA
5+
);
6+
7+
-- Index for the blocks table
8+
CREATE INDEX idx_blocks_slot ON blocks(slot);
9+
10+
-- Table for storing CBOR transactions
111
CREATE TABLE txs (
212
slot INTEGER NOT NULL,
313
cbor BYTEA
414
);
515

6-
CREATE INDEX idx_txs_slot ON txs(slot);
16+
-- Index for the txs table
17+
CREATE INDEX idx_txs_slot ON txs(slot);

src/sinks/sql_db.rs

+33-9
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,24 @@ impl gasket::framework::Worker<Stage> for Worker {
4646
let template = match unit {
4747
ChainEvent::Apply(p, r) => {
4848
let data = hbs_data(p.clone(), Some(r.clone()));
49-
stage.templates.render("apply", &data)
49+
match r {
50+
Record::CborBlock(_) => stage.templates.render("apply_cbor_block", &data),
51+
Record::CborTx(_) => stage.templates.render("apply_cbor_tx", &data),
52+
_ => stage.templates.render("apply", &data),
53+
}
5054
}
5155
ChainEvent::Undo(p, r) => {
5256
let data = hbs_data(p.clone(), Some(r.clone()));
53-
stage.templates.render("undo", &data)
57+
match r {
58+
Record::CborBlock(_) => stage.templates.render("undo_cbor_block", &data),
59+
Record::CborTx(_) => stage.templates.render("undo_cbor_tx", &data),
60+
_ => stage.templates.render("undo", &data),
61+
}
5462
}
5563
ChainEvent::Reset(p) => {
5664
let data = hbs_data(p.clone(), None);
57-
stage.templates.render("reset", &data)
65+
stage.templates.render("reset_cbor_block", &data).ok();
66+
stage.templates.render("reset_cbor_tx", &data)
5867
}
5968
};
6069

@@ -91,9 +100,12 @@ pub struct Stage {
91100
pub struct Config {
92101
/// eg: sqlite::memory:
93102
pub connection: String,
94-
pub apply_template: String,
95-
pub undo_template: String,
96-
pub reset_template: String,
103+
pub apply_cbor_block_template: String,
104+
pub undo_cbor_block_template: String,
105+
pub apply_cbor_tx_template: String,
106+
pub undo_cbor_tx_template: String,
107+
pub reset_cbor_block_template: String,
108+
pub reset_cbor_tx_template: String,
97109
}
98110

99111
impl Config {
@@ -103,15 +115,27 @@ impl Config {
103115
let mut templates = handlebars::Handlebars::new();
104116

105117
templates
106-
.register_template_string("apply", &self.apply_template)
118+
.register_template_string("apply_cbor_block", &self.apply_cbor_block_template)
107119
.map_err(Error::config)?;
108120

109121
templates
110-
.register_template_string("undo", &self.undo_template)
122+
.register_template_string("undo_cbor_block", &self.undo_cbor_block_template)
111123
.map_err(Error::config)?;
112124

113125
templates
114-
.register_template_string("reset", &self.reset_template)
126+
.register_template_string("apply_cbor_tx", &self.apply_cbor_tx_template)
127+
.map_err(Error::config)?;
128+
129+
templates
130+
.register_template_string("undo_cbor_tx", &self.undo_cbor_tx_template)
131+
.map_err(Error::config)?;
132+
133+
templates
134+
.register_template_string("reset_cbor_block", &self.reset_cbor_block_template)
135+
.map_err(Error::config)?;
136+
137+
templates
138+
.register_template_string("reset_cbor_tx", &self.reset_cbor_tx_template)
115139
.map_err(Error::config)?;
116140

117141
let stage = Stage {

0 commit comments

Comments
 (0)