Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] add command to show the dropped meta information that can be recovered #51007

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
1 change: 1 addition & 0 deletions be/src/exec/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ set(EXEC_FILES
schema_scanner/sys_fe_locks.cpp
schema_scanner/sys_fe_memory_usage.cpp
schema_scanner/schema_temp_tables_scanner.cpp
schema_scanner/schema_recyclebin_catalogs.cpp
jdbc_scanner.cpp
sorting/compare_column.cpp
sorting/merge_column.cpp
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
#include "exec/schema_scanner/schema_partitions_meta_scanner.h"
#include "exec/schema_scanner/schema_pipe_files.h"
#include "exec/schema_scanner/schema_pipes.h"
#include "exec/schema_scanner/schema_recyclebin_catalogs.h"
#include "exec/schema_scanner/schema_routine_load_jobs_scanner.h"
#include "exec/schema_scanner/schema_schema_privileges_scanner.h"
#include "exec/schema_scanner/schema_schemata_scanner.h"
Expand Down Expand Up @@ -213,6 +214,8 @@ std::unique_ptr<SchemaScanner> SchemaScanner::create(TSchemaTableType::type type
return std::make_unique<SysFeMemoryUsage>();
case TSchemaTableType::SCH_TEMP_TABLES:
return std::make_unique<SchemaTempTablesScanner>();
case TSchemaTableType::SCH_RECYCLEBIN_CATALOGS:
return std::make_unique<SchemaRecycleBinCatalogs>();
default:
return std::make_unique<SchemaDummyScanner>();
}
Expand Down
6 changes: 6 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,12 @@ Status SchemaHelper::get_partitions_meta(const SchemaScannerState& state, const
});
}

Status SchemaHelper::listRecycleBinCatalogs(const SchemaScannerState& state, const TListRecycleBinCatalogsParams& req,
TListRecycleBinCatalogsResult* res) {
return _call_rpc(state,
[&req, &res](FrontendServiceConnection& client) { client->listRecycleBinCatalogs(*res, req); });
}

