Skip to content

Commit 7314253

Browse files
committed
Postgres date-time types
Signed-off-by: itowlson <[email protected]>
1 parent 3eaba5f commit 7314253

File tree

11 files changed

+686
-77
lines changed

11 files changed

+686
-77
lines changed

Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/factor-outbound-pg/Cargo.toml

+2-1
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ edition = { workspace = true }
66

77
[dependencies]
88
anyhow = { workspace = true }
9+
chrono = "0.4"
910
native-tls = "0.2"
1011
postgres-native-tls = "0.5"
1112
spin-core = { path = "../core" }
@@ -14,7 +15,7 @@ spin-factors = { path = "../factors" }
1415
spin-resource-table = { path = "../table" }
1516
spin-world = { path = "../world" }
1617
tokio = { workspace = true, features = ["rt-multi-thread"] }
17-
tokio-postgres = "0.7"
18+
tokio-postgres = { version = "0.7", features = ["with-chrono-0_4"] }
1819
tracing = { workspace = true }
1920

2021
[dev-dependencies]

crates/factor-outbound-pg/src/client.rs

+123-32
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,9 @@ use anyhow::{anyhow, Result};
22
use native_tls::TlsConnector;
33
use postgres_native_tls::MakeTlsConnector;
44
use spin_world::async_trait;
5-
use spin_world::v2::postgres::{self as v2};
6-
use spin_world::v2::rdbms_types::{Column, DbDataType, DbValue, ParameterValue, RowSet};
5+
use spin_world::spin::postgres::postgres::{
6+
self as v3, Column, DbDataType, DbValue, ParameterValue, RowSet,
7+
};
78
use tokio_postgres::types::Type;
89
use tokio_postgres::{config::SslMode, types::ToSql, Row};
910
use tokio_postgres::{Client as TokioClient, NoTls, Socket};
@@ -18,13 +19,13 @@ pub trait Client {
1819
&self,
1920
statement: String,
2021
params: Vec<ParameterValue>,
21-
) -> Result<u64, v2::Error>;
22+
) -> Result<u64, v3::Error>;
2223

2324
async fn query(
2425
&self,
2526
statement: String,
2627
params: Vec<ParameterValue>,
27-
) -> Result<RowSet, v2::Error>;
28+
) -> Result<RowSet, v3::Error>;
2829
}
2930

3031
#[async_trait]
@@ -54,33 +55,43 @@ impl Client for TokioClient {
5455
&self,
5556
statement: String,
5657
params: Vec<ParameterValue>,
57-
) -> Result<u64, v2::Error> {
58-
let params: Vec<&(dyn ToSql + Sync)> = params
58+
) -> Result<u64, v3::Error> {
59+
let params = params
5960
.iter()
6061
.map(to_sql_parameter)
6162
.collect::<Result<Vec<_>>>()
62-
.map_err(|e| v2::Error::ValueConversionFailed(format!("{:?}", e)))?;
63+
.map_err(|e| v3::Error::ValueConversionFailed(format!("{:?}", e)))?;
6364

64-
self.execute(&statement, params.as_slice())
65+
let params_refs: Vec<&(dyn ToSql + Sync)> = params
66+
.iter()
67+
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
68+
.collect();
69+
70+
self.execute(&statement, params_refs.as_slice())
6571
.await
66-
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))
72+
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))
6773
}
6874

