From b64b50fb7679b47dadf099288ff4aadbb62f7ddc Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 15 Sep 2025 21:32:05 -0400 Subject: [PATCH 1/5] feat: Add `OR REPLACE` to creating external tables --- datafusion/catalog/src/listing_schema.rs | 1 + .../src/datasource/listing_table_factory.rs | 7 + datafusion/core/src/execution/context/mod.rs | 23 ++- datafusion/expr/src/logical_plan/ddl.rs | 2 + datafusion/proto/src/generated/prost.rs | 2 + datafusion/proto/src/logical_plan/mod.rs | 3 + datafusion/sql/src/parser.rs | 148 +++++++++++++++++- datafusion/sql/src/statement.rs | 3 +- .../test_files/create_external_table.slt | 6 + datafusion/sqllogictest/test_files/ddl.slt | 15 +- 10 files changed, 197 insertions(+), 13 deletions(-) diff --git a/datafusion/catalog/src/listing_schema.rs b/datafusion/catalog/src/listing_schema.rs index 2e4eac964b18..7e19c1ecaab0 100644 --- a/datafusion/catalog/src/listing_schema.rs +++ b/datafusion/catalog/src/listing_schema.rs @@ -136,6 +136,7 @@ impl ListingSchemaProvider { file_type: self.format.clone(), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], diff --git a/datafusion/core/src/datasource/listing_table_factory.rs b/datafusion/core/src/datasource/listing_table_factory.rs index 218a1fedbb37..45d0fefded57 100644 --- a/datafusion/core/src/datasource/listing_table_factory.rs +++ b/datafusion/core/src/datasource/listing_table_factory.rs @@ -238,6 +238,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -278,6 +279,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -322,6 +324,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -373,6 +376,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -416,6 +420,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -455,6 +460,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], @@ -495,6 +501,7 @@ mod tests { schema: Arc::new(DFSchema::empty()), table_partition_cols: vec![], if_not_exists: false, + or_replace: false, temporary: false, definition: None, order_exprs: vec![], diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index de1d40dda349..886fdf94f125 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -790,9 +790,26 @@ impl SessionContext { } if exist { - match cmd.if_not_exists { - true => return self.return_empty_dataframe(), - false => { + match (cmd.if_not_exists, cmd.or_replace) { + (true, false) => return self.return_empty_dataframe(), + (false, true) => { + let result = self + .find_and_deregister(cmd.name.clone(), TableType::Base) + .await; + match result { + Ok(true) => { + let table_provider: Arc = + self.create_custom_table(cmd).await?; + self.register_table(cmd.name.clone(), table_provider)?; + return self.return_empty_dataframe(); + } + _ => return exec_err!("View '{}' doesn't exist.", cmd.name), + } + } + (true, true) => { + return exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") + } + (false, false) => { return exec_err!("Table '{}' already exists", cmd.name); } } diff --git a/datafusion/expr/src/logical_plan/ddl.rs b/datafusion/expr/src/logical_plan/ddl.rs index d477ca1ad0d5..5075fc28e852 100644 --- a/datafusion/expr/src/logical_plan/ddl.rs +++ b/datafusion/expr/src/logical_plan/ddl.rs @@ -213,6 +213,8 @@ pub struct CreateExternalTable { pub table_partition_cols: Vec, /// Option to not error if table already exists pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, /// Whether the table is a temporary table pub temporary: bool, /// SQL used to create the table, if available diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ffb73086650f..803a79536daf 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -251,6 +251,8 @@ pub struct CreateExternalTableNode { pub table_partition_cols: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(bool, tag = "6")] pub if_not_exists: bool, + #[prost(bool, tag = "15")] + pub or_replace: bool, #[prost(bool, tag = "14")] pub temporary: bool, #[prost(string, tag = "7")] diff --git a/datafusion/proto/src/logical_plan/mod.rs b/datafusion/proto/src/logical_plan/mod.rs index cc3e805ed1df..6687cc31a3b0 100644 --- a/datafusion/proto/src/logical_plan/mod.rs +++ b/datafusion/proto/src/logical_plan/mod.rs @@ -632,6 +632,7 @@ impl AsLogicalPlan for LogicalPlanNode { .clone(), order_exprs, if_not_exists: create_extern_table.if_not_exists, + or_replace: create_extern_table.or_replace, temporary: create_extern_table.temporary, definition, unbounded: create_extern_table.unbounded, @@ -1469,6 +1470,7 @@ impl AsLogicalPlan for LogicalPlanNode { schema: df_schema, table_partition_cols, if_not_exists, + or_replace, definition, order_exprs, unbounded, @@ -1502,6 +1504,7 @@ impl AsLogicalPlan for LogicalPlanNode { schema: Some(df_schema.try_into()?), table_partition_cols: table_partition_cols.clone(), if_not_exists: *if_not_exists, + or_replace: *or_replace, temporary: *temporary, order_exprs: converted_order_exprs, definition: definition.clone().unwrap_or_default(), diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 2c673162ec9c..6ffd497c2201 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -188,7 +188,9 @@ pub(crate) type LexOrdering = Vec; /// Syntax: /// /// ```text -/// CREATE EXTERNAL TABLE +/// CREATE +/// [ OR REPLACE ] +/// EXTERNAL TABLE /// [ IF NOT EXISTS ] /// [ () ] /// STORED AS @@ -221,6 +223,8 @@ pub struct CreateExternalTable { pub order_exprs: Vec, /// Option to not error if table already exists pub if_not_exists: bool, + /// Option to replace table content if table already exists + pub or_replace: bool, /// Whether the table is a temporary table pub temporary: bool, /// Infinite streams? @@ -724,13 +728,29 @@ impl<'a> DFParser<'a> { /// Parse a SQL `CREATE` statement handling `CREATE EXTERNAL TABLE` pub fn parse_create(&mut self) -> Result { - if self.parser.parse_keyword(Keyword::EXTERNAL) { - self.parse_create_external_table(false) - } else if self.parser.parse_keyword(Keyword::UNBOUNDED) { - self.parser.expect_keyword(Keyword::EXTERNAL)?; - self.parse_create_external_table(true) + // TODO: Change sql parser to take in `or_replace: bool` inside parse_create() + if self + .parser + .parse_keywords(&[Keyword::OR, Keyword::REPLACE, Keyword::EXTERNAL]) + { + self.parse_create_external_table(false, true) + } else if self.parser.parse_keywords(&[ + Keyword::OR, + Keyword::REPLACE, + Keyword::UNBOUNDED, + Keyword::EXTERNAL, + ]) { + self.parse_create_external_table(true, true) + } else if self.parser.parse_keyword(Keyword::EXTERNAL) { + self.parse_create_external_table(false, false) + }else if self + .parser + .parse_keywords(&[Keyword::UNBOUNDED, Keyword::EXTERNAL]) + { + self.parse_create_external_table(true, false) } else { - Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) + + Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) } } @@ -876,15 +896,22 @@ impl<'a> DFParser<'a> { fn parse_create_external_table( &mut self, unbounded: bool, + or_replace: bool, ) -> Result { let temporary = self .parser .parse_one_of_keywords(&[Keyword::TEMP, Keyword::TEMPORARY]) .is_some(); + self.parser.expect_keyword(Keyword::TABLE)?; let if_not_exists = self.parser .parse_keywords(&[Keyword::IF, Keyword::NOT, Keyword::EXISTS]); + + if if_not_exists && or_replace { + return parser_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'"); + } + let table_name = self.parser.parse_object_name(true)?; let (mut columns, constraints) = self.parse_columns()?; @@ -1000,6 +1027,7 @@ impl<'a> DFParser<'a> { table_partition_cols: builder.table_partition_cols.unwrap_or(vec![]), order_exprs: builder.order_exprs, if_not_exists, + or_replace, temporary, unbounded, options: builder.options.unwrap_or(Vec::new()), @@ -1108,6 +1136,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1125,6 +1154,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1143,6 +1173,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1161,6 +1192,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![( @@ -1182,6 +1214,7 @@ mod tests { table_partition_cols: vec!["p1".to_string(), "p2".to_string()], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1210,6 +1243,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![( @@ -1231,6 +1265,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1248,6 +1283,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1265,6 +1301,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1283,6 +1320,26 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: true, + or_replace: false, + temporary: false, + unbounded: false, + options: vec![], + constraints: vec![], + }); + expect_parse_ok(sql, expected)?; + + // positive case: or replace + let sql = + "CREATE OR REPLACE EXTERNAL TABLE t STORED AS PARQUET LOCATION 'foo.parquet'"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: name.clone(), + columns: vec![], + file_type: "PARQUET".to_string(), + location: "foo.parquet".into(), + table_partition_cols: vec![], + order_exprs: vec![], + if_not_exists: false, + or_replace: true, temporary: false, unbounded: false, options: vec![], @@ -1304,6 +1361,7 @@ mod tests { table_partition_cols: vec!["p1".to_string()], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1335,6 +1393,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![("k1".into(), Value::SingleQuotedString("v1".into()))], @@ -1353,6 +1412,7 @@ mod tests { table_partition_cols: vec![], order_exprs: vec![], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![ @@ -1401,6 +1461,7 @@ mod tests { with_fill: None, }]], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1448,6 +1509,7 @@ mod tests { }, ]], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1488,6 +1550,7 @@ mod tests { with_fill: None, }]], if_not_exists: false, + or_replace: false, temporary: false, unbounded: false, options: vec![], @@ -1495,7 +1558,7 @@ mod tests { }); expect_parse_ok(sql, expected)?; - // Most complete CREATE EXTERNAL TABLE statement possible + // Most complete CREATE EXTERNAL TABLE statement possible (using IF NOT EXISTS) let sql = " CREATE UNBOUNDED EXTERNAL TABLE IF NOT EXISTS t (c1 int, c2 float) STORED AS PARQUET @@ -1537,6 +1600,75 @@ mod tests { with_fill: None, }]], if_not_exists: true, + or_replace: false, + temporary: false, + unbounded: true, + options: vec![ + ( + "format.compression".into(), + Value::SingleQuotedString("zstd".into()), + ), + ( + "format.delimiter".into(), + Value::SingleQuotedString("*".into()), + ), + ( + "ROW_GROUP_SIZE".into(), + Value::SingleQuotedString("1024".into()), + ), + ("TRUNCATE".into(), Value::SingleQuotedString("NO".into())), + ( + "format.has_header".into(), + Value::SingleQuotedString("true".into()), + ), + ], + constraints: vec![], + }); + expect_parse_ok(sql, expected)?; + + // Most complete CREATE EXTERNAL TABLE statement possible (using OR REPLACE) + let sql = " + CREATE OR REPLACE UNBOUNDED EXTERNAL TABLE t (c1 int, c2 float) + STORED AS PARQUET + WITH ORDER (c1 - c2 ASC) + PARTITIONED BY (c1) + LOCATION 'foo.parquet' + OPTIONS ('format.compression' 'zstd', + 'format.delimiter' '*', + 'ROW_GROUP_SIZE' '1024', + 'TRUNCATE' 'NO', + 'format.has_header' 'true')"; + let expected = Statement::CreateExternalTable(CreateExternalTable { + name: name.clone(), + columns: vec![ + make_column_def("c1", DataType::Int(None)), + make_column_def("c2", DataType::Float(None)), + ], + file_type: "PARQUET".to_string(), + location: "foo.parquet".into(), + table_partition_cols: vec!["c1".into()], + order_exprs: vec![vec![OrderByExpr { + expr: Expr::BinaryOp { + left: Box::new(Identifier(Ident { + value: "c1".to_owned(), + quote_style: None, + span: Span::empty(), + })), + op: BinaryOperator::Minus, + right: Box::new(Identifier(Ident { + value: "c2".to_owned(), + quote_style: None, + span: Span::empty(), + })), + }, + options: OrderByOptions { + asc: Some(true), + nulls_first: None, + }, + with_fill: None, + }]], + if_not_exists: false, + or_replace: true, temporary: false, unbounded: true, options: vec![ diff --git a/datafusion/sql/src/statement.rs b/datafusion/sql/src/statement.rs index 7f94fce7bd9f..44e924614208 100644 --- a/datafusion/sql/src/statement.rs +++ b/datafusion/sql/src/statement.rs @@ -519,7 +519,6 @@ impl SqlToRel<'_, S> { } } } - Statement::CreateView { or_replace, materialized, @@ -1504,6 +1503,7 @@ impl SqlToRel<'_, S> { unbounded, options, constraints, + or_replace, } = statement; // Merge inline constraints and existing constraints @@ -1552,6 +1552,7 @@ impl SqlToRel<'_, S> { file_type, table_partition_cols, if_not_exists, + or_replace, temporary, definition, order_exprs: ordered_exprs, diff --git a/datafusion/sqllogictest/test_files/create_external_table.slt b/datafusion/sqllogictest/test_files/create_external_table.slt index 03cb5edb5fcc..4a803c981a92 100644 --- a/datafusion/sqllogictest/test_files/create_external_table.slt +++ b/datafusion/sqllogictest/test_files/create_external_table.slt @@ -297,3 +297,9 @@ CREATE EXTERNAL TABLE staging.foo STORED AS parquet LOCATION '../../parquet-test # Create external table with qualified name, but no schema should error statement error DataFusion error: Error during planning: failed to resolve schema: release CREATE EXTERNAL TABLE release.bar STORED AS parquet LOCATION '../../parquet-testing/data/alltypes_plain.parquet'; + +# Cannot create external table alongside `if_not_exists` and `or_replace` +statement error DataFusion error: SQL error: ParserError\("'IF NOT EXISTS' cannot coexist with 'REPLACE'"\) +CREATE OR REPLACE EXTERNAL TABLE IF NOT EXISTS t_conflict(c1 int) +STORED AS CSV +LOCATION 'foo.csv'; \ No newline at end of file diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 81f2955eff49..1a8fd27119d3 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -272,7 +272,7 @@ drop table my_table # select_into statement ok -SELECT* INTO my_table FROM (SELECT * FROM aggregate_simple) +SELECT * INTO my_table FROM (SELECT * FROM aggregate_simple) query RRB rowsort SELECT * FROM my_table order by c1 LIMIT 1 @@ -607,6 +607,19 @@ CREATE TABLE table_without_values(field1 BIGINT, field2 BIGINT); statement error Execution error: 'IF NOT EXISTS' cannot coexist with 'REPLACE' CREATE OR REPLACE TABLE IF NOT EXISTS table_without_values(field1 BIGINT, field2 BIGINT); +# CREATE OR REPLACE +statement ok +CREATE OR REPLACE EXTERNAL TABLE aggregate_simple_repl +STORED AS CSV +LOCATION '../core/tests/data/aggregate_simple.csv' +OPTIONS ('format.has_header' 'true'); + +statement ok +CREATE OR REPLACE EXTERNAL TABLE aggregate_simple_repl +STORED AS CSV +LOCATION '../core/tests/data/aggregate_simple.csv' +OPTIONS ('format.has_header' 'true'); + # Should insert into an empty table statement ok insert into table_without_values values (1, 2), (2, 3), (2, 4); From 3720ee977088b5e56675e9e3914e11e44e91648d Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 15 Sep 2025 21:37:16 -0400 Subject: [PATCH 2/5] regen --- datafusion/proto/src/generated/prost.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index 803a79536daf..ffb73086650f 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -251,8 +251,6 @@ pub struct CreateExternalTableNode { pub table_partition_cols: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(bool, tag = "6")] pub if_not_exists: bool, - #[prost(bool, tag = "15")] - pub or_replace: bool, #[prost(bool, tag = "14")] pub temporary: bool, #[prost(string, tag = "7")] From 408dd6d86e7307d79fb08cbf72a1489ebc93c0d8 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Mon, 15 Sep 2025 22:03:21 -0400 Subject: [PATCH 3/5] fmt --- datafusion/proto/proto/datafusion.proto | 1 + datafusion/proto/src/generated/pbjson.rs | 18 ++++++++++++++++++ datafusion/proto/src/generated/prost.rs | 2 ++ datafusion/sql/src/parser.rs | 7 +++---- 4 files changed, 24 insertions(+), 4 deletions(-) diff --git a/datafusion/proto/proto/datafusion.proto b/datafusion/proto/proto/datafusion.proto index 4f411a4a9332..ee9ac0e7902d 100644 --- a/datafusion/proto/proto/datafusion.proto +++ b/datafusion/proto/proto/datafusion.proto @@ -167,6 +167,7 @@ message CreateExternalTableNode { datafusion_common.DfSchema schema = 4; repeated string table_partition_cols = 5; bool if_not_exists = 6; + bool or_replace = 15; bool temporary = 14; string definition = 7; repeated SortExprNodeCollection order_exprs = 10; diff --git a/datafusion/proto/src/generated/pbjson.rs b/datafusion/proto/src/generated/pbjson.rs index ff7519aa5df2..29967d812000 100644 --- a/datafusion/proto/src/generated/pbjson.rs +++ b/datafusion/proto/src/generated/pbjson.rs @@ -3094,6 +3094,9 @@ impl serde::Serialize for CreateExternalTableNode { if self.if_not_exists { len += 1; } + if self.or_replace { + len += 1; + } if self.temporary { len += 1; } @@ -3134,6 +3137,9 @@ impl serde::Serialize for CreateExternalTableNode { if self.if_not_exists { struct_ser.serialize_field("ifNotExists", &self.if_not_exists)?; } + if self.or_replace { + struct_ser.serialize_field("orReplace", &self.or_replace)?; + } if self.temporary { struct_ser.serialize_field("temporary", &self.temporary)?; } @@ -3174,6 +3180,8 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "tablePartitionCols", "if_not_exists", "ifNotExists", + "or_replace", + "orReplace", "temporary", "definition", "order_exprs", @@ -3193,6 +3201,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { Schema, TablePartitionCols, IfNotExists, + OrReplace, Temporary, Definition, OrderExprs, @@ -3227,6 +3236,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { "schema" => Ok(GeneratedField::Schema), "tablePartitionCols" | "table_partition_cols" => Ok(GeneratedField::TablePartitionCols), "ifNotExists" | "if_not_exists" => Ok(GeneratedField::IfNotExists), + "orReplace" | "or_replace" => Ok(GeneratedField::OrReplace), "temporary" => Ok(GeneratedField::Temporary), "definition" => Ok(GeneratedField::Definition), "orderExprs" | "order_exprs" => Ok(GeneratedField::OrderExprs), @@ -3259,6 +3269,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { let mut schema__ = None; let mut table_partition_cols__ = None; let mut if_not_exists__ = None; + let mut or_replace__ = None; let mut temporary__ = None; let mut definition__ = None; let mut order_exprs__ = None; @@ -3304,6 +3315,12 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { } if_not_exists__ = Some(map_.next_value()?); } + GeneratedField::OrReplace => { + if or_replace__.is_some() { + return Err(serde::de::Error::duplicate_field("orReplace")); + } + or_replace__ = Some(map_.next_value()?); + } GeneratedField::Temporary => { if temporary__.is_some() { return Err(serde::de::Error::duplicate_field("temporary")); @@ -3359,6 +3376,7 @@ impl<'de> serde::Deserialize<'de> for CreateExternalTableNode { schema: schema__, table_partition_cols: table_partition_cols__.unwrap_or_default(), if_not_exists: if_not_exists__.unwrap_or_default(), + or_replace: or_replace__.unwrap_or_default(), temporary: temporary__.unwrap_or_default(), definition: definition__.unwrap_or_default(), order_exprs: order_exprs__.unwrap_or_default(), diff --git a/datafusion/proto/src/generated/prost.rs b/datafusion/proto/src/generated/prost.rs index ffb73086650f..803a79536daf 100644 --- a/datafusion/proto/src/generated/prost.rs +++ b/datafusion/proto/src/generated/prost.rs @@ -251,6 +251,8 @@ pub struct CreateExternalTableNode { pub table_partition_cols: ::prost::alloc::vec::Vec<::prost::alloc::string::String>, #[prost(bool, tag = "6")] pub if_not_exists: bool, + #[prost(bool, tag = "15")] + pub or_replace: bool, #[prost(bool, tag = "14")] pub temporary: bool, #[prost(string, tag = "7")] diff --git a/datafusion/sql/src/parser.rs b/datafusion/sql/src/parser.rs index 6ffd497c2201..06418fa4ec96 100644 --- a/datafusion/sql/src/parser.rs +++ b/datafusion/sql/src/parser.rs @@ -741,16 +741,15 @@ impl<'a> DFParser<'a> { Keyword::EXTERNAL, ]) { self.parse_create_external_table(true, true) - } else if self.parser.parse_keyword(Keyword::EXTERNAL) { + } else if self.parser.parse_keyword(Keyword::EXTERNAL) { self.parse_create_external_table(false, false) - }else if self + } else if self .parser .parse_keywords(&[Keyword::UNBOUNDED, Keyword::EXTERNAL]) { self.parse_create_external_table(true, false) } else { - - Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) + Ok(Statement::Statement(Box::from(self.parser.parse_create()?))) } } From 617afc647c831de554708d61a86bb84c3cac1075 Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 16 Sep 2025 13:20:27 -0400 Subject: [PATCH 4/5] make more explicit + add tests --- datafusion/core/src/execution/context/mod.rs | 60 +++++++++++--------- datafusion/sqllogictest/test_files/ddl.slt | 38 ++++++++++++- 2 files changed, 71 insertions(+), 27 deletions(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 886fdf94f125..6756ef894fd8 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -789,36 +789,44 @@ impl SessionContext { return not_impl_err!("Temporary tables not supported"); } - if exist { - match (cmd.if_not_exists, cmd.or_replace) { - (true, false) => return self.return_empty_dataframe(), - (false, true) => { - let result = self - .find_and_deregister(cmd.name.clone(), TableType::Base) - .await; - match result { - Ok(true) => { - let table_provider: Arc = - self.create_custom_table(cmd).await?; - self.register_table(cmd.name.clone(), table_provider)?; - return self.return_empty_dataframe(); - } - _ => return exec_err!("View '{}' doesn't exist.", cmd.name), + match (cmd.if_not_exists, cmd.or_replace, exist) { + (true, false, true) => return self.return_empty_dataframe(), + (false, true, true) => { + let result = self + .find_and_deregister(cmd.name.clone(), TableType::Base) + .await; + + match result { + Ok(true) => { + let table_provider: Arc = + self.create_custom_table(cmd).await?; + self.register_table(cmd.name.clone(), table_provider)?; + self.return_empty_dataframe() + } + Ok(false) => { + let table_provider: Arc = + self.create_custom_table(cmd).await?; + self.register_table(cmd.name.clone(), table_provider)?; + self.return_empty_dataframe() + } + Err(e) => { + exec_err!("Errored while deregistering external table: {}", e) } } - (true, true) => { - return exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") - } - (false, false) => { - return exec_err!("Table '{}' already exists", cmd.name); - } + } + (true, true, true) => { + exec_err!("'IF NOT EXISTS' cannot coexist with 'REPLACE'") + } + (_, _, false) => { + let table_provider: Arc = + self.create_custom_table(cmd).await?; + self.register_table(cmd.name.clone(), table_provider)?; + self.return_empty_dataframe() + } + (false, false, true) => { + exec_err!("External table '{}' already exists", cmd.name) } } - - let table_provider: Arc = - self.create_custom_table(cmd).await?; - self.register_table(cmd.name.clone(), table_provider)?; - self.return_empty_dataframe() } async fn create_memory_table(&self, cmd: CreateMemoryTable) -> Result { diff --git a/datafusion/sqllogictest/test_files/ddl.slt b/datafusion/sqllogictest/test_files/ddl.slt index 1a8fd27119d3..f755ab3f356c 100644 --- a/datafusion/sqllogictest/test_files/ddl.slt +++ b/datafusion/sqllogictest/test_files/ddl.slt @@ -587,7 +587,7 @@ statement ok CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV LOCATION '../core/tests/data/aggregate_simple.csv' OPTIONS ('format.has_header' 'true'); # Should not recreate the same EXTERNAL table -statement error Execution error: Table 'aggregate_simple' already exists +statement error Execution error: External table 'aggregate_simple' already exists CREATE EXTERNAL TABLE aggregate_simple STORED AS CSV LOCATION '../core/tests/data/aggregate_simple.csv' OPTIONS ('format.has_header' 'true'); statement ok @@ -620,6 +620,42 @@ STORED AS CSV LOCATION '../core/tests/data/aggregate_simple.csv' OPTIONS ('format.has_header' 'true'); +# Create replacement table for table that doesn't already exist +statement ok +DROP TABLE IF EXISTS aggregate_table; + +statement ok +CREATE OR REPLACE EXTERNAL TABLE aggregate_table +STORED AS CSV +LOCATION '../core/tests/data/aggregate_simple.csv' +OPTIONS ('format.has_header' 'true'); + +query TTT +DESCRIBE aggregate_table; +---- +c1 Float64 YES +c2 Float64 YES +c3 Boolean YES + +# Create replacement table with different format for table that doesn't already exist +query I +COPY (SELECT * FROM (VALUES (1),(2),(3)) AS t(id)) +TO 'test_files/scratch/ddl/test_table' +STORED AS PARQUET; +---- +3 + +statement ok +CREATE OR REPLACE EXTERNAL TABLE aggregate_table +STORED AS PARQUET +LOCATION 'test_files/scratch/ddl/test_table'; + + +query TTT +DESCRIBE aggregate_table; +---- +id Int64 YES + # Should insert into an empty table statement ok insert into table_without_values values (1, 2), (2, 3), (2, 4); From 62fa332a62706a066529ebe7dd70306a2988952b Mon Sep 17 00:00:00 2001 From: Jonathan Date: Tue, 16 Sep 2025 16:52:31 -0400 Subject: [PATCH 5/5] clipy fix --- datafusion/core/src/execution/context/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/datafusion/core/src/execution/context/mod.rs b/datafusion/core/src/execution/context/mod.rs index 6756ef894fd8..d95155a5626b 100644 --- a/datafusion/core/src/execution/context/mod.rs +++ b/datafusion/core/src/execution/context/mod.rs @@ -790,7 +790,7 @@ impl SessionContext { } match (cmd.if_not_exists, cmd.or_replace, exist) { - (true, false, true) => return self.return_empty_dataframe(), + (true, false, true) => self.return_empty_dataframe(), (false, true, true) => { let result = self .find_and_deregister(cmd.name.clone(), TableType::Base)