diff --git a/TD-xxx-try-copilot-self-audit b/TD-xxx-try-copilot-self-audit new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 75a7d7711eeb..c74db359d8c9 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -245,6 +245,7 @@ extern int32_t tsAuditLevel; extern int32_t tsAuditInterval; extern bool tsAuditHttps; extern bool tsAuditUseToken; +extern bool tsAuditLocalWrite; // Enable direct write to local cluster // telem extern bool tsEnableTelem; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 3057fc93f767..d389e018d404 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -241,6 +241,7 @@ int32_t tsAuditInterval = 5000; int32_t tsAuditLevel = AUDIT_LEVEL_DATABASE; bool tsAuditHttps = false; bool tsAuditUseToken = true; +bool tsAuditLocalWrite = true; // Enable direct write to local cluster by default in enterprise #else bool tsEnableAudit = false; bool tsEnableAuditCreateTable = false; @@ -251,6 +252,7 @@ int32_t tsAuditInterval = 200000; int32_t tsAuditLevel = AUDIT_LEVEL_NONE; bool tsAuditHttps = false; bool tsAuditUseToken = true; +bool tsAuditLocalWrite = false; // Disable in community edition #endif // telem @@ -1041,6 +1043,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "auditInterval", tsAuditInterval, 500, 200000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditHttps", tsAuditHttps, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditUseToken", tsAuditUseToken, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); + TAOS_CHECK_RETURN(cfgAddBool(pCfg, "auditLocalWrite", tsAuditLocalWrite, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_AUDIT)); TAOS_CHECK_RETURN(cfgAddBool(pCfg, "telemetryReporting", tsEnableTelem, CFG_SCOPE_SERVER, CFG_DYN_ENT_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); TAOS_CHECK_RETURN(cfgAddInt32(pCfg, "telemetryInterval", tsTelemInterval, 1, 200000, CFG_SCOPE_SERVER, CFG_DYN_SERVER,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); TAOS_CHECK_RETURN(cfgAddString(pCfg, "telemetryServer", tsTelemServer, CFG_SCOPE_SERVER, CFG_DYN_BOTH,CFG_CATEGORY_GLOBAL, CFG_PRIV_SYSTEM)); @@ -1883,6 +1886,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditUseToken"); tsAuditUseToken = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditLocalWrite"); + tsAuditLocalWrite = pItem->bval; + TAOS_CHECK_GET_CFG_ITEM(pCfg, pItem, "auditInterval"); tsAuditInterval = pItem->i32; #endif @@ -2924,6 +2930,7 @@ static int32_t taosCfgDynamicOptionsForServer(SConfig *pCfg, const char *name) { {"auditLevel", &tsAuditLevel}, {"auditHttps", &tsAuditHttps}, {"auditUseToken", &tsAuditUseToken}, + {"auditLocalWrite", &tsAuditLocalWrite}, {"slowLogThreshold", &tsSlowLogThreshold}, {"compressMsgSize", &tsCompressMsgSize}, {"compressor", &tsCompressor}, diff --git a/source/libs/audit/inc/auditInt.h b/source/libs/audit/inc/auditInt.h index 7e269747fe18..aaf2f39341fa 100644 --- a/source/libs/audit/inc/auditInt.h +++ b/source/libs/audit/inc/auditInt.h @@ -27,6 +27,12 @@ typedef struct { TdThreadRwlock infoLock; char auditDB[TSDB_DB_FNAME_LEN]; char auditToken[TSDB_TOKEN_LEN]; + void *pLocalConn; // Local TAOS connection for direct write + TdThreadMutex connLock; // Lock for local connection + TdThread preConnThread; + int8_t preConnThreadCreated; + int8_t preConnThreadRunning; + int8_t stopPreConnThread; } SAudit; #endif /*_TD_AUDIT_INT_H_*/ diff --git a/source/libs/audit/src/auditMain.c b/source/libs/audit/src/auditMain.c index 5ca106a03ac4..367a7c0e4ec6 100644 --- a/source/libs/audit/src/auditMain.c +++ b/source/libs/audit/src/auditMain.c @@ -15,20 +15,24 @@ #define _DEFAULT_SOURCE -#include "tarray.h" +#include "audit.h" #include "auditInt.h" +#include "osMemory.h" +#include "taos.h" #include "taoserror.h" +#include "tarray.h" +#include "tglobal.h" #include "thttp.h" -#include "ttime.h" #include "tjson.h" -#include "tglobal.h" -#include "audit.h" -#include "osMemory.h" +#include "ttime.h" SAudit tsAudit = {0}; char* tsAuditUri = "/audit_v2"; char* tsAuditBatchUri = "/audit-batch"; +extern int32_t auditPreconnectLocal(); +extern void auditStopPreconnectLocal(); + static FORCE_INLINE void auditDeleteRecord(SAuditRecord *record) { if (record) { taosMemoryFree(record->detail); @@ -50,6 +54,18 @@ int32_t auditInit(const SAuditCfg *pCfg) { taosArrayDestroyP(tsAudit.records, (FDelete)auditDeleteRecord); return -1; } + if (taosThreadMutexInit(&tsAudit.connLock, NULL) != 0) { + (void)taosThreadMutexDestroy(&tsAudit.recordLock); + (void)taosThreadRwlockDestroy(&tsAudit.infoLock); + taosArrayDestroyP(tsAudit.records, (FDelete)auditDeleteRecord); + return -1; + } + + // Start non-blocking preconnect in background so startup and RPC threads never wait on taos_connect. + if (auditPreconnectLocal() != 0) { + uWarn("failed to start local TDengine preconnect thread, will retry on demand"); + } + return 0; } @@ -57,6 +73,18 @@ void auditSetDnodeId(int32_t dnodeId) { tsAudit.dnodeId = dnodeId; } void auditCleanup() { tsLogFp = NULL; + + auditStopPreconnectLocal(); + + // Close local connection + (void)taosThreadMutexLock(&tsAudit.connLock); + if (tsAudit.pLocalConn != NULL) { + taos_close(tsAudit.pLocalConn); + tsAudit.pLocalConn = NULL; + } + (void)taosThreadMutexUnlock(&tsAudit.connLock); + (void)taosThreadMutexDestroy(&tsAudit.connLock); + (void)taosThreadMutexLock(&tsAudit.recordLock); taosArrayDestroyP(tsAudit.records, (FDelete)auditDeleteRecord); (void)taosThreadMutexUnlock(&tsAudit.recordLock); @@ -93,6 +121,10 @@ void auditRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *tar void auditAddRecordImp(SRpcMsg *pReq, int64_t clusterId, char *operation, char *target1, char *target2, char *detail, int32_t len, double duration, int64_t affectedRows) {} +int32_t auditPreconnectLocal() { return 0; } + +void auditStopPreconnectLocal() {} + void auditSendRecordsInBatchImp(){ } diff --git a/tests/script/sh/stop_dnodes.sh b/tests/script/sh/stop_dnodes.sh new file mode 100755 index 000000000000..c3aae9459a95 --- /dev/null +++ b/tests/script/sh/stop_dnodes.sh @@ -0,0 +1,64 @@ +#!/bin/sh + +set +e +#set -x + +unset LD_PRELOAD +UNAME_BIN=`which uname` +OS_TYPE=`$UNAME_BIN` + +psby() { + if [ "$OS_TYPE" != "Darwin" ]; then + ps -C $1 + else + ps a -c + fi +} + +PID=`ps -efww | grep /usr/bin/taosd | grep -v grep | awk '{print $2}'` +if [ -n "$PID" ]; then + echo systemctl stop taosd + systemctl stop taosd +fi + +PID=`psby taosd | grep -w "[t]aosd" | awk '{print $1}' | head -n 1` +while [ -n "$PID" ]; do + echo kill -9 $PID + #pkill -9 taosd + kill -9 $PID + echo "Killing taosd processes" + if [ "$OS_TYPE" != "Darwin" ]; then + fuser -k -n tcp 6030 + else + lsof -nti:6030 | xargs kill -9 + fi + PID=`psby taosd | grep -w "[t]aosd" | awk '{print $1}' | head -n 1` +done + +PID=`psby taos | grep -w "[t]aos" | awk '{print $1}' | head -n 1` +while [ -n "$PID" ]; do + echo kill -9 $PID + #pkill -9 taos + kill -9 $PID + echo "Killing taos processes" + if [ "$OS_TYPE" != "Darwin" ]; then + fuser -k -n tcp 6030 + else + lsof -nti:6030 | xargs kill -9 + fi + PID=`psby taos | grep -w "[t]aos" | awk '{print $1}' | head -n 1` +done + +PID=`psby tmq_sim | grep -w "[t]mq_sim" | awk '{print $1}' | head -n 1` +while [ -n "$PID" ]; do + echo kill -9 $PID + #pkill -9 tmq_sim + kill -9 $PID + echo "Killing tmq_sim processes" + if [ "$OS_TYPE" != "Darwin" ]; then + fuser -k -n tcp 6030 + else + lsof -nti:6030 | xargs kill -9 + fi + PID=`psby tmq_sim | grep -w "[t]mq_sim" | awk '{print $1}' | head -n 1` +done diff --git a/tests/system-test/0-others/audit_local_write.py b/tests/system-test/0-others/audit_local_write.py new file mode 100644 index 000000000000..361668f234a7 --- /dev/null +++ b/tests/system-test/0-others/audit_local_write.py @@ -0,0 +1,59 @@ +from util.log import * +from util.cases import * +from util.sql import * + +import time + +from ..common.basic import BasicFun + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to execute {__file__}") + self.replicaVar = int(replicaVar) + self.Fun = BasicFun() + + def run(self): + self.Fun.config_cluster(1) + + for dnode in self.Fun.TDDnodes.dnodes: + dnode.addExtraCfg("audit", "1") + dnode.addExtraCfg("auditInterval", "500") + dnode.addExtraCfg("auditLevel", "5") + dnode.addExtraCfg("enableAuditDelete", "1") + dnode.addExtraCfg("enableAuditSelect", "1") + dnode.addExtraCfg("enableAuditInsert", "1") + dnode.addExtraCfg("auditLocalWrite", "1") + dnode.addExtraCfg("auditUseToken", "0") + dnode.addExtraCfg("monitorFqdn", "127.0.0.1") + dnode.addExtraCfg("monitorPort", "1") + + self.Fun.deploy_start_cluster() + self.Fun.connect() + + tdSql.execute("drop database if exists audit_case") + tdSql.execute("create database audit_case") + tdSql.execute("use audit_case") + tdSql.execute("create table if not exists t(ts timestamp, c1 int)") + tdSql.execute("insert into t values(now, 1)") + tdSql.query("select * from t") + tdSql.checkRows(1) + + # auditRecord is sync for these statements, but wait briefly for robustness. + count = 0 + for _ in range(20): + tdSql.query("select count(*) from audit.audit_events") + count = tdSql.queryResult[0][0] + if count > 0: + break + time.sleep(1) + + tdSql.checkGreater(count, 0) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())