Skip to content

Commit c633123

Browse files
amrdbdrrtuy
authored andcommitted
feat: add vacuum_partition functionality with initialization and execution logic
1 parent 74486da commit c633123

File tree

18 files changed

+832
-11
lines changed

18 files changed

+832
-11
lines changed

dbcon/mysql/ha_mcs_client_udfs.cpp

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1230,6 +1230,67 @@ extern "C"
12301230
{
12311231
}
12321232

1233+
my_bool vacuum_partition_init(UDF_INIT* initid, UDF_ARGS* args, char* message, const char* funcname)
1234+
{
1235+
if (args->arg_count != 3 ||
1236+
args->arg_type[0] != STRING_RESULT ||
1237+
args->arg_type[1] != STRING_RESULT ||
1238+
args->arg_type[2] != STRING_RESULT)
1239+
{
1240+
sprintf(message, "%s() requires three string arguments", funcname);
1241+
return 1;
1242+
}
1243+
1244+
initid->maybe_null = 0;
1245+
initid->max_length = 255;
1246+
1247+
return 0;
1248+
}
1249+
1250+
my_bool mcs_vacuum_partition_init(UDF_INIT* initid, UDF_ARGS* args, char* message)
1251+
{
1252+
return vacuum_partition_init(initid, args, message, "MCSVACUUMPARTITION");
1253+
}
1254+
1255+
const char* mcs_vacuum_partition(UDF_INIT* /*initid*/, UDF_ARGS* args, char* result,
1256+
unsigned long* length, char* /*is_null*/, char* /*error*/)
1257+
{
1258+
THD* thd = current_thd;
1259+
1260+
if (get_fe_conn_info_ptr() == NULL)
1261+
{
1262+
set_fe_conn_info_ptr((void*)new cal_connection_info());
1263+
thd_set_ha_data(thd, mcs_hton, get_fe_conn_info_ptr());
1264+
}
1265+
1266+
cal_connection_info* ci = reinterpret_cast<cal_connection_info*>(get_fe_conn_info_ptr());
1267+
execplan::CalpontSystemCatalog::TableName tableName;
1268+
1269+
tableName.schema = args->args[0];
1270+
tableName.table = args->args[1];
1271+
std::string partition = args->args[2];
1272+
1273+
if (lower_case_table_names) {
1274+
boost::algorithm::to_lower(tableName.schema);
1275+
boost::algorithm::to_lower(tableName.table);
1276+
}
1277+
1278+
if (!ci->dmlProc)
1279+
{
1280+
ci->dmlProc = new MessageQueueClient("DMLProc");
1281+
}
1282+
1283+
std::string vacuumResult = ha_mcs_impl_vacuum_partition(*ci, tableName, partition);
1284+
1285+
memcpy(result, vacuumResult.c_str(), vacuumResult.length());
1286+
*length = vacuumResult.length();
1287+
return result;
1288+
}
1289+
1290+
void mcs_vacuum_partition_deinit(UDF_INIT* /*initid*/)
1291+
{
1292+
}
1293+
12331294
my_bool analyze_table_bloat_init(UDF_INIT* initid, UDF_ARGS* args, char* message, const char* funcname)
12341295
{
12351296
if (args->arg_count != 2 ||

dbcon/mysql/ha_mcs_dml.cpp

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
#include <unordered.h>
2929
#include <fstream>
3030
#include <sstream>
31+
#include <vector>
3132
#include <cerrno>
3233
#include <cstring>
3334

@@ -1077,6 +1078,158 @@ std::string ha_mcs_impl_analyze_partition_bloat(cal_impl_if::cal_connection_info
10771078
return analysisResult;
10781079
}
10791080

1081+
std::string ha_mcs_impl_vacuum_partition(cal_impl_if::cal_connection_info& ci,
1082+
execplan::CalpontSystemCatalog::TableName& tablename,
1083+
const std::string& partition)
1084+
{
1085+
THD* thd = current_thd;
1086+
ulong sessionID = tid2sid(thd->thread_id);
1087+
std::string result;
1088+
1089+
try
1090+
{
1091+
// Parse partition triplet string: "dbroot.segment.partition"
1092+
uint16_t dbRoot;
1093+
uint16_t segmentNum;
1094+
uint32_t partitionNum;
1095+
1096+
std::istringstream ss(partition);
1097+
std::string item;
1098+
std::vector<std::string> tokens;
1099+
1100+
while (std::getline(ss, item, '.')) {
1101+
tokens.push_back(item);
1102+
}
1103+
1104+
if (tokens.size() != 3) {
1105+
return "Error: Invalid partition triplet format. Expected: dbroot.segment.partition";
1106+
}
1107+
1108+
try
1109+
{
1110+
dbRoot = static_cast<uint16_t>(std::stoul(tokens[0]));
1111+
segmentNum = static_cast<uint16_t>(std::stoul(tokens[1]));
1112+
partitionNum = std::stoul(tokens[2]);
1113+
}
1114+
catch (const std::exception&)
1115+
{
1116+
return "Error: Invalid numeric values in partition triplet";
1117+
}
1118+
1119+
// Get system catalog
1120+
boost::shared_ptr<execplan::CalpontSystemCatalog> systemCatalogPtr =
1121+
execplan::CalpontSystemCatalog::makeCalpontSystemCatalog(sessionID);
1122+
systemCatalogPtr->identity(execplan::CalpontSystemCatalog::EC);
1123+
1124+
// Get table information
1125+
execplan::CalpontSystemCatalog::TableName sysCatalogTableName;
1126+
sysCatalogTableName.schema = tablename.schema;
1127+
sysCatalogTableName.table = tablename.table;
1128+
1129+
// Check if table exists
1130+
execplan::CalpontSystemCatalog::RIDList ridList;
1131+
try
1132+
{
1133+
ridList = systemCatalogPtr->columnRIDs(sysCatalogTableName);
1134+
}
1135+
catch (const std::exception& ex)
1136+
{
1137+
return std::string("Error: Table not found - ") + ex.what();
1138+
}
1139+
1140+
if (ridList.empty())
1141+
{
1142+
return "Error: Table has no columns";
1143+
}
1144+
1145+
// Get AUX column OID
1146+
execplan::CalpontSystemCatalog::OID auxColumnOid = systemCatalogPtr->tableAUXColumnOID(sysCatalogTableName);
1147+
1148+
// Build column list for createHiddenStripeColumnExtents
1149+
std::vector<BRM::CreateStripeColumnExtentsArgIn> cols;
1150+
1151+
// Add regular columns
1152+
for (const auto& roPair : ridList)
1153+
{
1154+
BRM::CreateStripeColumnExtentsArgIn colArg;
1155+
colArg.oid = roPair.objnum;
1156+
1157+
execplan::CalpontSystemCatalog::ColType colType = systemCatalogPtr->colType(roPair.objnum);
1158+
colArg.colDataType = colType.colDataType;
1159+
1160+
// Set width based on whether it's a dictionary column
1161+
if (colType.ddn.dictOID > 3000)
1162+
{
1163+
colArg.width = 8; // Dictionary columns use 8-byte tokens
1164+
}
1165+
else
1166+
{
1167+
colArg.width = colType.colWidth;
1168+
}
1169+
1170+
cols.push_back(colArg);
1171+
}
1172+
1173+
// Add AUX column if it exists
1174+
if (auxColumnOid > 3000)
1175+
{
1176+
BRM::CreateStripeColumnExtentsArgIn auxColArg;
1177+
auxColArg.oid = auxColumnOid;
1178+
execplan::CalpontSystemCatalog::ColType auxColType = systemCatalogPtr->colType(auxColumnOid);
1179+
auxColArg.colDataType = auxColType.colDataType;
1180+
auxColArg.width = auxColType.colWidth;
1181+
cols.push_back(auxColArg);
1182+
}
1183+
1184+
std::vector<BRM::CreateStripeColumnExtentsArgOut> extents;
1185+
1186+
// Store original values for comparison
1187+
uint16_t originalSegmentNum = segmentNum;
1188+
uint32_t originalPartitionNum = partitionNum;
1189+
1190+
// Call createHiddenStripeColumnExtents
1191+
BRM::DBRM dbrm;
1192+
int rc = dbrm.createHiddenStripeColumnExtents(cols, dbRoot, partitionNum, segmentNum, extents);
1193+
1194+
if (rc != 0)
1195+
{
1196+
std::ostringstream oss;
1197+
oss << "Error: Failed to create hidden stripe column extents, error code: " << rc;
1198+
return oss.str();
1199+
}
1200+
1201+
// Build success message
1202+
std::ostringstream successMsg;
1203+
successMsg << "Successfully created hidden partition on DBRoot " << dbRoot;
1204+
1205+
// Check if the function used our requested values or assigned different ones
1206+
if (partitionNum != originalPartitionNum || segmentNum != originalSegmentNum)
1207+
{
1208+
successMsg << " (requested: " << originalPartitionNum << "." << originalSegmentNum
1209+
<< ", assigned: " << partitionNum << "." << segmentNum << ")";
1210+
}
1211+
else
1212+
{
1213+
successMsg << " at partition " << partitionNum << ", segment " << segmentNum;
1214+
}
1215+
1216+
successMsg << " with " << extents.size() << " extents";
1217+
1218+
result = successMsg.str();
1219+
}
1220+
catch (const std::exception& ex)
1221+
{
1222+
std::ostringstream errorMsg;
1223+
errorMsg << "Error: " << ex.what();
1224+
result = errorMsg.str();
1225+
}
1226+
catch (...)
1227+
{
1228+
result = "Error: Unknown exception occurred during vacuum partition operation";
1229+
}
1230+
1231+
return result;
1232+
}
10801233

10811234
std::string ha_mcs_impl_analyze_table_bloat(cal_impl_if::cal_connection_info& ci,
10821235
execplan::CalpontSystemCatalog::TableName& tablename)

dbcon/mysql/ha_mcs_impl.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,4 +80,7 @@ extern std::string ha_mcs_impl_analyze_partition_bloat(cal_impl_if::cal_connecti
8080
const std::string& partition);
8181
extern std::string ha_mcs_impl_analyze_table_bloat(cal_impl_if::cal_connection_info& ci,
8282
execplan::CalpontSystemCatalog::TableName& tablename);
83+
extern std::string ha_mcs_impl_vacuum_partition(cal_impl_if::cal_connection_info& ci,
84+
execplan::CalpontSystemCatalog::TableName& tablename,
85+
const std::string& partition);
8386
#endif

dbcon/mysql/install_mcs_mysql.sh.in

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ CREATE OR REPLACE FUNCTION mcs_emindex_size RETURNS INTEGER SONAME 'ha_columnsto
5858
CREATE OR REPLACE FUNCTION mcs_emindex_free RETURNS INTEGER SONAME 'ha_columnstore.so';
5959
CREATE OR REPLACE FUNCTION mcs_analyze_partition_bloat RETURNS STRING SONAME 'ha_columnstore.so';
6060
CREATE OR REPLACE FUNCTION mcs_analyze_table_bloat RETURNS STRING SONAME 'ha_columnstore.so';
61+
CREATE OR REPLACE FUNCTION mcs_vacuum_partition RETURNS STRING SONAME 'ha_columnstore.so';
6162
CREATE OR REPLACE FUNCTION columnstore_dataload RETURNS STRING SONAME 'ha_columnstore.so';
6263
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgx RETURNS REAL SONAME 'libregr_mysql.so';
6364
CREATE OR REPLACE AGGREGATE FUNCTION regr_avgy RETURNS REAL SONAME 'libregr_mysql.so';

utils/libmarias3/libmarias3

Submodule libmarias3 updated 104 files

versioning/BRM/brmtypes.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -481,6 +481,8 @@ const uint8_t MARK_ALL_PARTITION_FOR_DELETION = 41;
481481
const uint8_t CREATE_COLUMN_EXTENT_EXACT_FILE = 42;
482482
const uint8_t DELETE_DBROOT = 43;
483483
const uint8_t CREATE_STRIPE_COLUMN_EXTENTS = 44;
484+
const uint8_t CREATE_HIDDEN_STRIPE_COLUMN_EXTENTS = 107;
485+
const uint8_t MAKE_PARTITION_VISIBLE = 108;
484486

485487
/* SessionManager interface */
486488
const uint8_t VER_ID = 45;

versioning/BRM/dbrm.cpp

Lines changed: 108 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -991,6 +991,114 @@ int DBRM::createStripeColumnExtents(const std::vector<CreateStripeColumnExtentsA
991991
return 0;
992992
}
993993

994+
//------------------------------------------------------------------------------
995+
// Send a request to create hidden stripe column extents
996+
//------------------------------------------------------------------------------
997+
int DBRM::createHiddenStripeColumnExtents(const std::vector<CreateStripeColumnExtentsArgIn>& cols, uint16_t dbRoot,
998+
uint32_t& partitionNum, uint16_t& segmentNum,
999+
std::vector<CreateStripeColumnExtentsArgOut>& extents) DBRM_THROW
1000+
{
1001+
#ifdef BRM_INFO
1002+
1003+
if (fDebug)
1004+
{
1005+
TRACER_WRITELATER("createHiddenStripeColumnExtents");
1006+
TRACER_WRITE;
1007+
}
1008+
1009+
#endif
1010+
1011+
ByteStream command, response;
1012+
uint8_t err;
1013+
uint16_t tmp16;
1014+
uint32_t tmp32;
1015+
1016+
command << CREATE_HIDDEN_STRIPE_COLUMN_EXTENTS;
1017+
serializeInlineVector(command, cols);
1018+
command << dbRoot << partitionNum;
1019+
1020+
err = send_recv(command, response);
1021+
1022+
if (err != ERR_OK)
1023+
return err;
1024+
1025+
if (response.length() == 0)
1026+
return ERR_NETWORK;
1027+
1028+
try
1029+
{
1030+
response >> err;
1031+
1032+
if (err != 0)
1033+
return (int)err;
1034+
1035+
response >> tmp32;
1036+
partitionNum = tmp32;
1037+
response >> tmp16;
1038+
segmentNum = tmp16;
1039+
deserializeInlineVector(response, extents);
1040+
}
1041+
catch (exception& e)
1042+
{
1043+
cerr << e.what() << endl;
1044+
return ERR_FAILURE;
1045+
}
1046+
1047+
CHECK_EMPTY(response);
1048+
return 0;
1049+
}
1050+
1051+
//------------------------------------------------------------------------------
1052+
// Send a request to make a hidden partition visible
1053+
//------------------------------------------------------------------------------
1054+
int DBRM::makePartitionVisible(const std::set<OID_t>& oids, uint16_t dbRoot, uint32_t partitionNum) DBRM_THROW
1055+
{
1056+
#ifdef BRM_INFO
1057+
1058+
if (fDebug)
1059+
{
1060+
TRACER_WRITELATER("makePartitionVisible");
1061+
TRACER_WRITE;
1062+
}
1063+
1064+
#endif
1065+
1066+
ByteStream command, response;
1067+
uint8_t err;
1068+
1069+
command << MAKE_PARTITION_VISIBLE;
1070+
command << static_cast<uint32_t>(oids.size());
1071+
for (const auto& oid : oids)
1072+
{
1073+
command << oid;
1074+
}
1075+
command << dbRoot << partitionNum;
1076+
1077+
err = send_recv(command, response);
1078+
1079+
if (err != ERR_OK)
1080+
return err;
1081+
1082+
if (response.length() == 0)
1083+
return ERR_NETWORK;
1084+
1085+
try
1086+
{
1087+
response >> err;
1088+
1089+
if (err != 0)
1090+
return (int)err;
1091+
}
1092+
catch (exception& e)
1093+
{
1094+
cerr << e.what() << endl;
1095+
return ERR_FAILURE;
1096+
}
1097+
1098+
CHECK_EMPTY(response);
1099+
return 0;
1100+
}
1101+
9941102
//------------------------------------------------------------------------------
9951103
// Send a request to create a column extent for the specified OID and DBRoot.
9961104
//------------------------------------------------------------------------------

0 commit comments

Comments
 (0)