6975
async fn query(
7076
&self,
7177
statement: String,
7278
params: Vec<ParameterValue>,
73-
) -> Result<RowSet, v2::Error> {
74-
let params: Vec<&(dyn ToSql + Sync)> = params
79+
) -> Result<RowSet, v3::Error> {
80+
let params = params
7581
.iter()
7682
.map(to_sql_parameter)
7783
.collect::<Result<Vec<_>>>()
78-
.map_err(|e| v2::Error::BadParameter(format!("{:?}", e)))?;
84+
.map_err(|e| v3::Error::BadParameter(format!("{:?}", e)))?;
85+
86+
let params_refs: Vec<&(dyn ToSql + Sync)> = params
87+
.iter()
88+
.map(|b| b.as_ref() as &(dyn ToSql + Sync))
89+
.collect();
7990

8091
let results = self
81-
.query(&statement, params.as_slice())
92+
.query(&statement, params_refs.as_slice())
8293
.await
83-
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
94+
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;
8495

8596
if results.is_empty() {
8697
return Ok(RowSet {
@@ -94,7 +105,7 @@ impl Client for TokioClient {
94105
.iter()
95106
.map(convert_row)
96107
.collect::<Result<Vec<_>, _>>()
97-
.map_err(|e| v2::Error::QueryFailed(format!("{:?}", e)))?;
108+
.map_err(|e| v3::Error::QueryFailed(format!("{:?}", e)))?;
98109

99110
Ok(RowSet { columns, rows })
100111
}
@@ -111,22 +122,43 @@ where
111122
});
112123
}
113124

114-
fn to_sql_parameter(value: &ParameterValue) -> Result<&(dyn ToSql + Sync)> {
125+
fn to_sql_parameter(value: &ParameterValue) -> Result<Box<dyn ToSql + Send + Sync>> {
115126
match value {
116-
ParameterValue::Boolean(v) => Ok(v),
117-
ParameterValue::Int32(v) => Ok(v),
118-
ParameterValue::Int64(v) => Ok(v),
119-
ParameterValue::Int8(v) => Ok(v),
120-
ParameterValue::Int16(v) => Ok(v),
121-
ParameterValue::Floating32(v) => Ok(v),
122-
ParameterValue::Floating64(v) => Ok(v),
123-
ParameterValue::Uint8(_)
124-
| ParameterValue::Uint16(_)
125-
| ParameterValue::Uint32(_)
126-
| ParameterValue::Uint64(_) => Err(anyhow!("Postgres does not support unsigned integers")),
127-
ParameterValue::Str(v) => Ok(v),
128-
ParameterValue::Binary(v) => Ok(v),
129-
ParameterValue::DbNull => Ok(&PgNull),
127+
ParameterValue::Boolean(v) => Ok(Box::new(*v)),
128+
ParameterValue::Int32(v) => Ok(Box::new(*v)),
129+
ParameterValue::Int64(v) => Ok(Box::new(*v)),
130+
ParameterValue::Int8(v) => Ok(Box::new(*v)),
131+
ParameterValue::Int16(v) => Ok(Box::new(*v)),
132+
ParameterValue::Floating32(v) => Ok(Box::new(*v)),
133+
ParameterValue::Floating64(v) => Ok(Box::new(*v)),
134+
ParameterValue::Str(v) => Ok(Box::new(v.clone())),
135+
ParameterValue::Binary(v) => Ok(Box::new(v.clone())),
136+
ParameterValue::Date((y, mon, d)) => {
137+
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
138+
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
139+
Ok(Box::new(naive_date))
140+
}
141+
ParameterValue::Time((h, min, s, ns)) => {
142+
let naive_time =
143+
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
144+
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
145+
Ok(Box::new(naive_time))
146+
}
147+
ParameterValue::Datetime((y, mon, d, h, min, s, ns)) => {
148+
let naive_date = chrono::NaiveDate::from_ymd_opt(*y, (*mon).into(), (*d).into())
149+
.ok_or_else(|| anyhow!("invalid date y={y}, m={mon}, d={d}"))?;
150+
let naive_time =
151+
chrono::NaiveTime::from_hms_nano_opt((*h).into(), (*min).into(), (*s).into(), *ns)
152+
.ok_or_else(|| anyhow!("invalid time {h}:{min}:{s}:{ns}"))?;
153+
let dt = chrono::NaiveDateTime::new(naive_date, naive_time);
154+
Ok(Box::new(dt))
155+
}
156+
ParameterValue::Timestamp(v) => {
157+
let ts = chrono::DateTime::<chrono::Utc>::from_timestamp(*v, 0)
158+
.ok_or_else(|| anyhow!("invalid epoch timestamp {v}"))?;
159+
Ok(Box::new(ts))
160+
}
161+
ParameterValue::DbNull => Ok(Box::new(PgNull)),
130162
}
131163
}
132164

@@ -155,22 +187,25 @@ fn convert_data_type(pg_type: &Type) -> DbDataType {
155187
Type::INT4 => DbDataType::Int32,
156188
Type::INT8 => DbDataType::Int64,
157189
Type::TEXT | Type::VARCHAR | Type::BPCHAR => DbDataType::Str,
190+
Type::TIMESTAMP | Type::TIMESTAMPTZ => DbDataType::Timestamp,
191+
Type::DATE => DbDataType::Date,
192+
Type::TIME => DbDataType::Time,
158193
_ => {
159194
tracing::debug!("Couldn't convert Postgres type {} to WIT", pg_type.name(),);
160195
DbDataType::Other
161196
}
162197
}
163198
}
164199

165-
fn convert_row(row: &Row) -> Result<Vec<DbValue>, tokio_postgres::Error> {
200+
fn convert_row(row: &Row) -> anyhow::Result<Vec<DbValue>> {
166201
let mut result = Vec::with_capacity(row.len());
167202
for index in 0..row.len() {
168203
result.push(convert_entry(row, index)?);
169204
}
170205
Ok(result)
171206
}
172207

173-
fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Error> {
208+
fn convert_entry(row: &Row, index: usize) -> anyhow::Result<DbValue> {
174209
let column = &row.columns()[index];
175210
let value = match column.type_() {
176211
&Type::BOOL => {
@@ -229,6 +264,27 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
229264
None => DbValue::DbNull,
230265
}
231266
}
267+
&Type::TIMESTAMP | &Type::TIMESTAMPTZ => {
268+
let value: Option<chrono::NaiveDateTime> = row.try_get(index)?;
269+
match value {
270+
Some(v) => DbValue::Datetime(tuplify_date_time(v)?),
271+
None => DbValue::DbNull,
272+
}
273+
}
274+
&Type::DATE => {
275+
let value: Option<chrono::NaiveDate> = row.try_get(index)?;
276+
match value {
277+
Some(v) => DbValue::Date(tuplify_date(v)?),
278+
None => DbValue::DbNull,
279+
}
280+
}
281+
&Type::TIME => {
282+
let value: Option<chrono::NaiveTime> = row.try_get(index)?;
283+
match value {
284+
Some(v) => DbValue::Time(tuplify_time(v)?),
285+
None => DbValue::DbNull,
286+
}
287+
}
232288
t => {
233289
tracing::debug!(
234290
"Couldn't convert Postgres type {} in column {}",
@@ -241,6 +297,41 @@ fn convert_entry(row: &Row, index: usize) -> Result<DbValue, tokio_postgres::Err
241297
Ok(value)
242298
}
243299

300+
// Functions to convert from the chrono types to the WIT interface tuples
301+
fn tuplify_date_time(
302+
value: chrono::NaiveDateTime,
303+
) -> anyhow::Result<(i32, u8, u8, u8, u8, u8, u32)> {
304+
use chrono::{Datelike, Timelike};
305+
Ok((
306+
value.year(),
307+
value.month().try_into()?,
308+
value.day().try_into()?,
309+
value.hour().try_into()?,
310+
value.minute().try_into()?,
311+
value.second().try_into()?,
312+
value.nanosecond(),
313+
))
314+
}
315+
316+
fn tuplify_date(value: chrono::NaiveDate) -> anyhow::Result<(i32, u8, u8)> {
317+
use chrono::Datelike;
318+
Ok((
319+
value.year(),
320+
value.month().try_into()?,
321+
value.day().try_into()?,
322+
))
323+
}
324+
325+
fn tuplify_time(value: chrono::NaiveTime) -> anyhow::Result<(u8, u8, u8, u32)> {
326+
use chrono::Timelike;
327+
Ok((
328+
value.hour().try_into()?,
329+
value.minute().try_into()?,
330+
value.second().try_into()?,
331+
value.nanosecond(),
332+
))
333+
}
334+
244335
/// Although the Postgres crate converts Rust Option::None to Postgres NULL,
245336
/// it enforces the type of the Option as it does so. (For example, trying to
246337
/// pass an Option::<i32>::None to a VARCHAR column fails conversion.) As we

0 commit comments

Comments
 (0)