Skip to content

Commit c73c0bb

Browse files
committed
added truncate table support
1 parent 70daf88 commit c73c0bb

10 files changed

Lines changed: 304 additions & 1 deletion

File tree

datafusion/catalog/src/table.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -353,6 +353,14 @@ pub trait TableProvider: Debug + Sync + Send {
353353
) -> Result<Arc<dyn ExecutionPlan>> {
354354
not_impl_err!("UPDATE not supported for {} table", self.table_type())
355355
}
356+
357+
/// Truncate rows
358+
async fn truncate(
359+
&self,
360+
_state: &dyn Session,
361+
) -> Result<Arc<dyn ExecutionPlan>> {
362+
not_impl_err!("TRUNCATE not supported for {}", self.table_type())
363+
}
356364
}
357365

358366
/// Arguments for scanning a table with [`TableProvider::scan_with_args`].

datafusion/core/src/physical_planner.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -655,6 +655,31 @@ impl DefaultPhysicalPlanner {
655655
);
656656
}
657657
}
658+
LogicalPlan::Dml(DmlStatement {
659+
table_name,
660+
target,
661+
op: WriteOp::Truncate,
662+
..
663+
}) => {
664+
if let Some(provider) =
665+
target.as_any().downcast_ref::<DefaultTableSource>()
666+
{
667+
provider
668+
.table_provider
669+
.truncate(session_state)
670+
.await
671+
.map_err(|e| {
672+
e.context(format!(
673+
"TRUNCATE operation on table '{}'",
674+
table_name
675+
))
676+
})?
677+
} else {
678+
return exec_err!(
679+
"Table source can't be downcasted to DefaultTableSource"
680+
);
681+
}
682+
}
658683
LogicalPlan::Window(Window { window_expr, .. }) => {
659684
assert_or_internal_err!(
660685
!window_expr.is_empty(),

datafusion/core/tests/custom_sources_cases/dml_planning.rs

Lines changed: 99 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18-
//! Tests for DELETE and UPDATE planning to verify filter and assignment extraction.
18+
//! Tests for DELETE, UPDATE, and TRUNCATE planning to verify filter and assignment extraction.
1919
2020
use std::any::Any;
2121
use std::sync::{Arc, Mutex};
@@ -165,6 +165,69 @@ impl TableProvider for CaptureUpdateProvider {
165165
}
166166
}
167167

168+
/// A TableProvider that captures whether truncate() was called.
169+
struct CaptureTruncateProvider {
170+
schema: SchemaRef,
171+
truncate_called: Arc<Mutex<bool>>,
172+
}
173+
174+
impl CaptureTruncateProvider {
175+
fn new(schema: SchemaRef) -> Self {
176+
Self {
177+
schema,
178+
truncate_called: Arc::new(Mutex::new(false)),
179+
}
180+
}
181+
182+
fn was_truncated(&self) -> bool {
183+
*self.truncate_called.lock().unwrap()
184+
}
185+
}
186+
187+
impl std::fmt::Debug for CaptureTruncateProvider {
188+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189+
f.debug_struct("CaptureTruncateProvider")
190+
.field("schema", &self.schema)
191+
.finish()
192+
}
193+
}
194+
195+
#[async_trait]
196+
impl TableProvider for CaptureTruncateProvider {
197+
fn as_any(&self) -> &dyn Any {
198+
self
199+
}
200+
201+
fn schema(&self) -> SchemaRef {
202+
Arc::clone(&self.schema)
203+
}
204+
205+
fn table_type(&self) -> TableType {
206+
TableType::Base
207+
}
208+
209+
async fn scan(
210+
&self,
211+
_state: &dyn Session,
212+
_projection: Option<&Vec<usize>>,
213+
_filters: &[Expr],
214+
_limit: Option<usize>,
215+
) -> Result<Arc<dyn ExecutionPlan>> {
216+
Ok(Arc::new(EmptyExec::new(Arc::clone(&self.schema))))
217+
}
218+
219+
async fn truncate(
220+
&self,
221+
_state: &dyn Session,
222+
) -> Result<Arc<dyn ExecutionPlan>> {
223+
*self.truncate_called.lock().unwrap() = true;
224+
225+
Ok(Arc::new(EmptyExec::new(Arc::new(Schema::new(vec![
226+
Field::new("count", DataType::UInt64, false),
227+
])))))
228+
}
229+
}
230+
168231
fn test_schema() -> SchemaRef {
169232
Arc::new(Schema::new(vec![
170233
Field::new("id", DataType::Int32, false),
@@ -269,6 +332,26 @@ async fn test_update_assignments() -> Result<()> {
269332
Ok(())
270333
}
271334

335+
#[tokio::test]
336+
async fn test_truncate_calls_provider() -> Result<()> {
337+
let provider = Arc::new(CaptureTruncateProvider::new(test_schema()));
338+
let ctx = SessionContext::new();
339+
340+
ctx.register_table("t", Arc::clone(&provider) as Arc<dyn TableProvider>)?;
341+
342+
ctx.sql("TRUNCATE TABLE t")
343+
.await?
344+
.collect()
345+
.await?;
346+
347+
assert!(
348+
provider.was_truncated(),
349+
"truncate() should be called on the TableProvider"
350+
);
351+
352+
Ok(())
353+
}
354+
272355
#[tokio::test]
273356
async fn test_unsupported_table_delete() -> Result<()> {
274357
let schema = test_schema();
@@ -295,3 +378,18 @@ async fn test_unsupported_table_update() -> Result<()> {
295378
assert!(result.is_err() || result.unwrap().collect().await.is_err());
296379
Ok(())
297380
}
381+
382+
#[tokio::test]
383+
async fn test_unsupported_table_truncate() -> Result<()> {
384+
let schema = test_schema();
385+
let ctx = SessionContext::new();
386+
387+
let empty_table = datafusion::datasource::empty::EmptyTable::new(schema);
388+
ctx.register_table("empty_t", Arc::new(empty_table))?;
389+
390+
let result = ctx.sql("TRUNCATE TABLE empty_t").await;
391+
392+
assert!(result.is_err() || result.unwrap().collect().await.is_err());
393+
394+
Ok(())
395+
}

datafusion/expr/src/logical_plan/dml.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ pub enum WriteOp {
237237
Update,
238238
/// `CREATE TABLE AS SELECT` operation
239239
Ctas,
240+
/// `TRUNCATE` operation
241+
Truncate,
240242
}
241243

242244
impl WriteOp {
@@ -247,6 +249,7 @@ impl WriteOp {
247249
WriteOp::Delete => "Delete",
248250
WriteOp::Update => "Update",
249251
WriteOp::Ctas => "Ctas",
252+
WriteOp::Truncate => "Truncate",
250253
}
251254
}
252255
}

datafusion/proto/src/generated/prost.rs

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

datafusion/proto/src/logical_plan/from_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ impl From<protobuf::dml_node::Type> for WriteOp {
239239
}
240240
protobuf::dml_node::Type::InsertReplace => WriteOp::Insert(InsertOp::Replace),
241241
protobuf::dml_node::Type::Ctas => WriteOp::Ctas,
242+
protobuf::dml_node::Type::Truncate => WriteOp::Truncate,
242243
}
243244
}
244245
}

datafusion/proto/src/logical_plan/to_proto.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -728,6 +728,7 @@ impl From<&WriteOp> for protobuf::dml_node::Type {
728728
WriteOp::Delete => protobuf::dml_node::Type::Delete,
729729
WriteOp::Update => protobuf::dml_node::Type::Update,
730730
WriteOp::Ctas => protobuf::dml_node::Type::Ctas,
731+
WriteOp::Truncate => protobuf::dml_node::Type::Truncate,
731732
}
732733
}
733734
}