void fill_data_column_with_null(Column* data_column) {
auto* nullable_column = down_cast<NullableColumn*>(data_column);
nullable_column->append_nulls(1);
Expand Down
3 changes: 3 additions & 0 deletions be/src/exec/schema_scanner/schema_helper.h
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ class SchemaHelper {
static Status get_partitions_meta(const SchemaScannerState& state, const TGetPartitionsMetaRequest& var_params,
TGetPartitionsMetaResponse* var_result);

static Status listRecycleBinCatalogs(const SchemaScannerState& state, const TListRecycleBinCatalogsParams& req,
TListRecycleBinCatalogsResult* res);

private:
static Status _call_rpc(const SchemaScannerState& state,
std::function<void(ClientConnection<FrontendServiceClient>&)> callback);
Expand Down
84 changes: 84 additions & 0 deletions be/src/exec/schema_scanner/schema_recyclebin_catalogs.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "exec/schema_scanner/schema_recyclebin_catalogs.h"

#include "exec/schema_scanner.h"
#include "exec/schema_scanner/schema_helper.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"
#include "runtime/string_value.h"
#include "types/logical_type.h"

namespace starrocks {

SchemaScanner::ColumnDesc SchemaRecycleBinCatalogs::_s_columns[] = {
{"TYPE", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
{"NAME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
{"DBID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"TABLEID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"PARTID", TypeDescriptor::from_logical_type(TYPE_BIGINT), sizeof(int64_t), false},
{"DROPTIME", TypeDescriptor::create_varchar_type(sizeof(StringValue)), sizeof(StringValue), false},
};

SchemaRecycleBinCatalogs::SchemaRecycleBinCatalogs()
: SchemaScanner(_s_columns, sizeof(_s_columns) / sizeof(SchemaScanner::ColumnDesc)) {}

Status SchemaRecycleBinCatalogs::start(RuntimeState* state) {
RETURN_IF_ERROR(SchemaScanner::init_schema_scanner_state(state));
return SchemaScanner::start(state);
}

Status SchemaRecycleBinCatalogs::_listRecycleBinCatalogs() {
RETURN_IF(_param->ip == nullptr || _param->port == 0, Status::InternalError("unknown frontend address"));

TListRecycleBinCatalogsParams params;
if (_param->current_user_ident) {
params.__set_user_ident(*_param->current_user_ident);
}
return SchemaHelper::listRecycleBinCatalogs(_ss_state, params, &_recyclebin_catalogs_result);
}

Status SchemaRecycleBinCatalogs::get_next(ChunkPtr* chunk, bool* eos) {
while (_cur_row >= _recyclebin_catalogs_result.recyclebin_catalogs.size()) {
if (!_fetched) {
_fetched = true;
RETURN_IF_ERROR(_listRecycleBinCatalogs());
} else {
*eos = true;
return Status::OK();
}
}
*eos = false;
return _fill_chunk(chunk);
}

DatumArray SchemaRecycleBinCatalogs::_build_row() {
auto& info = _recyclebin_catalogs_result.recyclebin_catalogs.at(_cur_row++);
return {
Slice(info.type), Slice(info.name), info.dbid, info.tableid, info.partitionid, Slice(info.droptime),
};
}

Status SchemaRecycleBinCatalogs::_fill_chunk(ChunkPtr* chunk) {
auto& slot_id_map = (*chunk)->get_slot_id_to_index_map();
auto datum_array = _build_row();
for (const auto& [slot_id, index] : slot_id_map) {
Column* column = (*chunk)->get_column_by_slot_id(slot_id).get();
column->append_datum(datum_array[slot_id - 1]);
}
return {};
}

} // namespace starrocks
44 changes: 44 additions & 0 deletions be/src/exec/schema_scanner/schema_recyclebin_catalogs.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include "column/datum.h"
#include "exec/schema_scanner.h"
#include "gen_cpp/FrontendService_types.h"
#include "runtime/runtime_state.h"

namespace starrocks {

class SchemaRecycleBinCatalogs : public SchemaScanner {
public:
SchemaRecycleBinCatalogs();

~SchemaRecycleBinCatalogs() override = default;

Status start(RuntimeState* state) override;
Status get_next(ChunkPtr* chunk, bool* eos) override;

private:
Status _fill_chunk(ChunkPtr* chunk);
DatumArray _build_row();
Status _listRecycleBinCatalogs();

size_t _cur_row = 0;
bool _fetched = false;
TListRecycleBinCatalogsResult _recyclebin_catalogs_result;
static SchemaScanner::ColumnDesc _s_columns[];
};

}; // namespace starrocks
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import com.starrocks.common.io.Text;
import com.starrocks.common.io.Writable;
import com.starrocks.common.util.FrontendDaemon;
import com.starrocks.common.util.TimeUtils;
import com.starrocks.persist.ImageWriter;
import com.starrocks.persist.RecoverInfo;
import com.starrocks.persist.metablock.SRMetaBlockEOFException;
Expand All @@ -68,6 +69,7 @@

import java.io.DataOutput;
import java.io.IOException;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
Expand All @@ -76,6 +78,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import javax.validation.constraints.NotNull;

Expand Down Expand Up @@ -969,6 +972,82 @@ protected void runAfterCatalogReady() {
}
}

public synchronized List<List<String>> getCatalogRecycleBinInfo() {
Map<Long, Long> dbToDataSize = Maps.newHashMap();
List<List<String>> tableInfos = Lists.newArrayList();
for (Map<Long, RecycleTableInfo> tableEntry : idToTableInfo.rowMap().values()) {
for (Map.Entry<Long, RecycleTableInfo> entry : tableEntry.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Table");
RecycleTableInfo tableInfo = entry.getValue();
Table table = tableInfo.getTable();
info.add(table.getName());
info.add(String.valueOf(tableInfo.getDbId()));
info.add(String.valueOf(entry.getKey()));
info.add("");
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
tableInfos.add(info);
}
}
// sort by Name, DropTime
tableInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});

List<List<String>> partitionInfos = Lists.newArrayList();
for (Map.Entry<Long, RecyclePartitionInfo> entry : idToPartition.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Partition");
RecyclePartitionInfo partitionInfo = entry.getValue();
Partition partition = partitionInfo.getPartition();
info.add(partition.getName());
info.add(String.valueOf(partitionInfo.getDbId()));
info.add(String.valueOf(partitionInfo.getTableId()));
info.add(String.valueOf(entry.getKey()));
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
partitionInfos.add(info);
}
// sort by Name, DropTime
partitionInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});

