Skip to content

Commit

Permalink
feat(clickhouse): add nullable test case
Browse files Browse the repository at this point in the history
  • Loading branch information
burmecia committed Sep 11, 2024
1 parent fa8d789 commit 578b69f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 21 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion wrappers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ bigquery_fdw = [
"yup-oauth2",
"thiserror",
]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror"]
clickhouse_fdw = ["clickhouse-rs", "chrono", "chrono-tz", "regex", "thiserror", "either"]
stripe_fdw = [
"http",
"reqwest",
Expand Down Expand Up @@ -175,6 +175,7 @@ clickhouse-rs = { git = "https://github.com/suharev7/clickhouse-rs", rev = "abfe
], optional = true }
chrono = { version = "0.4", optional = true }
chrono-tz = { version = "0.6", optional = true }
either = { version = "1.12.0", optional = true }


# for bigquery_fdw, firebase_fdw, airtable_fdw and etc.
Expand Down
85 changes: 68 additions & 17 deletions wrappers/src/fdw/clickhouse_fdw/clickhouse_fdw.rs
Original file line number Diff line number Diff line change
Expand Up @@ -364,33 +364,84 @@ impl ForeignDataWrapper<ClickHouseFdwError> for ClickHouseFdw {

fn insert(&mut self, src: &Row) -> ClickHouseFdwResult<()> {
if let Some(ref mut client) = self.client {
// use a dummy query to probe column types
let sql = format!("select * from {} where false", self.table);
let probe = self.rt.block_on(client.query(&sql).fetch_all())?;

// add row to block
let mut row = Vec::new();
for (col_name, cell) in src.iter() {
let col_name = col_name.to_owned();
if let Some(cell) = cell {
match cell {
Cell::Bool(v) => row.push((col_name, types::Value::from(*v))),
Cell::F64(v) => row.push((col_name, types::Value::from(*v))),
Cell::I64(v) => row.push((col_name, types::Value::from(*v))),
Cell::String(v) => row.push((col_name, types::Value::from(v.as_str()))),
let tgt_col = probe.get_column(col_name.as_ref())?;
let is_nullable = matches!(tgt_col.sql_type(), SqlType::Nullable(_));

let value = cell
.as_ref()
.map(|c| match c {
Cell::Bool(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::F64(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::I64(v) => {
let val = if is_nullable {
types::Value::from(Some(*v))
} else {
types::Value::from(*v)
};
Ok(val)
}
Cell::String(v) => {
let s = v.as_str();
let val = if is_nullable {
types::Value::from(Some(s))
} else {
types::Value::from(s)
};
Ok(val)
}
Cell::Date(_) => {
let s = cell.to_string().replace('\'', "");
let s = c.to_string().replace('\'', "");
let tm = NaiveDate::parse_from_str(&s, "%Y-%m-%d")?;
let epoch = NaiveDate::from_ymd_opt(1970, 1, 1).unwrap();
let duration = tm - epoch;
let dt = types::Value::Date(duration.num_days() as u16);
row.push((col_name, dt));
let dt = duration.num_days() as u16;
let val = if is_nullable {
types::Value::from(Some(dt))
} else {
types::Value::Date(dt)
};
Ok(val)
}
Cell::Timestamp(_) => {
let s = cell.to_string().replace('\'', "");
let tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?;
let tm: DateTime<Utc> = DateTime::from_naive_utc_and_offset(tm, Utc);
row.push((col_name, types::Value::from(tm)));
}
_ => {
return Err(ClickHouseFdwError::UnsupportedColumnType(cell.to_string()))
let s = c.to_string().replace('\'', "");
let naive_tm = NaiveDateTime::parse_from_str(&s, "%Y-%m-%d %H:%M:%S")?;
let tm: DateTime<Utc> =
DateTime::from_naive_utc_and_offset(naive_tm, Utc);
let val = if is_nullable {
types::Value::Nullable(either::Either::Right(Box::new(tm.into())))
} else {
types::Value::from(tm)
};
Ok(val)
}
}
_ => Err(ClickHouseFdwError::UnsupportedColumnType(c.to_string())),
})
.transpose()?;

if let Some(v) = value {
row.push((col_name, v));
}
}
let mut block = Block::new();
Expand Down
20 changes: 17 additions & 3 deletions wrappers/src/fdw/clickhouse_fdw/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ mod tests {
rt.block_on(async {
handle.execute("DROP TABLE IF EXISTS test_table").await?;
handle
.execute("CREATE TABLE test_table (id INT, name TEXT) engine = Memory")
.execute(
"CREATE TABLE test_table (id Int64, name Nullable(TEXT)) engine = Memory",
)
.await
})
.expect("test_table in ClickHouse");
Expand Down Expand Up @@ -133,6 +135,18 @@ mod tests {
)]),
)
.unwrap();
c.update(
"INSERT INTO test_table (id, name) VALUES ($1, $2)",
None,
Some(vec![
(PgOid::BuiltIn(PgBuiltInOids::INT4OID), 42.into_datum()),
(
PgOid::BuiltIn(PgBuiltInOids::TEXTOID),
None::<String>.into_datum(),
),
]),
)
.unwrap();
assert_eq!(
c.select("SELECT name FROM test_table ORDER BY name", None, None)
.unwrap()
Expand Down Expand Up @@ -230,7 +244,7 @@ mod tests {
"test3"
);

let remote_value: String = rt
let remote_value: Option<String> = rt
.block_on(async {
handle
.query("SELECT name FROM test_table ORDER BY name LIMIT 1")
Expand All @@ -242,7 +256,7 @@ mod tests {
.get("name")
})
.expect("value");
assert_eq!(remote_value, "test");
assert_eq!(remote_value, Some("test".to_string()));
});
}
}

0 comments on commit 578b69f

Please sign in to comment.