datafusion/sql/src/statement.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1362,6 +1362,28 @@ impl<S: ContextProvider> SqlToRel<'_, S> {
13621362
exec_err!("Function name not provided")
13631363
}
13641364
}
1365+
Statement::Truncate { table_names, .. } => {
1366+
if table_names.len() != 1 {
1367+
return not_impl_err!("TRUNCATE with multiple tables is not supported");
1368+
}
1369+
1370+
let target = &table_names[0]; // TruncateTableTarget
1371+
let table = self.object_name_to_table_reference(target.name.clone())?;
1372+
let source = self.context_provider.get_table_source(table.clone())?;
1373+
1374+
Ok(LogicalPlan::Dml(DmlStatement {
1375+
table_name: table.clone(),
1376+
target: source,
1377+
op: WriteOp::Truncate,
1378+
input: Arc::new(LogicalPlan::EmptyRelation(
1379+
EmptyRelation {
1380+
produce_one_row: false,
1381+
schema: DFSchemaRef::new(DFSchema::empty()),
1382+
},
1383+
)),
1384+
output_schema: DFSchemaRef::new(DFSchema::empty()),
1385+
}))
1386+
}
13651387
Statement::CreateIndex(CreateIndex {
13661388
name,
13671389
table_name,
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
19+
##########
20+
## Truncate tests for MemTable
21+
##########
22+
23+
# Test that TRUNCATE preserves table structure but removes data
24+
statement ok
25+
truncate table t1;
26+
27+
query I
28+
select count(*) from t1;
29+
----
30+
0
31+
32+
query TT
33+
describe t1;
34+
----
35+
a:Int32:None
36+
b:Utf8:None
37+
c:Float64:None
38+
d:Int32:None
39+
40+
# Test TRUNCATE then INSERT works correctly
41+
statement ok
42+
truncate table t1;
43+
44+
statement ok
45+
insert into t1 values (10, 'new', 1.23, 100);
46+
47+
query I
48+
select count(*) from t1;
49+
----
50+
1
51+
52+
query I
53+
select a from t1;
54+
----
55+
10
56+
57+
# Test multiple TRUNCATE operations
58+
statement ok
59+
insert into t1 values (20, 'another', 4.56, 200);
60+
61+
query I
62+
select count(*) from t1;
63+
----
64+
2
65+
66+
statement ok
67+
truncate table t1;
68+
69+
query I
70+
select count(*) from t1;
71+
----
72+
0
73+
74+
# Truncate already empty table
75+
statement ok
76+
truncate table t1;
77+
78+
query I
79+
select count(*) from t1;
80+
----
81+
0
82+
83+
# Clean up
84+
statement ok
85+
drop table t1;
86+
87+
statement ok
88+
drop table test_schema.t5;
89+
90+
statement ok
91+
drop schema test_schema;
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
# Licensed to the Apache Software Foundation (ASF) under one
2+
# or more contributor license agreements. See the NOTICE file
3+
# distributed with this work for additional information
4+
# regarding copyright ownership. The ASF licenses this file
5+
# to you under the Apache License, Version 2.0 (the
6+
# "License"); you may not use this file except in compliance
7+
# with the License. You may obtain a copy of the License at
8+
9+
# http://www.apache.org/licenses/LICENSE-2.0
10+
11+
# Unless required by applicable law or agreed to in writing,
12+
# software distributed under the License is distributed on an
13+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
# KIND, either express or implied. See the License for the
15+
# specific language governing permissions and limitations
16+
# under the License.
17+
18+
##########
19+
## Truncate Tests
20+
##########
21+
22+
statement ok
23+
create table t1(a int, b varchar, c double, d int);
24+
25+
statement ok
26+
insert into t1 values (1, 'abc', 3.14, 4), (2, 'def', 2.71, 5);
27+
28+
# Truncate all rows from table
29+
query TT
30+
explain truncate table t1;
31+
----
32+
logical_plan
33+
01)Dml: op=[Truncate] table=[t1]
34+
physical_plan
35+
01)CooperativeExec
36+
02)--DmlResultExec: rows_affected=0
37+
38+
# Test TRUNCATE with fully qualified table name
39+
statement ok
40+
create schema test_schema;
41+
42+
statement ok
43+
create table test_schema.t5(a int);
44+
45+
query TT
46+
explain truncate table test_schema.t5;
47+
----
48+
logical_plan
49+
01)Dml: op=[Truncate] table=[test_schema.t5]
50+
physical_plan
51+
01)CooperativeExec
52+
02)--DmlResultExec: rows_affected=0

0 commit comments

Comments
 (0)