List<List<String>> dbInfos = Lists.newArrayList();
for (Map.Entry<Long, RecycleDatabaseInfo> entry : idToDatabase.entrySet()) {
List<String> info = Lists.newArrayList();
info.add("Database");
RecycleDatabaseInfo dbInfo = entry.getValue();
Database db = dbInfo.getDb();
info.add(db.getFullName());
info.add(String.valueOf(entry.getKey()));
info.add("");
info.add("");
info.add(TimeUtils.longToTimeString(idToRecycleTime.get(entry.getKey())));
dbInfos.add(info);
}
// sort by Name, DropTime
dbInfos.sort((x, y) -> {
int nameRet = x.get(1).compareTo(y.get(1));
if (nameRet == 0) {
return x.get(5).compareTo(y.get(5));
} else {
return nameRet;
}
});

return Stream.of(dbInfos, tableInfos, partitionInfos).flatMap(Collection::stream).collect(Collectors.toList());
}

@VisibleForTesting
synchronized boolean isContainedInidToRecycleTime(long id) {
return idToRecycleTime.get(id) != null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ public class SystemId {
public static final long MEMORY_USAGE_ID = 106L;
public static final long PIPE_FILES_ID = 120L;
public static final long PIPES_ID = 121L;
public static final long RECYCLEBIN_CATALOGS = 122L;
public static final long BE_DATACACHE_METRICS = 130L;
// Remain for other datacache manage table

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public InfoSchemaDb(String catalogName) {
super.registerTableUnlocked(BeDataCacheMetricsTable.create());
super.registerTableUnlocked(PartitionsMetaSystemTable.create());
super.registerTableUnlocked(TemporaryTablesTable.create());
super.registerTableUnlocked(RecycleBinCatalogsTable.create());
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package com.starrocks.catalog.system.information;

import com.starrocks.catalog.ScalarType;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.system.SystemId;
import com.starrocks.catalog.system.SystemTable;
import com.starrocks.thrift.TSchemaTableType;

public class RecycleBinCatalogsTable {
public static final String NAME = "recyclebin_catalogs";

public static SystemTable create() {
return new SystemTable(SystemId.RECYCLEBIN_CATALOGS, NAME, Table.TableType.SCHEMA,
SystemTable.builder()
.column("TYPE", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("NAME", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.column("DBID", ScalarType.BIGINT)
.column("TABLEID", ScalarType.BIGINT)
.column("PARTID", ScalarType.BIGINT)
.column("DROPTIME", ScalarType.createVarcharType(SystemTable.NAME_CHAR_LEN))
.build(),
TSchemaTableType.SCH_RECYCLEBIN_CATALOGS);
}
}
7 changes: 7 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/qe/ShowExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@
import com.starrocks.sql.ast.ShowBackupStmt;
import com.starrocks.sql.ast.ShowBasicStatsMetaStmt;
import com.starrocks.sql.ast.ShowBrokerStmt;
import com.starrocks.sql.ast.ShowCatalogRecycleBinStmt;
import com.starrocks.sql.ast.ShowCatalogsStmt;
import com.starrocks.sql.ast.ShowCharsetStmt;
import com.starrocks.sql.ast.ShowCollationStmt;
Expand Down Expand Up @@ -2686,6 +2687,12 @@ public ShowResultSet visitShowBackendBlackListStatement(ShowBackendBlackListStmt
}

@Override
public ShowResultSet visitShowCatalogRecycleBinStatement(ShowCatalogRecycleBinStmt statement, ConnectContext context) {
List<List<String>> rowSet = GlobalStateMgr.getCurrentState().getRecycleBin().getCatalogRecycleBinInfo().stream()
.sorted(Comparator.comparing(o -> o.get(0))).collect(Collectors.toList());
return new ShowResultSet(statement.getMetaData(), rowSet);
}

public ShowResultSet visitShowWarehousesStatement(ShowWarehousesStmt statement, ConnectContext context) {
GlobalStateMgr globalStateMgr = GlobalStateMgr.getCurrentState();
WarehouseManager warehouseMgr = globalStateMgr.getWarehouseMgr();
Expand Down
Loading
Loading