diff --git a/docs/en/14-reference/01-components/01-taosd.md b/docs/en/14-reference/01-components/01-taosd.md index 306d5a8013b9..c6f15e020077 100644 --- a/docs/en/14-reference/01-components/01-taosd.md +++ b/docs/en/14-reference/01-components/01-taosd.md @@ -15,10 +15,63 @@ The command line parameters for taosd are as follows: - -s: Prints SDB information. - -C: Prints configuration information. - -e: Specifies environment variables, formatted like `-e 'TAOS_FQDN=td1'`. +- -r: Starts repair mode. Currently supports `--mode copy` (copy vnode data from a healthy source node). - -k: Retrieves the machine code. - -dm: Enables memory scheduling. - -V: Prints version information. +## Copy Mode Repair + +Copy mode copies files for specified vnodes directly from a healthy source node to the current (target) node. This is intended for scenarios where the volume of corrupted data is too large for regular repair mode to handle within acceptable time. + +### Syntax + +```bash +taosd -r --mode copy --node-type vnode --source-cfg \ + [--source-host ] --vnode [,|-]... +``` + +### Options + +| Option | Required | Description | +| --- | --- | --- | +| `--source-cfg` | Yes | Path to the source node's `taos.cfg` configuration file, or the directory containing it | +| `--source-host` | No | SSH host of the source node; omit for local source | +| `--vnode` | Yes | Comma-separated list of vnode IDs to copy; ranges with `-` are supported (e.g., `3,5-8,10`) | + +### Limitations + +- Only `--node-type vnode` is supported. +- Windows is not currently supported for copy mode. +- Remote mode requires passwordless SSH access (BatchMode). + +### Examples + +Copy a single vnode from a local source node: + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /data/source-cluster/taos.cfg \ + --vnode 3 +``` + +Copy multiple vnodes from a local source node (specifying config directory): + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /etc/taos/ \ + --vnode 3,5,8 +``` + +Copy vnodes from a remote source node: + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /etc/taos/taos.cfg \ + --source-host 192.168.1.100 \ + --vnode 3,5 +``` + ## Configuration Parameters :::note diff --git a/docs/zh/14-reference/01-components/01-taosd.md b/docs/zh/14-reference/01-components/01-taosd.md index 14651bd7a094..19acd63dae30 100644 --- a/docs/zh/14-reference/01-components/01-taosd.md +++ b/docs/zh/14-reference/01-components/01-taosd.md @@ -17,10 +17,63 @@ taosd 命令行参数如下: - -e:指定环境变量的字符串,例如 `-e 'TAOS_FQDN=td1'`。 - -E:指定环境变量的文件路径,默认是 `./.env`,.env 文件中的内容可以是 `TAOS_FQDN=td1`。 - -o:指定日志输入方式,可选 `stdout`、`stderr`、`/dev/null`、``、`/`、``。 +- -r:启动修复模式。当前支持 `--mode copy`(从健康的源节点拷贝 vnode 数据)。 - -k:获取机器码 - -dm:启用内存调度 - -V:打印版本信息 +## 拷贝模式修复 + +拷贝模式用于从健康的源节点直接拷贝指定 vnode 的文件到当前(目标)节点。适用于损坏的数据量巨大、常规修复模式的性能无法满足要求的场景。 + +### 语法 + +```bash +taosd -r --mode copy --node-type vnode --source-cfg \ + [--source-host ] --vnode [,|-]... +``` + +### 参数 + +| 参数 | 必填 | 说明 | +| --- | --- | --- | +| `--source-cfg` | 是 | 源节点 `taos.cfg` 配置文件的路径,也可以指定配置文件所在目录 | +| `--source-host` | 否 | 源节点的 SSH 主机地址;省略时表示源数据在本地 | +| `--vnode` | 是 | 要拷贝的 vnode ID 列表,多个 ID 用逗号分隔,支持用 `-` 指定范围(如 `3,5-8,10`) | + +### 限制 + +- 当前只支持 `--node-type vnode`。 +- Windows 平台暂不支持拷贝模式。 +- 远程模式需要 SSH 免密登录(BatchMode)。 + +### 示例 + +从本地源节点拷贝单个 vnode: + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /data/source-cluster/taos.cfg \ + --vnode 3 +``` + +从本地源节点拷贝多个 vnode(指定配置目录): + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /etc/taos/ \ + --vnode 3,5,8 +``` + +从远程源节点拷贝 vnode: + +```bash +taosd -r --mode copy --node-type vnode \ + --source-cfg /etc/taos/taos.cfg \ + --source-host 192.168.1.100 \ + --vnode 3,5 +``` + ## 配置参数 :::note diff --git a/include/common/dmRepairCopy.h b/include/common/dmRepairCopy.h new file mode 100644 index 000000000000..ba003c0d2a0e --- /dev/null +++ b/include/common/dmRepairCopy.h @@ -0,0 +1,49 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_DM_REPAIR_COPY_H_ +#define _TD_DM_REPAIR_COPY_H_ + +#include "os.h" +#include "tarray.h" +#include "tdef.h" + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct SRepairCopyOpts { + bool enabled; + char modeStr[32]; + char nodeType[32]; + char sourceHost[256]; + char sourceCfg[PATH_MAX]; + SArray *vnodeIds; // sorted ascending, deduplicated array of int32_t +} SRepairCopyOpts; + +// Parse a vnode ID list string like "2-5,8,3,2" into a sorted, deduplicated +// SArray of int32_t. Caller must call taosArrayDestroy() on the result. +// Returns NULL on parse error. +SArray *dmParseVnodeIds(const char *str); + +// Execute copy-mode repair. Returns exit code: 0=all ok, 1=bad args, +// 2=SSH fail, 3=partial failure, 4=all failed. +int32_t dmRepairCopyMode(const SRepairCopyOpts *pOpts); + +#ifdef __cplusplus +} +#endif + +#endif /* _TD_DM_REPAIR_COPY_H_ */ diff --git a/source/common/CMakeLists.txt b/source/common/CMakeLists.txt index ca5ca14b0030..e3c19425f631 100644 --- a/source/common/CMakeLists.txt +++ b/source/common/CMakeLists.txt @@ -53,6 +53,8 @@ target_include_directories( PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" PRIVATE "${GRANT_CFG_INCLUDE_DIR}" + PRIVATE "${TD_SOURCE_DIR}/include/libs/tfs" + PRIVATE "${TD_SOURCE_DIR}/include/libs/monitor" ) if(${TD_WINDOWS}) diff --git a/source/common/src/dmRepairCopy.c b/source/common/src/dmRepairCopy.c new file mode 100644 index 000000000000..46873708b9e2 --- /dev/null +++ b/source/common/src/dmRepairCopy.c @@ -0,0 +1,2139 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#define _DEFAULT_SOURCE +#include "dmRepairCopy.h" +#include "tconfig.h" +#include "tfs.h" +#include "tglobal.h" +#include "tjson.h" +#include "tlog.h" + +// Lightweight TFS model — uses SDiskCfg directly with a parallel array +// of tier-local IDs assigned during construction. +typedef struct SRepairTfs { + int32_t ndisk; + int32_t nlevel; + SDiskCfg *disks; // array of ndisk disks (owned) + int32_t primaryIdx; // index into disks[] for the primary disk +} SRepairTfs; + +// Lightweight representation of a single TSDB file parsed from current.json. +typedef struct SRepairFile { + int32_t type; // 0=head,1=data,2=sma,3=tomb,5=stt + SDiskID did; // source disk ID {level, id} + int32_t lcn; // last chunk number (S3) + int32_t fid; // file set id + int64_t cid; // commit id + int64_t size; // file size in bytes + int64_t minVer; // minimum version (-1 if absent) + int64_t maxVer; // maximum version (-1 if absent) + int32_t sttLevel; // STT compaction level (only for type 5) +} SRepairFile; + +// Lightweight representation of a TSDB file set parsed from current.json. +typedef struct SRepairFileSet { + int32_t fid; // file set id + SArray *files; // SArray of SRepairFile + int64_t lastCompact; // last compact timestamp + int64_t lastCommit; // last commit timestamp +} SRepairFileSet; + +// Per-vnode repair result. +typedef struct SRepairVnodeResult { + int8_t result; // 0=success, 1=failed, 2=skipped + bool hasS3File; // true if any source file has lcn > 1 + const char *reason; // failure/skip reason (string literal) +} SRepairVnodeResult; + +// Buffer sizes for shell-quoted paths and SSH commands. +// Shell quoting can expand a single quote to 4 chars ('\''). PATH_MAX*2 handles +// paths with a reasonable number of special chars; SSH_CMD_BUF covers two quoted +// paths + host + command prefix. +#define DM_SHELL_QUOTED_PATH_LEN (PATH_MAX * 2) +#define DM_SSH_CMD_BUF_LEN (PATH_MAX * 2 * 2 + 512) + +// Shell-quote a string for safe use in sh/bash commands. +// Produces output wrapped in single quotes with embedded single quotes escaped as '\''. +// Returns the number of bytes written (excluding null), or -1 if buffer too small. +static int32_t dmShellQuote(const char *in, char *out, size_t outLen) { + if (in == NULL || out == NULL || outLen < 3) return -1; + char *p = out; + char *end = out + outLen - 1; + + *p++ = '\''; + while (*in != '\0') { + if (*in == '\'') { + // Need 4 chars: '\'' plus we're inside a quote + if (p + 4 > end) return -1; + *p++ = '\''; // close current quote + *p++ = '\\'; + *p++ = '\''; // escaped literal quote + *p++ = '\''; // reopen quote + } else { + if (p + 1 > end) return -1; + *p++ = *in; + } + in++; + } + if (p + 1 > end) return -1; + *p++ = '\''; + *p = '\0'; + return (int32_t)(p - out); +} + +static int32_t compareInt32(const void *a, const void *b) { + int32_t va = *(const int32_t *)a; + int32_t vb = *(const int32_t *)b; + if (va < vb) return -1; + if (va > vb) return 1; + return 0; +} + +SArray *dmParseVnodeIds(const char *str) { + if (str == NULL || str[0] == '\0') return NULL; + + SArray *pArr = taosArrayInit(8, sizeof(int32_t)); + if (pArr == NULL) return NULL; + + const char *p = str; + while (*p != '\0') { + // skip leading whitespace + while (*p == ' ' || *p == '\t') p++; + if (*p == '\0') break; + + // parse first number + char *end = NULL; + int32_t lo = taosStr2Int32(p, &end, 10); + if (end == p || lo <= 0) goto _err; + + // skip whitespace + while (*end == ' ' || *end == '\t') end++; + + if (*end == '-') { + // range: lo-hi + end++; + int32_t hi = taosStr2Int32(end, &end, 10); + if (hi < lo) goto _err; + for (int32_t id = lo; id <= hi; id++) { + if (taosArrayPush(pArr, &id) == NULL) goto _err; + } + } else { + if (taosArrayPush(pArr, &lo) == NULL) goto _err; + } + + // skip whitespace + while (*end == ' ' || *end == '\t') end++; + + if (*end == ',') { + end++; + } else if (*end != '\0') { + goto _err; + } + p = end; + } + + if (taosArrayGetSize(pArr) == 0) goto _err; + + taosArraySort(pArr, compareInt32); + taosArrayRemoveDuplicate(pArr, compareInt32, NULL); + + return pArr; + +_err: + taosArrayDestroy(pArr); + return NULL; +} + +// Fetch a remote file to a local path via SSH. +// Returns 0 on success, -1 on error. +static int32_t dmSshFetchFile(const char *host, const char *remotePath, const char *localPath) { + char qHost[320], qRemote[DM_SHELL_QUOTED_PATH_LEN], qLocal[DM_SHELL_QUOTED_PATH_LEN]; + if (dmShellQuote(host, qHost, sizeof(qHost)) < 0 || dmShellQuote(remotePath, qRemote, sizeof(qRemote)) < 0 || + dmShellQuote(localPath, qLocal, sizeof(qLocal)) < 0) { + uError("repair: shell quote failed in dmSshFetchFile"); + return -1; + } + char cmd[DM_SSH_CMD_BUF_LEN]; + snprintf(cmd, sizeof(cmd), "ssh -o BatchMode=yes %s cat %s > %s 2>/dev/null", qHost, qRemote, qLocal); + TdCmdPtr pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) { + uError("repair: failed to run ssh command"); + return -1; + } + char buf[256]; + while (taosGetsCmd(pCmd, sizeof(buf), buf) > 0) { + } + taosCloseCmd(&pCmd); + + // Verify file has content + int64_t fsize = 0; + if (taosStatFile(localPath, &fsize, NULL, NULL) != 0 || fsize <= 0) { + uError("repair: ssh fetch returned empty file for %s:%s", host, remotePath); + (void)taosRemoveFile(localPath); + return -1; + } + return 0; +} + +// Parse a taos.cfg file and extract SDiskCfg entries from the dataDir items. +// Returns 0 on success. On success, caller must free *ppDisks. +static int32_t dmParseSourceCfg(const char *cfgPath, SDiskCfg **ppDisks, int32_t *pNumDisks) { + if (!taosCheckExistFile(cfgPath)) { + uError("repair: config file does not exist: %s", cfgPath); + return -1; + } + + SConfig *pCfg = NULL; + int32_t code = cfgInit(&pCfg); + if (code != 0) { + uError("repair: cfgInit failed: %s", tstrerror(code)); + return -1; + } + + // Register dataDir so cfgLoad knows how to handle it + code = cfgAddDir(pCfg, "dataDir", "/tmp", CFG_SCOPE_SERVER, CFG_DYN_NONE, CFG_CATEGORY_LOCAL); + if (code != 0) { + uError("repair: cfgAddDir failed: %s", tstrerror(code)); + cfgCleanup(pCfg); + return -1; + } + + code = cfgLoad(pCfg, CFG_STYPE_CFG_FILE, cfgPath); + if (code != 0) { + uError("repair: cfgLoad failed for %s: %s", cfgPath, tstrerror(code)); + cfgCleanup(pCfg); + return -1; + } + + SConfigItem *pItem = cfgGetItem(pCfg, "dataDir"); + if (pItem == NULL) { + uError("repair: no dataDir found in %s", cfgPath); + cfgCleanup(pCfg); + return -1; + } + + int32_t ndisk = 0; + if (pItem->array != NULL) { + ndisk = taosArrayGetSize(pItem->array); + } + + SDiskCfg *disks = NULL; + if (ndisk <= 0) { + // Single default dataDir from pItem->str + ndisk = 1; + disks = taosMemoryCalloc(1, sizeof(SDiskCfg)); + if (disks == NULL) { + cfgCleanup(pCfg); + return -1; + } + tstrncpy(disks[0].dir, pItem->str, TSDB_FILENAME_LEN); + disks[0].level = 0; + disks[0].primary = 1; + disks[0].disable = 0; + } else { + if (ndisk > TFS_MAX_DISKS) ndisk = TFS_MAX_DISKS; + disks = taosMemoryCalloc(ndisk, sizeof(SDiskCfg)); + if (disks == NULL) { + cfgCleanup(pCfg); + return -1; + } + for (int32_t i = 0; i < ndisk; i++) { + SDiskCfg *pSrc = taosArrayGet(pItem->array, i); + disks[i] = *pSrc; + } + } + + cfgCleanup(pCfg); + *ppDisks = disks; + *pNumDisks = ndisk; + return 0; +} + +// Build a lightweight source TFS model from SDiskCfg array. +// Assigns tier-local IDs to each disk. Returns 0 on success. +static int32_t dmBuildRepairTfs(const SDiskCfg *pCfgArr, int32_t ndisk, SRepairTfs *pTfs) { + pTfs->ndisk = ndisk; + pTfs->nlevel = 0; + pTfs->primaryIdx = -1; + pTfs->disks = taosMemoryCalloc(ndisk, sizeof(SDiskCfg)); + if (pTfs->disks == NULL) return -1; + + for (int32_t i = 0; i < ndisk; i++) { + int32_t lvl = pCfgArr[i].level; + if (lvl < 0 || lvl >= TFS_MAX_TIERS) { + uError("repair: invalid disk level %d for %s", lvl, pCfgArr[i].dir); + taosMemoryFree(pTfs->disks); + pTfs->disks = NULL; + return -1; + } + + pTfs->disks[i] = pCfgArr[i]; + + if (lvl + 1 > pTfs->nlevel) { + pTfs->nlevel = lvl + 1; + } + if (pCfgArr[i].primary && lvl == 0) { + pTfs->primaryIdx = i; + } + } + + if (pTfs->primaryIdx < 0) { + uError("repair: no primary disk found in source config"); + taosMemoryFree(pTfs->disks); + pTfs->disks = NULL; + return -1; + } + + return 0; +} + +static void dmDestroyRepairTfs(SRepairTfs *pTfs) { + if (pTfs == NULL) return; + taosMemoryFreeClear(pTfs->disks); + pTfs->ndisk = 0; + pTfs->nlevel = 0; + pTfs->primaryIdx = -1; +} + +// Validate source disk paths exist (local mode only). +static int32_t dmValidateSourceDisksLocal(const SRepairTfs *pTfs) { + for (int32_t i = 0; i < pTfs->ndisk; i++) { + if (!taosDirExist(pTfs->disks[i].dir)) { + uError("repair: source dataDir does not exist: %s", pTfs->disks[i].dir); + return -1; + } + } + return 0; +} + +// Validate source disk paths exist (remote mode). +static int32_t dmValidateSourceDisksRemote(const char *host, const SRepairTfs *pTfs) { + for (int32_t i = 0; i < pTfs->ndisk; i++) { + char qHost[320], qDir[DM_SHELL_QUOTED_PATH_LEN]; + if (dmShellQuote(host, qHost, sizeof(qHost)) < 0 || dmShellQuote(pTfs->disks[i].dir, qDir, sizeof(qDir)) < 0) { + uError("repair: shell quote failed in dmValidateSourceDisksRemote"); + return -1; + } + char cmd[DM_SSH_CMD_BUF_LEN]; + snprintf(cmd, sizeof(cmd), "ssh -o BatchMode=yes %s test -d %s && echo YES", qHost, qDir); + TdCmdPtr pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) { + uError("repair: ssh connectivity failed"); + return -1; + } + bool found = false; + char buf[64]; + while (taosGetsCmd(pCmd, sizeof(buf), buf) > 0) { + if (strncmp(buf, "YES", 3) == 0) found = true; + } + taosCloseCmd(&pCmd); + if (!found) { + uError("repair: remote dataDir does not exist: %s:%s", host, pTfs->disks[i].dir); + return -1; + } + } + return 0; +} + +// Read an entire file into a null-terminated malloc'd buffer. +// Caller must taosMemoryFree(*ppContent). +static int32_t dmReadFileContent(const char *path, char **ppContent, int64_t *pSize) { + int64_t fsize = 0; + if (taosStatFile(path, &fsize, NULL, NULL) != 0 || fsize <= 0) { + return -1; + } + TdFilePtr pFile = taosOpenFile(path, TD_FILE_READ); + if (pFile == NULL) return -1; + + char *buf = taosMemoryMalloc(fsize + 1); + if (buf == NULL) { + taosCloseFile(&pFile); + return -1; + } + int64_t nread = taosReadFile(pFile, buf, fsize); + taosCloseFile(&pFile); + if (nread != fsize) { + taosMemoryFree(buf); + return -1; + } + buf[nread] = '\0'; + *ppContent = buf; + if (pSize) *pSize = nread; + return 0; +} + +// Load dnodeId from {tsDataDir}/dnode/dnode.json. +// Returns 0 on success, -1 on error (file missing or parse failure). +static int32_t dmLoadDnodeInfo(int32_t *pDnodeId) { + char file[PATH_MAX] = {0}; + snprintf(file, sizeof(file), "%s%sdnode%sdnode.json", tsDataDir, TD_DIRSEP, TD_DIRSEP); + + char *content = NULL; + if (dmReadFileContent(file, &content, NULL) != 0) { + uError("repair: failed to read dnode.json: %s", file); + return -1; + } + + SJson *pJson = tjsonParse(content); + taosMemoryFree(content); + if (pJson == NULL) { + uError("repair: failed to parse dnode.json"); + return -1; + } + + int32_t code = 0; + int32_t dnodeId = 0; + tjsonGetInt32ValueFromDouble(pJson, "dnodeId", dnodeId, code); + if (code < 0 || dnodeId <= 0) { + uError("repair: invalid or missing dnodeId in dnode.json"); + tjsonDelete(pJson); + return -1; + } + + tjsonDelete(pJson); + *pDnodeId = dnodeId; + return 0; +} + +// Open target TFS from global tsDiskCfg[]/tsDiskCfgNum. +// Returns 0 on success. Caller must call tfsClose(ppTfs) to free. +static int32_t dmOpenTargetTfs(STfs **ppTfs) { + SDiskCfg *pDisks = tsDiskCfg; + int32_t numOfDisks = tsDiskCfgNum; + + SDiskCfg tmpDisk = {0}; + if (numOfDisks <= 0) { + // Fallback: single disk from tsDataDir + tmpDisk.level = 0; + tmpDisk.primary = 1; + tmpDisk.disable = 0; + tstrncpy(tmpDisk.dir, tsDataDir, TSDB_FILENAME_LEN); + pDisks = &tmpDisk; + numOfDisks = 1; + } + + int32_t code = tfsOpen(pDisks, numOfDisks, ppTfs); + if (code != 0) { + uError("repair: failed to open target TFS: %s", tstrerror(code)); + return -1; + } + return 0; +} + +// Check if vnode{vid}.bak exists on any target TFS disk. +// Returns true if .bak found on any disk. +static bool dmCheckBakExists(STfs *pTgtTfs, int32_t vnodeId) { + char relBak[TSDB_FILENAME_LEN]; + snprintf(relBak, sizeof(relBak), "vnode%svnode%d.bak", TD_DIRSEP, vnodeId); + int32_t nlevel = tfsGetLevel(pTgtTfs); + for (int32_t level = 0; level < nlevel; level++) { + int32_t ndisk = tfsGetDisksAtLevel(pTgtTfs, level); + for (int32_t id = 0; id < ndisk; id++) { + SDiskID did = {.level = level, .id = id}; + const char *diskPath = tfsGetDiskPath(pTgtTfs, did); + char fullPath[PATH_MAX]; + snprintf(fullPath, sizeof(fullPath), "%s%s%s", diskPath, TD_DIRSEP, relBak); + if (taosDirExist(fullPath)) return true; + } + } + return false; +} + +static void dmDestroyRepairFileSets(SArray *pSets) { + if (pSets == NULL) return; + for (int32_t i = 0; i < taosArrayGetSize(pSets); i++) { + SRepairFileSet *pSet = taosArrayGet(pSets, i); + taosArrayDestroy(pSet->files); + } + taosArrayDestroy(pSets); +} + +// File type suffix strings for all types including STT. +static const char *gRepairFTypeSuffixAll[] = {[0] = "head", [1] = "data", [2] = "sma", + [3] = "tomb", [4] = NULL, [5] = "stt"}; + +// Read a JSON number field as int64_t via tjsonGetDoubleValue. +static int32_t dmJsonGetInt64FromDouble(SJson *pJson, const char *pName, int64_t *pVal) { + double tmp = 0; + int32_t code = tjsonGetDoubleValue(pJson, pName, &tmp); + if (code == 0) *pVal = (int64_t)tmp; + return code; +} + +// Parse a single file's JSON fields into SRepairFile. +static int32_t dmParseRepairFileJson(SJson *pJson, int32_t type, SRepairFile *pFile) { + int32_t code = 0; + pFile->type = type; + tjsonGetInt32ValueFromDouble(pJson, "did.level", pFile->did.level, code); + if (code < 0) return -1; + tjsonGetInt32ValueFromDouble(pJson, "did.id", pFile->did.id, code); + if (code < 0) return -1; + + pFile->lcn = 0; + tjsonGetInt32ValueFromDouble(pJson, "lcn", pFile->lcn, code); + + tjsonGetInt32ValueFromDouble(pJson, "fid", pFile->fid, code); + if (code < 0) return -1; + + if (dmJsonGetInt64FromDouble(pJson, "cid", &pFile->cid) < 0) return -1; + if (dmJsonGetInt64FromDouble(pJson, "size", &pFile->size) < 0) return -1; + + pFile->minVer = -1; + pFile->maxVer = -1; + (void)dmJsonGetInt64FromDouble(pJson, "minVer", &pFile->minVer); + (void)dmJsonGetInt64FromDouble(pJson, "maxVer", &pFile->maxVer); + + pFile->sttLevel = 0; + if (type == 5) { // TSDB_FTYPE_STT + tjsonGetInt32ValueFromDouble(pJson, "level", pFile->sttLevel, code); + if (code < 0) return -1; + } + return 0; +} + +// Parse current.json content into an SArray of SRepairFileSet. +// Returns NULL on error. +static SArray *dmParseCurrentJson(const char *content) { + SJson *pRoot = tjsonParse(content); + if (pRoot == NULL) { + uError("repair: failed to parse current.json"); + return NULL; + } + + // Check format version + int32_t fmtv = 0; + int32_t code = 0; + tjsonGetInt32ValueFromDouble(pRoot, "fmtv", fmtv, code); + if (code < 0 || fmtv != 1) { + uError("repair: unsupported current.json format version: %d", fmtv); + tjsonDelete(pRoot); + return NULL; + } + + SJson *pFsetArr = tjsonGetObjectItem(pRoot, "fset"); + if (pFsetArr == NULL) { + uError("repair: missing 'fset' array in current.json"); + tjsonDelete(pRoot); + return NULL; + } + + int32_t nFsets = tjsonGetArraySize(pFsetArr); + SArray *pSets = taosArrayInit(nFsets > 0 ? nFsets : 1, sizeof(SRepairFileSet)); + if (pSets == NULL) { + tjsonDelete(pRoot); + return NULL; + } + + for (int32_t i = 0; i < nFsets; i++) { + SJson *pFsetJson = tjsonGetArrayItem(pFsetArr, i); + SRepairFileSet fset = {0}; + + tjsonGetInt32ValueFromDouble(pFsetJson, "fid", fset.fid, code); + if (code < 0) goto _err; + + fset.files = taosArrayInit(8, sizeof(SRepairFile)); + if (fset.files == NULL) goto _err; + + // Parse non-STT file types (head, data, sma, tomb) + for (int32_t t = 0; t < 4; t++) { + SJson *pFileJson = tjsonGetObjectItem(pFsetJson, gRepairFTypeSuffixAll[t]); + if (pFileJson == NULL) continue; + SRepairFile rf = {0}; + if (dmParseRepairFileJson(pFileJson, t, &rf) != 0) { + taosArrayDestroy(fset.files); + goto _err; + } + if (taosArrayPush(fset.files, &rf) == NULL) { + taosArrayDestroy(fset.files); + goto _err; + } + } + + // Parse STT levels + SJson *pSttLvlArr = tjsonGetObjectItem(pFsetJson, "stt lvl"); + if (pSttLvlArr != NULL) { + int32_t nLvls = tjsonGetArraySize(pSttLvlArr); + for (int32_t l = 0; l < nLvls; l++) { + SJson *pLvlJson = tjsonGetArrayItem(pSttLvlArr, l); + SJson *pFilesArr = tjsonGetObjectItem(pLvlJson, "files"); + if (pFilesArr == NULL) continue; + int32_t nFiles = tjsonGetArraySize(pFilesArr); + for (int32_t f = 0; f < nFiles; f++) { + SJson *pSttJson = tjsonGetArrayItem(pFilesArr, f); + SRepairFile rf = {0}; + if (dmParseRepairFileJson(pSttJson, 5, &rf) != 0) { // 5 = TSDB_FTYPE_STT + taosArrayDestroy(fset.files); + goto _err; + } + if (taosArrayPush(fset.files, &rf) == NULL) { + taosArrayDestroy(fset.files); + goto _err; + } + } + } + } + + // Parse file set level timestamps + fset.lastCompact = 0; + fset.lastCommit = 0; + (void)dmJsonGetInt64FromDouble(pFsetJson, "last compact", &fset.lastCompact); + (void)dmJsonGetInt64FromDouble(pFsetJson, "last commit", &fset.lastCommit); + + if (taosArrayPush(pSets, &fset) == NULL) { + taosArrayDestroy(fset.files); + goto _err; + } + } + + tjsonDelete(pRoot); + return pSets; + +_err: + tjsonDelete(pRoot); + dmDestroyRepairFileSets(pSets); + return NULL; +} + +// Read and parse local (target) current.json for a vnode. +// Returns parsed SArray of SRepairFileSet, or NULL if file doesn't exist or fails to parse. +static SArray *dmReadLocalCurrentJson(STfs *pTgtTfs, int32_t vnodeId) { + const char *primaryPath = tfsGetPrimaryPath(pTgtTfs); + char path[PATH_MAX]; + snprintf(path, sizeof(path), "%s%svnode%svnode%d%stsdb%scurrent.json", primaryPath, TD_DIRSEP, TD_DIRSEP, vnodeId, + TD_DIRSEP, TD_DIRSEP); + + char *content = NULL; + if (dmReadFileContent(path, &content, NULL) != 0) return NULL; + SArray *pSets = dmParseCurrentJson(content); + taosMemoryFree(content); + return pSets; +} + +// Build the on-disk filename for a SRepairFile. +// Pattern: {diskPath}/vnode/vnode{vid}/tsdb/v{vid}f{fid}ver{cid}.{suffix} +// S3 variant (lcn>0): ...ver{cid}.{lcn}.{suffix} +static void dmBuildTsdbFilePath(const char *diskPath, int32_t vnodeId, const SRepairFile *pFile, char *buf, + int32_t bufLen) { + const char *suffix = gRepairFTypeSuffixAll[pFile->type]; + if (pFile->lcn > 0) { + snprintf(buf, bufLen, "%s%svnode%svnode%d%stsdb%sv%df%dver%" PRId64 ".%d.%s", diskPath, TD_DIRSEP, TD_DIRSEP, + vnodeId, TD_DIRSEP, TD_DIRSEP, vnodeId, pFile->fid, pFile->cid, pFile->lcn, suffix); + } else { + snprintf(buf, bufLen, "%s%svnode%svnode%d%stsdb%sv%df%dver%" PRId64 ".%s", diskPath, TD_DIRSEP, TD_DIRSEP, vnodeId, + TD_DIRSEP, TD_DIRSEP, vnodeId, pFile->fid, pFile->cid, suffix); + } +} + +// Determine which source file sets need to be copied vs retained from local. +// A source fid is "retained" only if the local also has that fid AND every file +// listed in the local file set physically exists on the target disk with correct size. +// Otherwise it needs copying from source. +// Sets *ppCopyFids to a new SArray of int32_t fids that need copying (caller frees). +// Sets *ppRetainFids to a new SArray of int32_t fids that can be hard-linked from backup. +static int32_t dmDiffFileSets(const SArray *srcSets, const SArray *localSets, STfs *pTgtTfs, int32_t vnodeId, + SArray **ppCopyFids, SArray **ppRetainFids) { + int32_t nSrc = taosArrayGetSize(srcSets); + *ppCopyFids = taosArrayInit(nSrc, sizeof(int32_t)); + *ppRetainFids = taosArrayInit(nSrc, sizeof(int32_t)); + if (*ppCopyFids == NULL || *ppRetainFids == NULL) { + taosArrayDestroy(*ppCopyFids); + taosArrayDestroy(*ppRetainFids); + *ppCopyFids = NULL; + *ppRetainFids = NULL; + return -1; + } + + for (int32_t s = 0; s < nSrc; s++) { + SRepairFileSet *pSrc = taosArrayGet(srcSets, s); + int32_t srcFid = pSrc->fid; + bool retained = false; + + if (localSets != NULL) { + int32_t nLocal = taosArrayGetSize(localSets); + for (int32_t l = 0; l < nLocal; l++) { + SRepairFileSet *pLocal = taosArrayGet(localSets, l); + if (pLocal->fid != srcFid) continue; + + // Same fid exists locally — verify every local file exists on disk + bool allExist = true; + int32_t nLocalFiles = taosArrayGetSize(pLocal->files); + for (int32_t f = 0; f < nLocalFiles; f++) { + SRepairFile *lf = taosArrayGet(pLocal->files, f); + const char *diskPath = tfsGetDiskPath(pTgtTfs, lf->did); + if (diskPath == NULL) { + allExist = false; + break; + } + + char filePath[PATH_MAX]; + dmBuildTsdbFilePath(diskPath, vnodeId, lf, filePath, sizeof(filePath)); + + int64_t actualSize = 0; + if (taosStatFile(filePath, &actualSize, NULL, NULL) != 0 || actualSize <= 0) { + allExist = false; + break; + } + } + if (allExist && nLocalFiles > 0) retained = true; + break; + } + } + + SArray *arr = retained ? *ppRetainFids : *ppCopyFids; + if (taosArrayPush(arr, &srcFid) == NULL) { + taosArrayDestroy(*ppCopyFids); + taosArrayDestroy(*ppRetainFids); + *ppCopyFids = NULL; + *ppRetainFids = NULL; + return -1; + } + } + return 0; +} + +// Read and parse source current.json into an SArray of SRepairFileSet. +// Returns NULL on error (file missing, SSH failure, or parse error). +static SArray *dmReadSourceCurrentJson(const SRepairTfs *pSrcTfs, const char *host, int32_t vnodeId) { + const char *primaryDir = pSrcTfs->disks[pSrcTfs->primaryIdx].dir; + char srcPath[PATH_MAX]; + snprintf(srcPath, sizeof(srcPath), "%s%svnode%svnode%d%stsdb%scurrent.json", primaryDir, TD_DIRSEP, TD_DIRSEP, + vnodeId, TD_DIRSEP, TD_DIRSEP); + + char *content = NULL; + if (host == NULL || host[0] == '\0') { + if (dmReadFileContent(srcPath, &content, NULL) != 0) return NULL; + } else { + char tmpPath[PATH_MAX]; + snprintf(tmpPath, sizeof(tmpPath), "/tmp/tdrepair_%d_v%d_current.json", (int)taosGetPId(), vnodeId); + if (dmSshFetchFile(host, srcPath, tmpPath) != 0) return NULL; + int32_t rc = dmReadFileContent(tmpPath, &content, NULL); + (void)taosRemoveFile(tmpPath); + if (rc != 0) return NULL; + } + + SArray *pSets = dmParseCurrentJson(content); + taosMemoryFree(content); + return pSets; +} + +// Step d: Backup vnodeN → vnodeN.bak on all target disks. +// Disks where vnodeN exists: rename to vnodeN.bak. +// Disks where vnodeN does not exist: create empty vnodeN.bak dir. +static int32_t dmBackupVnode(STfs *pTgtTfs, int32_t vnodeId) { + char relVnode[TSDB_FILENAME_LEN]; + char relBak[TSDB_FILENAME_LEN]; + snprintf(relVnode, sizeof(relVnode), "vnode%svnode%d", TD_DIRSEP, vnodeId); + snprintf(relBak, sizeof(relBak), "vnode%svnode%d.bak", TD_DIRSEP, vnodeId); + + int32_t nlevel = tfsGetLevel(pTgtTfs); + for (int32_t level = 0; level < nlevel; level++) { + int32_t ndisk = tfsGetDisksAtLevel(pTgtTfs, level); + for (int32_t id = 0; id < ndisk; id++) { + SDiskID did = {.level = level, .id = id}; + const char *diskPath = tfsGetDiskPath(pTgtTfs, did); + char srcPath[PATH_MAX]; + char dstPath[PATH_MAX]; + snprintf(srcPath, sizeof(srcPath), "%s%s%s", diskPath, TD_DIRSEP, relVnode); + snprintf(dstPath, sizeof(dstPath), "%s%s%s", diskPath, TD_DIRSEP, relBak); + + if (taosDirExist(srcPath)) { + if (taosRenameFile(srcPath, dstPath) != 0) { + uError("repair: vnode%d failed to rename %s to %s", vnodeId, srcPath, dstPath); + return -1; + } + uInfo("repair: vnode%d renamed %s to .bak", vnodeId, srcPath); + } else { + if (taosMulMkDir(dstPath) != 0) { + uError("repair: vnode%d failed to create backup dir %s", vnodeId, dstPath); + return -1; + } + } + } + } + return 0; +} + +// Step e: Create vnodeN/tsdb directory tree on all target disks. +static int32_t dmCreateVnodeDirs(STfs *pTgtTfs, int32_t vnodeId) { + char relTsdb[TSDB_FILENAME_LEN]; + snprintf(relTsdb, sizeof(relTsdb), "vnode%svnode%d%stsdb", TD_DIRSEP, vnodeId, TD_DIRSEP); + int32_t code = tfsMkdirRecur(pTgtTfs, relTsdb); + if (code != 0) { + uError("repair: vnode%d failed to create directories: %s", vnodeId, tstrerror(code)); + return -1; + } + return 0; +} + +// Step f: Hard-link retained tsdb files from vnodeN.bak to vnodeN. +// Each file is hard-linked on the same disk (same filesystem). +static int32_t dmHardLinkRetainedFiles(STfs *pTgtTfs, int32_t vnodeId, const SArray *retainFids, + const SArray *localFileSets) { + int32_t nRetain = taosArrayGetSize(retainFids); + int32_t nLocal = taosArrayGetSize(localFileSets); + + for (int32_t r = 0; r < nRetain; r++) { + int32_t fid = *(int32_t *)taosArrayGet(retainFids, r); + + // Find the local file set for this fid + SRepairFileSet *pLocal = NULL; + for (int32_t l = 0; l < nLocal; l++) { + SRepairFileSet *pSet = taosArrayGet(localFileSets, l); + if (pSet->fid == fid) { + pLocal = pSet; + break; + } + } + if (pLocal == NULL) continue; + + int32_t nFiles = taosArrayGetSize(pLocal->files); + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pFile = taosArrayGet(pLocal->files, f); + const char *diskPath = tfsGetDiskPath(pTgtTfs, pFile->did); + if (diskPath == NULL) { + uError("repair: vnode%d fid=%d invalid disk level=%d id=%d", vnodeId, fid, pFile->did.level, pFile->did.id); + return -1; + } + + const char *suffix = gRepairFTypeSuffixAll[pFile->type]; + char fileName[256]; + if (pFile->lcn > 0) { + snprintf(fileName, sizeof(fileName), "v%df%dver%" PRId64 ".%d.%s", vnodeId, pFile->fid, pFile->cid, pFile->lcn, + suffix); + } else { + snprintf(fileName, sizeof(fileName), "v%df%dver%" PRId64 ".%s", vnodeId, pFile->fid, pFile->cid, suffix); + } + + char bakPath[PATH_MAX]; + char newPath[PATH_MAX]; + snprintf(bakPath, sizeof(bakPath), "%s%svnode%svnode%d.bak%stsdb%s%s", diskPath, TD_DIRSEP, TD_DIRSEP, vnodeId, + TD_DIRSEP, TD_DIRSEP, fileName); + snprintf(newPath, sizeof(newPath), "%s%svnode%svnode%d%stsdb%s%s", diskPath, TD_DIRSEP, TD_DIRSEP, vnodeId, + TD_DIRSEP, TD_DIRSEP, fileName); + + if (taosLinkFile(bakPath, newPath) != 0) { + uError("repair: vnode%d failed to hard-link %s", vnodeId, fileName); + return -1; + } + uInfo("repair: vnode%d hard-linked %s", vnodeId, fileName); + } + } + return 0; +} + +// Recursively copy a directory tree, optionally skipping one subdirectory by name. +// When skipSubDir is non-NULL and matches a top-level entry, that subtree is skipped. +// For files: copies with taosCopyFile(), verifies size, logs name+size. +// For dirs: creates target dir, recurses (skipSubDir only applies at depth 0). +static int32_t dmCopyDirRecursive(const char *srcDir, const char *dstDir, const char *skipSubDir, int32_t vnodeId) { + TdDirPtr pDir = taosOpenDir(srcDir); + if (pDir == NULL) { + uError("repair: vnode%d cannot open source dir %s", vnodeId, srcDir); + return -1; + } + + TdDirEntryPtr pEntry; + while ((pEntry = taosReadDir(pDir)) != NULL) { + char *name = taosGetDirEntryName(pEntry); + if (name == NULL) continue; + if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0) continue; + + // Skip the excluded subdirectory at this level + if (skipSubDir != NULL && strcmp(name, skipSubDir) == 0) continue; + + char srcPath[PATH_MAX]; + char dstPath[PATH_MAX]; + snprintf(srcPath, sizeof(srcPath), "%s%s%s", srcDir, TD_DIRSEP, name); + snprintf(dstPath, sizeof(dstPath), "%s%s%s", dstDir, TD_DIRSEP, name); + + if (taosDirEntryIsDir(pEntry)) { + if (taosMulMkDir(dstPath) != 0) { + uError("repair: vnode%d failed to create dir %s", vnodeId, dstPath); + taosCloseDir(&pDir); + return -1; + } + uInfo("repair: vnode%d dir: %s", vnodeId, name); + // Recurse without skip — skipSubDir only applies at top level + if (dmCopyDirRecursive(srcPath, dstPath, NULL, vnodeId) != 0) { + taosCloseDir(&pDir); + return -1; + } + } else { + int64_t srcSize = 0; + if (taosStatFile(srcPath, &srcSize, NULL, NULL) != 0) { + uError("repair: vnode%d cannot stat source file %s", vnodeId, srcPath); + taosCloseDir(&pDir); + return -1; + } + uInfo("repair: vnode%d file: %s (%" PRId64 " bytes)", vnodeId, name, srcSize); + + int64_t copied = taosCopyFile(srcPath, dstPath); + if (copied < 0) { + uError("repair: vnode%d failed to copy %s", vnodeId, srcPath); + taosCloseDir(&pDir); + return -1; + } + + int64_t dstSize = 0; + if (taosStatFile(dstPath, &dstSize, NULL, NULL) != 0 || dstSize != srcSize) { + uError("repair: vnode%d size mismatch after copy: %s (src=%" PRId64 " dst=%" PRId64 ")", vnodeId, name, srcSize, + dstSize); + taosCloseDir(&pDir); + return -1; + } + } + } + taosCloseDir(&pDir); + return 0; +} + +// Step g: Copy non-tsdb files from source vnodeN to target primary disk. +// Local mode: recursive copy skipping tsdb/. +// Remote mode: scp -r then remove tsdb/ from the copy. +static int32_t dmCopyNonTsdbFiles(const SRepairTfs *pSrcTfs, STfs *pTgtTfs, const char *host, int32_t vnodeId) { + const char *tgtPrimary = tfsGetPrimaryPath(pTgtTfs); + char dstVnodeDir[PATH_MAX]; + snprintf(dstVnodeDir, sizeof(dstVnodeDir), "%s%svnode%svnode%d", tgtPrimary, TD_DIRSEP, TD_DIRSEP, vnodeId); + + if (host == NULL || host[0] == '\0') { + // Local mode: recursive copy skipping "tsdb" + const char *srcPrimary = pSrcTfs->disks[pSrcTfs->primaryIdx].dir; + char srcVnodeDir[PATH_MAX]; + snprintf(srcVnodeDir, sizeof(srcVnodeDir), "%s%svnode%svnode%d", srcPrimary, TD_DIRSEP, TD_DIRSEP, vnodeId); + + return dmCopyDirRecursive(srcVnodeDir, dstVnodeDir, "tsdb", vnodeId); + } + + // Remote mode: list source vnodeN/ entries, scp each non-tsdb item individually + const char *srcPrimary = pSrcTfs->disks[pSrcTfs->primaryIdx].dir; + char srcVnodeDir[PATH_MAX]; + snprintf(srcVnodeDir, sizeof(srcVnodeDir), "%s%svnode%svnode%d", srcPrimary, TD_DIRSEP, TD_DIRSEP, vnodeId); + + // List remote directory entries with type and size via ls -lA + struct SRemoteEntry { + char name[256]; + bool isDir; + int64_t size; + }; + char qHost[320], qSrcDir[DM_SHELL_QUOTED_PATH_LEN]; + if (dmShellQuote(host, qHost, sizeof(qHost)) < 0 || dmShellQuote(srcVnodeDir, qSrcDir, sizeof(qSrcDir)) < 0) { + uError("repair: vnode%d shell quote failed", vnodeId); + return -1; + } + char cmd[DM_SSH_CMD_BUF_LEN]; + snprintf(cmd, sizeof(cmd), "ssh -o BatchMode=yes %s ls -lA %s/ 2>/dev/null", qHost, qSrcDir); + TdCmdPtr pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) { + uError("repair: vnode%d ssh ls command failed to start", vnodeId); + return -1; + } + + // Collect entries with type and size + SArray *entries = taosArrayInit(8, sizeof(struct SRemoteEntry)); + if (entries == NULL) { + taosCloseCmd(&pCmd); + uError("repair: vnode%d memory allocation failed", vnodeId); + return -1; + } + char line[512]; + while (taosGetsCmd(pCmd, sizeof(line), line) > 0) { + // Strip trailing newline + int32_t len = (int32_t)strlen(line); + while (len > 0 && (line[len - 1] == '\n' || line[len - 1] == '\r')) line[--len] = '\0'; + if (len == 0) continue; + // Skip "total NNN" line + if (strncmp(line, "total ", 6) == 0) continue; + + // Parse: perms nlinks user group size mon day time name + char perms[16] = {0}, user[64] = {0}, group[64] = {0}, name[256] = {0}; + char mon[8] = {0}, day[8] = {0}, timeOrYear[16] = {0}; + int32_t nlinks = 0; + int64_t fsize = 0; + if (sscanf(line, "%15s %d %63s %63s %" PRId64 " %7s %7s %15s %255s", perms, &nlinks, user, group, &fsize, mon, day, + timeOrYear, name) < 9) { + continue; + } + if (strcmp(name, "tsdb") == 0) continue; + + struct SRemoteEntry re = {.isDir = (perms[0] == 'd'), .size = fsize}; + tstrncpy(re.name, name, sizeof(re.name)); + if (taosArrayPush(entries, &re) == NULL) { + taosArrayDestroy(entries); + taosCloseCmd(&pCmd); + uError("repair: vnode%d memory allocation failed", vnodeId); + return -1; + } + } + taosCloseCmd(&pCmd); + + // scp each entry + int32_t nEntries = taosArrayGetSize(entries); + for (int32_t i = 0; i < nEntries; i++) { + struct SRemoteEntry *re = taosArrayGet(entries, i); + if (re->isDir) { + uInfo("repair: vnode%d scp dir: %s", vnodeId, re->name); + } else { + uInfo("repair: vnode%d scp file: %s (%" PRId64 " bytes)", vnodeId, re->name, re->size); + } + + char qSrcFile[DM_SHELL_QUOTED_PATH_LEN], qDstDir[DM_SHELL_QUOTED_PATH_LEN]; + char srcFilePath[PATH_MAX]; + snprintf(srcFilePath, sizeof(srcFilePath), "%s/%s", srcVnodeDir, re->name); + if (dmShellQuote(srcFilePath, qSrcFile, sizeof(qSrcFile)) < 0 || + dmShellQuote(dstVnodeDir, qDstDir, sizeof(qDstDir)) < 0) { + uError("repair: vnode%d shell quote failed for %s", vnodeId, re->name); + taosArrayDestroy(entries); + return -1; + } + snprintf(cmd, sizeof(cmd), "scp -r -o BatchMode=yes %s:%s %s/ 2>/dev/null", qHost, qSrcFile, qDstDir); + pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) { + uError("repair: vnode%d scp failed for %s", vnodeId, re->name); + taosArrayDestroy(entries); + return -1; + } + char buf[256]; + while (taosGetsCmd(pCmd, sizeof(buf), buf) > 0) { + } + taosCloseCmd(&pCmd); + + // Verify file size after copy + if (!re->isDir) { + char dstPath[PATH_MAX]; + snprintf(dstPath, sizeof(dstPath), "%s%s%s", dstVnodeDir, TD_DIRSEP, re->name); + int64_t dstSize = 0; + if (taosStatFile(dstPath, &dstSize, NULL, NULL) != 0 || dstSize != re->size) { + uError("repair: vnode%d scp size mismatch: %s (src=%" PRId64 " dst=%" PRId64 ")", vnodeId, re->name, re->size, + dstSize); + taosArrayDestroy(entries); + return -1; + } + } + } + taosArrayDestroy(entries); + + // Verify at least vnode.json was copied + char vnodeJson[PATH_MAX]; + snprintf(vnodeJson, sizeof(vnodeJson), "%s%svnode.json", dstVnodeDir, TD_DIRSEP); + if (!taosCheckExistFile(vnodeJson)) { + uError("repair: vnode%d scp failed — vnode.json not found after copy", vnodeId); + return -1; + } + uInfo("repair: vnode%d remote non-tsdb files copied via scp", vnodeId); + return 0; +} + +// Lookup source disk path by disk ID {level, id}. +// Returns NULL if no matching disk found. +static const char *dmGetSourceDiskPath(const SRepairTfs *pTfs, SDiskID did) { + int32_t count = 0; + for (int32_t i = 0; i < pTfs->ndisk; i++) { + if (pTfs->disks[i].level == did.level) { + if (count == did.id) return pTfs->disks[i].dir; + count++; + } + } + return NULL; +} + +// Check if a target TFS disk is disabled, based on global tsDiskCfg[]. +static bool dmIsTgtDiskDisabled(int32_t level, int32_t id) { + if (tsDiskCfgNum <= 0) return false; // fallback single-disk mode: never disabled + int32_t count = 0; + for (int32_t i = 0; i < tsDiskCfgNum; i++) { + if (tsDiskCfg[i].level == level) { + if (count == id) return tsDiskCfg[i].disable; + count++; + } + } + return true; +} + +// Per-tier round-robin state for target disk allocation. +typedef struct SRepairDiskAlloc { + int32_t nextId[TFS_MAX_TIERS]; +} SRepairDiskAlloc; + +// Remap a source file to a target disk using round-robin allocation. +// Rules: same tier if exists → fold to highest available tier → skip disabled → check space. +// Returns 0 on success, -1 if no disk has enough space. +static int32_t dmRemapDiskId(STfs *pTgtTfs, int32_t srcLevel, int64_t fileSize, SRepairDiskAlloc *pAlloc, + SDiskID *pTgtDid) { + int32_t tgtNlevel = tfsGetLevel(pTgtTfs); + int32_t level = srcLevel; + if (level >= tgtNlevel) level = tgtNlevel - 1; + + for (int32_t tryLevel = level; tryLevel >= 0; tryLevel--) { + int32_t ndisk = tfsGetDisksAtLevel(pTgtTfs, tryLevel); + if (ndisk <= 0) continue; + + int32_t startId = pAlloc->nextId[tryLevel]; + for (int32_t attempt = 0; attempt < ndisk; attempt++) { + int32_t id = (startId + attempt) % ndisk; + if (dmIsTgtDiskDisabled(tryLevel, id)) continue; + + SDiskID did = {.level = tryLevel, .id = id}; + const char *diskPath = tfsGetDiskPath(pTgtTfs, did); + if (diskPath == NULL) continue; + + SDiskSize diskSize = {0}; + if (taosGetDiskSize((char *)diskPath, &diskSize) != 0) continue; + if (diskSize.avail < fileSize + tsMinDiskFreeSize) continue; + + pAlloc->nextId[tryLevel] = (id + 1) % ndisk; + *pTgtDid = did; + return 0; + } + } + return -1; +} + +// Get remote file size via ssh stat. +// Returns file size in bytes, or -1 on error. +static int64_t dmGetRemoteFileSize(const char *host, const char *remotePath) { + char qHost[320], qPath[DM_SHELL_QUOTED_PATH_LEN]; + if (dmShellQuote(host, qHost, sizeof(qHost)) < 0 || dmShellQuote(remotePath, qPath, sizeof(qPath)) < 0) return -1; + char cmd[DM_SSH_CMD_BUF_LEN]; + snprintf(cmd, sizeof(cmd), "ssh -o BatchMode=yes %s stat -c %%s %s 2>/dev/null", qHost, qPath); + TdCmdPtr pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) return -1; + + int64_t size = -1; + char buf[64] = {0}; + if (taosGetsCmd(pCmd, sizeof(buf), buf) > 0) { + int32_t len = (int32_t)strlen(buf); + while (len > 0 && (buf[len - 1] == '\n' || buf[len - 1] == '\r')) buf[--len] = '\0'; + size = taosStr2Int64(buf, NULL, 10); + } + taosCloseCmd(&pCmd); + return size; +} + +// Step h: Copy source TSDB file sets to target with disk ID remapping. +// For each file set in copyFids, remap each file's disk ID to a target disk, +// copy the file (local or remote), and verify size. +// On success, *ppRemappedSets is set to a new SArray of SRepairFileSet with +// target disk IDs (caller must free with dmDestroyRepairFileSets). +static int32_t dmCopySourceFileSets(const SRepairTfs *pSrcTfs, STfs *pTgtTfs, const char *host, int32_t vnodeId, + const SArray *srcFileSets, const SArray *copyFids, SArray **ppRemappedSets) { + SRepairDiskAlloc alloc = {0}; + int32_t nCopy = taosArrayGetSize(copyFids); + SArray *remapped = taosArrayInit(nCopy > 0 ? nCopy : 1, sizeof(SRepairFileSet)); + if (remapped == NULL) return -1; + + for (int32_t c = 0; c < nCopy; c++) { + int32_t fid = *(int32_t *)taosArrayGet(copyFids, c); + + // Find source file set for this fid + SRepairFileSet *pSrcSet = NULL; + int32_t nSets = taosArrayGetSize(srcFileSets); + for (int32_t s = 0; s < nSets; s++) { + SRepairFileSet *pSet = taosArrayGet(srcFileSets, s); + if (pSet->fid == fid) { + pSrcSet = pSet; + break; + } + } + if (pSrcSet == NULL) { + uError("repair: vnode%d fid=%d file set not found", vnodeId, fid); + dmDestroyRepairFileSets(remapped); + return -1; + } + + SRepairFileSet newSet = {.fid = fid}; + newSet.files = taosArrayInit(taosArrayGetSize(pSrcSet->files), sizeof(SRepairFile)); + if (newSet.files == NULL) { + uError("repair: vnode%d fid=%d memory allocation failed", vnodeId, fid); + dmDestroyRepairFileSets(remapped); + return -1; + } + + int32_t nFiles = taosArrayGetSize(pSrcSet->files); + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pFile = taosArrayGet(pSrcSet->files, f); + + // Resolve source disk path + const char *srcDiskPath = dmGetSourceDiskPath(pSrcTfs, pFile->did); + if (srcDiskPath == NULL) { + uError("repair: vnode%d fid=%d source disk not found level=%d id=%d", vnodeId, fid, pFile->did.level, + pFile->did.id); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + + char srcPath[PATH_MAX]; + dmBuildTsdbFilePath(srcDiskPath, vnodeId, pFile, srcPath, sizeof(srcPath)); + + // Get source physical file size + int64_t srcSize = 0; + if (host == NULL || host[0] == '\0') { + if (taosStatFile(srcPath, &srcSize, NULL, NULL) != 0 || srcSize <= 0) { + uError("repair: vnode%d cannot stat source file %s", vnodeId, srcPath); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + } else { + srcSize = dmGetRemoteFileSize(host, srcPath); + if (srcSize <= 0) { + uError("repair: vnode%d cannot stat remote source file %s", vnodeId, srcPath); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + } + + // Remap to target disk + SDiskID tgtDid = {0}; + if (dmRemapDiskId(pTgtTfs, pFile->did.level, srcSize, &alloc, &tgtDid) != 0) { + uError("repair: vnode%d fid=%d no disk with enough space for %" PRId64 " bytes", vnodeId, fid, srcSize); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + + // Build target path + SRepairFile tgtFile = *pFile; + tgtFile.did = tgtDid; + const char *tgtDiskPath = tfsGetDiskPath(pTgtTfs, tgtDid); + char dstPath[PATH_MAX]; + dmBuildTsdbFilePath(tgtDiskPath, vnodeId, &tgtFile, dstPath, sizeof(dstPath)); + + // Build display name + const char *suffix = gRepairFTypeSuffixAll[pFile->type]; + char fileName[256]; + if (pFile->lcn > 0) { + snprintf(fileName, sizeof(fileName), "v%df%dver%" PRId64 ".%d.%s", vnodeId, pFile->fid, pFile->cid, pFile->lcn, + suffix); + } else { + snprintf(fileName, sizeof(fileName), "v%df%dver%" PRId64 ".%s", vnodeId, pFile->fid, pFile->cid, suffix); + } + + uInfo("repair: vnode%d copy %s (%" PRId64 " bytes) -> level=%d id=%d", vnodeId, fileName, srcSize, tgtDid.level, + tgtDid.id); + + // Copy file + if (host == NULL || host[0] == '\0') { + int64_t copied = taosCopyFile(srcPath, dstPath); + if (copied < 0) { + uError("repair: vnode%d failed to copy %s", vnodeId, fileName); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + } else { + char qHost[320], qSrc[DM_SHELL_QUOTED_PATH_LEN], qDst[DM_SHELL_QUOTED_PATH_LEN]; + if (dmShellQuote(host, qHost, sizeof(qHost)) < 0 || dmShellQuote(srcPath, qSrc, sizeof(qSrc)) < 0 || + dmShellQuote(dstPath, qDst, sizeof(qDst)) < 0) { + uError("repair: vnode%d shell quote failed for %s", vnodeId, fileName); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + char cmd[DM_SSH_CMD_BUF_LEN]; + snprintf(cmd, sizeof(cmd), "scp -o BatchMode=yes %s:%s %s 2>/dev/null", qHost, qSrc, qDst); + TdCmdPtr pCmd = taosOpenCmd(cmd); + if (pCmd == NULL) { + uError("repair: vnode%d scp failed for %s", vnodeId, fileName); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + char buf[256]; + while (taosGetsCmd(pCmd, sizeof(buf), buf) > 0) { + } + taosCloseCmd(&pCmd); + } + + // Verify destination file size + int64_t dstSize = 0; + if (taosStatFile(dstPath, &dstSize, NULL, NULL) != 0 || dstSize != srcSize) { + uError("repair: vnode%d size mismatch: %s (src=%" PRId64 " dst=%" PRId64 ")", vnodeId, fileName, srcSize, + dstSize); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + + if (taosArrayPush(newSet.files, &tgtFile) == NULL) { + uError("repair: vnode%d fid=%d failed to push target file", vnodeId, pFile->fid); + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + } + + if (taosArrayPush(remapped, &newSet) == NULL) { + taosArrayDestroy(newSet.files); + dmDestroyRepairFileSets(remapped); + return -1; + } + } + + *ppRemappedSets = remapped; + return 0; +} + +// Step i: Generate target current.json from merged file sets. +// Combines retained file sets (from local with original disk IDs) and +// remapped file sets (copied from source with new disk IDs) into one +// current.json written to the target primary disk. +// The JSON format follows save_fs() in tsdbFS2.c. +static int32_t dmGenerateCurrentJson(STfs *pTgtTfs, int32_t vnodeId, const SArray *retainFids, + const SArray *localFileSets, const SArray *remappedSets, + const SArray *srcFileSets) { + // Collect all file sets into one sorted array + // retained: use localFileSets entries with matching fids (they have correct target disk IDs) + // copied: use remappedSets entries (already have target disk IDs) + int32_t nRetain = retainFids ? taosArrayGetSize(retainFids) : 0; + int32_t nRemapped = remappedSets ? taosArrayGetSize(remappedSets) : 0; + int32_t totalSets = nRetain + nRemapped; + + // Build sorted array of {fid, pointer to SRepairFileSet, pointer to source SRepairFileSet} + typedef struct { + int32_t fid; + const SRepairFileSet *pSet; + const SRepairFileSet *pSrcSet; + } FSEntry; + FSEntry *sorted = + (totalSets > 0) ? taosMemoryCalloc(totalSets, sizeof(FSEntry)) : taosMemoryCalloc(1, sizeof(FSEntry)); + if (sorted == NULL) return -1; + + int32_t idx = 0; + // Add retained file sets (from local) + for (int32_t r = 0; r < nRetain; r++) { + int32_t fid = *(int32_t *)taosArrayGet(retainFids, r); + int32_t nLocal = localFileSets ? taosArrayGetSize(localFileSets) : 0; + for (int32_t l = 0; l < nLocal; l++) { + SRepairFileSet *pLocal = taosArrayGet(localFileSets, l); + if (pLocal->fid == fid) { + // Find source file set for timestamps + const SRepairFileSet *pSrcSet = NULL; + int32_t nSrc = srcFileSets ? taosArrayGetSize(srcFileSets) : 0; + for (int32_t s = 0; s < nSrc; s++) { + SRepairFileSet *ps = taosArrayGet(srcFileSets, s); + if (ps->fid == fid) { + pSrcSet = ps; + break; + } + } + sorted[idx++] = (FSEntry){.fid = fid, .pSet = pLocal, .pSrcSet = pSrcSet}; + break; + } + } + } + // Add remapped (copied) file sets + for (int32_t c = 0; c < nRemapped; c++) { + SRepairFileSet *pRemap = taosArrayGet(remappedSets, c); + // Find source file set for timestamps + const SRepairFileSet *pSrcSet = NULL; + int32_t nSrc = srcFileSets ? taosArrayGetSize(srcFileSets) : 0; + for (int32_t s = 0; s < nSrc; s++) { + SRepairFileSet *ps = taosArrayGet(srcFileSets, s); + if (ps->fid == pRemap->fid) { + pSrcSet = ps; + break; + } + } + sorted[idx++] = (FSEntry){.fid = pRemap->fid, .pSet = pRemap, .pSrcSet = pSrcSet}; + } + totalSets = idx; + + // Sort by fid ascending + for (int32_t i = 0; i < totalSets - 1; i++) { + for (int32_t j = i + 1; j < totalSets; j++) { + if (sorted[j].fid < sorted[i].fid) { + FSEntry tmp = sorted[i]; + sorted[i] = sorted[j]; + sorted[j] = tmp; + } + } + } + + // Build JSON: {"fmtv":1, "fset":[...]} + SJson *pRoot = tjsonCreateObject(); + if (pRoot == NULL) { + taosMemoryFree(sorted); + return -1; + } + (void)tjsonAddDoubleToObject(pRoot, "fmtv", 1); + + SJson *pFsetArr = tjsonAddArrayToObject(pRoot, "fset"); + if (pFsetArr == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + + for (int32_t i = 0; i < totalSets; i++) { + const SRepairFileSet *pSet = sorted[i].pSet; + const SRepairFileSet *pSrcSet = sorted[i].pSrcSet; + if (pSet == NULL) continue; + + SJson *pFsetJson = tjsonCreateObject(); + if (pFsetJson == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + (void)tjsonAddItemToArray(pFsetArr, pFsetJson); + (void)tjsonAddDoubleToObject(pFsetJson, "fid", pSet->fid); + + int32_t nFiles = taosArrayGetSize(pSet->files); + + // Serialize non-STT file types (head=0, data=1, sma=2, tomb=3) as named sub-objects + for (int32_t t = 0; t < 4; t++) { + // Find file of this type + const SRepairFile *pFile = NULL; + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pf = taosArrayGet(pSet->files, f); + if (pf->type == t) { + pFile = pf; + break; + } + } + if (pFile == NULL) continue; + + SJson *pFileJson = tjsonCreateObject(); + if (pFileJson == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + (void)tjsonAddItemToObject(pFsetJson, gRepairFTypeSuffixAll[t], pFileJson); + (void)tjsonAddDoubleToObject(pFileJson, "did.level", pFile->did.level); + (void)tjsonAddDoubleToObject(pFileJson, "did.id", pFile->did.id); + (void)tjsonAddDoubleToObject(pFileJson, "lcn", pFile->lcn); + (void)tjsonAddDoubleToObject(pFileJson, "fid", pFile->fid); + (void)tjsonAddDoubleToObject(pFileJson, "cid", (double)pFile->cid); + (void)tjsonAddDoubleToObject(pFileJson, "size", (double)pFile->size); + if (pFile->minVer >= 0 && pFile->minVer <= pFile->maxVer) { + (void)tjsonAddDoubleToObject(pFileJson, "minVer", (double)pFile->minVer); + (void)tjsonAddDoubleToObject(pFileJson, "maxVer", (double)pFile->maxVer); + } + } + + // Serialize STT files grouped by sttLevel as "stt lvl" array + // Collect distinct STT levels + int32_t sttLevels[TSDB_STT_TRIGGER_ARRAY_SIZE] = {0}; + int32_t nSttLevels = 0; + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pf = taosArrayGet(pSet->files, f); + if (pf->type != 5) continue; + bool found = false; + for (int32_t sl = 0; sl < nSttLevels; sl++) { + if (sttLevels[sl] == pf->sttLevel) { + found = true; + break; + } + } + if (!found && nSttLevels < tListLen(sttLevels)) { + sttLevels[nSttLevels++] = pf->sttLevel; + } + } + // Sort STT levels ascending + for (int32_t a = 0; a < nSttLevels - 1; a++) { + for (int32_t b = a + 1; b < nSttLevels; b++) { + if (sttLevels[b] < sttLevels[a]) { + int32_t tmp = sttLevels[a]; + sttLevels[a] = sttLevels[b]; + sttLevels[b] = tmp; + } + } + } + + { + SJson *pSttLvlArr = tjsonAddArrayToObject(pFsetJson, "stt lvl"); + if (pSttLvlArr == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + + for (int32_t sl = 0; sl < nSttLevels; sl++) { + SJson *pLvlJson = tjsonCreateObject(); + if (pLvlJson == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + (void)tjsonAddItemToArray(pSttLvlArr, pLvlJson); + (void)tjsonAddDoubleToObject(pLvlJson, "level", sttLevels[sl]); + + SJson *pFilesArr = tjsonAddArrayToObject(pLvlJson, "files"); + if (pFilesArr == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pf = taosArrayGet(pSet->files, f); + if (pf->type != 5 || pf->sttLevel != sttLevels[sl]) continue; + + SJson *pSttJson = tjsonCreateObject(); + if (pSttJson == NULL) { + tjsonDelete(pRoot); + taosMemoryFree(sorted); + return -1; + } + (void)tjsonAddItemToArray(pFilesArr, pSttJson); + (void)tjsonAddDoubleToObject(pSttJson, "did.level", pf->did.level); + (void)tjsonAddDoubleToObject(pSttJson, "did.id", pf->did.id); + (void)tjsonAddDoubleToObject(pSttJson, "lcn", pf->lcn); + (void)tjsonAddDoubleToObject(pSttJson, "fid", pf->fid); + (void)tjsonAddDoubleToObject(pSttJson, "cid", (double)pf->cid); + (void)tjsonAddDoubleToObject(pSttJson, "size", (double)pf->size); + if (pf->minVer >= 0 && pf->minVer <= pf->maxVer) { + (void)tjsonAddDoubleToObject(pSttJson, "minVer", (double)pf->minVer); + (void)tjsonAddDoubleToObject(pSttJson, "maxVer", (double)pf->maxVer); + } + (void)tjsonAddDoubleToObject(pSttJson, "level", pf->sttLevel); + } + } + } + + // Add file set level timestamps from source + int64_t lastCompact = pSrcSet ? pSrcSet->lastCompact : 0; + int64_t lastCommit = pSrcSet ? pSrcSet->lastCommit : 0; + (void)tjsonAddDoubleToObject(pFsetJson, "last compact", (double)lastCompact); + (void)tjsonAddDoubleToObject(pFsetJson, "last commit", (double)lastCommit); + } + + // Serialize to string and write to file + char *jsonStr = tjsonToString(pRoot); + tjsonDelete(pRoot); + taosMemoryFree(sorted); + if (jsonStr == NULL) { + uError("repair: vnode%d failed to serialize current.json", vnodeId); + return -1; + } + + const char *primaryPath = tfsGetPrimaryPath(pTgtTfs); + char outPath[PATH_MAX]; + snprintf(outPath, sizeof(outPath), "%s%svnode%svnode%d%stsdb%scurrent.json", primaryPath, TD_DIRSEP, TD_DIRSEP, + vnodeId, TD_DIRSEP, TD_DIRSEP); + + TdFilePtr pFile = taosOpenFile(outPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pFile == NULL) { + uError("repair: vnode%d failed to create current.json: %s", vnodeId, outPath); + taosMemoryFree(jsonStr); + return -1; + } + + int32_t ret = 0; + int64_t len = (int64_t)strlen(jsonStr); + int64_t written = taosWriteFile(pFile, jsonStr, len); + if (written != len) { + uError("repair: vnode%d failed to write current.json (wrote %" PRId64 "/%" PRId64 ")", vnodeId, written, len); + ret = -1; + } + if (taosFsyncFile(pFile) != 0) { + uError("repair: vnode%d failed to fsync current.json", vnodeId); + ret = -1; + } + if (taosCloseFile(&pFile) != 0) { + uError("repair: vnode%d failed to close current.json", vnodeId); + ret = -1; + } + taosMemoryFree(jsonStr); + + if (ret == 0) { + uInfo("repair: vnode%d current.json generated (%d file set(s))", vnodeId, totalSets); + } + return ret; +} + +// Step j: Update syncCfg.myIndex in vnode.json and raft_config.json. +// Finds the local dnodeId in nodeInfo[] and sets myIndex to that position. +static int32_t dmUpdateSyncIndex(STfs *pTgtTfs, int32_t vnodeId, int32_t dnodeId) { + const char *primaryPath = tfsGetPrimaryPath(pTgtTfs); + + // --- Update vnode.json --- + char vnodeJsonPath[PATH_MAX]; + snprintf(vnodeJsonPath, sizeof(vnodeJsonPath), "%s%svnode%svnode%d%svnode.json", primaryPath, TD_DIRSEP, TD_DIRSEP, + vnodeId, TD_DIRSEP); + + char *content = NULL; + if (dmReadFileContent(vnodeJsonPath, &content, NULL) != 0) { + uError("repair: vnode%d failed to read vnode.json", vnodeId); + return -1; + } + + SJson *pRoot = tjsonParse(content); + taosMemoryFree(content); + if (pRoot == NULL) { + uError("repair: vnode%d failed to parse vnode.json", vnodeId); + return -1; + } + + SJson *pConfig = tjsonGetObjectItem(pRoot, "config"); + if (pConfig == NULL) { + uError("repair: vnode%d vnode.json missing 'config'", vnodeId); + tjsonDelete(pRoot); + return -1; + } + + // Find myIndex by matching dnodeId in syncCfg.nodeInfo[] + SJson *pNodeInfoArr = tjsonGetObjectItem(pConfig, "syncCfg.nodeInfo"); + int32_t myIndex = -1; + if (pNodeInfoArr != NULL) { + int32_t nNodes = tjsonGetArraySize(pNodeInfoArr); + for (int32_t i = 0; i < nNodes; i++) { + SJson *pNode = tjsonGetArrayItem(pNodeInfoArr, i); + int32_t nodeId = 0; + int32_t code = 0; + tjsonGetNumberValue(pNode, "nodeId", nodeId, code); + if (code >= 0 && nodeId == dnodeId) { + myIndex = i; + break; + } + } + } + + if (myIndex < 0) { + uError("repair: vnode%d dnodeId %d not found in syncCfg.nodeInfo", vnodeId, dnodeId); + tjsonDelete(pRoot); + return -1; + } + + // Replace syncCfg.myIndex + tjsonDeleteItemFromObject(pConfig, "syncCfg.myIndex"); + (void)tjsonAddIntegerToObject(pConfig, "syncCfg.myIndex", myIndex); + + // Write back vnode.json + char *jsonStr = tjsonToString(pRoot); + tjsonDelete(pRoot); + if (jsonStr == NULL) { + uError("repair: vnode%d failed to serialize vnode.json", vnodeId); + return -1; + } + + TdFilePtr pFile = taosOpenFile(vnodeJsonPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pFile == NULL) { + uError("repair: vnode%d failed to open vnode.json for write", vnodeId); + taosMemoryFree(jsonStr); + return -1; + } + int32_t ret = 0; + int64_t len = (int64_t)strlen(jsonStr); + int64_t written = taosWriteFile(pFile, jsonStr, len); + if (written != len) { + uError("repair: vnode%d failed to write vnode.json (wrote %" PRId64 "/%" PRId64 ")", vnodeId, written, len); + ret = -1; + } + if (taosFsyncFile(pFile) != 0) { + uError("repair: vnode%d failed to fsync vnode.json", vnodeId); + ret = -1; + } + if (taosCloseFile(&pFile) != 0) { + uError("repair: vnode%d failed to close vnode.json", vnodeId); + ret = -1; + } + taosMemoryFree(jsonStr); + if (ret != 0) { + return ret; + } + uInfo("repair: vnode%d vnode.json syncCfg.myIndex updated to %d", vnodeId, myIndex); + + // --- Update raft_config.json --- + char raftCfgPath[PATH_MAX]; + snprintf(raftCfgPath, sizeof(raftCfgPath), "%s%svnode%svnode%d%ssync%sraft_config.json", primaryPath, TD_DIRSEP, + TD_DIRSEP, vnodeId, TD_DIRSEP, TD_DIRSEP); + + content = NULL; + if (dmReadFileContent(raftCfgPath, &content, NULL) != 0) { + // raft_config.json may not exist for single-replica vnodes — not an error + uInfo("repair: vnode%d raft_config.json not found, skipping", vnodeId); + return 0; + } + + pRoot = tjsonParse(content); + taosMemoryFree(content); + if (pRoot == NULL) { + uError("repair: vnode%d failed to parse raft_config.json", vnodeId); + return -1; + } + + SJson *pRaftCfg = tjsonGetObjectItem(pRoot, "RaftCfg"); + SJson *pSyncCfg = pRaftCfg ? tjsonGetObjectItem(pRaftCfg, "SSyncCfg") : NULL; + if (pSyncCfg == NULL) { + uError("repair: vnode%d raft_config.json missing RaftCfg.SSyncCfg", vnodeId); + tjsonDelete(pRoot); + return -1; + } + + // Find myIndex by matching dnodeId in nodeInfo[] + SJson *pRaftNodeInfo = tjsonGetObjectItem(pSyncCfg, "nodeInfo"); + int32_t raftMyIndex = -1; + if (pRaftNodeInfo != NULL) { + int32_t nNodes = tjsonGetArraySize(pRaftNodeInfo); + for (int32_t i = 0; i < nNodes; i++) { + SJson *pNode = tjsonGetArrayItem(pRaftNodeInfo, i); + int32_t nodeId = 0; + int32_t code = 0; + tjsonGetNumberValue(pNode, "nodeId", nodeId, code); + if (code >= 0 && nodeId == dnodeId) { + raftMyIndex = i; + break; + } + } + } + + if (raftMyIndex < 0) { + uError("repair: vnode%d dnodeId %d not found in raft_config nodeInfo", vnodeId, dnodeId); + tjsonDelete(pRoot); + return -1; + } + + // Replace myIndex in SSyncCfg + tjsonDeleteItemFromObject(pSyncCfg, "myIndex"); + (void)tjsonAddDoubleToObject(pSyncCfg, "myIndex", raftMyIndex); + + // Write back raft_config.json + jsonStr = tjsonToString(pRoot); + tjsonDelete(pRoot); + if (jsonStr == NULL) { + uError("repair: vnode%d failed to serialize raft_config.json", vnodeId); + return -1; + } + + pFile = taosOpenFile(raftCfgPath, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_WRITE_THROUGH); + if (pFile == NULL) { + uError("repair: vnode%d failed to open raft_config.json for write", vnodeId); + taosMemoryFree(jsonStr); + return -1; + } + ret = 0; + len = (int64_t)strlen(jsonStr); + written = taosWriteFile(pFile, jsonStr, len); + if (written != len) { + uError("repair: vnode%d failed to write raft_config.json (wrote %" PRId64 "/%" PRId64 ")", vnodeId, written, len); + ret = -1; + } + if (taosFsyncFile(pFile) != 0) { + uError("repair: vnode%d failed to fsync raft_config.json", vnodeId); + ret = -1; + } + if (taosCloseFile(&pFile) != 0) { + uError("repair: vnode%d failed to close raft_config.json", vnodeId); + ret = -1; + } + taosMemoryFree(jsonStr); + if (ret != 0) { + return ret; + } + uInfo("repair: vnode%d raft_config.json myIndex updated to %d", vnodeId, raftMyIndex); + return 0; +} + +// Step k: Clean sync state — delete raft_store.json and *.bak files in sync/. +static int32_t dmCleanSyncState(STfs *pTgtTfs, int32_t vnodeId) { + const char *primaryPath = tfsGetPrimaryPath(pTgtTfs); + char syncDir[PATH_MAX]; + snprintf(syncDir, sizeof(syncDir), "%s%svnode%svnode%d%ssync", primaryPath, TD_DIRSEP, TD_DIRSEP, vnodeId, TD_DIRSEP); + + // Delete raft_store.json + char raftStore[PATH_MAX]; + snprintf(raftStore, sizeof(raftStore), "%s%sraft_store.json", syncDir, TD_DIRSEP); + if (taosCheckExistFile(raftStore)) { + (void)taosRemoveFile(raftStore); + uInfo("repair: vnode%d deleted raft_store.json", vnodeId); + } + + // Delete *.bak files in sync/ + TdDirPtr pDir = taosOpenDir(syncDir); + if (pDir == NULL) return 0; // sync dir may not exist + + TdDirEntryPtr pEntry; + while ((pEntry = taosReadDir(pDir)) != NULL) { + char *name = taosGetDirEntryName(pEntry); + if (name == NULL) continue; + int32_t nameLen = (int32_t)strlen(name); + if (nameLen > 4 && strcmp(name + nameLen - 4, ".bak") == 0) { + char bakPath[PATH_MAX]; + snprintf(bakPath, sizeof(bakPath), "%s%s%s", syncDir, TD_DIRSEP, name); + (void)taosRemoveFile(bakPath); + uInfo("repair: vnode%d deleted sync/%s", vnodeId, name); + } + } + taosCloseDir(&pDir); + return 0; +} + +// Step l: Delete vnodeN.bak on all target disks. +static void dmDeleteBackup(STfs *pTgtTfs, int32_t vnodeId) { + char relBak[TSDB_FILENAME_LEN]; + snprintf(relBak, sizeof(relBak), "vnode%svnode%d.bak", TD_DIRSEP, vnodeId); + + int32_t nlevel = tfsGetLevel(pTgtTfs); + for (int32_t level = 0; level < nlevel; level++) { + int32_t ndisk = tfsGetDisksAtLevel(pTgtTfs, level); + for (int32_t id = 0; id < ndisk; id++) { + SDiskID did = {.level = level, .id = id}; + const char *diskPath = tfsGetDiskPath(pTgtTfs, did); + char fullPath[PATH_MAX]; + snprintf(fullPath, sizeof(fullPath), "%s%s%s", diskPath, TD_DIRSEP, relBak); + if (taosDirExist(fullPath)) { + taosRemoveDir(fullPath); + } + } + } + uInfo("repair: vnode%d backup deleted", vnodeId); +} + +// Step m: Rollback on failure — restore vnodeN.bak to vnodeN on all disks. +// For each disk independently: +// - Both vnodeN and vnodeN.bak exist: delete vnodeN, rename vnodeN.bak → vnodeN +// - Only vnodeN.bak exists: rename vnodeN.bak → vnodeN +// - Only vnodeN exists (or neither): do nothing +static void dmRollbackVnode(STfs *pTgtTfs, int32_t vnodeId) { + char relVnode[TSDB_FILENAME_LEN]; + char relBak[TSDB_FILENAME_LEN]; + snprintf(relVnode, sizeof(relVnode), "vnode%svnode%d", TD_DIRSEP, vnodeId); + snprintf(relBak, sizeof(relBak), "vnode%svnode%d.bak", TD_DIRSEP, vnodeId); + + int32_t nlevel = tfsGetLevel(pTgtTfs); + for (int32_t level = 0; level < nlevel; level++) { + int32_t ndisk = tfsGetDisksAtLevel(pTgtTfs, level); + for (int32_t id = 0; id < ndisk; id++) { + SDiskID did = {.level = level, .id = id}; + const char *diskPath = tfsGetDiskPath(pTgtTfs, did); + + char vnodePath[PATH_MAX]; + char bakPath[PATH_MAX]; + snprintf(vnodePath, sizeof(vnodePath), "%s%s%s", diskPath, TD_DIRSEP, relVnode); + snprintf(bakPath, sizeof(bakPath), "%s%s%s", diskPath, TD_DIRSEP, relBak); + + bool hasVnode = taosDirExist(vnodePath); + bool hasBak = taosDirExist(bakPath); + + if (hasVnode && hasBak) { + taosRemoveDir(vnodePath); + if (taosRenameFile(bakPath, vnodePath) != 0) { + uError("repair: vnode%d failed to rollback %s (rename failed)", vnodeId, vnodePath); + } + } else if (!hasVnode && hasBak) { + if (taosRenameFile(bakPath, vnodePath) != 0) { + uError("repair: vnode%d failed to rollback %s (rename failed)", vnodeId, vnodePath); + } + } + // If only vnodeN exists (no .bak) or neither exists: do nothing + } + } + uInfo("repair: vnode%d rollback completed", vnodeId); +} + +int32_t dmRepairCopyMode(const SRepairCopyOpts *pOpts) { +#ifdef WINDOWS + + uError("repair: copy-mode repair is not supported on Windows"); + return 2; + +#else + + bool isRemote = (pOpts->sourceHost[0] != '\0'); + + uInfo("repair: starting copy-mode repair (%s mode)", isRemote ? "remote" : "local"); + uInfo("repair: source config: %s", pOpts->sourceCfg); + if (isRemote) { + uInfo("repair: source host: %s", pOpts->sourceHost); + } + int32_t nVnodes = taosArrayGetSize(pOpts->vnodeIds); + uInfo("repair: number of vnodes to repair: %d", nVnodes); + + // Phase 2: Parse source config file + // If sourceCfg is a directory (not a regular file), use taos.cfg inside it + char resolvedCfg[PATH_MAX] = {0}; + tstrncpy(resolvedCfg, pOpts->sourceCfg, sizeof(resolvedCfg)); + if (!isRemote && taosIsDir(resolvedCfg)) { + size_t len = strlen(pOpts->sourceCfg); + const char *sep = (len > 0 && pOpts->sourceCfg[len - 1] == TD_DIRSEP_CHAR) ? "" : TD_DIRSEP; + snprintf(resolvedCfg, sizeof(resolvedCfg), "%s%staos.cfg", pOpts->sourceCfg, sep); + uInfo("repair: source-cfg is a directory, using %s", resolvedCfg); + } + + const char *cfgPathToLoad = resolvedCfg; + char tmpCfgPath[PATH_MAX] = {0}; + + if (isRemote) { + // Fetch remote config via SSH + snprintf(tmpCfgPath, sizeof(tmpCfgPath), "/tmp/tdrepair_%d.cfg", (int)taosGetPId()); + if (dmSshFetchFile(pOpts->sourceHost, resolvedCfg, tmpCfgPath) != 0) { + // Retry with taos.cfg appended in case sourceCfg is a remote directory + char remoteCfgRetry[PATH_MAX]; + size_t len = strlen(pOpts->sourceCfg); + const char *sep = (len > 0 && pOpts->sourceCfg[len - 1] == TD_DIRSEP_CHAR) ? "" : TD_DIRSEP; + snprintf(remoteCfgRetry, sizeof(remoteCfgRetry), "%s%staos.cfg", pOpts->sourceCfg, sep); + if (dmSshFetchFile(pOpts->sourceHost, remoteCfgRetry, tmpCfgPath) != 0) { + uError("repair: failed to fetch remote config via SSH (exit code 2)"); + return 2; + } + uInfo("repair: source-cfg is a remote directory, using %s", remoteCfgRetry); + } + cfgPathToLoad = tmpCfgPath; + } + + SDiskCfg *srcDisks = NULL; + int32_t srcDiskNum = 0; + int32_t code = dmParseSourceCfg(cfgPathToLoad, &srcDisks, &srcDiskNum); + if (tmpCfgPath[0] != '\0') { + (void)taosRemoveFile(tmpCfgPath); + } + if (code != 0) { + uError("repair: failed to parse source config file"); + return isRemote ? 2 : 1; + } + + uInfo("repair: source config has %d disk(s)", srcDiskNum); + + // Build source TFS model + SRepairTfs srcTfs = {0}; + if (dmBuildRepairTfs(srcDisks, srcDiskNum, &srcTfs) != 0) { + uError("repair: failed to build source TFS model"); + taosMemoryFree(srcDisks); + return 1; + } + taosMemoryFree(srcDisks); + + uInfo("repair: source TFS: %d level(s), %d disk(s), primary=%s", srcTfs.nlevel, srcTfs.ndisk, + srcTfs.disks[srcTfs.primaryIdx].dir); + + // Validate source disk paths exist + if (isRemote) { + if (dmValidateSourceDisksRemote(pOpts->sourceHost, &srcTfs) != 0) { + uError("repair: remote source disk validation failed"); + dmDestroyRepairTfs(&srcTfs); + return 2; + } + } else { + if (dmValidateSourceDisksLocal(&srcTfs) != 0) { + uError("repair: local source disk validation failed"); + dmDestroyRepairTfs(&srcTfs); + return 1; + } + } + + uInfo("repair: source disk validation passed"); + + // Phase 3: Load dnode info and open target TFS + int32_t dnodeId = 0; + if (dmLoadDnodeInfo(&dnodeId) != 0) { + uError("repair: failed to load dnode.json (exit code 1)"); + dmDestroyRepairTfs(&srcTfs); + return 1; + } + uInfo("repair: local dnodeId = %d", dnodeId); + + STfs *pTgtTfs = NULL; + if (dmOpenTargetTfs(&pTgtTfs) != 0) { + uError("repair: failed to open target TFS (exit code 1)"); + dmDestroyRepairTfs(&srcTfs); + return 1; + } + uInfo("repair: target TFS: %d level(s), primary=%s", tfsGetLevel(pTgtTfs), tfsGetPrimaryPath(pTgtTfs)); + + // Phase 4: Per-vnode repair loop + const char *remoteHost = isRemote ? pOpts->sourceHost : NULL; + SRepairVnodeResult *vnResults = taosMemoryCalloc(nVnodes, sizeof(SRepairVnodeResult)); + if (vnResults == NULL) { + uError("repair: memory allocation failed"); + tfsClose(pTgtTfs); + dmDestroyRepairTfs(&srcTfs); + return 1; + } + + for (int32_t v = 0; v < nVnodes; v++) { + int32_t vnodeId = *(int32_t *)taosArrayGet(pOpts->vnodeIds, v); + uInfo("repair: === vnode%d [%d/%d] ===", vnodeId, v + 1, nVnodes); + + // Step a: Check for existing .bak on any target disk + if (dmCheckBakExists(pTgtTfs, vnodeId)) { + uInfo("repair: vnode%d SKIPPED — vnode%d.bak already exists on target", vnodeId, vnodeId); + vnResults[v].result = 2; + vnResults[v].reason = "vnode.bak already exists"; + continue; + } + + // Step b: Read and parse source current.json + SArray *srcFileSets = dmReadSourceCurrentJson(&srcTfs, remoteHost, vnodeId); + if (srcFileSets == NULL) { + uInfo("repair: vnode%d SKIPPED — source current.json not found or unreadable", vnodeId); + vnResults[v].result = 2; + vnResults[v].reason = "source current.json not found"; + continue; + } + + int32_t nSets = taosArrayGetSize(srcFileSets); + int32_t nTotalFiles = 0; + for (int32_t s = 0; s < nSets; s++) { + SRepairFileSet *pSet = taosArrayGet(srcFileSets, s); + int32_t nFiles = taosArrayGetSize(pSet->files); + nTotalFiles += nFiles; + for (int32_t f = 0; f < nFiles; f++) { + SRepairFile *pf = taosArrayGet(pSet->files, f); + if (pf->lcn > 1) vnResults[v].hasS3File = true; + } + } + uInfo("repair: vnode%d source has %d file set(s), %d file(s) total", vnodeId, nSets, nTotalFiles); + + // Step c: Read local current.json and diff against source + SArray *localFileSets = dmReadLocalCurrentJson(pTgtTfs, vnodeId); + SArray *copyFids = NULL; + SArray *retainFids = NULL; + if (dmDiffFileSets(srcFileSets, localFileSets, pTgtTfs, vnodeId, ©Fids, &retainFids) != 0) { + uError("repair: vnode%d FAILED \xe2\x80\x94 memory allocation failed in diff", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "memory allocation failed"; + goto _vnodeCleanup; + } + + int32_t nCopy = taosArrayGetSize(copyFids); + int32_t nRetain = taosArrayGetSize(retainFids); + if (localFileSets != NULL) { + uInfo("repair: vnode%d local current.json found: %d file set(s)", vnodeId, (int)taosArrayGetSize(localFileSets)); + } else { + uInfo("repair: vnode%d local current.json not found, will copy all", vnodeId); + } + uInfo("repair: vnode%d file sets to copy: %d, to retain: %d", vnodeId, nCopy, nRetain); + + // Declare here so it's initialized before any goto _vnodeCleanup + SArray *remappedSets = NULL; + + // Step d: Backup vnodeN → vnodeN.bak on all disks + if (dmBackupVnode(pTgtTfs, vnodeId) != 0) { + uError("repair: vnode%d FAILED — backup failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "backup failed"; + goto _vnodeCleanup; + } + uInfo("repair: vnode%d backup completed", vnodeId); + + // Step e: Create vnodeN directories on all disks + if (dmCreateVnodeDirs(pTgtTfs, vnodeId) != 0) { + uError("repair: vnode%d FAILED — failed to create directories", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "create directories failed"; + goto _vnodeCleanup; + } + uInfo("repair: vnode%d directories created", vnodeId); + + // Step f: Hard-link retained tsdb files from backup + if (nRetain > 0 && localFileSets != NULL) { + if (dmHardLinkRetainedFiles(pTgtTfs, vnodeId, retainFids, localFileSets) != 0) { + uError("repair: vnode%d FAILED — hard-link retained files failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "hard-link retained files failed"; + goto _vnodeCleanup; + } + uInfo("repair: vnode%d hard-linked %d retained file set(s)", vnodeId, nRetain); + } + + // Step g: Copy non-tsdb files from source to target primary disk + if (dmCopyNonTsdbFiles(&srcTfs, pTgtTfs, remoteHost, vnodeId) != 0) { + uError("repair: vnode%d FAILED — copy non-tsdb files failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "copy non-tsdb files failed"; + goto _vnodeCleanup; + } + uInfo("repair: vnode%d non-tsdb files copied", vnodeId); + + // Step h: Copy source TSDB file sets with disk ID remapping + if (nCopy > 0) { + if (dmCopySourceFileSets(&srcTfs, pTgtTfs, remoteHost, vnodeId, srcFileSets, copyFids, &remappedSets) != 0) { + uError("repair: vnode%d FAILED — copy source file sets failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "copy tsdb file sets failed"; + goto _vnodeCleanup; + } + uInfo("repair: vnode%d copied %d file set(s)", vnodeId, nCopy); + } + + // Step i: Generate target current.json + if (dmGenerateCurrentJson(pTgtTfs, vnodeId, retainFids, localFileSets, remappedSets, srcFileSets) != 0) { + uError("repair: vnode%d FAILED — generate current.json failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "generate current.json failed"; + goto _vnodeCleanup; + } + + // Step j: Update syncCfg.myIndex in vnode.json and raft_config.json + if (dmUpdateSyncIndex(pTgtTfs, vnodeId, dnodeId) != 0) { + uError("repair: vnode%d FAILED — update sync index failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "update sync index failed"; + goto _vnodeCleanup; + } + + // Step k: Clean sync state + if (dmCleanSyncState(pTgtTfs, vnodeId) != 0) { + uError("repair: vnode%d FAILED — clean sync state failed", vnodeId); + vnResults[v].result = 1; + vnResults[v].reason = "clean sync state failed"; + goto _vnodeCleanup; + } + + // Step l: Delete backup + dmDeleteBackup(pTgtTfs, vnodeId); + + _vnodeCleanup: + if (vnResults[v].result == 1) { + dmRollbackVnode(pTgtTfs, vnodeId); + } + dmDestroyRepairFileSets(remappedSets); + taosArrayDestroy(copyFids); + taosArrayDestroy(retainFids); + dmDestroyRepairFileSets(localFileSets); + dmDestroyRepairFileSets(srcFileSets); + } + + // Phase 5: Summary report + uInfo("repair: ========== SUMMARY =========="); + bool hasS3Files = false; + int32_t nSuccess = 0, nSkipped = 0, nFailed = 0; + for (int32_t v = 0; v < nVnodes; v++) { + int32_t vnodeId = *(int32_t *)taosArrayGet(pOpts->vnodeIds, v); + if (vnResults[v].hasS3File) hasS3Files = true; + if (vnResults[v].result == 1) { + nFailed++; + uInfo("repair: vnode%-6d FAILED (%s)", vnodeId, vnResults[v].reason); + } else if (vnResults[v].result == 2) { + nSkipped++; + uInfo("repair: vnode%-6d SKIPPED (%s)", vnodeId, vnResults[v].reason); + } else { + nSuccess++; + uInfo("repair: vnode%-6d SUCCESS", vnodeId); + } + } + uInfo("repair: total=%d success=%d skipped=%d failed=%d", nVnodes, nSuccess, nSkipped, nFailed); + if (hasS3Files) { + uInfo("repair: WARNING — some source files have lcn > 1 (S3 multi-chunk)."); + uInfo("repair: Only the last chunk was copied. You must manually sync the"); + uInfo("repair: remaining chunks."); + } + uInfo("repair: ================================"); + + taosMemoryFree(vnResults); + tfsClose(pTgtTfs); + dmDestroyRepairTfs(&srcTfs); + + if (nFailed > 0 && nSuccess == 0) return 4; + if (nFailed > 0) return 3; + return 0; + +#endif // WINDOWS +} diff --git a/source/dnode/mgmt/exe/dmMain.c b/source/dnode/mgmt/exe/dmMain.c index 16ad67ed9eda..a5833b2d6bf7 100644 --- a/source/dnode/mgmt/exe/dmMain.c +++ b/source/dnode/mgmt/exe/dmMain.c @@ -24,6 +24,7 @@ #include "dmUtil.h" #include "tcs.h" #include "qworker.h" +#include "dmRepairCopy.h" #ifdef TD_JEMALLOC_ENABLED #define ALLOW_FORBID_FUNC @@ -69,6 +70,7 @@ static struct { int64_t startTime; bool generateCode; char encryptKey[ENCRYPT_KEY_LEN + 1]; + SRepairCopyOpts repairCopy; } global = {0}; static void dmSetDebugFlag(int32_t signum, void *sigInfo, void *context) { (void)taosSetGlobalDebugFlag(143); } @@ -232,7 +234,53 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { return TSDB_CODE_INVALID_CFG; } } else if (strcmp(argv[i], "-r") == 0) { - generateNewMeta = true; + global.repairCopy.enabled = true; + } else if (strcmp(argv[i], "--mode") == 0) { + if (i < argc - 1) { + tstrncpy(global.repairCopy.modeStr, argv[++i], sizeof(global.repairCopy.modeStr)); + } else { + printf("'--mode' requires a parameter\n"); + return TSDB_CODE_INVALID_CFG; + } + } else if (strcmp(argv[i], "--node-type") == 0) { + if (i < argc - 1) { + tstrncpy(global.repairCopy.nodeType, argv[++i], sizeof(global.repairCopy.nodeType)); + } else { + printf("'--node-type' requires a parameter\n"); + return TSDB_CODE_INVALID_CFG; + } + } else if (strcmp(argv[i], "--source-host") == 0) { + if (i < argc - 1) { + tstrncpy(global.repairCopy.sourceHost, argv[++i], sizeof(global.repairCopy.sourceHost)); + } else { + printf("'--source-host' requires a parameter\n"); + return TSDB_CODE_INVALID_CFG; + } + } else if (strcmp(argv[i], "--source-cfg") == 0) { + if (i < argc - 1) { + if (strlen(argv[++i]) >= PATH_MAX) { + printf("source config file path overflow\n"); + return TSDB_CODE_INVALID_CFG; + } + tstrncpy(global.repairCopy.sourceCfg, argv[i], PATH_MAX); + } else { + printf("'--source-cfg' requires a parameter\n"); + return TSDB_CODE_INVALID_CFG; + } + } else if (strcmp(argv[i], "--vnode") == 0) { + if (global.repairCopy.vnodeIds != NULL) { + printf("duplicate --vnode argument\n"); + return TSDB_CODE_INVALID_CFG; + } else if (i < argc - 1) { + global.repairCopy.vnodeIds = dmParseVnodeIds(argv[++i]); + if (global.repairCopy.vnodeIds == NULL) { + printf("invalid --vnode format: '%s'\n", argv[i]); + return TSDB_CODE_INVALID_CFG; + } + } else { + printf("'--vnode' requires a parameter\n"); + return TSDB_CODE_INVALID_CFG; + } } else if (strcmp(argv[i], "-E") == 0) { if (i < argc - 1) { if (strlen(argv[++i]) >= PATH_MAX) { @@ -314,6 +362,43 @@ static int32_t dmParseArgs(int32_t argc, char const *argv[]) { } } + // Resolve -r flag: if --mode is specified, route to repair copy mode; + // otherwise fall back to legacy meta regeneration behavior. + if (global.repairCopy.enabled) { + if (global.repairCopy.modeStr[0] == '\0') { + // -r without --mode: legacy meta regeneration + if (global.repairCopy.vnodeIds != NULL) { + taosArrayDestroy(global.repairCopy.vnodeIds); + global.repairCopy.vnodeIds = NULL; + } + global.repairCopy.nodeType[0] = '\0'; + global.repairCopy.sourceCfg[0] = '\0'; + global.repairCopy.modeStr[0] = '\0'; + global.repairCopy.enabled = false; + generateNewMeta = true; + } else if (strcmp(global.repairCopy.modeStr, "copy") == 0) { + // -r --mode copy: validate required args + if (strcmp(global.repairCopy.nodeType, "vnode") != 0) { + printf("--mode copy requires --node-type vnode\n"); + return TSDB_CODE_INVALID_CFG; + } + if (global.repairCopy.sourceCfg[0] == '\0') { + printf("--mode copy requires --source-cfg\n"); + return TSDB_CODE_INVALID_CFG; + } + if (global.repairCopy.vnodeIds == NULL || taosArrayGetSize(global.repairCopy.vnodeIds) == 0) { + printf("--mode copy requires --vnode\n"); + return TSDB_CODE_INVALID_CFG; + } + } else { + printf("unsupported repair mode: '%s'\n", global.repairCopy.modeStr); + return TSDB_CODE_INVALID_CFG; + } + } else if (global.repairCopy.modeStr[0] != '\0') { + printf("--mode requires -r flag\n"); + return TSDB_CODE_INVALID_CFG; + } + return 0; } @@ -569,6 +654,17 @@ int mainWindows(int argc, char **argv) { } osSetProcPath(argc, (char **)argv); + + if (global.repairCopy.enabled) { + code = dmRepairCopyMode(&global.repairCopy); + taosArrayDestroy(global.repairCopy.vnodeIds); + taosCleanupCfg(); + taosCloseLog(); + taosCleanupArgs(); + taosConvDestroy(); + return code; + } + taosCleanupArgs(); if ((code = dmGetEncryptKey()) != 0) { diff --git a/test/cases/50-Others/02-Repair/test_copy_repair.py b/test/cases/50-Others/02-Repair/test_copy_repair.py new file mode 100644 index 000000000000..302a6b278db9 --- /dev/null +++ b/test/cases/50-Others/02-Repair/test_copy_repair.py @@ -0,0 +1,1343 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +from new_test_framework.utils import tdLog, tdSql, tdCom, tdDnodes +import getpass +import json +import os +import shutil +import subprocess + +import pytest + + +def _get_sim_path(): + """Return /sim, same as the test framework.""" + self_path = os.path.dirname(os.path.realpath(__file__)) + if "community" in self_path: + proj_path = self_path[:self_path.find("community")] + else: + proj_path = self_path[:self_path.find("test")] + return os.path.join(proj_path, "sim") + + +SIM_PATH = _get_sim_path() + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def write_file(path, content): + """Write text content to a file, creating parent dirs.""" + os.makedirs(os.path.dirname(path), exist_ok=True) + if isinstance(content, bytes): + with open(path, "wb") as f: + f.write(content) + else: + with open(path, "w") as f: + f.write(content) + + +def read_file(path): + with open(path, "r") as f: + return f.read() + + +def read_bin(path): + with open(path, "rb") as f: + return f.read() + + +def make_fake_file(path, size, seed=None): + """Create a file filled with deterministic data.""" + os.makedirs(os.path.dirname(path), exist_ok=True) + if seed is None: + seed = abs(hash(path)) & 0xFF + data = bytes([(seed + i) & 0xFF for i in range(size)]) + with open(path, "wb") as f: + f.write(data) + return data + + +def files_equal(path_a, path_b): + """Check if two files are byte-identical.""" + return read_bin(path_a) == read_bin(path_b) + + +def make_taos_cfg(cfg_dir, data_dirs, log_dir, extra=None): + """Write a taos.cfg file. + + data_dirs: list of (path, level, primary) tuples. + """ + os.makedirs(cfg_dir, exist_ok=True) + lines = [ + "firstEp localhost:6030", + f"logDir {log_dir}", + ] + for path, level, primary in data_dirs: + lines.append(f"dataDir {path} {level} {primary}") + if extra: + for k, v in extra.items(): + lines.append(f"{k} {v}") + write_file(os.path.join(cfg_dir, "taos.cfg"), "\n".join(lines) + "\n") + + +def make_dnode_json(data_dir, dnode_id): + """Create dnode/dnode.json with the given dnodeId.""" + dnode_dir = os.path.join(data_dir, "dnode") + os.makedirs(dnode_dir, exist_ok=True) + content = json.dumps({"dnodeId": dnode_id}) + write_file(os.path.join(dnode_dir, "dnode.json"), content) + + +def make_vnode_json(vnode_dir, vnode_id, dnode_ids, my_index=0): + """Create a vnode.json that repair code can parse. + + The repair code looks for: + - config.syncCfg.nodeInfo[] with "nodeId" per entry (string-encoded int) + - config.syncCfg.myIndex (string-encoded int) + Values are written as string-encoded integers to match tjsonAddIntegerToObject. + """ + node_info = [] + for did in dnode_ids: + node_info.append({ + "nodeId": str(did), + "clusterId": "0", + "nodeFqdn": "localhost", + "nodePort": "6030", + }) + vnode_json = { + "config": { + "syncCfg.myIndex": str(my_index), + "syncCfg.nodeInfo": node_info, + } + } + write_file(os.path.join(vnode_dir, "vnode.json"), json.dumps(vnode_json)) + + +def make_raft_config_json(sync_dir, dnode_ids, my_index=0): + """Create sync/raft_config.json.""" + node_info = [] + for did in dnode_ids: + node_info.append({ + "nodeId": str(did), + "clusterId": "0", + "nodeFqdn": "localhost", + "nodePort": 6030, + }) + raft_cfg = { + "RaftCfg": { + "SSyncCfg": { + "myIndex": my_index, + "nodeInfo": node_info, + } + } + } + os.makedirs(sync_dir, exist_ok=True) + write_file(os.path.join(sync_dir, "raft_config.json"), json.dumps(raft_cfg)) + + +def make_raft_store_json(sync_dir): + """Create sync/raft_store.json (should be cleaned after repair).""" + os.makedirs(sync_dir, exist_ok=True) + write_file(os.path.join(sync_dir, "raft_store.json"), '{"vote":0}') + write_file(os.path.join(sync_dir, "some_state.bak"), "backup data") + + +def tsdb_filename(vnode_id, fid, cid, suffix): + """Build TSDB file name: v{vid}f{fid}ver{cid}.{suffix}""" + return f"v{vnode_id}f{fid}ver{cid}.{suffix}" + + +def make_current_json(fsets): + """Build current.json content from a list of file set dicts. + + Each fset: { + "fid": int, + "files": [{type, did_level, did_id, fid, cid, size, lcn, ...}], + "last_compact": int, "last_commit": int, + } + """ + SUFFIXES = {0: "head", 1: "data", 2: "sma", 3: "tomb", 5: "stt"} + fset_arr = [] + for fs in fsets: + fset_json = {"fid": fs["fid"]} + # Non-STT files + for f in fs["files"]: + ftype = f["type"] + if ftype in (0, 1, 2, 3): + fset_json[SUFFIXES[ftype]] = { + "did.level": f.get("did_level", 0), + "did.id": f.get("did_id", 0), + "lcn": f.get("lcn", 0), + "fid": f["fid"], + "cid": f["cid"], + "size": f["size"], + "minVer": f.get("minVer", 0), + "maxVer": f.get("maxVer", 0), + } + # STT files grouped by level + stt_files = [f for f in fs["files"] if f["type"] == 5] + stt_levels = sorted(set(f.get("sttLevel", 0) for f in stt_files)) + stt_lvl_arr = [] + for sl in stt_levels: + level_files = [f for f in stt_files if f.get("sttLevel", 0) == sl] + files_arr = [] + for f in level_files: + files_arr.append({ + "did.level": f.get("did_level", 0), + "did.id": f.get("did_id", 0), + "lcn": f.get("lcn", 0), + "fid": f["fid"], + "cid": f["cid"], + "size": f["size"], + "minVer": f.get("minVer", 0), + "maxVer": f.get("maxVer", 0), + "level": sl, + }) + stt_lvl_arr.append({"level": sl, "files": files_arr}) + fset_json["stt lvl"] = stt_lvl_arr + fset_json["last compact"] = fs.get("last_compact", 0) + fset_json["last commit"] = fs.get("last_commit", 0) + fset_arr.append(fset_json) + return json.dumps({"fmtv": 1, "fset": fset_arr}) + + +def make_source_vnode(primary_data, vnode_id, fsets, dnode_ids, my_index=0, + extra_data_dirs=None, file_size=1024): + """Create a complete source vnode directory tree with fake TSDB files. + + Args: + primary_data: primary data dir path + vnode_id: integer vnode id + fsets: list of file set dicts (same as make_current_json) + dnode_ids: list of dnode IDs for vnode.json nodeInfo + my_index: source myIndex + extra_data_dirs: list of (path, level) for non-primary disks with TSDB files + file_size: size of fake files + Returns: + dict mapping (fid, cid, suffix) -> bytes content of each created file + """ + SUFFIXES = {0: "head", 1: "data", 2: "sma", 3: "tomb", 5: "stt"} + vnode_dir = os.path.join(primary_data, "vnode", f"vnode{vnode_id}") + tsdb_dir = os.path.join(vnode_dir, "tsdb") + sync_dir = os.path.join(vnode_dir, "sync") + + # Create vnode.json + make_vnode_json(vnode_dir, vnode_id, dnode_ids, my_index) + + # Create sync state files + make_raft_config_json(sync_dir, dnode_ids, my_index) + make_raft_store_json(sync_dir) + + # Create a dummy wal file in wal/ subdir + wal_dir = os.path.join(vnode_dir, "wal") + write_file(os.path.join(wal_dir, "meta-ver0"), "wal meta content") + + # Build disk map: did (level, id) -> data dir path + disk_map = {(0, 0): primary_data} + if extra_data_dirs: + for path, level in extra_data_dirs: + # Count existing disks at this level + existing = sum(1 for (l, _) in disk_map if l == level) + disk_map[(level, existing)] = path + + # Create TSDB files on appropriate disks + file_contents = {} + for fs in fsets: + for f in fs["files"]: + ftype = f["type"] + suffix = SUFFIXES[ftype] + did_level = f.get("did_level", 0) + did_id = f.get("did_id", 0) + disk_path = disk_map.get((did_level, did_id), primary_data) + fname = tsdb_filename(vnode_id, f["fid"], f["cid"], suffix) + fpath = os.path.join(disk_path, "vnode", f"vnode{vnode_id}", "tsdb", fname) + content = make_fake_file(fpath, f["size"], seed=hash(fname) & 0xFF) + file_contents[(f["fid"], f["cid"], suffix)] = content + # Update size in fset to match actual + f["size"] = len(content) + + # Write current.json + os.makedirs(tsdb_dir, exist_ok=True) + write_file(os.path.join(tsdb_dir, "current.json"), make_current_json(fsets)) + + return file_contents + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + +class TestCopyModeRepair: + """Repair-copy tests (single-disk and multi-disk), local and remote.""" + + taosd_bin = None + source_host = None + _ssh_ok = None + + @classmethod + def setup_class(cls): + cls.taosd_bin = cls._find_taosd() + if cls.taosd_bin is None: + pytest.skip("taosd not found") + cls.source_host = f"{getpass.getuser()}@127.0.0.1" + cls._ssh_ok = cls._ssh_localhost_ok() + + @staticmethod + def _ssh_localhost_ok(): + """Return True if passwordless SSH to 127.0.0.1 works.""" + user = getpass.getuser() + try: + r = subprocess.run( + ["ssh", "-o", "BatchMode=yes", "-o", "ConnectTimeout=5", + f"{user}@127.0.0.1", "true"], + capture_output=True, timeout=10) + return r.returncode == 0 + except Exception: + return False + + @staticmethod + def _find_taosd(): + candidates = [] + if tdDnodes.binPath: + candidates.append(tdDnodes.binPath) + + taosd_bin = os.getenv("TAOSD_BIN") + if taosd_bin: + candidates.append(taosd_bin) + + taos_bin_path = os.getenv("TAOS_BIN_PATH") + if taos_bin_path: + candidates.append(os.path.join(taos_bin_path, "taosd")) + + for bin_path in candidates: + if os.path.isfile(bin_path) and os.access(bin_path, os.X_OK): + tdDnodes.binPath = bin_path + tdLog.info("taosd found in %s" % bin_path) + return bin_path + return None + + def _run_repair(self, target_cfg_dir, source_cfg_path, vnode_ids_str, + source_host=None, timeout=60): + """Run taosd in repair-copy mode and return the CompletedProcess.""" + cmd = [ + self.taosd_bin, + "-c", target_cfg_dir, + "-r", + "--mode", "copy", + "--node-type", "vnode", + "--source-cfg", source_cfg_path, + "--vnode", vnode_ids_str, + ] + if source_host: + cmd.extend(["--source-host", source_host]) + return subprocess.run(cmd, capture_output=True, text=True, timeout=timeout) + + def _setup_env(self, src_disks=None, tgt_disks=None, + vnode_id=2, dnode_ids=None, target_dnode_id=2): + """Set up source and target environments. + + src_disks, tgt_disks: list of (subdir_name, level, primary) tuples. + Defaults to single-disk: [("data", 0, 1)]. + """ + for d in ("dnode1", "dnode2"): + p = os.path.join(SIM_PATH, d) + if os.path.exists(p): + shutil.rmtree(p) + os.makedirs(p, exist_ok=True) + + if src_disks is None: + src_disks = [("data", 0, 1)] + if tgt_disks is None: + tgt_disks = [("data", 0, 1)] + if dnode_ids is None: + dnode_ids = [1, 2, 3] + + src_data_dirs = [] + tgt_data_dirs = [] + + for name, level, primary in src_disks: + path = os.path.join(SIM_PATH, "dnode1", name) + os.makedirs(path, exist_ok=True) + src_data_dirs.append((path, level, primary)) + + for name, level, primary in tgt_disks: + path = os.path.join(SIM_PATH, "dnode2", name) + os.makedirs(os.path.join(path, "vnode"), exist_ok=True) + tgt_data_dirs.append((path, level, primary)) + + src_cfg = os.path.join(SIM_PATH, "dnode1", "cfg") + tgt_cfg = os.path.join(SIM_PATH, "dnode2", "cfg") + tgt_log = os.path.join(SIM_PATH, "dnode2", "log") + os.makedirs(tgt_log, exist_ok=True) + + make_taos_cfg(src_cfg, src_data_dirs, os.path.join(SIM_PATH, "dnode1", "log")) + make_taos_cfg(tgt_cfg, tgt_data_dirs, tgt_log) + + tgt_primary = next(p for p, l, pr in tgt_data_dirs if pr == 1) + make_dnode_json(tgt_primary, target_dnode_id) + + return { + "src_data_dirs": src_data_dirs, + "tgt_data_dirs": tgt_data_dirs, + "src_data": src_data_dirs[0][0], + "tgt_data": tgt_data_dirs[0][0], + "src_cfg": os.path.join(src_cfg, "taos.cfg"), + "tgt_cfg_dir": tgt_cfg, + "tgt_log": tgt_log, + "vnode_id": vnode_id, + "dnode_ids": dnode_ids, + "target_dnode_id": target_dnode_id, + } + + def _do_test_basic_copy(self, source_host=None): + """Copy a vnode with 2 file sets (head+data+sma+tomb each) from source to empty target.""" + env = self._setup_env() + vid = env["vnode_id"] + + fsets = [ + { + "fid": 1, "last_compact": 100, "last_commit": 200, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 512, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 1024, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 2, "fid": 1, "cid": 10, "size": 256, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 3, "fid": 1, "cid": 10, "size": 128, "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + { + "fid": 2, "last_compact": 300, "last_commit": 400, + "files": [ + {"type": 0, "fid": 2, "cid": 20, "size": 512, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 2, "cid": 20, "size": 2048, "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + src_contents = make_source_vnode( + env["src_data"], vid, fsets, env["dnode_ids"], my_index=0) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"taosd failed:\nstdout: {result.stdout}\nstderr: {result.stderr}") + + # Verify TSDB files are copied with correct content + tgt_tsdb = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}", "tsdb") + for (fid, cid, suffix), expected_content in src_contents.items(): + fname = tsdb_filename(vid, fid, cid, suffix) + tgt_path = os.path.join(tgt_tsdb, fname) + tdSql.checkEqual(os.path.isfile(tgt_path), True, f"Missing file: {fname}") + tdSql.checkEqual(read_bin(tgt_path), expected_content, f"Content mismatch: {fname}") + + # Verify current.json was regenerated + current_path = os.path.join(tgt_tsdb, "current.json") + tdSql.checkEqual(os.path.isfile(current_path), True, f"Missing file: current.json") + current = json.loads(read_file(current_path)) + tdSql.checkEqual(current["fmtv"], 1, "Incorrect fmtv in current.json") + tdSql.checkEqual(len(current["fset"]), 2, "Incorrect number of fsets in current.json") + # Check fids are present and sorted + fids = [fs["fid"] for fs in current["fset"]] + tdSql.checkEqual(fids, [1, 2], "Fids in current.json are not as expected") + + # Verify non-tsdb files were copied + tgt_vnode = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}") + tdSql.checkEqual(os.path.isfile(os.path.join(tgt_vnode, "vnode.json")), True, "Missing file: vnode.json") + tdSql.checkEqual(os.path.isfile(os.path.join(tgt_vnode, "wal", "meta-ver0")), True, "Missing file: wal/meta-ver0") + + # Verify sync state cleaned: no raft_store.json, no .bak in sync/ + sync_dir = os.path.join(tgt_vnode, "sync") + tdSql.checkEqual(not os.path.exists(os.path.join(sync_dir, "raft_store.json")), True, "Unexpected file: raft_store.json") + for entry in os.listdir(sync_dir) if os.path.isdir(sync_dir) else []: + tdSql.checkEqual(not entry.endswith(".bak"), True, f"Unexpected .bak file: {entry}") + + # Verify vnode.json myIndex updated + vnode_json = json.loads(read_file(os.path.join(tgt_vnode, "vnode.json"))) + config = vnode_json["config"] + # target_dnode_id=2 is at index 1 in dnode_ids=[1,2,3] + tdSql.checkEqual(int(config["syncCfg.myIndex"]), 1, "Incorrect myIndex in vnode.json") + + # Verify .bak directories cleaned up + bak_dir = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}.bak") + tdSql.checkEqual(not os.path.exists(bak_dir), True, "Backup dir should be deleted after success") + + def test_basic_copy_local(self): + """Basic local mode copy test + + 1. Copy vnode from local source. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_basic_copy() + + def test_basic_copy_remote(self): + """Basic remote mode copy test + + 1. Copy vnode from remote source via SSH. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_basic_copy(source_host=self.source_host) + + def _do_test_stt_files(self, source_host=None): + """Copy a vnode with STT files at multiple levels.""" + env = self._setup_env() + vid = env["vnode_id"] + + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 512, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 1024, "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 5, "fid": 1, "cid": 11, "size": 768, "did_level": 0, "did_id": 0, + "lcn": 0, "sttLevel": 0, "minVer": 1, "maxVer": 100}, + {"type": 5, "fid": 1, "cid": 12, "size": 512, "did_level": 0, "did_id": 0, + "lcn": 0, "sttLevel": 1, "minVer": 50, "maxVer": 200}, + ], + }, + ] + src_contents = make_source_vnode( + env["src_data"], vid, fsets, env["dnode_ids"]) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"taosd failed:\nstderr: {result.stderr}") + + # Verify all TSDB files are copied with correct content + tgt_tsdb = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}", "tsdb") + for (fid, cid, suffix), expected_content in src_contents.items(): + fname = tsdb_filename(vid, fid, cid, suffix) + tgt_path = os.path.join(tgt_tsdb, fname) + tdSql.checkEqual(os.path.isfile(tgt_path), True, f"Missing file: {fname}") + tdSql.checkEqual(read_bin(tgt_path), expected_content, f"Mismatch: {fname}") + + # Verify current.json has stt lvl entries + current = json.loads(read_file(os.path.join(tgt_tsdb, "current.json"))) + fset0 = current["fset"][0] + tdSql.checkEqual("stt lvl" in fset0, True, "Missing 'stt lvl' in current.json") + stt_lvl = fset0["stt lvl"] + tdSql.checkEqual(len(stt_lvl), 2, "Incorrect number of stt levels in current.json") + tdSql.checkEqual(stt_lvl[0]["level"], 0, "Incorrect stt level 0 in current.json") + tdSql.checkEqual(stt_lvl[1]["level"], 1, "Incorrect stt level 1 in current.json") + + def test_stt_files_local(self): + """STT files local mode copy test + + 1. Copy vnode with STT files at multiple levels from local source. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_stt_files() + + def test_stt_files_remote(self): + """STT files remote mode copy test + + 1. Copy vnode with STT files at multiple levels from remote source via SSH. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_stt_files(source_host=self.source_host) + + def _do_test_empty_target_no_local_current_json(self, source_host=None): + """When target has no vnode directory at all, all files should be copied.""" + env = self._setup_env() + vid = env["vnode_id"] + + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 5, "size": 256, "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + make_source_vnode(env["src_data"], vid, fsets, env["dnode_ids"]) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + tgt_tsdb = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}", "tsdb") + tdSql.checkEqual(os.path.isfile(os.path.join(tgt_tsdb, "current.json")), True, "Missing file: current.json") + fname = tsdb_filename(vid, 1, 5, "head") + tdSql.checkEqual(os.path.isfile(os.path.join(tgt_tsdb, fname)), True, f"Missing file: {fname}") + + def test_empty_target_no_local_current_json_local(self): + """Empty target local mode copy test + + 1. Copy vnode when target has no vnode directory at all (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_empty_target_no_local_current_json() + + def test_empty_target_no_local_current_json_remote(self): + """Empty target remote mode copy test + + 1. Copy vnode when target has no vnode directory at all (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_empty_target_no_local_current_json(source_host=self.source_host) + + def _do_test_multiple_vnodes(self, source_host=None): + """Repair multiple vnodes in one invocation.""" + env = self._setup_env() + + for vid in [2, 5]: + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": vid * 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": vid * 10, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + make_source_vnode(env["src_data"], vid, fsets, env["dnode_ids"]) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], "2,5", source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + for vid in [2, 5]: + tgt_tsdb = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}", "tsdb") + tdSql.checkEqual(os.path.isfile(os.path.join(tgt_tsdb, "current.json")), True, "Missing file: current.json") + + def test_multiple_vnodes_local(self): + """Multiple vnodes local mode copy test + + 1. Repair multiple vnodes in one invocation from local source. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_multiple_vnodes() + + def test_multiple_vnodes_remote(self): + """Multiple vnodes remote mode copy test + + 1. Repair multiple vnodes in one invocation from remote source via SSH. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_multiple_vnodes(source_host=self.source_host) + + def _do_test_skip_missing_source_vnode(self, source_host=None): + """When source vnode doesn't exist, it should be skipped (not fail).""" + env = self._setup_env() + vid = 99 # Source vnode 99 doesn't exist + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + # Should succeed overall (vnode is skipped, not failed) + tdSql.checkEqual(result.returncode, 0, "Vnode repair failed for missing source vnode") + + def test_skip_missing_source_vnode_local(self): + """Skip missing source vnode local mode test + + 1. Verify missing source vnode is skipped without failure (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_skip_missing_source_vnode() + + def test_skip_missing_source_vnode_remote(self): + """Skip missing source vnode remote mode test + + 1. Verify missing source vnode is skipped without failure (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_skip_missing_source_vnode(source_host=self.source_host) + + def _do_test_skip_existing_bak(self, source_host=None): + """When vnode.bak already exists on target, the vnode should be skipped.""" + env = self._setup_env() + vid = env["vnode_id"] + + # Create source vnode + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + make_source_vnode(env["src_data"], vid, fsets, env["dnode_ids"]) + + # Create a pre-existing .bak on target + bak_dir = os.path.join(env["tgt_data"], "vnode", f"vnode{vid}.bak") + os.makedirs(bak_dir, exist_ok=True) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, "Vnode repair failed for existing .bak") + + # .bak should still be there (untouched) + tdSql.checkEqual(os.path.isdir(bak_dir), True, "Missing .bak directory") + + def test_skip_existing_bak_local(self): + """Skip existing .bak local mode test + + 1. Verify vnode is skipped when .bak already exists on target (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_skip_existing_bak() + + def test_skip_existing_bak_remote(self): + """Skip existing .bak remote mode test + + 1. Verify vnode is skipped when .bak already exists on target (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_skip_existing_bak(source_host=self.source_host) + + def test_exit_code_bad_args(self): + """Bad arguments exit code test + + 1. Verify missing required arguments returns non-zero exit code. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + env = self._setup_env() + # Missing --vnode + cmd = [ + self.taosd_bin, "-c", env["tgt_cfg_dir"], + "-r", "--mode", "copy", + "--node-type", "vnode", + "--source-cfg", env["src_cfg"], + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + tdSql.checkEqual(result.returncode != 0, True, "Expected non-zero exit code for bad args") + + def test_exit_code_missing_source_cfg(self): + """Missing source config exit code test + + 1. Verify non-existent --source-cfg path causes vnode to be skipped or non-zero exit. + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + env = self._setup_env() + bogus_cfg = os.path.join(SIM_PATH, "nonexistent", "taos.cfg") + cmd = [ + self.taosd_bin, "-c", env["tgt_cfg_dir"], + "-r", "--mode", "copy", + "--node-type", "vnode", + "--source-cfg", bogus_cfg, + "--vnode", "2", + ] + result = subprocess.run(cmd, capture_output=True, text=True, timeout=30) + # cfgLoad falls back to defaults; vnode is skipped (source dir wrong) + combined = result.stdout + result.stderr + tdSql.checkEqual("SKIPPED" in combined or result.returncode != 0, True, "Expected vnode to be skipped or non-zero exit code") + + # --- Multi-disk / multi-tier tests --- + + def _do_test_two_tier_to_single_tier(self, source_host=None): + """Source has 2 tiers, target has 1 tier — tier folding should work.""" + env = self._setup_env( + src_disks=[("data_l0_d0", 0, 1), ("data_l1_d0", 1, 0)], + tgt_disks=[("data_l0_d0", 0, 1)], + ) + vid = 2 + # Files on level 0 and level 1 of source + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 1024, + "did_level": 1, "did_id": 0, "lcn": 0}, + ], + }, + ] + src_primary = env["src_data_dirs"][0][0] + extra_data_dirs = [(env["src_data_dirs"][1][0], 1)] + src_contents = make_source_vnode( + src_primary, vid, fsets, env["dnode_ids"], + extra_data_dirs=extra_data_dirs) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + # All files should end up on target level 0 (the only tier) + tgt_primary = env["tgt_data_dirs"][0][0] + tgt_tsdb = os.path.join(tgt_primary, "vnode", f"vnode{vid}", "tsdb") + for (fid, cid, suffix), expected_content in src_contents.items(): + fname = tsdb_filename(vid, fid, cid, suffix) + tgt_path = os.path.join(tgt_tsdb, fname) + tdSql.checkEqual(os.path.isfile(tgt_path), True, f"Missing file: {fname}") + tdSql.checkEqual(read_bin(tgt_path), expected_content, f"Mismatch: {fname}") + + # Verify current.json has all files remapped to level 0 + current = json.loads(read_file(os.path.join(tgt_tsdb, "current.json"))) + for fset in current["fset"]: + for key in ("head", "data", "sma", "tomb"): + if key in fset: + tdSql.checkEqual(fset[key]["did.level"], 0, f"{key} should be on level 0") + + def test_two_tier_to_single_tier_local(self): + """Two-tier to single-tier local mode copy test + + 1. Copy vnode from 2-tier source to 1-tier target with tier folding (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_two_tier_to_single_tier() + + def test_two_tier_to_single_tier_remote(self): + """Two-tier to single-tier remote mode copy test + + 1. Copy vnode from 2-tier source to 1-tier target with tier folding (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_two_tier_to_single_tier(source_host=self.source_host) + + def _do_test_multi_disk_round_robin(self, source_host=None): + """Files should be distributed across multiple disks at the same tier.""" + env = self._setup_env( + src_disks=[("data_l0_d0", 0, 1)], + tgt_disks=[("data_l0_d0", 0, 1), ("data_l0_d1", 0, 0)], + ) + vid = 2 + # Create multiple file sets — files should spread across tgt_d0 and tgt_d1 + fsets = [] + for fid in range(1, 5): + fsets.append({ + "fid": fid, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": fid, "cid": fid * 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": fid, "cid": fid * 10, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }) + make_source_vnode(env["src_data_dirs"][0][0], vid, fsets, env["dnode_ids"]) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + # Parse generated current.json and check disk distribution + tgt_primary = env["tgt_data_dirs"][0][0] + tgt_tsdb = os.path.join(tgt_primary, "vnode", f"vnode{vid}", "tsdb") + current = json.loads(read_file(os.path.join(tgt_tsdb, "current.json"))) + + # Collect all did.id values — should see both 0 and 1 + did_ids = set() + for fset in current["fset"]: + for key in ("head", "data"): + if key in fset: + did_ids.add(fset[key]["did.id"]) + tdSql.checkEqual(len(did_ids) > 1, True, f"Expected round-robin across disks, got did_ids={did_ids}") + + # Verify files exist on the respective disks + for fset in current["fset"]: + for key in ("head", "data"): + if key in fset: + did_id = fset[key]["did.id"] + disk_path = env["tgt_data_dirs"][did_id][0] + fid = fset["fid"] + cid = fset[key]["cid"] + fname = tsdb_filename(vid, fid, cid, key) + fpath = os.path.join(disk_path, "vnode", f"vnode{vid}", "tsdb", fname) + tdSql.checkEqual(os.path.isfile(fpath), True, f"Missing on disk {did_id}: {fname}") + + def test_multi_disk_round_robin_local(self): + """Multi-disk round-robin local mode copy test + + 1. Verify files are distributed across multiple target disks at same tier (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_multi_disk_round_robin() + + def test_multi_disk_round_robin_remote(self): + """Multi-disk round-robin remote mode copy test + + 1. Verify files are distributed across multiple target disks at same tier (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_multi_disk_round_robin(source_host=self.source_host) + + def _do_test_multi_source_disks_same_level(self, source_host=None): + """Source has multiple disks at level 0 — files from all disks are copied.""" + env = self._setup_env( + src_disks=[("data_l0_d0", 0, 1), ("data_l0_d1", 0, 0)], + tgt_disks=[("data_l0_d0", 0, 1)], + ) + vid = 2 + # Files spread across disk 0 and disk 1 at level 0 + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 512, + "did_level": 0, "did_id": 1, "lcn": 0}, + ], + }, + { + "fid": 2, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 2, "cid": 20, "size": 256, + "did_level": 0, "did_id": 1, "lcn": 0}, + {"type": 1, "fid": 2, "cid": 20, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + src_primary = env["src_data_dirs"][0][0] + extra_data_dirs = [(env["src_data_dirs"][1][0], 0)] + src_contents = make_source_vnode( + src_primary, vid, fsets, env["dnode_ids"], + extra_data_dirs=extra_data_dirs) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + # All files should land on the single target disk + tgt_primary = env["tgt_data_dirs"][0][0] + tgt_tsdb = os.path.join(tgt_primary, "vnode", f"vnode{vid}", "tsdb") + for (fid, cid, suffix), expected_content in src_contents.items(): + fname = tsdb_filename(vid, fid, cid, suffix) + tgt_path = os.path.join(tgt_tsdb, fname) + tdSql.checkEqual(os.path.isfile(tgt_path), True, f"Missing file: {fname}") + tdSql.checkEqual(read_bin(tgt_path), expected_content, f"Mismatch: {fname}") + + def test_multi_source_disks_same_level_local(self): + """Multi-source-disk same level local mode copy test + + 1. Copy files from multiple source disks at same level to single target disk (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_multi_source_disks_same_level() + + def test_multi_source_disks_same_level_remote(self): + """Multi-source-disk same level remote mode copy test + + 1. Copy files from multiple source disks at same level to single target disk (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_multi_source_disks_same_level(source_host=self.source_host) + + def _do_test_s3_warning_lcn_gt_1(self, source_host=None): + """File sets with lcn > 1 should produce S3 warning in output.""" + env = self._setup_env( + src_disks=[("data_l0_d0", 0, 1)], + tgt_disks=[("data_l0_d0", 0, 1)], + ) + vid = 2 + # lcn=2 means only the last chunk is local; S3 warning expected + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 2}, + ], + }, + ] + src_primary = env["src_data_dirs"][0][0] + # For lcn > 0, the local chunk file name is v{vid}f{fid}ver{cid}.{lcn}.{suffix} + vnode_dir = os.path.join(src_primary, "vnode", f"vnode{vid}") + tsdb_dir = os.path.join(vnode_dir, "tsdb") + os.makedirs(tsdb_dir, exist_ok=True) + + # Create normal head file (lcn=0) + head_name = tsdb_filename(vid, 1, 10, "head") + make_fake_file(os.path.join(tsdb_dir, head_name), 256) + + # Create S3 last-chunk data file: v2f1ver10.2.data + s3_data_name = f"v{vid}f1ver10.2.data" + make_fake_file(os.path.join(tsdb_dir, s3_data_name), 512) + + # Write current.json + write_file(os.path.join(tsdb_dir, "current.json"), make_current_json(fsets)) + + # Create supporting vnode files + make_vnode_json(vnode_dir, vid, env["dnode_ids"]) + sync_dir = os.path.join(vnode_dir, "sync") + make_raft_config_json(sync_dir, env["dnode_ids"]) + make_raft_store_json(sync_dir) + write_file(os.path.join(vnode_dir, "wal", "meta-ver0"), "wal") + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + # Verify S3 warning appears in stdout/stderr + combined_output = result.stdout + result.stderr + tdSql.checkEqual("s3" in combined_output.lower() or "S3" in combined_output, True, \ + f"Expected S3 warning in output, got:\n{combined_output}") + + def test_s3_warning_lcn_gt_1_local(self): + """S3 warning for lcn>1 local mode test + + 1. Verify S3 warning is produced for file sets with lcn > 1 (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_s3_warning_lcn_gt_1() + + def test_s3_warning_lcn_gt_1_remote(self): + """S3 warning for lcn>1 remote mode test + + 1. Verify S3 warning is produced for file sets with lcn > 1 (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_s3_warning_lcn_gt_1(source_host=self.source_host) + + def _do_test_three_tier_to_single_tier(self, source_host=None): + """Source has 3 tiers, target has 1 tier — all files map to {0,0}.""" + env = self._setup_env( + src_disks=[("data_l0_d0", 0, 1), ("data_l1_d0", 1, 0), ("data_l2_d0", 2, 0)], + tgt_disks=[("data_l0_d0", 0, 1)], + ) + vid = 2 + # Files spread across all three source tiers + fsets = [ + { + "fid": 1, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 1, "cid": 10, "size": 256, + "did_level": 0, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 1, "cid": 10, "size": 512, + "did_level": 1, "did_id": 0, "lcn": 0}, + {"type": 2, "fid": 1, "cid": 10, "size": 128, + "did_level": 2, "did_id": 0, "lcn": 0}, + ], + }, + { + "fid": 2, "last_compact": 0, "last_commit": 0, + "files": [ + {"type": 0, "fid": 2, "cid": 20, "size": 256, + "did_level": 2, "did_id": 0, "lcn": 0}, + {"type": 1, "fid": 2, "cid": 20, "size": 512, + "did_level": 0, "did_id": 0, "lcn": 0}, + ], + }, + ] + src_primary = env["src_data_dirs"][0][0] + extra_data_dirs = [(env["src_data_dirs"][1][0], 1), (env["src_data_dirs"][2][0], 2)] + src_contents = make_source_vnode( + src_primary, vid, fsets, env["dnode_ids"], + extra_data_dirs=extra_data_dirs) + + result = self._run_repair(env["tgt_cfg_dir"], env["src_cfg"], str(vid), source_host=source_host) + tdSql.checkEqual(result.returncode, 0, f"stderr: {result.stderr}") + + # All files must land on the single target disk at level 0 + tgt_primary = env["tgt_data_dirs"][0][0] + tgt_tsdb = os.path.join(tgt_primary, "vnode", f"vnode{vid}", "tsdb") + for (fid, cid, suffix), expected_content in src_contents.items(): + fname = tsdb_filename(vid, fid, cid, suffix) + tgt_path = os.path.join(tgt_tsdb, fname) + tdSql.checkEqual(os.path.isfile(tgt_path), True, f"Missing file: {fname}") + tdSql.checkEqual(read_bin(tgt_path), expected_content, f"Mismatch: {fname}") + + # Verify current.json has all disk IDs remapped to {0, 0} + current = json.loads(read_file(os.path.join(tgt_tsdb, "current.json"))) + for fset in current["fset"]: + for key in ("head", "data", "sma", "tomb"): + if key in fset: + tdSql.checkEqual(fset[key]["did.level"], 0, f"fid={fset['fid']} {key} should be on level 0") + tdSql.checkEqual(fset[key]["did.id"], 0, f"fid={fset['fid']} {key} should be on disk 0") + + def test_three_tier_to_single_tier_local(self): + """Three-tier to single-tier local mode copy test + + 1. Copy vnode from 3-tier source to 1-tier target, all files map to {0,0} (local source). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + self._do_test_three_tier_to_single_tier() + + def test_three_tier_to_single_tier_remote(self): + """Three-tier to single-tier remote mode copy test + + 1. Copy vnode from 3-tier source to 1-tier target, all files map to {0,0} (remote source via SSH). + + Catalog: + - Others:RepairCopy + + Since: v3.3.6.0 + + Labels: common,ci + + Jira: None + + History: + - 2026-5-6 Bomin Zhang created + """ + if not self._ssh_ok: + pytest.skip("passwordless SSH to 127.0.0.1 not available") + self._do_test_three_tier_to_single_tier(source_host=self.source_host) diff --git a/test/ci/cases.task b/test/ci/cases.task index 2b94607f17ec..6fb5d61f9f6d 100644 --- a/test/ci/cases.task +++ b/test/ci/cases.task @@ -630,3 +630,5 @@ ,,y,.,./ci/pytest.sh pytest cases/50-Others/01-Valgrind/test_valgrind_checkerror8.py #,,y,.,./ci/pytest.sh pytest cases/50-Others/01-Valgrind/test_valgrind_udf.py +## 02-Repair +,,y,.,./ci/pytest.sh pytest cases/50-Others/02-Repair/test_copy_repair.py