From 2d93b4e2775af587cccdf754f3c1f20d7e83b900 Mon Sep 17 00:00:00 2001 From: zhyass Date: Thu, 23 May 2024 22:41:33 +0800 Subject: [PATCH 1/3] compact and recluster add table lock --- src/query/catalog/src/lock.rs | 2 + src/query/catalog/src/table.rs | 4 +- .../tests/it/inverted_index/index_refresh.rs | 3 +- .../ee/tests/it/inverted_index/pruning.rs | 3 +- .../src/interpreters/hook/compact_hook.rs | 13 ++-- .../service/src/interpreters/hook/hook.rs | 17 ++-- .../src/interpreters/hook/refresh_hook.rs | 17 ++-- .../interpreter_copy_into_table.rs | 3 +- .../src/interpreters/interpreter_delete.rs | 3 +- .../src/interpreters/interpreter_insert.rs | 5 +- .../interpreters/interpreter_merge_into.rs | 25 +++--- .../src/interpreters/interpreter_replace.rs | 3 +- .../interpreter_table_index_refresh.rs | 16 ++-- .../interpreter_table_optimize.rs | 23 +++--- .../src/interpreters/interpreter_update.rs | 3 +- src/query/service/src/locks/lock_manager.rs | 77 +++++++++++++++++-- src/query/service/src/locks/table_lock/mod.rs | 9 +++ .../mutation/block_compact_mutator.rs | 1 - .../mutation/segments_compact_mutator.rs | 3 - src/query/sql/src/planner/binder/ddl/index.rs | 3 +- src/query/sql/src/planner/binder/ddl/table.rs | 3 +- src/query/sql/src/planner/plans/ddl/index.rs | 3 +- src/query/sql/src/planner/plans/ddl/table.rs | 9 ++- src/query/storages/fuse/src/fuse_table.rs | 4 +- .../storages/fuse/src/operations/compact.rs | 3 - .../mutator/segment_compact_mutator.rs | 6 -- .../storages/fuse/src/operations/util.rs | 2 +- 27 files changed, 173 insertions(+), 90 deletions(-) diff --git a/src/query/catalog/src/lock.rs b/src/query/catalog/src/lock.rs index 87aa9cd98910a..9b70de893494b 100644 --- a/src/query/catalog/src/lock.rs +++ b/src/query/catalog/src/lock.rs @@ -31,4 +31,6 @@ pub trait Lock: Sync + Send { fn tenant_name(&self) -> &str; async fn try_lock(&self, ctx: Arc) -> Result>; + + async fn try_lock_no_retry(&self, ctx: Arc) -> Result>; } diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index b05fc92f7f19a..f685be052d657 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -41,7 +41,6 @@ use databend_storages_common_table_meta::meta::SnapshotId; use databend_storages_common_table_meta::meta::TableSnapshot; use databend_storages_common_table_meta::table::ChangeType; -use crate::lock::Lock; use crate::plan::DataSourceInfo; use crate::plan::DataSourcePlan; use crate::plan::PartStatistics; @@ -339,10 +338,9 @@ pub trait Table: Sync + Send { async fn compact_segments( &self, ctx: Arc, - lock: Arc, limit: Option, ) -> Result<()> { - let (_, _, _) = (ctx, lock, limit); + let (_, _) = (ctx, limit); Err(ErrorCode::Unimplemented(format!( "The operation 'compact_segments' is not supported for the table '{}', which is using the '{}' engine.", diff --git a/src/query/ee/tests/it/inverted_index/index_refresh.rs b/src/query/ee/tests/it/inverted_index/index_refresh.rs index 02255c8ae4e16..4e95a5e27aea2 100644 --- a/src/query/ee/tests/it/inverted_index/index_refresh.rs +++ b/src/query/ee/tests/it/inverted_index/index_refresh.rs @@ -20,6 +20,7 @@ use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::schema::CreateTableIndexReq; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::io::read::InvertedIndexReader; use databend_common_storages_fuse::io::MetaReaders; @@ -86,7 +87,7 @@ async fn test_fuse_do_refresh_inverted_index() -> Result<()> { table: fixture.default_table_name(), index_name: index_name.clone(), segment_locs: None, - need_lock: true, + lock_opt: LockTableOption::LockWithRetry, }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?; let _ = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/ee/tests/it/inverted_index/pruning.rs b/src/query/ee/tests/it/inverted_index/pruning.rs index 0c2e5c4beab6f..f18ebfe2f7209 100644 --- a/src/query/ee/tests/it/inverted_index/pruning.rs +++ b/src/query/ee/tests/it/inverted_index/pruning.rs @@ -36,6 +36,7 @@ use databend_common_expression::TableSchemaRefExt; use databend_common_meta_app::schema::CreateOption; use databend_common_meta_app::schema::CreateTableIndexReq; use databend_common_sql::plans::CreateTablePlan; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_sql::BloomIndexColumns; use databend_common_storages_fuse::pruning::create_segment_location_vector; @@ -525,7 +526,7 @@ async fn test_block_pruner() -> Result<()> { table: test_tbl_name.to_string(), index_name: index_name.clone(), segment_locs: None, - need_lock: true, + lock_opt: LockTableOption::LockWithRetry, }; let interpreter = RefreshTableIndexInterpreter::try_create(ctx.clone(), refresh_index_plan)?; let _ = interpreter.execute(ctx.clone()).await?; diff --git a/src/query/service/src/interpreters/hook/compact_hook.rs b/src/query/service/src/interpreters/hook/compact_hook.rs index 1389708b23197..68e871770d6e1 100644 --- a/src/query/service/src/interpreters/hook/compact_hook.rs +++ b/src/query/service/src/interpreters/hook/compact_hook.rs @@ -21,6 +21,7 @@ use databend_common_catalog::table_context::TableContext; use databend_common_exception::Result; use databend_common_pipeline_core::Pipeline; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::OptimizeTableAction; use databend_common_sql::plans::OptimizeTablePlan; use log::info; @@ -52,10 +53,10 @@ pub async fn hook_compact( pipeline: &mut Pipeline, compact_target: CompactTargetTableDescription, trace_ctx: CompactHookTraceCtx, - need_lock: bool, + lock_opt: LockTableOption, ) { let op_name = trace_ctx.operation_name.clone(); - if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, need_lock).await { + if let Err(e) = do_hook_compact(ctx, pipeline, compact_target, trace_ctx, lock_opt).await { info!("compact hook ({}) with error (ignored): {}", op_name, e); } } @@ -66,7 +67,7 @@ async fn do_hook_compact( pipeline: &mut Pipeline, compact_target: CompactTargetTableDescription, trace_ctx: CompactHookTraceCtx, - need_lock: bool, + lock_opt: LockTableOption, ) -> Result<()> { if pipeline.is_empty() { return Ok(()); @@ -103,7 +104,7 @@ async fn do_hook_compact( if err.is_ok() { info!("execute {op_name} finished successfully. running table optimization job."); match GlobalIORuntime::instance().block_on({ - compact_table(ctx, compact_target, compaction_limits, need_lock) + compact_table(ctx, compact_target, compaction_limits, lock_opt) }) { Ok(_) => { info!("execute {op_name} finished successfully. table optimization job finished."); @@ -126,7 +127,7 @@ async fn compact_table( ctx: Arc, compact_target: CompactTargetTableDescription, compaction_limits: CompactionLimits, - need_lock: bool, + lock_opt: LockTableOption, ) -> Result<()> { // evict the table from cache ctx.evict_table_from_cache( @@ -143,7 +144,7 @@ async fn compact_table( table: compact_target.table, action: OptimizeTableAction::CompactBlocks(compaction_limits.block_limit), limit: compaction_limits.segment_limit, - need_lock, + lock_opt, })?; let mut build_res = optimize_interpreter.execute2().await?; diff --git a/src/query/service/src/interpreters/hook/hook.rs b/src/query/service/src/interpreters/hook/hook.rs index 83889ceeab08a..2ace53412a391 100644 --- a/src/query/service/src/interpreters/hook/hook.rs +++ b/src/query/service/src/interpreters/hook/hook.rs @@ -18,6 +18,7 @@ use std::time::Instant; use databend_common_catalog::table_context::TableContext; use databend_common_pipeline_core::Pipeline; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_common_sql::plans::LockTableOption; use log::info; use log::warn; @@ -35,7 +36,7 @@ pub struct HookOperator { database: String, table: String, mutation_kind: MutationKind, - need_lock: bool, + lock_opt: LockTableOption, } impl HookOperator { @@ -45,7 +46,7 @@ impl HookOperator { database: String, table: String, mutation_kind: MutationKind, - need_lock: bool, + lock_opt: LockTableOption, ) -> Self { Self { ctx, @@ -53,7 +54,7 @@ impl HookOperator { database, table, mutation_kind, - need_lock, + lock_opt, } } @@ -105,7 +106,7 @@ impl HookOperator { pipeline, compact_target, trace_ctx, - self.need_lock, + self.lock_opt.clone(), ) .await; } @@ -122,6 +123,12 @@ impl HookOperator { table: self.table.to_owned(), }; - hook_refresh(self.ctx.clone(), pipeline, refresh_desc, self.need_lock).await; + hook_refresh( + self.ctx.clone(), + pipeline, + refresh_desc, + self.lock_opt.clone(), + ) + .await; } } diff --git a/src/query/service/src/interpreters/hook/refresh_hook.rs b/src/query/service/src/interpreters/hook/refresh_hook.rs index 5277db6599e02..fea1876cbd0f9 100644 --- a/src/query/service/src/interpreters/hook/refresh_hook.rs +++ b/src/query/service/src/interpreters/hook/refresh_hook.rs @@ -25,6 +25,7 @@ use databend_common_meta_app::schema::ListIndexesByIdReq; use databend_common_meta_app::schema::ListVirtualColumnsReq; use databend_common_meta_types::MetaId; use databend_common_pipeline_core::Pipeline; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::Plan; use databend_common_sql::plans::RefreshIndexPlan; use databend_common_sql::plans::RefreshTableIndexPlan; @@ -57,7 +58,7 @@ pub async fn hook_refresh( ctx: Arc, pipeline: &mut Pipeline, desc: RefreshDesc, - need_lock: bool, + lock_opt: LockTableOption, ) { if pipeline.is_empty() { return; @@ -66,7 +67,7 @@ pub async fn hook_refresh( pipeline.set_on_finished(move |(_profiles, err)| { if err.is_ok() { info!("execute pipeline finished successfully, starting run refresh job."); - match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, need_lock)) { + match GlobalIORuntime::instance().block_on(do_refresh(ctx, desc, lock_opt)) { Ok(_) => { info!("execute refresh job successfully."); } @@ -79,7 +80,11 @@ pub async fn hook_refresh( }); } -async fn do_refresh(ctx: Arc, desc: RefreshDesc, need_lock: bool) -> Result<()> { +async fn do_refresh( + ctx: Arc, + desc: RefreshDesc, + lock_opt: LockTableOption, +) -> Result<()> { let table = ctx .get_table(&desc.catalog, &desc.database, &desc.table) .await?; @@ -99,7 +104,7 @@ async fn do_refresh(ctx: Arc, desc: RefreshDesc, need_lock: bool) // Generate sync inverted indexes. let inverted_index_plans = - generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), need_lock).await?; + generate_refresh_inverted_index_plan(ctx.clone(), &desc, table.clone(), lock_opt).await?; plans.extend_from_slice(&inverted_index_plans); // Generate virtual columns. @@ -263,7 +268,7 @@ async fn generate_refresh_inverted_index_plan( ctx: Arc, desc: &RefreshDesc, table: Arc, - need_lock: bool, + lock_opt: LockTableOption, ) -> Result> { let segment_locs = ctx.get_segment_locations()?; let mut plans = vec![]; @@ -279,7 +284,7 @@ async fn generate_refresh_inverted_index_plan( table: desc.table.clone(), index_name: index.name.clone(), segment_locs: Some(segment_locs.clone()), - need_lock, + lock_opt: lock_opt.clone(), }; plans.push(Plan::RefreshTableIndex(Box::new(plan))); } diff --git a/src/query/service/src/interpreters/interpreter_copy_into_table.rs b/src/query/service/src/interpreters/interpreter_copy_into_table.rs index 33ce659c91b87..c0d1af8ab69f1 100644 --- a/src/query/service/src/interpreters/interpreter_copy_into_table.rs +++ b/src/query/service/src/interpreters/interpreter_copy_into_table.rs @@ -31,6 +31,7 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::physical_plans::TableScan; use databend_common_sql::executor::table_read_plan::ToReadDataSourcePlan; use databend_common_sql::executor::PhysicalPlan; +use databend_common_sql::plans::LockTableOption; use databend_common_storage::StageFileInfo; use databend_common_storages_stage::StageTable; use log::debug; @@ -392,7 +393,7 @@ impl Interpreter for CopyIntoTableInterpreter { self.plan.database_name.to_string(), self.plan.table_name.to_string(), MutationKind::Insert, - true, + LockTableOption::LockNoRetry, ); hook_operator.execute(&mut build_res.main_pipeline).await; } diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 98cfccf51a585..32eab48b99425 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -48,6 +48,7 @@ use databend_common_sql::plans::BoundColumnRef; use databend_common_sql::plans::ConstantExpr; use databend_common_sql::plans::EvalScalar; use databend_common_sql::plans::FunctionCall; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RelOperator; use databend_common_sql::plans::ScalarItem; use databend_common_sql::plans::SubqueryDesc; @@ -301,7 +302,7 @@ impl Interpreter for DeleteInterpreter { tbl_name.to_string(), MutationKind::Delete, // table lock has been added, no need to check. - false, + LockTableOption::NoLock, ); hook_operator .execute_refresh(&mut build_res.main_pipeline) diff --git a/src/query/service/src/interpreters/interpreter_insert.rs b/src/query/service/src/interpreters/interpreter_insert.rs index c62cea6ac8c48..a7a3780e15e0d 100644 --- a/src/query/service/src/interpreters/interpreter_insert.rs +++ b/src/query/service/src/interpreters/interpreter_insert.rs @@ -29,6 +29,7 @@ use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::plans::insert::InsertValue; use databend_common_sql::plans::Insert; use databend_common_sql::plans::InsertInputSource; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::Plan; use databend_common_sql::NameResolutionContext; @@ -276,7 +277,7 @@ impl Interpreter for InsertInterpreter { self.plan.database.clone(), self.plan.table.clone(), MutationKind::Insert, - true, + LockTableOption::LockNoRetry, ); hook_operator.execute(&mut build_res.main_pipeline).await; } @@ -311,7 +312,7 @@ impl Interpreter for InsertInterpreter { self.plan.database.clone(), self.plan.table.clone(), MutationKind::Insert, - true, + LockTableOption::LockNoRetry, ); hook_operator.execute(&mut build_res.main_pipeline).await; } diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 5548cdb75a7c4..ee409461a2989 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -33,7 +33,7 @@ use databend_common_expression::SendableDataBlockStream; use databend_common_expression::ROW_ID_COL_NAME; use databend_common_expression::ROW_NUMBER_COL_NAME; use databend_common_functions::BUILTIN_FUNCTIONS; -use databend_common_meta_app::schema::TableInfo; +use databend_common_pipeline_core::LockGuard; use databend_common_sql::binder::MergeIntoType; use databend_common_sql::executor::physical_plans::CommitSink; use databend_common_sql::executor::physical_plans::Exchange; @@ -44,6 +44,7 @@ use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::plans; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::MergeInto as MergePlan; use databend_common_sql::IndexType; use databend_common_sql::ScalarExpr; @@ -59,6 +60,7 @@ use itertools::Itertools; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; +use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -89,16 +91,11 @@ impl Interpreter for MergeIntoInterpreter { #[async_backtrace::framed] async fn execute2(&self) -> Result { - let (physical_plan, _) = self.build_physical_plan().await?; + let (physical_plan, lock_guard) = self.build_physical_plan().await?; let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; - - // Add table lock before execution. - // todo!(@zhyass) :But for now the lock maybe exist problem, let's open this after fix it. - // let table_lock = LockManager::create_table_lock(table_info)?; - // let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - // build_res.main_pipeline.add_lock_guard(lock_guard); + build_res.main_pipeline.add_lock_guard(lock_guard); // Execute hook. { @@ -108,7 +105,7 @@ impl Interpreter for MergeIntoInterpreter { self.plan.database.clone(), self.plan.table.clone(), MutationKind::MergeInto, - true, + LockTableOption::NoLock, ); hook_operator.execute(&mut build_res.main_pipeline).await; } @@ -123,7 +120,7 @@ impl Interpreter for MergeIntoInterpreter { } impl MergeIntoInterpreter { - pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, TableInfo)> { + pub async fn build_physical_plan(&self) -> Result<(PhysicalPlan, Option)> { let MergePlan { bind_context, input, @@ -157,6 +154,11 @@ impl MergeIntoInterpreter { )) })?; + let table_info = fuse_table.get_table_info(); + // Add table lock before execution. + let table_lock = LockManager::create_table_lock(table_info.clone())?; + let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + // attentation!! for now we have some strategies: // 1. target_build_optimization, this is enabled in standalone mode and in this case we don't need rowid column anymore. // but we just support for `merge into xx using source on xxx when matched then update xxx when not matched then insert xxx`. @@ -274,7 +276,6 @@ impl MergeIntoInterpreter { )); } - let table_info = fuse_table.get_table_info().clone(); let catalog_ = self.ctx.get_catalog(catalog).await?; // transform unmatched for insert @@ -489,7 +490,7 @@ impl MergeIntoInterpreter { plan_id: u32::MAX, })); physical_plan.adjust_plan_id(&mut 0); - Ok((physical_plan, table_info)) + Ok((physical_plan, lock_guard)) } fn transform_scalar_expr2expr( diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 9d5121e45d8f0..9cb90a64d5fd4 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -35,6 +35,7 @@ use databend_common_sql::executor::physical_plans::ReplaceSelectCtx; use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::plans::insert::InsertValue; use databend_common_sql::plans::InsertInputSource; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::Plan; use databend_common_sql::plans::Replace; use databend_common_sql::BindContext; @@ -112,7 +113,7 @@ impl Interpreter for ReplaceInterpreter { self.plan.database.clone(), self.plan.table.clone(), MutationKind::Replace, - true, + LockTableOption::LockNoRetry, ); hook_operator.execute(&mut pipeline.main_pipeline).await; } diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index d2bfc6c5e8d12..daa39cb4c5cee 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -20,6 +20,7 @@ use databend_common_exception::Result; use databend_common_expression::TableSchemaRefExt; use databend_common_license::license::Feature; use databend_common_license::license_manager::get_license_manager; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; @@ -90,12 +91,11 @@ impl Interpreter for RefreshTableIndexInterpreter { let index_schema = TableSchemaRefExt::create(index_fields); // Add table lock if need. - let lock_guard = if self.plan.need_lock { - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - Some(lock_guard) - } else { - None + let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; + let lock_guard = match self.plan.lock_opt { + LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, + LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, + LockTableOption::NoLock => None, }; // refresh table. @@ -104,9 +104,7 @@ impl Interpreter for RefreshTableIndexInterpreter { table.check_mutable()?; let mut build_res = PipelineBuildResult::create(); - if let Some(lock_guard) = lock_guard { - build_res.main_pipeline.add_lock_guard(lock_guard); - } + build_res.main_pipeline.add_lock_guard(lock_guard); let fuse_table = FuseTable::try_from_table(table.as_ref())?; fuse_table diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 58fca664fe5ba..26fd7944a86a6 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -23,7 +23,6 @@ use databend_common_catalog::table::CompactTarget; use databend_common_catalog::table::CompactionLimits; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_meta_app::schema::CatalogInfo; use databend_common_meta_app::schema::TableInfo; @@ -34,6 +33,7 @@ use databend_common_sql::executor::physical_plans::Exchange; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::PhysicalPlan; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::OptimizeTableAction; use databend_common_sql::plans::OptimizeTablePlan; use databend_common_storages_factory::NavigationPoint; @@ -43,7 +43,6 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; -use crate::locks::LockExt; use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; @@ -114,7 +113,6 @@ impl OptimizeTableInterpreter { snapshot: Arc, catalog_info: CatalogInfo, is_distributed: bool, - need_lock: bool, ) -> Result { let merge_meta = parts.partitions_type() == PartInfoType::LazyLevel; let mut root = PhysicalPlan::CompactSource(Box::new(CompactSource { @@ -144,7 +142,7 @@ impl OptimizeTableInterpreter { mutation_kind: MutationKind::Compact, update_stream_meta: vec![], merge_meta, - need_lock, + need_lock: false, deduplicated_label: None, plan_id: u32::MAX, }))) @@ -162,17 +160,16 @@ impl OptimizeTableInterpreter { // check if the table is locked. let table_lock = LockManager::create_table_lock(table_info.clone())?; - if self.plan.need_lock && !table_lock.wait_lock_expired(catalog.clone()).await? { - return Err(ErrorCode::TableAlreadyLocked(format!( - "table '{}' is locked, please retry compaction later", - self.plan.table - ))); - } + let lock_guard = match self.plan.lock_opt { + LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, + LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, + LockTableOption::NoLock => None, + }; let compaction_limits = match target { CompactTarget::Segments => { table - .compact_segments(self.ctx.clone(), table_lock, self.plan.limit) + .compact_segments(self.ctx.clone(), self.plan.limit) .await?; return Ok(PipelineBuildResult::create()); } @@ -198,7 +195,6 @@ impl OptimizeTableInterpreter { snapshot, catalog_info, compact_is_distributed, - self.plan.need_lock, )?; let build_res = @@ -247,7 +243,7 @@ impl OptimizeTableInterpreter { mutator.remained_blocks, mutator.removed_segment_indexes, mutator.removed_segment_summary, - self.plan.need_lock, + false, )?; build_res = @@ -291,6 +287,7 @@ impl OptimizeTableInterpreter { } } + build_res.main_pipeline.add_lock_guard(lock_guard); Ok(build_res) } } diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 5366272ad3e43..8339a25d35bd5 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -41,6 +41,7 @@ use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::physical_plans::UpdateSource; use databend_common_sql::executor::PhysicalPlan; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::Visibility; use databend_common_storages_factory::Table; use databend_common_storages_fuse::FuseTable; @@ -122,7 +123,7 @@ impl Interpreter for UpdateInterpreter { tbl_name.to_string(), MutationKind::Update, // table lock has been added, no need to check. - false, + LockTableOption::NoLock, ); hook_operator .execute_refresh(&mut build_res.main_pipeline) diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index cf8e912f5e92b..3da25f08ee14b 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -105,10 +105,10 @@ impl LockManager { let tenant_name = lock.tenant_name(); let tenant = Tenant::new_or_err(tenant_name, func_name!())?; - + let table_id = lock.get_table_id(); let lock_key = LockKey::Table { tenant: tenant.clone(), - table_id: lock.get_table_id(), + table_id, }; let req = CreateLockRevReq::new(lock_key.clone(), user, node, query_id, expire_secs); @@ -117,7 +117,7 @@ impl LockManager { let res = catalog.create_lock_revision(req).await?; let revision = res.revision; // metrics. - record_created_lock_nums(lock.lock_type().to_string(), lock.get_table_id(), 1); + record_created_lock_nums(lock.lock_type().to_string(), table_id, 1); let lock_holder = Arc::new(LockHolder::default()); lock_holder @@ -147,18 +147,16 @@ impl LockManager { if position == 0 { // The lock is acquired by current session. - let extend_table_lock_req = ExtendLockRevReq::new(lock_key.clone(), revision, expire_secs, true); catalog.extend_lock_revision(extend_table_lock_req).await?; // metrics. - record_acquired_lock_nums(lock.lock_type().to_string(), lock.get_table_id(), 1); + record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1); break; } - let watch_delete_ident = - TableLockIdent::new(&tenant, lock.get_table_id(), reply[position - 1].0); + let watch_delete_ident = TableLockIdent::new(&tenant, table_id, reply[position - 1].0); // Get the previous revision, watch the delete event. let req = WatchRequest { @@ -194,6 +192,71 @@ impl LockManager { Ok(Some(guard)) } + /// Try request lock without retry. + #[async_backtrace::framed] + pub async fn try_lock_no_retry( + self: &Arc, + ctx: Arc, + lock: &T, + ) -> Result> { + let table_id = lock.get_table_id(); + let lock_key = LockKey::Table { + tenant: Tenant::new_or_err(lock.tenant_name(), func_name!())?, + table_id, + }; + + let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?; + let req = CreateLockRevReq::new( + lock_key.clone(), + ctx.get_current_user()?.name, + ctx.get_cluster().local_id.clone(), + ctx.get_current_session_id(), + expire_secs, + ); + + // get a new table lock revision. + let catalog = ctx.get_catalog(lock.get_catalog()).await?; + let res = catalog.create_lock_revision(req).await?; + let revision = res.revision; + // metrics. + record_created_lock_nums(lock.lock_type().to_string(), table_id, 1); + + let lock_holder = Arc::new(LockHolder::default()); + lock_holder + .start(ctx.get_id(), catalog.clone(), lock, revision, expire_secs) + .await?; + + self.insert_lock(revision, lock_holder); + let guard = LockGuard::new(self.clone(), revision); + + // List all revisions and check if the current is the minimum. + let reply = catalog + .list_lock_revisions(ListLockRevReq::new(lock_key.clone())) + .await?; + let position = reply.iter().position(|(x, _)| *x == revision).ok_or_else(|| + // If the current is not found in list, it means that the current has been expired. + ErrorCode::TableLockExpired("the acquired table lock has been expired".to_string()), + )?; + + if position == 0 { + // The lock is acquired by current session. + catalog + .extend_lock_revision(ExtendLockRevReq::new(lock_key, revision, expire_secs, true)) + .await?; + // metrics. + record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1); + } else { + catalog + .delete_lock_revision(DeleteLockRevReq::new(lock_key, revision)) + .await?; + return Err(ErrorCode::TableAlreadyLocked( + "table is locked by other session, please retry later".to_string(), + )); + } + + Ok(Some(guard)) + } + fn insert_lock(&self, revision: u64, lock_holder: Arc) { let mut active_locks = self.active_locks.write(); let prev = active_locks.insert(revision, lock_holder); diff --git a/src/query/service/src/locks/table_lock/mod.rs b/src/query/service/src/locks/table_lock/mod.rs index df775514a3077..9d38234341f61 100644 --- a/src/query/service/src/locks/table_lock/mod.rs +++ b/src/query/service/src/locks/table_lock/mod.rs @@ -63,4 +63,13 @@ impl Lock for TableLock { Ok(None) } } + + async fn try_lock_no_retry(&self, ctx: Arc) -> Result> { + let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false); + if enabled_table_lock { + self.lock_mgr.try_lock_no_retry(ctx, self).await + } else { + Ok(None) + } + } } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index ca06c33db0de1..08da7bdc134a9 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -122,7 +122,6 @@ async fn do_compact(ctx: Arc, table: Arc) -> Result>, - pub need_lock: bool, + pub lock_opt: LockTableOption, } diff --git a/src/query/sql/src/planner/plans/ddl/table.rs b/src/query/sql/src/planner/plans/ddl/table.rs index 98e087666495e..9f0b3cb46171b 100644 --- a/src/query/sql/src/planner/plans/ddl/table.rs +++ b/src/query/sql/src/planner/plans/ddl/table.rs @@ -205,7 +205,7 @@ pub struct OptimizeTablePlan { pub table: String, pub action: OptimizeTableAction, pub limit: Option, - pub need_lock: bool, + pub lock_opt: LockTableOption, } impl OptimizeTablePlan { @@ -214,6 +214,13 @@ impl OptimizeTablePlan { } } +#[derive(Clone, Debug, PartialEq, Eq)] +pub enum LockTableOption { + NoLock, + LockWithRetry, + LockNoRetry, +} + #[derive(Clone, Debug, PartialEq, Eq)] pub enum OptimizeTableAction { All, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index fddc5d93a0e6e..7bf52aadcccea 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -19,7 +19,6 @@ use std::str::FromStr; use std::sync::Arc; use databend_common_catalog::catalog::StorageDescription; -use databend_common_catalog::lock::Lock; use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::PartStatistics; use databend_common_catalog::plan::Partitions; @@ -882,10 +881,9 @@ impl Table for FuseTable { async fn compact_segments( &self, ctx: Arc, - lock: Arc, limit: Option, ) -> Result<()> { - self.do_compact_segments(ctx, lock, limit).await + self.do_compact_segments(ctx, limit).await } #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index 676c8f55f50f8..612e69f12e2dc 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -16,7 +16,6 @@ use std::collections::HashSet; use std::sync::Arc; use databend_common_base::runtime::Runtime; -use databend_common_catalog::lock::Lock; use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::Partitions; use databend_common_catalog::plan::PartitionsShuffleKind; @@ -58,7 +57,6 @@ impl FuseTable { pub(crate) async fn do_compact_segments( &self, ctx: Arc, - lock: Arc, num_segment_limit: Option, ) -> Result<()> { let compact_options = if let Some(v) = self @@ -72,7 +70,6 @@ impl FuseTable { let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), - lock, compact_options, self.meta_location_generator().clone(), self.operator.clone(), diff --git a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs index 827dc48a414b9..08ff8fd353700 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutator/segment_compact_mutator.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use std::time::Instant; -use databend_common_catalog::lock::Lock; use databend_common_catalog::table::Table; use databend_common_exception::Result; use databend_common_metrics::storage::metrics_set_compact_segments_select_duration_second; @@ -46,7 +45,6 @@ pub struct SegmentCompactionState { pub struct SegmentCompactMutator { ctx: Arc, - lock: Arc, compact_params: CompactOptions, data_accessor: Operator, location_generator: TableMetaLocationGenerator, @@ -57,7 +55,6 @@ pub struct SegmentCompactMutator { impl SegmentCompactMutator { pub fn try_create( ctx: Arc, - lock: Arc, compact_params: CompactOptions, location_generator: TableMetaLocationGenerator, operator: Operator, @@ -65,7 +62,6 @@ impl SegmentCompactMutator { ) -> Result { Ok(Self { ctx, - lock, compact_params, data_accessor: operator, location_generator, @@ -137,8 +133,6 @@ impl SegmentCompactMutator { let statistics = self.compact_params.base_snapshot.summary.clone(); let fuse_table = FuseTable::try_from_table(table.as_ref())?; - let _guard = self.lock.try_lock(self.ctx.clone()).await?; - fuse_table .commit_mutation( &self.ctx, diff --git a/src/query/storages/fuse/src/operations/util.rs b/src/query/storages/fuse/src/operations/util.rs index 248d07aeca340..b4305ea08c94f 100644 --- a/src/query/storages/fuse/src/operations/util.rs +++ b/src/query/storages/fuse/src/operations/util.rs @@ -48,7 +48,7 @@ pub fn set_backoff( // The initial retry delay in millisecond. By default, it is 5 ms. let init_delay = init_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_INIT_DELAY_MS); - // The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing. + // The maximum back off delay in millisecond, once the retry interval reaches this value, it stops increasing. // By default, it is 20 seconds. let max_delay = max_retry_delay.unwrap_or(OCC_DEFAULT_BACKOFF_MAX_DELAY_MS); From aee258484d6db874d81a070336359991470d3951 Mon Sep 17 00:00:00 2001 From: zhyass Date: Fri, 24 May 2024 01:29:39 +0800 Subject: [PATCH 2/3] fix test --- .../src/interpreters/interpreter_delete.rs | 22 ++++--- .../interpreters/interpreter_merge_into.rs | 19 +++--- .../src/interpreters/interpreter_replace.rs | 1 - .../interpreter_table_index_refresh.rs | 29 +++++---- .../interpreter_table_modify_column.rs | 60 ++++++++----------- .../interpreter_table_optimize.rs | 45 +++++++------- .../interpreter_table_recluster.rs | 40 ++++--------- .../interpreter_table_truncate.rs | 19 +++--- .../src/interpreters/interpreter_update.rs | 16 ++--- src/query/service/src/locks/table_lock/mod.rs | 14 +---- .../src/pipelines/builders/builder_commit.rs | 7 --- .../pipelines/builders/builder_recluster.rs | 9 --- src/query/service/src/sessions/query_ctx.rs | 56 +++++++++++++++++ .../physical_plans/physical_commit_sink.rs | 1 - .../physical_plans/physical_recluster_sink.rs | 1 - .../storages/fuse/src/operations/commit.rs | 1 - .../common/processors/sink_commit.rs | 25 +------- .../fuse/src/operations/inverted_index.rs | 7 ++- .../storages/fuse/src/operations/truncate.rs | 1 - tests/cloud_control_server/simple_server.py | 6 +- 20 files changed, 173 insertions(+), 206 deletions(-) diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 32eab48b99425..7e1ae829d5b18 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -68,7 +68,6 @@ use crate::interpreters::common::create_push_down_filters; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelinePullingExecutor; use crate::pipelines::PipelineBuildResult; @@ -111,21 +110,21 @@ impl Interpreter for DeleteInterpreter { let is_distributed = !self.ctx.get_cluster().is_empty(); let catalog_name = self.plan.catalog_name.as_str(); - let catalog = self.ctx.get_catalog(catalog_name).await?; - let catalog_info = catalog.info(); - let db_name = self.plan.database_name.as_str(); let tbl_name = self.plan.table_name.as_str(); - let tbl = catalog - .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await?; // Add table lock. - let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; - // refresh table. - let tbl = tbl.refresh(self.ctx.as_ref()).await?; + let catalog = self.ctx.get_catalog(catalog_name).await?; + let catalog_info = catalog.info(); + let tbl = catalog + .get_table(&self.ctx.get_tenant(), db_name, tbl_name) + .await?; // check mutability tbl.check_mutable()?; @@ -355,7 +354,6 @@ impl DeleteInterpreter { mutation_kind: MutationKind::Delete, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: None, plan_id: u32::MAX, })); diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index ee409461a2989..f58f2766c5948 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -60,7 +60,6 @@ use itertools::Itertools; use crate::interpreters::common::dml_build_update_stream_req; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -145,7 +144,17 @@ impl MergeIntoInterpreter { } = &self.plan; let enable_right_broadcast = *enable_right_broadcast; let mut columns_set = columns_set.clone(); + + // Add table lock before execution. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog, database, table_name) + .await?; + let table = self.ctx.get_table(catalog, database, table_name).await?; + // check mutability + table.check_mutable()?; let fuse_table = table.as_any().downcast_ref::().ok_or_else(|| { ErrorCode::Unimplemented(format!( "table {}, engine type {}, does not support MERGE INTO", @@ -155,9 +164,6 @@ impl MergeIntoInterpreter { })?; let table_info = fuse_table.get_table_info(); - // Add table lock before execution. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; // attentation!! for now we have some strategies: // 1. target_build_optimization, this is enabled in standalone mode and in this case we don't need rowid column anymore. @@ -204,10 +210,6 @@ impl MergeIntoInterpreter { } } - // check mutability - let check_table = self.ctx.get_table(catalog, database, table_name).await?; - check_table.check_mutable()?; - let update_stream_meta = dml_build_update_stream_req(self.ctx.clone(), meta_data).await?; let table_name = table_name.clone(); @@ -485,7 +487,6 @@ impl MergeIntoInterpreter { mutation_kind: MutationKind::Update, update_stream_meta: update_stream_meta.clone(), merge_meta: false, - need_lock: false, deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, })); diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index 9cb90a64d5fd4..a2f040de0845e 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -335,7 +335,6 @@ impl ReplaceInterpreter { mutation_kind: MutationKind::Replace, update_stream_meta: update_stream_meta.clone(), merge_meta: false, - need_lock: false, deduplicated_label: unsafe { self.ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, }))); diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index daa39cb4c5cee..a45453dc64228 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -20,13 +20,11 @@ use databend_common_exception::Result; use databend_common_expression::TableSchemaRefExt; use databend_common_license::license::Feature; use databend_common_license::license_manager::get_license_manager; -use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::RefreshTableIndexPlan; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::sessions::QueryContext; @@ -58,10 +56,24 @@ impl Interpreter for RefreshTableIndexInterpreter { .manager .check_enterprise_enabled(self.ctx.get_license_key(), Feature::InvertedIndex)?; + // Add table lock. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock_with_opt( + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + &self.plan.lock_opt, + ) + .await?; + let table = self .ctx .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) .await?; + // check mutability + table.check_mutable()?; let index_name = self.plan.index_name.clone(); let segment_locs = self.plan.segment_locs.clone(); @@ -90,19 +102,6 @@ impl Interpreter for RefreshTableIndexInterpreter { let index_version = index.version.clone(); let index_schema = TableSchemaRefExt::create(index_fields); - // Add table lock if need. - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - let lock_guard = match self.plan.lock_opt { - LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, - LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, - LockTableOption::NoLock => None, - }; - - // refresh table. - let table = table.refresh(self.ctx.as_ref()).await?; - // check mutability - table.check_mutable()?; - let mut build_res = PipelineBuildResult::create(); build_res.main_pipeline.add_lock_guard(lock_guard); diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index 4372419f0ef8e..d444a5ea1c528 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -51,9 +51,8 @@ use databend_enterprise_data_mask_feature::get_datamask_handler; use databend_storages_common_index::BloomIndex; use databend_storages_common_table_meta::table::OPT_KEY_BLOOM_INDEX_COLUMNS; -use super::common::check_referenced_computed_columns; +use crate::interpreters::common::check_referenced_computed_columns; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -73,7 +72,7 @@ impl ModifyTableColumnInterpreter { async fn do_set_data_mask_policy( &self, catalog: Arc, - table: &Arc, + table: Arc, column: String, mask_name: String, ) -> Result { @@ -140,15 +139,9 @@ impl ModifyTableColumnInterpreter { // Set data column type. async fn do_set_data_type( &self, - table: &Arc, + table: Arc, field_and_comments: &[(TableField, String)], ) -> Result { - // Add table lock. - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; - // refresh table. - let table = table.refresh(self.ctx.as_ref()).await?; - let schema = table.schema().as_ref().clone(); let table_info = table.get_table_info(); let mut new_schema = schema.clone(); @@ -420,7 +413,6 @@ impl ModifyTableColumnInterpreter { None, )?; - build_res.main_pipeline.add_lock_guard(lock_guard); Ok(build_res) } @@ -428,7 +420,7 @@ impl ModifyTableColumnInterpreter { async fn do_unset_data_mask_policy( &self, catalog: Arc, - table: &Arc, + table: Arc, column: String, ) -> Result { let license_manager = get_license_manager(); @@ -474,7 +466,7 @@ impl ModifyTableColumnInterpreter { async fn do_convert_stored_computed_column( &self, catalog: Arc, - table: &Arc, + table: Arc, table_meta: TableMeta, column: String, ) -> Result { @@ -553,53 +545,50 @@ impl Interpreter for ModifyTableColumnInterpreter { let db_name = self.plan.database.as_str(); let tbl_name = self.plan.table.as_str(); - let tbl = self + // try add lock table. + let lock_guard = self .ctx - .get_catalog(catalog_name) - .await? + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; + + let catalog = self.ctx.get_catalog(catalog_name).await?; + let table = catalog .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await - .ok(); + .await?; - let table = if let Some(table) = &tbl { - // check mutability - table.check_mutable()?; - table - } else { - return Ok(PipelineBuildResult::create()); - }; + table.check_mutable()?; let table_info = table.get_table_info(); let engine = table.engine(); if matches!(engine, VIEW_ENGINE | STREAM_ENGINE) { return Err(ErrorCode::TableEngineNotSupported(format!( "{}.{} engine is {} that doesn't support alter", - &self.plan.database, &self.plan.table, engine + db_name, tbl_name, engine ))); } if table_info.db_type != DatabaseType::NormalDB { return Err(ErrorCode::TableEngineNotSupported(format!( "{}.{} doesn't support alter", - &self.plan.database, &self.plan.table + db_name, tbl_name ))); } - let catalog = self.ctx.get_catalog(catalog_name).await?; let table_meta = table.get_table_info().meta.clone(); // NOTICE: if we support modify column data type, // need to check whether this column is referenced by other computed columns. - match &self.plan.action { + let mut build_res = match &self.plan.action { ModifyColumnAction::SetMaskingPolicy(column, mask_name) => { self.do_set_data_mask_policy(catalog, table, column.to_string(), mask_name.clone()) - .await + .await? } ModifyColumnAction::UnsetMaskingPolicy(column) => { self.do_unset_data_mask_policy(catalog, table, column.to_string()) - .await + .await? } ModifyColumnAction::SetDataType(field_and_comment) => { - self.do_set_data_type(table, field_and_comment).await + self.do_set_data_type(table, field_and_comment).await? } ModifyColumnAction::ConvertStoredComputedColumn(column) => { self.do_convert_stored_computed_column( @@ -608,8 +597,11 @@ impl Interpreter for ModifyTableColumnInterpreter { table_meta, column.to_string(), ) - .await + .await? } - } + }; + + build_res.main_pipeline.add_lock_guard(lock_guard); + Ok(build_res) } } diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 26fd7944a86a6..8b71de7dc1cd5 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -21,7 +21,6 @@ use databend_common_catalog::plan::PartInfoType; use databend_common_catalog::plan::Partitions; use databend_common_catalog::table::CompactTarget; use databend_common_catalog::table::CompactionLimits; -use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_exception::Result; use databend_common_meta_app::schema::CatalogInfo; @@ -33,7 +32,6 @@ use databend_common_sql::executor::physical_plans::Exchange; use databend_common_sql::executor::physical_plans::FragmentKind; use databend_common_sql::executor::physical_plans::MutationKind; use databend_common_sql::executor::PhysicalPlan; -use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::OptimizeTableAction; use databend_common_sql::plans::OptimizeTablePlan; use databend_common_storages_factory::NavigationPoint; @@ -43,7 +41,6 @@ use databend_storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::interpreter_table_recluster::build_recluster_physical_plan; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -78,20 +75,14 @@ impl Interpreter for OptimizeTableInterpreter { let plan = self.plan.clone(); let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; - let tenant = self.ctx.get_tenant(); - let table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; - // check mutability - table.check_mutable()?; match self.plan.action.clone() { OptimizeTableAction::CompactBlocks(limit) => { - self.build_pipeline(catalog, table, CompactTarget::Blocks(limit), false) + self.build_pipeline(catalog, CompactTarget::Blocks(limit), false) .await } OptimizeTableAction::CompactSegments => { - self.build_pipeline(catalog, table, CompactTarget::Segments, false) + self.build_pipeline(catalog, CompactTarget::Segments, false) .await } OptimizeTableAction::Purge(point) => { @@ -99,7 +90,7 @@ impl Interpreter for OptimizeTableInterpreter { Ok(PipelineBuildResult::create()) } OptimizeTableAction::All => { - self.build_pipeline(catalog, table, CompactTarget::Blocks(None), true) + self.build_pipeline(catalog, CompactTarget::Blocks(None), true) .await } } @@ -142,7 +133,6 @@ impl OptimizeTableInterpreter { mutation_kind: MutationKind::Compact, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: None, plan_id: u32::MAX, }))) @@ -151,20 +141,26 @@ impl OptimizeTableInterpreter { async fn build_pipeline( &self, catalog: Arc, - mut table: Arc, target: CompactTarget, need_purge: bool, ) -> Result { let tenant = self.ctx.get_tenant(); - let table_info = table.get_table_info().clone(); + let lock_guard = self + .ctx + .clone() + .acquire_table_lock_with_opt( + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + &self.plan.lock_opt, + ) + .await?; - // check if the table is locked. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - let lock_guard = match self.plan.lock_opt { - LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self.ctx.clone()).await?, - LockTableOption::LockWithRetry => table_lock.try_lock(self.ctx.clone()).await?, - LockTableOption::NoLock => None, - }; + let mut table = catalog + .get_table(&tenant, &self.plan.database, &self.plan.table) + .await?; + // check mutability + table.check_mutable()?; let compaction_limits = match target { CompactTarget::Segments => { @@ -191,7 +187,7 @@ impl OptimizeTableInterpreter { let mut compact_pipeline = if let Some((parts, snapshot)) = res { let physical_plan = Self::build_physical_plan( parts, - table_info, + table.get_table_info().clone(), snapshot, catalog_info, compact_is_distributed, @@ -243,7 +239,6 @@ impl OptimizeTableInterpreter { mutator.remained_blocks, mutator.removed_segment_indexes, mutator.removed_segment_summary, - false, )?; build_res = @@ -303,6 +298,8 @@ async fn purge( let table = catalog .get_table(&ctx.get_tenant(), &plan.database, &plan.table) .await?; + // check mutability + table.check_mutable()?; let keep_latest = true; let res = table diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 316081ecc79cc..51fc511ab062b 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -19,7 +19,6 @@ use std::time::SystemTime; use databend_common_catalog::plan::Filters; use databend_common_catalog::plan::PushDownInfo; use databend_common_catalog::table::TableExt; -use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::type_check::check_function; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -40,8 +39,6 @@ use log::warn; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; -use crate::locks::LockExt; -use crate::locks::LockManager; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; @@ -107,12 +104,6 @@ impl Interpreter for ReclusterTableInterpreter { let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let tenant = self.ctx.get_tenant(); - let mut table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; - - // check mutability - table.check_mutable()?; let mut times = 0; let mut block_count = 0; @@ -127,16 +118,18 @@ impl Interpreter for ReclusterTableInterpreter { return Err(err); } - let table_info = table.get_table_info().clone(); + // try add lock table. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(&self.plan.catalog, &self.plan.database, &self.plan.table) + .await?; - // check if the table is locked. - let table_lock = LockManager::create_table_lock(table_info.clone())?; - if !table_lock.wait_lock_expired(catalog.clone()).await? { - return Err(ErrorCode::TableAlreadyLocked(format!( - "table '{}' is locked, please retry recluster later", - self.plan.table - ))); - } + let table = catalog + .get_table(&tenant, &self.plan.database, &self.plan.table) + .await?; + // check mutability + table.check_mutable()?; let fuse_table = FuseTable::try_from_table(table.as_ref())?; let mutator = fuse_table @@ -153,17 +146,17 @@ impl Interpreter for ReclusterTableInterpreter { block_count += mutator.recluster_blocks_count; let physical_plan = build_recluster_physical_plan( mutator.tasks, - table_info, + table.get_table_info().clone(), catalog.info(), mutator.snapshot, mutator.remained_blocks, mutator.removed_segment_indexes, mutator.removed_segment_summary, - true, )?; let mut build_res = build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan).await?; + build_res.main_pipeline.add_lock_guard(lock_guard); assert!(build_res.main_pipeline.is_complete_pipeline()?); build_res.set_max_threads(max_threads); @@ -201,11 +194,6 @@ impl Interpreter for ReclusterTableInterpreter { ); break; } - - // refresh table. - table = catalog - .get_table(&tenant, &self.plan.database, &self.plan.table) - .await?; } if block_count != 0 { @@ -231,7 +219,6 @@ pub fn build_recluster_physical_plan( remained_blocks: Vec>, removed_segment_indexes: Vec, removed_segment_summary: Statistics, - need_lock: bool, ) -> Result { let is_distributed = tasks.len() > 1; let mut root = PhysicalPlan::ReclusterSource(Box::new(ReclusterSource { @@ -260,7 +247,6 @@ pub fn build_recluster_physical_plan( removed_segment_indexes, removed_segment_summary, plan_id: u32::MAX, - need_lock, })); plan.adjust_plan_id(&mut 0); Ok(plan) diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 6c2ac941be574..56d41b271cc84 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -18,10 +18,8 @@ use databend_common_catalog::table::TableExt; use databend_common_config::GlobalConfig; use databend_common_exception::Result; use databend_common_sql::plans::TruncateTablePlan; -use databend_common_storages_fuse::FuseTable; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::servers::flight::v1::packets::Packet; use crate::servers::flight::v1::packets::TruncateTablePacket; @@ -72,23 +70,20 @@ impl Interpreter for TruncateTableInterpreter { #[async_backtrace::framed] #[minitrace::trace] async fn execute2(&self) -> Result { + // try add lock table. + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(&self.catalog_name, &self.database_name, &self.table_name) + .await?; + let table = self .ctx .get_table(&self.catalog_name, &self.database_name, &self.table_name) .await?; - // check mutability table.check_mutable()?; - // Add table lock. - let maybe_fuse_table = FuseTable::try_from_table(table.as_ref()).is_ok(); - let lock_guard = if maybe_fuse_table { - let table_lock = LockManager::create_table_lock(table.get_table_info().clone())?; - table_lock.try_lock(self.ctx.clone()).await? - } else { - None - }; - if self.proxy_to_cluster && table.broadcast_truncate_to_cluster() { let settings = self.ctx.get_settings(); let timeout = settings.get_flight_client_timeout()?; diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 8339a25d35bd5..7e17b4d38c722 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -54,7 +54,6 @@ use crate::interpreters::interpreter_delete::replace_subquery; use crate::interpreters::interpreter_delete::subquery_filter; use crate::interpreters::HookOperator; use crate::interpreters::Interpreter; -use crate::locks::LockManager; use crate::pipelines::PipelineBuildResult; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; @@ -95,17 +94,15 @@ impl Interpreter for UpdateInterpreter { } let catalog_name = self.plan.catalog.as_str(); - let catalog = self.ctx.get_catalog(catalog_name).await?; - let db_name = self.plan.database.as_str(); let tbl_name = self.plan.table.as_str(); - let tbl = catalog - .get_table(&self.ctx.get_tenant(), db_name, tbl_name) - .await?; // Add table lock. - let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; - let lock_guard = table_lock.try_lock(self.ctx.clone()).await?; + let lock_guard = self + .ctx + .clone() + .acquire_table_lock(catalog_name, db_name, tbl_name) + .await?; // build physical plan. let physical_plan = self.get_physical_plan().await?; @@ -147,8 +144,6 @@ impl UpdateInterpreter { let tbl = catalog .get_table(&self.ctx.get_tenant(), db_name, tbl_name) .await?; - // refresh table. - let tbl = tbl.refresh(self.ctx.as_ref()).await?; // check mutability tbl.check_mutable()?; @@ -328,7 +323,6 @@ impl UpdateInterpreter { mutation_kind: MutationKind::Update, update_stream_meta: vec![], merge_meta, - need_lock: false, deduplicated_label: unsafe { ctx.get_settings().get_deduplicate_label()? }, plan_id: u32::MAX, })); diff --git a/src/query/service/src/locks/table_lock/mod.rs b/src/query/service/src/locks/table_lock/mod.rs index 9d38234341f61..d8e5069c933d4 100644 --- a/src/query/service/src/locks/table_lock/mod.rs +++ b/src/query/service/src/locks/table_lock/mod.rs @@ -56,20 +56,10 @@ impl Lock for TableLock { } async fn try_lock(&self, ctx: Arc) -> Result> { - let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false); - if enabled_table_lock { - self.lock_mgr.try_lock(ctx, self).await - } else { - Ok(None) - } + self.lock_mgr.try_lock(ctx, self).await } async fn try_lock_no_retry(&self, ctx: Arc) -> Result> { - let enabled_table_lock = ctx.get_settings().get_enable_table_lock().unwrap_or(false); - if enabled_table_lock { - self.lock_mgr.try_lock_no_retry(ctx, self).await - } else { - Ok(None) - } + self.lock_mgr.try_lock_no_retry(ctx, self).await } } diff --git a/src/query/service/src/pipelines/builders/builder_commit.rs b/src/query/service/src/pipelines/builders/builder_commit.rs index 82ad4a74054b2..6359727b47b98 100644 --- a/src/query/service/src/pipelines/builders/builder_commit.rs +++ b/src/query/service/src/pipelines/builders/builder_commit.rs @@ -24,7 +24,6 @@ use databend_common_storages_fuse::operations::TableMutationAggregator; use databend_common_storages_fuse::operations::TransformMergeCommitMeta; use databend_common_storages_fuse::FuseTable; -use crate::locks::LockManager; use crate::pipelines::PipelineBuilder; impl PipelineBuilder { @@ -66,11 +65,6 @@ impl PipelineBuilder { } let snapshot_gen = MutationGenerator::new(plan.snapshot.clone(), plan.mutation_kind); - let lock = if plan.need_lock { - Some(LockManager::create_table_lock(plan.table_info.clone())?) - } else { - None - }; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -80,7 +74,6 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - lock.clone(), None, plan.deduplicated_label.clone(), ) diff --git a/src/query/service/src/pipelines/builders/builder_recluster.rs b/src/query/service/src/pipelines/builders/builder_recluster.rs index 44a156fd18563..43ac9f5f46170 100644 --- a/src/query/service/src/pipelines/builders/builder_recluster.rs +++ b/src/query/service/src/pipelines/builders/builder_recluster.rs @@ -39,7 +39,6 @@ use databend_common_storages_fuse::operations::TransformSerializeBlock; use databend_common_storages_fuse::FuseTable; use databend_common_storages_fuse::TableContext; -use crate::locks::LockManager; use crate::pipelines::builders::SortPipelineBuilder; use crate::pipelines::processors::TransformAddStreamColumns; use crate::pipelines::PipelineBuilder; @@ -229,13 +228,6 @@ impl PipelineBuilder { let snapshot_gen = MutationGenerator::new(recluster_sink.snapshot.clone(), MutationKind::Recluster); - let lock = if recluster_sink.need_lock { - Some(LockManager::create_table_lock( - recluster_sink.table_info.clone(), - )?) - } else { - None - }; self.main_pipeline.add_sink(|input| { CommitSink::try_create( table, @@ -245,7 +237,6 @@ impl PipelineBuilder { snapshot_gen.clone(), input, None, - lock.clone(), None, None, ) diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index ab66f2483d53b..a7277ee64db55 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -75,7 +75,9 @@ use databend_common_meta_app::tenant::Tenant; use databend_common_metrics::storage::*; use databend_common_pipeline_core::processors::PlanProfile; use databend_common_pipeline_core::InputError; +use databend_common_pipeline_core::LockGuard; use databend_common_settings::Settings; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::IndexType; use databend_common_storage::CopyStatus; use databend_common_storage::DataOperator; @@ -102,6 +104,7 @@ use xorf::BinaryFuse16; use crate::catalogs::Catalog; use crate::clusters::Cluster; +use crate::locks::LockManager; use crate::pipelines::executor::PipelineExecutor; use crate::servers::flight::v1::exchange::DataExchangeManager; use crate::sessions::query_affect::QueryAffect; @@ -315,6 +318,59 @@ impl QueryContext { pub fn clear_tables_cache(&self) { self.shared.clear_tables_cache() } + + pub async fn acquire_table_lock( + self: Arc, + catalog_name: &str, + db_name: &str, + tbl_name: &str, + ) -> Result> { + let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); + if !enabled_table_lock { + return Ok(None); + } + + let catalog = self.get_catalog(catalog_name).await?; + let tbl = catalog + .get_table(&self.get_tenant(), db_name, tbl_name) + .await?; + if tbl.engine() != "FUSE" { + return Ok(None); + } + + // Add table lock. + let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; + table_lock.try_lock(self).await + } + + pub async fn acquire_table_lock_with_opt( + self: Arc, + catalog_name: &str, + db_name: &str, + tbl_name: &str, + lock_opt: &LockTableOption, + ) -> Result> { + let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); + if !enabled_table_lock { + return Ok(None); + } + + let catalog = self.get_catalog(catalog_name).await?; + let tbl = catalog + .get_table(&self.get_tenant(), db_name, tbl_name) + .await?; + if tbl.engine() != "FUSE" { + return Ok(None); + } + + // Add table lock. + let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; + match lock_opt { + LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self).await, + LockTableOption::LockWithRetry => table_lock.try_lock(self).await, + LockTableOption::NoLock => Ok(None), + } + } } #[async_trait::async_trait] diff --git a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs index 86ea6dd5783da..c345865e0d75e 100644 --- a/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_commit_sink.rs @@ -34,6 +34,5 @@ pub struct CommitSink { pub mutation_kind: MutationKind, pub update_stream_meta: Vec, pub merge_meta: bool, - pub need_lock: bool, pub deduplicated_label: Option, } diff --git a/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs index b58f763eca1d2..f9fd7c1742859 100644 --- a/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs +++ b/src/query/sql/src/executor/physical_plans/physical_recluster_sink.rs @@ -34,5 +34,4 @@ pub struct ReclusterSink { pub remained_blocks: Vec>, pub removed_segment_indexes: Vec, pub removed_segment_summary: Statistics, - pub need_lock: bool, } diff --git a/src/query/storages/fuse/src/operations/commit.rs b/src/query/storages/fuse/src/operations/commit.rs index b9d617c7479ef..ab9948736c404 100644 --- a/src/query/storages/fuse/src/operations/commit.rs +++ b/src/query/storages/fuse/src/operations/commit.rs @@ -104,7 +104,6 @@ impl FuseTable { snapshot_gen.clone(), input, None, - None, prev_snapshot_id, deduplicated_label.clone(), ) diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index ad26a0aef5e0a..4def9bbf6ff38 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -19,7 +19,6 @@ use std::time::Instant; use backoff::backoff::Backoff; use backoff::ExponentialBackoff; -use databend_common_catalog::lock::Lock; use databend_common_catalog::table::Table; use databend_common_catalog::table::TableExt; use databend_common_catalog::table_context::TableContext; @@ -35,7 +34,6 @@ use databend_common_pipeline_core::processors::Event; use databend_common_pipeline_core::processors::InputPort; use databend_common_pipeline_core::processors::Processor; use databend_common_pipeline_core::processors::ProcessorPtr; -use databend_common_pipeline_core::LockGuard; use databend_storages_common_table_meta::meta::ClusterKey; use databend_storages_common_table_meta::meta::Location; use databend_storages_common_table_meta::meta::SnapshotId; @@ -59,7 +57,6 @@ use crate::FuseTable; enum State { None, FillDefault, - TryLock, RefreshTable, GenerateSnapshot { previous: Option>, @@ -94,8 +91,6 @@ pub struct CommitSink { backoff: ExponentialBackoff, new_segment_locs: Vec, - lock_guard: Option, - lock: Option>, start_time: Instant, prev_snapshot_id: Option, @@ -116,7 +111,6 @@ where F: SnapshotGenerator + Send + 'static snapshot_gen: F, input: Arc, max_retry_elapsed: Option, - lock: Option>, prev_snapshot_id: Option, deduplicated_label: Option, ) -> Result { @@ -129,14 +123,12 @@ where F: SnapshotGenerator + Send + 'static table: Arc::new(table.clone()), copied_files, snapshot_gen, - lock_guard: None, purge, backoff: ExponentialBackoff::default(), retries: 0, max_retry_elapsed, input, new_segment_locs: vec![], - lock, start_time: Instant::now(), prev_snapshot_id, change_tracking: table.change_tracking_enabled(), @@ -179,11 +171,7 @@ where F: SnapshotGenerator + Send + 'static self.snapshot_gen .set_conflict_resolve_context(meta.conflict_resolve_context); - if self.lock.is_some() { - self.state = State::TryLock; - } else { - self.state = State::FillDefault; - } + self.state = State::FillDefault; Ok(Event::Async) } @@ -242,8 +230,6 @@ where F: SnapshotGenerator + Send + 'static } if matches!(self.state, State::Finish) { - // release the lock manually. - std::mem::take(&mut self.lock_guard); return Ok(Event::Finished); } @@ -364,15 +350,6 @@ where F: SnapshotGenerator + Send + 'static }; } } - State::TryLock => match self.lock.as_ref().unwrap().try_lock(self.ctx.clone()).await { - Ok(guard) => { - self.lock_guard = guard; - self.state = State::FillDefault; - } - Err(e) => { - self.state = State::Abort(e); - } - }, State::TryCommit { data, snapshot, diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index ac9611850c216..72f018d1a087f 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -107,8 +107,11 @@ impl FuseTable { MetaReaders::segment_info_reader(self.get_operator(), table_schema.clone()); // If no segment locations are specified, iterates through all segments - let segment_locs = if let Some(segment_locs) = &segment_locs { - segment_locs.clone() + let segment_locs = if let Some(segment_locs) = segment_locs { + segment_locs + .into_iter() + .filter(|s| snapshot.segments.contains(s)) + .collect() } else { snapshot.segments.clone() }; diff --git a/src/query/storages/fuse/src/operations/truncate.rs b/src/query/storages/fuse/src/operations/truncate.rs index b7657f04c003d..f3397cc6e2571 100644 --- a/src/query/storages/fuse/src/operations/truncate.rs +++ b/src/query/storages/fuse/src/operations/truncate.rs @@ -67,7 +67,6 @@ impl FuseTable { snapshot_gen.clone(), input, None, - None, prev_snapshot_id, None, ) diff --git a/tests/cloud_control_server/simple_server.py b/tests/cloud_control_server/simple_server.py index 0ef3de850ee90..f2371ae0c46fc 100644 --- a/tests/cloud_control_server/simple_server.py +++ b/tests/cloud_control_server/simple_server.py @@ -44,9 +44,9 @@ def load_data_from_json(): notification_history_data = json.load(f) notification_history = notification_pb2.NotificationHistory() json_format.ParseDict(notification_history_data, notification_history) - NOTIFICATION_HISTORY_DB[ - notification_history.name - ] = notification_history + NOTIFICATION_HISTORY_DB[notification_history.name] = ( + notification_history + ) def create_task_request_to_task(id, create_task_request): From 3f7cf11d448b5c175e94cf22ab89747f3e86c7ca Mon Sep 17 00:00:00 2001 From: zhyass Date: Tue, 28 May 2024 13:52:27 +0800 Subject: [PATCH 3/3] fix review commend --- src/query/catalog/src/lock.rs | 8 +- .../src/interpreters/interpreter_delete.rs | 7 +- .../interpreters/interpreter_merge_into.rs | 7 +- .../interpreter_table_index_refresh.rs | 2 +- .../interpreter_table_modify_column.rs | 8 +- .../interpreter_table_optimize.rs | 2 +- .../interpreter_table_recluster.rs | 8 +- .../interpreter_table_truncate.rs | 8 +- .../src/interpreters/interpreter_update.rs | 7 +- src/query/service/src/locks/lock_manager.rs | 76 +++---------------- src/query/service/src/locks/table_lock/mod.rs | 12 +-- src/query/service/src/sessions/query_ctx.rs | 28 +------ 12 files changed, 65 insertions(+), 108 deletions(-) diff --git a/src/query/catalog/src/lock.rs b/src/query/catalog/src/lock.rs index 9b70de893494b..5f8c261bd9429 100644 --- a/src/query/catalog/src/lock.rs +++ b/src/query/catalog/src/lock.rs @@ -30,7 +30,9 @@ pub trait Lock: Sync + Send { fn tenant_name(&self) -> &str; - async fn try_lock(&self, ctx: Arc) -> Result>; - - async fn try_lock_no_retry(&self, ctx: Arc) -> Result>; + async fn try_lock( + &self, + ctx: Arc, + should_retry: bool, + ) -> Result>; } diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 7e1ae829d5b18..644b74614a4f4 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -117,7 +117,12 @@ impl Interpreter for DeleteInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(catalog_name, db_name, tbl_name) + .acquire_table_lock( + catalog_name, + db_name, + tbl_name, + &LockTableOption::LockWithRetry, + ) .await?; let catalog = self.ctx.get_catalog(catalog_name).await?; diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index f58f2766c5948..ed80e637f5c27 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -149,7 +149,12 @@ impl MergeIntoInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(catalog, database, table_name) + .acquire_table_lock( + catalog, + database, + table_name, + &LockTableOption::LockWithRetry, + ) .await?; let table = self.ctx.get_table(catalog, database, table_name).await?; diff --git a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs index a45453dc64228..61704238cdf10 100644 --- a/src/query/service/src/interpreters/interpreter_table_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_table_index_refresh.rs @@ -60,7 +60,7 @@ impl Interpreter for RefreshTableIndexInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock_with_opt( + .acquire_table_lock( &self.plan.catalog, &self.plan.database, &self.plan.table, diff --git a/src/query/service/src/interpreters/interpreter_table_modify_column.rs b/src/query/service/src/interpreters/interpreter_table_modify_column.rs index d444a5ea1c528..f26940cc52977 100644 --- a/src/query/service/src/interpreters/interpreter_table_modify_column.rs +++ b/src/query/service/src/interpreters/interpreter_table_modify_column.rs @@ -37,6 +37,7 @@ use databend_common_sql::executor::physical_plans::DistributedInsertSelect; use databend_common_sql::executor::PhysicalPlan; use databend_common_sql::executor::PhysicalPlanBuilder; use databend_common_sql::field_default_value; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::ModifyColumnAction; use databend_common_sql::plans::ModifyTableColumnPlan; use databend_common_sql::plans::Plan; @@ -549,7 +550,12 @@ impl Interpreter for ModifyTableColumnInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(catalog_name, db_name, tbl_name) + .acquire_table_lock( + catalog_name, + db_name, + tbl_name, + &LockTableOption::LockWithRetry, + ) .await?; let catalog = self.ctx.get_catalog(catalog_name).await?; diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 8b71de7dc1cd5..5143d8778704f 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -148,7 +148,7 @@ impl OptimizeTableInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock_with_opt( + .acquire_table_lock( &self.plan.catalog, &self.plan.database, &self.plan.table, diff --git a/src/query/service/src/interpreters/interpreter_table_recluster.rs b/src/query/service/src/interpreters/interpreter_table_recluster.rs index 51fc511ab062b..cdfb744d610ff 100644 --- a/src/query/service/src/interpreters/interpreter_table_recluster.rs +++ b/src/query/service/src/interpreters/interpreter_table_recluster.rs @@ -30,6 +30,7 @@ use databend_common_sql::executor::physical_plans::ReclusterSink; use databend_common_sql::executor::physical_plans::ReclusterSource; use databend_common_sql::executor::physical_plans::ReclusterTask; use databend_common_sql::executor::PhysicalPlan; +use databend_common_sql::plans::LockTableOption; use databend_common_storages_fuse::FuseTable; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::Statistics; @@ -122,7 +123,12 @@ impl Interpreter for ReclusterTableInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(&self.plan.catalog, &self.plan.database, &self.plan.table) + .acquire_table_lock( + &self.plan.catalog, + &self.plan.database, + &self.plan.table, + &LockTableOption::LockWithRetry, + ) .await?; let table = catalog diff --git a/src/query/service/src/interpreters/interpreter_table_truncate.rs b/src/query/service/src/interpreters/interpreter_table_truncate.rs index 56d41b271cc84..7bef98eac64e8 100644 --- a/src/query/service/src/interpreters/interpreter_table_truncate.rs +++ b/src/query/service/src/interpreters/interpreter_table_truncate.rs @@ -17,6 +17,7 @@ use std::sync::Arc; use databend_common_catalog::table::TableExt; use databend_common_config::GlobalConfig; use databend_common_exception::Result; +use databend_common_sql::plans::LockTableOption; use databend_common_sql::plans::TruncateTablePlan; use crate::interpreters::Interpreter; @@ -74,7 +75,12 @@ impl Interpreter for TruncateTableInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(&self.catalog_name, &self.database_name, &self.table_name) + .acquire_table_lock( + &self.catalog_name, + &self.database_name, + &self.table_name, + &LockTableOption::LockWithRetry, + ) .await?; let table = self diff --git a/src/query/service/src/interpreters/interpreter_update.rs b/src/query/service/src/interpreters/interpreter_update.rs index 7e17b4d38c722..daa231df68c5b 100644 --- a/src/query/service/src/interpreters/interpreter_update.rs +++ b/src/query/service/src/interpreters/interpreter_update.rs @@ -101,7 +101,12 @@ impl Interpreter for UpdateInterpreter { let lock_guard = self .ctx .clone() - .acquire_table_lock(catalog_name, db_name, tbl_name) + .acquire_table_lock( + catalog_name, + db_name, + tbl_name, + &LockTableOption::LockWithRetry, + ) .await?; // build physical plan. diff --git a/src/query/service/src/locks/lock_manager.rs b/src/query/service/src/locks/lock_manager.rs index 3da25f08ee14b..6df103b317d27 100644 --- a/src/query/service/src/locks/lock_manager.rs +++ b/src/query/service/src/locks/lock_manager.rs @@ -95,6 +95,7 @@ impl LockManager { self: &Arc, ctx: Arc, lock: &T, + should_retry: bool, ) -> Result> { let user = ctx.get_current_user()?.name; let node = ctx.get_cluster().local_id.clone(); @@ -156,6 +157,16 @@ impl LockManager { break; } + // if no need retry, return error directly. + if !should_retry { + catalog + .delete_lock_revision(delete_table_lock_req.clone()) + .await?; + return Err(ErrorCode::TableAlreadyLocked( + "table is locked by other session, please retry later".to_string(), + )); + } + let watch_delete_ident = TableLockIdent::new(&tenant, table_id, reply[position - 1].0); // Get the previous revision, watch the delete event. @@ -192,71 +203,6 @@ impl LockManager { Ok(Some(guard)) } - /// Try request lock without retry. - #[async_backtrace::framed] - pub async fn try_lock_no_retry( - self: &Arc, - ctx: Arc, - lock: &T, - ) -> Result> { - let table_id = lock.get_table_id(); - let lock_key = LockKey::Table { - tenant: Tenant::new_or_err(lock.tenant_name(), func_name!())?, - table_id, - }; - - let expire_secs = ctx.get_settings().get_table_lock_expire_secs()?; - let req = CreateLockRevReq::new( - lock_key.clone(), - ctx.get_current_user()?.name, - ctx.get_cluster().local_id.clone(), - ctx.get_current_session_id(), - expire_secs, - ); - - // get a new table lock revision. - let catalog = ctx.get_catalog(lock.get_catalog()).await?; - let res = catalog.create_lock_revision(req).await?; - let revision = res.revision; - // metrics. - record_created_lock_nums(lock.lock_type().to_string(), table_id, 1); - - let lock_holder = Arc::new(LockHolder::default()); - lock_holder - .start(ctx.get_id(), catalog.clone(), lock, revision, expire_secs) - .await?; - - self.insert_lock(revision, lock_holder); - let guard = LockGuard::new(self.clone(), revision); - - // List all revisions and check if the current is the minimum. - let reply = catalog - .list_lock_revisions(ListLockRevReq::new(lock_key.clone())) - .await?; - let position = reply.iter().position(|(x, _)| *x == revision).ok_or_else(|| - // If the current is not found in list, it means that the current has been expired. - ErrorCode::TableLockExpired("the acquired table lock has been expired".to_string()), - )?; - - if position == 0 { - // The lock is acquired by current session. - catalog - .extend_lock_revision(ExtendLockRevReq::new(lock_key, revision, expire_secs, true)) - .await?; - // metrics. - record_acquired_lock_nums(lock.lock_type().to_string(), table_id, 1); - } else { - catalog - .delete_lock_revision(DeleteLockRevReq::new(lock_key, revision)) - .await?; - return Err(ErrorCode::TableAlreadyLocked( - "table is locked by other session, please retry later".to_string(), - )); - } - - Ok(Some(guard)) - } - fn insert_lock(&self, revision: u64, lock_holder: Arc) { let mut active_locks = self.active_locks.write(); let prev = active_locks.insert(revision, lock_holder); diff --git a/src/query/service/src/locks/table_lock/mod.rs b/src/query/service/src/locks/table_lock/mod.rs index d8e5069c933d4..3ea2680e71b8b 100644 --- a/src/query/service/src/locks/table_lock/mod.rs +++ b/src/query/service/src/locks/table_lock/mod.rs @@ -55,11 +55,11 @@ impl Lock for TableLock { &self.table_info.tenant } - async fn try_lock(&self, ctx: Arc) -> Result> { - self.lock_mgr.try_lock(ctx, self).await - } - - async fn try_lock_no_retry(&self, ctx: Arc) -> Result> { - self.lock_mgr.try_lock_no_retry(ctx, self).await + async fn try_lock( + &self, + ctx: Arc, + should_retry: bool, + ) -> Result> { + self.lock_mgr.try_lock(ctx, self, should_retry).await } } diff --git a/src/query/service/src/sessions/query_ctx.rs b/src/query/service/src/sessions/query_ctx.rs index a7277ee64db55..f855fe5de1710 100644 --- a/src/query/service/src/sessions/query_ctx.rs +++ b/src/query/service/src/sessions/query_ctx.rs @@ -324,30 +324,6 @@ impl QueryContext { catalog_name: &str, db_name: &str, tbl_name: &str, - ) -> Result> { - let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); - if !enabled_table_lock { - return Ok(None); - } - - let catalog = self.get_catalog(catalog_name).await?; - let tbl = catalog - .get_table(&self.get_tenant(), db_name, tbl_name) - .await?; - if tbl.engine() != "FUSE" { - return Ok(None); - } - - // Add table lock. - let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; - table_lock.try_lock(self).await - } - - pub async fn acquire_table_lock_with_opt( - self: Arc, - catalog_name: &str, - db_name: &str, - tbl_name: &str, lock_opt: &LockTableOption, ) -> Result> { let enabled_table_lock = self.get_settings().get_enable_table_lock().unwrap_or(false); @@ -366,8 +342,8 @@ impl QueryContext { // Add table lock. let table_lock = LockManager::create_table_lock(tbl.get_table_info().clone())?; match lock_opt { - LockTableOption::LockNoRetry => table_lock.try_lock_no_retry(self).await, - LockTableOption::LockWithRetry => table_lock.try_lock(self).await, + LockTableOption::LockNoRetry => table_lock.try_lock(self, false).await, + LockTableOption::LockWithRetry => table_lock.try_lock(self, true).await, LockTableOption::NoLock => Ok(None), } }