diff --git a/ee/query-service/app/api/api.go b/ee/query-service/app/api/api.go index 4291b3f488..2c1938cd92 100644 --- a/ee/query-service/app/api/api.go +++ b/ee/query-service/app/api/api.go @@ -38,9 +38,10 @@ type APIHandlerOptions struct { Cache cache.Cache Gateway *httputil.ReverseProxy // Querier Influx Interval - FluxInterval time.Duration - UseLogsNewSchema bool - UseLicensesV3 bool + FluxInterval time.Duration + UseLogsNewSchema bool + UseTraceNewSchema bool + UseLicensesV3 bool } type APIHandler struct { @@ -66,6 +67,7 @@ func NewAPIHandler(opts APIHandlerOptions) (*APIHandler, error) { Cache: opts.Cache, FluxInterval: opts.FluxInterval, UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, UseLicensesV3: opts.UseLicensesV3, }) diff --git a/ee/query-service/app/api/traces.go b/ee/query-service/app/api/traces.go index 3864fc672e..5c65089cd0 100644 --- a/ee/query-service/app/api/traces.go +++ b/ee/query-service/app/api/traces.go @@ -2,32 +2,31 @@ package api import ( "net/http" - - "go.signoz.io/signoz/ee/query-service/app/db" - "go.signoz.io/signoz/ee/query-service/model" - baseapp "go.signoz.io/signoz/pkg/query-service/app" - basemodel "go.signoz.io/signoz/pkg/query-service/model" - "go.uber.org/zap" ) func (ah *APIHandler) searchTraces(w http.ResponseWriter, r *http.Request) { - if !ah.CheckFeature(basemodel.SmartTraceDetail) { - zap.L().Info("SmartTraceDetail feature is not enabled in this plan") - ah.APIHandler.SearchTraces(w, r) - return - } - searchTracesParams, err := baseapp.ParseSearchTracesParams(r) - if err != nil { - RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params") - return - } - - result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm) - if ah.HandleError(w, err, http.StatusBadRequest) { - return - } - - ah.WriteJSON(w, r, result) + ah.APIHandler.SearchTraces(w, r) + return + + // This is commented since this will be taken care by new trace API + + // if !ah.CheckFeature(basemodel.SmartTraceDetail) { + // zap.L().Info("SmartTraceDetail feature is not enabled in this plan") + // ah.APIHandler.SearchTraces(w, r) + // return + // } + // searchTracesParams, err := baseapp.ParseSearchTracesParams(r) + // if err != nil { + // RespondError(w, &model.ApiError{Typ: model.ErrorBadData, Err: err}, "Error reading params") + // return + // } + + // result, err := ah.opts.DataConnector.SearchTraces(r.Context(), searchTracesParams, db.SmartTraceAlgorithm) + // if ah.HandleError(w, err, http.StatusBadRequest) { + // return + // } + + // ah.WriteJSON(w, r, result) } diff --git a/ee/query-service/app/db/reader.go b/ee/query-service/app/db/reader.go index fcab1cb991..9794abd013 100644 --- a/ee/query-service/app/db/reader.go +++ b/ee/query-service/app/db/reader.go @@ -26,8 +26,9 @@ func NewDataConnector( dialTimeout time.Duration, cluster string, useLogsNewSchema bool, + useTraceNewSchema bool, ) *ClickhouseReader { - ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema) + ch := basechr.NewReader(localDB, promConfigPath, lm, maxIdleConns, maxOpenConns, dialTimeout, cluster, useLogsNewSchema, useTraceNewSchema) return &ClickhouseReader{ conn: ch.GetConn(), appdb: localDB, diff --git a/ee/query-service/app/server.go b/ee/query-service/app/server.go index a8acbc46e9..b58a90b9c4 100644 --- a/ee/query-service/app/server.go +++ b/ee/query-service/app/server.go @@ -77,6 +77,7 @@ type ServerOptions struct { Cluster string GatewayUrl string UseLogsNewSchema bool + UseTraceNewSchema bool UseLicensesV3 bool } @@ -156,6 +157,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.DialTimeout, serverOptions.Cluster, serverOptions.UseLogsNewSchema, + serverOptions.UseTraceNewSchema, ) go qb.Start(readerReady) reader = qb @@ -189,6 +191,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.DisableRules, lm, serverOptions.UseLogsNewSchema, + serverOptions.UseTraceNewSchema, ) if err != nil { @@ -270,6 +273,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { FluxInterval: fluxInterval, Gateway: gatewayProxy, UseLogsNewSchema: serverOptions.UseLogsNewSchema, + UseTraceNewSchema: serverOptions.UseTraceNewSchema, UseLicensesV3: serverOptions.UseLicensesV3, } @@ -737,7 +741,8 @@ func makeRulesManager( cache cache.Cache, disableRules bool, fm baseint.FeatureLookup, - useLogsNewSchema bool) (*baserules.Manager, error) { + useLogsNewSchema bool, + useTraceNewSchema bool) (*baserules.Manager, error) { // create engine pqle, err := pqle.FromConfigPath(promConfigPath) @@ -767,8 +772,9 @@ func makeRulesManager( EvalDelay: baseconst.GetEvalDelay(), PrepareTaskFunc: rules.PrepareTaskFunc, - PrepareTestRuleFunc: rules.TestNotification, UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, + PrepareTestRuleFunc: rules.TestNotification, } // create Manager diff --git a/ee/query-service/main.go b/ee/query-service/main.go index 55e70893e6..23824bd636 100644 --- a/ee/query-service/main.go +++ b/ee/query-service/main.go @@ -94,6 +94,7 @@ func main() { var cluster string var useLogsNewSchema bool + var useTraceNewSchema bool var useLicensesV3 bool var cacheConfigPath, fluxInterval string var enableQueryServiceLogOTLPExport bool @@ -105,6 +106,7 @@ func main() { var gatewayUrl string flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") + flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces") flag.BoolVar(&useLicensesV3, "use-licenses-v3", false, "use licenses_v3 schema for licenses") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") @@ -145,6 +147,7 @@ func main() { Cluster: cluster, GatewayUrl: gatewayUrl, UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, UseLicensesV3: useLicensesV3, } diff --git a/ee/query-service/rules/manager.go b/ee/query-service/rules/manager.go index 9843d108d8..00e0882f36 100644 --- a/ee/query-service/rules/manager.go +++ b/ee/query-service/rules/manager.go @@ -26,6 +26,7 @@ func PrepareTaskFunc(opts baserules.PrepareTaskOptions) (baserules.Task, error) opts.FF, opts.Reader, opts.UseLogsNewSchema, + opts.UseTraceNewSchema, baserules.WithEvalDelay(opts.ManagerOpts.EvalDelay), ) @@ -122,6 +123,7 @@ func TestNotification(opts baserules.PrepareTestRuleOptions) (int, *basemodel.Ap opts.FF, opts.Reader, opts.UseLogsNewSchema, + opts.UseTraceNewSchema, baserules.WithSendAlways(), baserules.WithSendUnmatched(), ) diff --git a/pkg/query-service/app/clickhouseReader/options.go b/pkg/query-service/app/clickhouseReader/options.go index 695bef8570..25eea0c7ff 100644 --- a/pkg/query-service/app/clickhouseReader/options.go +++ b/pkg/query-service/app/clickhouseReader/options.go @@ -22,6 +22,7 @@ const ( defaultTraceDB string = "signoz_traces" defaultOperationsTable string = "distributed_signoz_operations" defaultIndexTable string = "distributed_signoz_index_v2" + defaultLocalIndexTable string = "signoz_index_v2" defaultErrorTable string = "distributed_signoz_error_index_v2" defaultDurationTable string = "distributed_durationSort" defaultUsageExplorerTable string = "distributed_usage_explorer" @@ -45,6 +46,11 @@ const ( defaultLogsTableV2 string = "distributed_logs_v2" defaultLogsResourceLocalTableV2 string = "logs_v2_resource" defaultLogsResourceTableV2 string = "distributed_logs_v2_resource" + + defaultTraceIndexTableV3 string = "distributed_signoz_index_v3" + defaultTraceLocalTableName string = "signoz_index_v3" + defaultTraceResourceTableV3 string = "distributed_traces_v3_resource" + defaultTraceSummaryTable string = "distributed_trace_summary" ) // NamespaceConfig is Clickhouse's internal configuration data @@ -58,6 +64,7 @@ type namespaceConfig struct { TraceDB string OperationsTable string IndexTable string + LocalIndexTable string DurationTable string UsageExplorerTable string SpansTable string @@ -82,6 +89,11 @@ type namespaceConfig struct { LogsTableV2 string LogsResourceLocalTableV2 string LogsResourceTableV2 string + + TraceIndexTableV3 string + TraceLocalTableNameV3 string + TraceResourceTableV3 string + TraceSummaryTable string } // Connecto defines how to connect to the database @@ -150,6 +162,7 @@ func NewOptions( TraceDB: defaultTraceDB, OperationsTable: defaultOperationsTable, IndexTable: defaultIndexTable, + LocalIndexTable: defaultLocalIndexTable, ErrorTable: defaultErrorTable, DurationTable: defaultDurationTable, UsageExplorerTable: defaultUsageExplorerTable, @@ -174,6 +187,11 @@ func NewOptions( LogsLocalTableV2: defaultLogsLocalTableV2, LogsResourceTableV2: defaultLogsResourceTableV2, LogsResourceLocalTableV2: defaultLogsResourceLocalTableV2, + + TraceIndexTableV3: defaultTraceIndexTableV3, + TraceLocalTableNameV3: defaultTraceLocalTableName, + TraceResourceTableV3: defaultTraceResourceTableV3, + TraceSummaryTable: defaultTraceSummaryTable, }, others: make(map[string]*namespaceConfig, len(otherNamespaces)), } diff --git a/pkg/query-service/app/clickhouseReader/reader.go b/pkg/query-service/app/clickhouseReader/reader.go index a6811dc2eb..7c2946d383 100644 --- a/pkg/query-service/app/clickhouseReader/reader.go +++ b/pkg/query-service/app/clickhouseReader/reader.go @@ -145,9 +145,16 @@ type ClickHouseReader struct { liveTailRefreshSeconds int cluster string - useLogsNewSchema bool + useLogsNewSchema bool + useTraceNewSchema bool + logsTableName string logsLocalTableName string + + traceTableName string + traceLocalTableName string + traceResourceTableV3 string + traceSummaryTable string } // NewTraceReader returns a TraceReader for the database @@ -160,6 +167,7 @@ func NewReader( dialTimeout time.Duration, cluster string, useLogsNewSchema bool, + useTraceNewSchema bool, ) *ClickHouseReader { datasource := os.Getenv("ClickHouseUrl") @@ -170,7 +178,7 @@ func NewReader( zap.L().Fatal("failed to initialize ClickHouse", zap.Error(err)) } - return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema) + return NewReaderFromClickhouseConnection(db, options, localDB, configFile, featureFlag, cluster, useLogsNewSchema, useTraceNewSchema) } func NewReaderFromClickhouseConnection( @@ -181,6 +189,7 @@ func NewReaderFromClickhouseConnection( featureFlag interfaces.FeatureLookup, cluster string, useLogsNewSchema bool, + useTraceNewSchema bool, ) *ClickHouseReader { alertManager, err := am.New() if err != nil { @@ -218,6 +227,13 @@ func NewReaderFromClickhouseConnection( logsLocalTableName = options.primary.LogsLocalTableV2 } + traceTableName := options.primary.IndexTable + traceLocalTableName := options.primary.LocalIndexTable + if useTraceNewSchema { + traceTableName = options.primary.TraceIndexTableV3 + traceLocalTableName = options.primary.TraceLocalTableNameV3 + } + return &ClickHouseReader{ db: wrap, localDB: localDB, @@ -245,7 +261,8 @@ func NewReaderFromClickhouseConnection( cluster: cluster, queryProgressTracker: queryprogress.NewQueryProgressTracker(), - useLogsNewSchema: useLogsNewSchema, + useLogsNewSchema: useLogsNewSchema, + useTraceNewSchema: useTraceNewSchema, logsTableV2: options.primary.LogsTableV2, logsLocalTableV2: options.primary.LogsLocalTableV2, @@ -253,6 +270,11 @@ func NewReaderFromClickhouseConnection( logsResourceLocalTableV2: options.primary.LogsResourceLocalTableV2, logsTableName: logsTableName, logsLocalTableName: logsLocalTableName, + + traceLocalTableName: traceLocalTableName, + traceTableName: traceTableName, + traceResourceTableV3: options.primary.TraceResourceTableV3, + traceSummaryTable: options.primary.TraceSummaryTable, } } @@ -463,9 +485,8 @@ func (r *ClickHouseReader) GetQueryRangeResult(ctx context.Context, query *model } func (r *ClickHouseReader) GetServicesList(ctx context.Context) (*[]string, error) { - services := []string{} - query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.indexTable) + query := fmt.Sprintf(`SELECT DISTINCT serviceName FROM %s.%s WHERE ts_bucket_start > (toUnixTimestamp(now()) - 1800) AND toDate(timestamp) > now() - INTERVAL 1 DAY`, r.TraceDB, r.traceLocalTableName) rows, err := r.db.Query(ctx, query) @@ -574,14 +595,14 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G count(*) as numCalls FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) errorQuery := fmt.Sprintf( `SELECT count(*) as numErrors FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) args := []interface{}{} @@ -591,6 +612,17 @@ func (r *ClickHouseReader) GetServices(ctx context.Context, queryParams *model.G clickhouse.Named("serviceName", svc), clickhouse.Named("names", ops), ) + + if r.useTraceNewSchema { + bFilter := " AND ts_bucket_start >= @start_bucket AND ts_bucket_start <= @end_bucket" + query += bFilter + errorQuery += bFilter + args = append(args, + clickhouse.Named("start_bucket", strconv.FormatInt(queryParams.Start.Unix()-1800, 10)), + clickhouse.Named("end_bucket", strconv.FormatInt(queryParams.End.Unix(), 10)), + ) + } + // create TagQuery from TagQueryParams tags := createTagQueryFromTagQueryParams(queryParams.Tags) subQuery, argsSubQuery, errStatus := buildQueryWithTagParams(ctx, tags) @@ -673,7 +705,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * count(*) as numCalls FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) args := []interface{}{} args = append(args, namedArgs...) @@ -704,7 +736,7 @@ func (r *ClickHouseReader) GetServiceOverview(ctx context.Context, queryParams * count(*) as numErrors FROM %s.%s WHERE serviceName = @serviceName AND name In @names AND timestamp>= @start AND timestamp<= @end AND statusCode=2`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) args = []interface{}{} args = append(args, namedArgs...) @@ -841,7 +873,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode case constants.TraceID: continue case constants.ServiceName: - finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT serviceName, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY serviceName" var dBResponse []model.DBResponseServiceName @@ -858,7 +890,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpRoute: - finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpRoute, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY httpRoute" var dBResponse []model.DBResponseHttpRoute @@ -875,7 +907,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpUrl: - finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpUrl, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY httpUrl" var dBResponse []model.DBResponseHttpUrl @@ -892,7 +924,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpMethod: - finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY httpMethod" var dBResponse []model.DBResponseHttpMethod @@ -909,7 +941,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.HttpHost: - finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT httpHost, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY httpHost" var dBResponse []model.DBResponseHttpHost @@ -926,7 +958,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.OperationRequest: - finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT name, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY name" var dBResponse []model.DBResponseOperation @@ -943,7 +975,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.Status: - finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = true", r.TraceDB, r.traceTableName) finalQuery += query var dBResponse []model.DBResponseTotal err := r.db.Select(ctx, &dBResponse, finalQuery, args...) @@ -954,7 +986,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode return nil, &model.ApiError{Typ: model.ErrorExec, Err: fmt.Errorf("error in processing sql query: %s", err)} } - finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.indexTable) + finalQuery2 := fmt.Sprintf("SELECT COUNT(*) as numTotal FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU AND hasError = false", r.TraceDB, r.traceTableName) finalQuery2 += query var dBResponse2 []model.DBResponseTotal err = r.db.Select(ctx, &dBResponse2, finalQuery2, args...) @@ -979,7 +1011,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode finalQuery := "" if !durationSortEnabled { // if duration sort is not enabled, we need to get the min and max duration from the index table - finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery = fmt.Sprintf("SELECT min(durationNano) as min, max(durationNano) as max FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query var dBResponse []model.DBResponseMinMax err = r.db.Select(ctx, &dBResponse, finalQuery, args...) @@ -1024,7 +1056,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } } case constants.RPCMethod: - finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT rpcMethod, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY rpcMethod" var dBResponse []model.DBResponseRPCMethod @@ -1042,7 +1074,7 @@ func (r *ClickHouseReader) GetSpanFilters(ctx context.Context, queryParams *mode } case constants.ResponseStatusCode: - finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf("SELECT responseStatusCode, count() as count FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " GROUP BY responseStatusCode" var dBResponse []model.DBResponseStatusCodeMethod @@ -1090,7 +1122,7 @@ func getStatusFilters(query string, statusParams []string, excludeMap map[string func (r *ClickHouseReader) GetFilteredSpans(ctx context.Context, queryParams *model.GetFilteredSpansParams) (*model.GetFilterSpansResponse, *model.ApiError) { - queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.indexTable) + queryTable := fmt.Sprintf("%s.%s", r.TraceDB, r.traceTableName) excludeMap := make(map[string]struct{}) for _, e := range queryParams.Exclude { @@ -1436,8 +1468,8 @@ func (r *ClickHouseReader) GetTagFilters(ctx context.Context, queryParams *model tagFilters := []model.TagFilters{} - // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) - finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) + // Alternative finalQuery := fmt.Sprintf(`SELECT DISTINCT arrayJoin(tagMap.keys) as tagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, indexTable) + finalQuery := fmt.Sprintf(`SELECT groupUniqArrayArray(mapKeys(stringTagMap)) as stringTagKeys, groupUniqArrayArray(mapKeys(numberTagMap)) as numberTagKeys, groupUniqArrayArray(mapKeys(boolTagMap)) as boolTagKeys FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName) finalQuery += query err := r.db.Select(ctx, &tagFilters, finalQuery, args...) @@ -1548,7 +1580,7 @@ func (r *ClickHouseReader) GetTagValues(ctx context.Context, queryParams *model. tagValues := []model.TagValues{} - finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT stringTagMap[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.indexTable) + finalQuery := fmt.Sprintf(`SELECT groupArray(DISTINCT attributes_string[@key]) as stringTagValues FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU`, r.TraceDB, r.traceTableName) finalQuery += query finalQuery += " LIMIT @limit" @@ -1599,7 +1631,7 @@ func (r *ClickHouseReader) GetTopOperations(ctx context.Context, queryParams *mo name FROM %s.%s WHERE serviceName = @serviceName AND timestamp>= @start AND timestamp<= @end`, - r.TraceDB, r.indexTable, + r.TraceDB, r.traceTableName, ) args := []interface{}{} args = append(args, namedArgs...) @@ -1666,10 +1698,137 @@ func (r *ClickHouseReader) GetUsage(ctx context.Context, queryParams *model.GetU return &usageItems, nil } +func (r *ClickHouseReader) SearchTracesV2(ctx context.Context, params *model.SearchTracesParams) (*[]model.SearchSpansResult, error) { + searchSpansResult := []model.SearchSpansResult{ + { + Columns: []string{"__time", "SpanId", "TraceId", "ServiceName", "Name", "Kind", "DurationNano", "TagsKeys", "TagsValues", "References", "Events", "HasError", "StatusMessage", "StatusCodeString", "SpanKind"}, + IsSubTree: false, + Events: make([][]interface{}, 0), + }, + } + + var traceSummary model.TraceSummary + summaryQuery := fmt.Sprintf("SELECT * from %s.%s WHERE traceID=$1", r.TraceDB, r.traceSummaryTable) + err := r.db.QueryRow(ctx, summaryQuery, params.TraceID).Scan(&traceSummary.TraceID, &traceSummary.Start, &traceSummary.End, &traceSummary.NumSpans) + if err != nil { + if err == sql.ErrNoRows { + return &searchSpansResult, nil + } + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + + if traceSummary.NumSpans > uint64(params.MaxSpansInTrace) { + zap.L().Error("Max spans allowed in a trace limit reached", zap.Int("MaxSpansInTrace", params.MaxSpansInTrace), + zap.Uint64("Count", traceSummary.NumSpans)) + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + "maxSpansInTraceLimit": params.MaxSpansInTrace, + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_MAX_SPANS_ALLOWED_LIMIT_REACHED, data, userEmail, true, false) + } + return nil, fmt.Errorf("max spans allowed in trace limit reached, please contact support for more details") + } + + userEmail, err := auth.GetEmailFromJwt(ctx) + if err == nil { + data := map[string]interface{}{ + "traceSize": traceSummary.NumSpans, + } + telemetry.GetInstance().SendEvent(telemetry.TELEMETRY_EVENT_TRACE_DETAIL_API, data, userEmail, true, false) + } + + var startTime, endTime, durationNano uint64 + var searchScanResponses []model.SearchSpanResponseItemV2 + + query := fmt.Sprintf("SELECT timestamp, durationNano, spanID, traceID, hasError, kind, serviceName, name, references, attributes_string, events, statusMessage, statusCodeString, spanKind FROM %s.%s WHERE traceID=$1 and ts_bucket_start>=$2 and ts_bucket_start<=$3", r.TraceDB, r.traceTableName) + + start := time.Now() + + err = r.db.Select(ctx, &searchScanResponses, query, params.TraceID, strconv.FormatInt(traceSummary.Start.Unix()-1800, 10), strconv.FormatInt(traceSummary.End.Unix(), 10)) + + zap.L().Info(query) + + if err != nil { + zap.L().Error("Error in processing sql query", zap.Error(err)) + return nil, fmt.Errorf("error in processing sql query") + } + end := time.Now() + zap.L().Debug("getTraceSQLQuery took: ", zap.Duration("duration", end.Sub(start))) + + searchSpansResult[0].Events = make([][]interface{}, len(searchScanResponses)) + + searchSpanResponses := []model.SearchSpanResponseItem{} + start = time.Now() + for _, item := range searchScanResponses { + ref := []model.OtelSpanRef{} + err := json.Unmarshal([]byte(item.References), &ref) + if err != nil { + zap.L().Error("Error unmarshalling references", zap.Error(err)) + return nil, err + } + + // merge attributes_number and attributes_bool to attributes_string + for k, v := range item.Attributes_bool { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + for k, v := range item.Attributes_number { + item.Attributes_string[k] = fmt.Sprintf("%v", v) + } + + jsonItem := model.SearchSpanResponseItem{ + SpanID: item.SpanID, + TraceID: item.TraceID, + ServiceName: item.ServiceName, + Name: item.Name, + Kind: int32(item.Kind), + DurationNano: int64(item.DurationNano), + HasError: item.HasError, + StatusMessage: item.StatusMessage, + StatusCodeString: item.StatusCodeString, + SpanKind: item.SpanKind, + References: ref, + Events: item.Events, + TagMap: item.Attributes_string, + } + + jsonItem.TimeUnixNano = uint64(item.TimeUnixNano.UnixNano() / 1000000) + + searchSpanResponses = append(searchSpanResponses, jsonItem) + if startTime == 0 || jsonItem.TimeUnixNano < startTime { + startTime = jsonItem.TimeUnixNano + } + if endTime == 0 || jsonItem.TimeUnixNano > endTime { + endTime = jsonItem.TimeUnixNano + } + if durationNano == 0 || uint64(jsonItem.DurationNano) > durationNano { + durationNano = uint64(jsonItem.DurationNano) + } + } + end = time.Now() + zap.L().Debug("getTraceSQLQuery unmarshal took: ", zap.Duration("duration", end.Sub(start))) + + for i, item := range searchSpanResponses { + spanEvents := item.GetValues() + searchSpansResult[0].Events[i] = spanEvents + } + + searchSpansResult[0].StartTimestampMillis = startTime - (durationNano / 1000000) + searchSpansResult[0].EndTimestampMillis = endTime + (durationNano / 1000000) + + return &searchSpansResult, nil +} + func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.SearchTracesParams, smartTraceAlgorithm func(payload []model.SearchSpanResponseItem, targetSpanId string, levelUp int, levelDown int, spanLimit int) ([]model.SearchSpansResult, error)) (*[]model.SearchSpansResult, error) { + if r.useTraceNewSchema { + return r.SearchTracesV2(ctx, params) + } + var countSpans uint64 countQuery := fmt.Sprintf("SELECT count() as count from %s.%s WHERE traceID=$1", r.TraceDB, r.SpansTable) err := r.db.QueryRow(ctx, countQuery, params.TraceID).Scan(&countSpans) @@ -1746,6 +1905,7 @@ func (r *ClickHouseReader) SearchTraces(ctx context.Context, params *model.Searc err = r.featureFlags.CheckFeature(model.SmartTraceDetail) smartAlgoEnabled := err == nil + // TODO(nitya): this will never run remove it if len(searchScanResponses) > params.SpansRenderLimit && smartAlgoEnabled { start = time.Now() searchSpansResult, err = smartTraceAlgorithm(searchSpanResponses, params.SpanID, params.LevelUp, params.LevelDown, params.SpansRenderLimit) @@ -1824,7 +1984,6 @@ func (r *ClickHouseReader) GetDependencyGraph(ctx context.Context, queryParams * } func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, queryParams *model.GetFilteredSpanAggregatesParams) (*model.GetFilteredSpansAggregatesResponse, *model.ApiError) { - excludeMap := make(map[string]struct{}) for _, e := range queryParams.Exclude { if e == constants.OperationRequest { @@ -1870,7 +2029,7 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query // Using %s for groupBy params as it can be a custom column and custom columns are not supported by clickhouse-go yet: // issue link: https://github.com/ClickHouse/clickhouse-go/issues/870 if queryParams.GroupBy != "" && columnExists { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, queryParams.GroupBy, aggregation_query, r.TraceDB, r.traceTableName) args = append(args, clickhouse.Named("groupByVar", queryParams.GroupBy)) } else if queryParams.GroupBy != "" { customStr = strings.Split(queryParams.GroupBy, ".(") @@ -1878,17 +2037,17 @@ func (r *ClickHouseReader) GetFilteredSpansAggregates(ctx context.Context, query return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} } if customStr[1] == string(model.TagTypeString)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, stringTagMap['%s'] as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName) } else if customStr[1] == string(model.TagTypeNumber)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(numberTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName) } else if customStr[1] == string(model.TagTypeBool)+")" { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, toString(boolTagMap['%s']) as groupBy, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, customStr[0], aggregation_query, r.TraceDB, r.traceTableName) } else { // return error for unsupported group by return nil, &model.ApiError{Typ: model.ErrorBadData, Err: fmt.Errorf("GroupBy: %s not supported", queryParams.GroupBy)} } } else { - query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.indexTable) + query = fmt.Sprintf("SELECT toStartOfInterval(timestamp, INTERVAL %d minute) as time, %s FROM %s.%s WHERE timestamp >= @timestampL AND timestamp <= @timestampU", queryParams.StepSeconds/60, aggregation_query, r.TraceDB, r.traceTableName) } if len(queryParams.TraceID) > 0 { @@ -3056,11 +3215,10 @@ func (r *ClickHouseReader) GetLogsInfoInLastHeartBeatInterval(ctx context.Contex } func (r *ClickHouseReader) GetTagsInfoInLastHeartBeatInterval(ctx context.Context, interval time.Duration) (*model.TagsInfo, error) { - queryStr := fmt.Sprintf(`select serviceName, stringTagMap['deployment.environment'] as env, stringTagMap['telemetry.sdk.language'] as language from %s.%s where timestamp > toUnixTimestamp(now()-toIntervalMinute(%d)) - group by serviceName, env, language;`, r.TraceDB, r.indexTable, int(interval.Minutes())) + group by serviceName, env, language;`, r.TraceDB, r.traceTableName, int(interval.Minutes())) tagTelemetryDataList := []model.TagTelemetryData{} err := r.db.Select(ctx, &tagTelemetryDataList, queryStr) @@ -4575,8 +4733,6 @@ func (r *ClickHouseReader) GetTraceAggregateAttributes(ctx context.Context, req if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } - // TODO: Remove this once the column name are updated in the table - tagKey = tempHandleFixedColumns(tagKey) key := v3.AttributeKey{ Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), @@ -4616,8 +4772,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } - // TODO: Remove this once the column name are updated in the table - tagKey = tempHandleFixedColumns(tagKey) key := v3.AttributeKey{ Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), @@ -4629,19 +4783,6 @@ func (r *ClickHouseReader) GetTraceAttributeKeys(ctx context.Context, req *v3.Fi return &response, nil } -// tempHandleFixedColumns is a temporary function to handle the fixed columns whose name has been changed in AttributeKeys Table -func tempHandleFixedColumns(tagKey string) string { - switch { - case tagKey == "traceId": - tagKey = "traceID" - case tagKey == "spanId": - tagKey = "spanID" - case tagKey == "parentSpanId": - tagKey = "parentSpanID" - } - return tagKey -} - func (r *ClickHouseReader) GetTraceAttributeValues(ctx context.Context, req *v3.FilterAttributeValueRequest) (*v3.FilterAttributeValueResponse, error) { var query string @@ -4702,31 +4843,38 @@ func (r *ClickHouseReader) GetSpanAttributeKeys(ctx context.Context) (map[string var rows driver.Rows response := map[string]v3.AttributeKey{} - query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType, isColumn FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable) + query = fmt.Sprintf("SELECT DISTINCT(tagKey), tagType, dataType FROM %s.%s", r.TraceDB, r.spanAttributesKeysTable) rows, err = r.db.Query(ctx, query) - if err != nil { zap.L().Error("Error while executing query", zap.Error(err)) return nil, fmt.Errorf("error while executing query: %s", err.Error()) } defer rows.Close() + statements := []model.ShowCreateTableStatement{} + query = fmt.Sprintf("SHOW CREATE TABLE %s.%s", r.TraceDB, r.traceTableName) + err = r.db.Select(ctx, &statements, query) + if err != nil { + return nil, fmt.Errorf("error while fetching logs schema: %s", err.Error()) + } + var tagKey string var dataType string var tagType string - var isColumn bool for rows.Next() { - if err := rows.Scan(&tagKey, &tagType, &dataType, &isColumn); err != nil { + if err := rows.Scan(&tagKey, &tagType, &dataType); err != nil { return nil, fmt.Errorf("error while scanning rows: %s", err.Error()) } key := v3.AttributeKey{ Key: tagKey, DataType: v3.AttributeKeyDataType(dataType), Type: v3.AttributeKeyType(tagType), - IsColumn: isColumn, + IsColumn: isColumn(r.useLogsNewSchema, statements[0].Statement, tagType, tagKey, dataType), } - response[tagKey] = key + + name := tagKey + "##" + tagType + "##" + strings.ToLower(dataType) + response[name] = key } return response, nil } diff --git a/pkg/query-service/app/http_handler.go b/pkg/query-service/app/http_handler.go index 6586e21d98..9c44e01508 100644 --- a/pkg/query-service/app/http_handler.go +++ b/pkg/query-service/app/http_handler.go @@ -39,6 +39,7 @@ import ( querierV2 "go.signoz.io/signoz/pkg/query-service/app/querier/v2" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4" "go.signoz.io/signoz/pkg/query-service/auth" "go.signoz.io/signoz/pkg/query-service/cache" "go.signoz.io/signoz/pkg/query-service/common" @@ -110,8 +111,9 @@ type APIHandler struct { // Websocket connection upgrader Upgrader *websocket.Upgrader - UseLogsNewSchema bool - UseLicensesV3 bool + UseLogsNewSchema bool + UseTraceNewSchema bool + UseLicensesV3 bool hostsRepo *inframetrics.HostsRepo processesRepo *inframetrics.ProcessesRepo @@ -163,6 +165,7 @@ type APIHandlerOpts struct { // Use Logs New schema UseLogsNewSchema bool + UseTraceNewSchema bool // Use Licenses V3 structure UseLicensesV3 bool } @@ -176,21 +179,23 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { } querierOpts := querier.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, - UseLogsNewSchema: opts.UseLogsNewSchema, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: opts.Reader, - Cache: opts.Cache, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FluxInterval: opts.FluxInterval, - FeatureLookup: opts.FeatureFlags, - UseLogsNewSchema: opts.UseLogsNewSchema, + Reader: opts.Reader, + Cache: opts.Cache, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FluxInterval: opts.FluxInterval, + FeatureLookup: opts.FeatureFlags, + UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, } querier := querier.NewQuerier(querierOpts) @@ -224,6 +229,7 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { querier: querier, querierV2: querierv2, UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, UseLicensesV3: opts.UseLicensesV3, hostsRepo: hostsRepo, processesRepo: processesRepo, @@ -242,9 +248,14 @@ func NewAPIHandler(opts APIHandlerOpts) (*APIHandler, error) { logsQueryBuilder = logsv4.PrepareLogsQuery } + tracesQueryBuilder := tracesV3.PrepareTracesQuery + if opts.UseTraceNewSchema { + tracesQueryBuilder = tracesV4.PrepareTracesQuery + } + builderOpts := queryBuilder.QueryBuilderOptions{ BuildMetricQuery: metricsv3.PrepareMetricQuery, - BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildTraceQuery: tracesQueryBuilder, BuildLogQuery: logsQueryBuilder, } aH.queryBuilder = queryBuilder.NewQueryBuilder(builderOpts, aH.featureFlags) @@ -4433,7 +4444,12 @@ func (aH *APIHandler) queryRangeV3(ctx context.Context, queryRangeParams *v3.Que RespondError(w, apiErrObj, errQuriesByName) return } - tracesV3.Enrich(queryRangeParams, spanKeys) + if aH.UseTraceNewSchema { + tracesV4.Enrich(queryRangeParams, spanKeys) + } else { + tracesV3.Enrich(queryRangeParams, spanKeys) + } + } // WARN: Only works for AND operator in traces query diff --git a/pkg/query-service/app/logs/v3/enrich_query.go b/pkg/query-service/app/logs/v3/enrich_query.go index cd5c2d6a0c..2f853a12b9 100644 --- a/pkg/query-service/app/logs/v3/enrich_query.go +++ b/pkg/query-service/app/logs/v3/enrich_query.go @@ -142,7 +142,7 @@ func enrichFieldWithMetadata(field v3.AttributeKey, fields map[string]v3.Attribu } // check if the field is present in the fields map - for _, key := range utils.GenerateLogEnrichmentKeys(field) { + for _, key := range utils.GenerateEnrichmentKeys(field) { if val, ok := fields[key]; ok { return val } diff --git a/pkg/query-service/app/querier/helper.go b/pkg/query-service/app/querier/helper.go index 798eb8f0b7..1b2acbab8b 100644 --- a/pkg/query-service/app/querier/helper.go +++ b/pkg/query-service/app/querier/helper.go @@ -10,6 +10,7 @@ import ( logsV4 "go.signoz.io/signoz/pkg/query-service/app/logs/v4" metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceTraces { + tracesQueryBuilder := tracesV3.PrepareTracesQuery + if q.UseTraceNewSchema { + tracesQueryBuilder = tracesV4.PrepareTracesQuery + } + var query string var err error // for ts query with group by and limit form two queries if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := tracesV3.PrepareTracesQuery( + limitQuery, err := tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, @@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} return } - placeholderQuery, err := tracesV3.PrepareTracesQuery( + placeholderQuery, err := tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, @@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery( } query = fmt.Sprintf(placeholderQuery, limitQuery) } else { - query, err = tracesV3.PrepareTracesQuery( + query, err = tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, diff --git a/pkg/query-service/app/querier/querier.go b/pkg/query-service/app/querier/querier.go index fd7198b334..d9d6e14129 100644 --- a/pkg/query-service/app/querier/querier.go +++ b/pkg/query-service/app/querier/querier.go @@ -11,6 +11,8 @@ import ( metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4" + "go.signoz.io/signoz/pkg/query-service/common" chErrors "go.signoz.io/signoz/pkg/query-service/errors" "go.signoz.io/signoz/pkg/query-service/querycache" @@ -52,7 +54,8 @@ type querier struct { returnedSeries []*v3.Series returnedErr error - UseLogsNewSchema bool + UseLogsNewSchema bool + UseTraceNewSchema bool } type QuerierOptions struct { @@ -63,10 +66,11 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error - UseLogsNewSchema bool + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool + UseTraceNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { @@ -74,6 +78,10 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { if opts.UseLogsNewSchema { logsQueryBuilder = logsV4.PrepareLogsQuery } + tracesQueryBuilder := tracesV3.PrepareTracesQuery + if opts.UseTraceNewSchema { + tracesQueryBuilder = tracesV4.PrepareTracesQuery + } qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) @@ -85,16 +93,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { fluxInterval: opts.FluxInterval, builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ - BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildTraceQuery: tracesQueryBuilder, BuildLogQuery: logsQueryBuilder, BuildMetricQuery: metricsV3.PrepareMetricQuery, }, opts.FeatureLookup), featureLookUp: opts.FeatureLookup, - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, - UseLogsNewSchema: opts.UseLogsNewSchema, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, + UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, } } @@ -308,15 +317,17 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang return results, errQueriesByName, err } -func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { +func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { res := make([]*v3.Result, 0) qName := "" pageSize := uint64(0) + limit := uint64(0) // se we are considering only one query for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize + limit = v.Limit } data := []*v3.Row{} @@ -343,21 +354,41 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar data = append(data, rowList...) } - // append a filter to the params - if len(data) > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) - } + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + if len(data) > 0 { + params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "id", + IsColumn: true, + DataType: "string", + }, + Operator: v3.FilterOperatorLessThan, + Value: data[len(data)-1].Data["id"], + }) + } - if uint64(len(data)) >= pageSize { - break + if uint64(len(data)) >= pageSize { + break + } + } else { + // we are updating the offset and limit based on the number of traces we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // if 100 traces are there in [t1, t10] then 100 will return immediately. + // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // If we find 100 traces in [t1, t10] then we return immediately + // If we finds 50 in [t1, t10] then it will set offset = 0 and limit = 50 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=50, limit=100 + if len(data) > 0 { + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + params.CompositeQuery.BuilderQueries[qName].Limit = limit - uint64(len(data)) + } + + if uint64(len(data)) >= limit { + break + } } } res = append(res, &v3.Result{ @@ -368,15 +399,25 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar } func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { - // List query has support for only one query. - if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { + // List query has support for only one query + // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload + if params.CompositeQuery != nil && + len(params.CompositeQuery.BuilderQueries) == 1 && + params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { + if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || + (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { + break + } + // only allow of logs queries with timestamp ordering desc - if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { - startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) - if len(startEndArr) > 0 { - return q.runLogsListQuery(ctx, params, startEndArr) - } + // TODO(nitya): allow for timestamp asc + if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" && + v.OrderBy[0].Order == "desc" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) } } } diff --git a/pkg/query-service/app/querier/v2/helper.go b/pkg/query-service/app/querier/v2/helper.go index 09d6cc2309..b62fffe106 100644 --- a/pkg/query-service/app/querier/v2/helper.go +++ b/pkg/query-service/app/querier/v2/helper.go @@ -11,6 +11,7 @@ import ( metricsV3 "go.signoz.io/signoz/pkg/query-service/app/metrics/v3" metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4" "go.signoz.io/signoz/pkg/query-service/common" "go.signoz.io/signoz/pkg/query-service/constants" v3 "go.signoz.io/signoz/pkg/query-service/model/v3" @@ -158,11 +159,16 @@ func (q *querier) runBuilderQuery( if builderQuery.DataSource == v3.DataSourceTraces { + tracesQueryBuilder := tracesV3.PrepareTracesQuery + if q.UseTraceNewSchema { + tracesQueryBuilder = tracesV4.PrepareTracesQuery + } + var query string var err error // for ts query with group by and limit form two queries if params.CompositeQuery.PanelType == v3.PanelTypeGraph && builderQuery.Limit > 0 && len(builderQuery.GroupBy) > 0 { - limitQuery, err := tracesV3.PrepareTracesQuery( + limitQuery, err := tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, @@ -173,7 +179,7 @@ func (q *querier) runBuilderQuery( ch <- channelResult{Err: err, Name: queryName, Query: limitQuery, Series: nil} return } - placeholderQuery, err := tracesV3.PrepareTracesQuery( + placeholderQuery, err := tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, @@ -186,7 +192,7 @@ func (q *querier) runBuilderQuery( } query = fmt.Sprintf(placeholderQuery, limitQuery) } else { - query, err = tracesV3.PrepareTracesQuery( + query, err = tracesQueryBuilder( start, end, params.CompositeQuery.PanelType, diff --git a/pkg/query-service/app/querier/v2/querier.go b/pkg/query-service/app/querier/v2/querier.go index 311d213656..06c96a4a0e 100644 --- a/pkg/query-service/app/querier/v2/querier.go +++ b/pkg/query-service/app/querier/v2/querier.go @@ -11,6 +11,7 @@ import ( metricsV4 "go.signoz.io/signoz/pkg/query-service/app/metrics/v4" "go.signoz.io/signoz/pkg/query-service/app/queryBuilder" tracesV3 "go.signoz.io/signoz/pkg/query-service/app/traces/v3" + tracesV4 "go.signoz.io/signoz/pkg/query-service/app/traces/v4" "go.signoz.io/signoz/pkg/query-service/common" chErrors "go.signoz.io/signoz/pkg/query-service/errors" "go.signoz.io/signoz/pkg/query-service/querycache" @@ -48,10 +49,11 @@ type querier struct { testingMode bool queriesExecuted []string // tuple of start and end time in milliseconds - timeRanges [][]int - returnedSeries []*v3.Series - returnedErr error - UseLogsNewSchema bool + timeRanges [][]int + returnedSeries []*v3.Series + returnedErr error + UseLogsNewSchema bool + UseTraceNewSchema bool } type QuerierOptions struct { @@ -62,10 +64,11 @@ type QuerierOptions struct { FeatureLookup interfaces.FeatureLookup // used for testing - TestingMode bool - ReturnedSeries []*v3.Series - ReturnedErr error - UseLogsNewSchema bool + TestingMode bool + ReturnedSeries []*v3.Series + ReturnedErr error + UseLogsNewSchema bool + UseTraceNewSchema bool } func NewQuerier(opts QuerierOptions) interfaces.Querier { @@ -74,6 +77,11 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { logsQueryBuilder = logsV4.PrepareLogsQuery } + tracesQueryBuilder := tracesV3.PrepareTracesQuery + if opts.UseTraceNewSchema { + tracesQueryBuilder = tracesV4.PrepareTracesQuery + } + qc := querycache.NewQueryCache(querycache.WithCache(opts.Cache), querycache.WithFluxInterval(opts.FluxInterval)) return &querier{ @@ -84,16 +92,17 @@ func NewQuerier(opts QuerierOptions) interfaces.Querier { fluxInterval: opts.FluxInterval, builder: queryBuilder.NewQueryBuilder(queryBuilder.QueryBuilderOptions{ - BuildTraceQuery: tracesV3.PrepareTracesQuery, + BuildTraceQuery: tracesQueryBuilder, BuildLogQuery: logsQueryBuilder, BuildMetricQuery: metricsV4.PrepareMetricQuery, }, opts.FeatureLookup), featureLookUp: opts.FeatureLookup, - testingMode: opts.TestingMode, - returnedSeries: opts.ReturnedSeries, - returnedErr: opts.ReturnedErr, - UseLogsNewSchema: opts.UseLogsNewSchema, + testingMode: opts.TestingMode, + returnedSeries: opts.ReturnedSeries, + returnedErr: opts.ReturnedErr, + UseLogsNewSchema: opts.UseLogsNewSchema, + UseTraceNewSchema: opts.UseTraceNewSchema, } } @@ -308,15 +317,17 @@ func (q *querier) runClickHouseQueries(ctx context.Context, params *v3.QueryRang return results, errQueriesByName, err } -func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { +func (q *querier) runWindowBasedListQuery(ctx context.Context, params *v3.QueryRangeParamsV3, tsRanges []utils.LogsListTsRange) ([]*v3.Result, map[string]error, error) { res := make([]*v3.Result, 0) qName := "" pageSize := uint64(0) + limit := uint64(0) // se we are considering only one query for name, v := range params.CompositeQuery.BuilderQueries { qName = name pageSize = v.PageSize + limit = v.Limit } data := []*v3.Row{} @@ -343,17 +354,41 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar data = append(data, rowList...) } - // append a filter to the params - if len(data) > 0 { - params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ - Key: v3.AttributeKey{ - Key: "id", - IsColumn: true, - DataType: "string", - }, - Operator: v3.FilterOperatorLessThan, - Value: data[len(data)-1].Data["id"], - }) + if params.CompositeQuery.BuilderQueries[qName].DataSource == v3.DataSourceLogs { + if len(data) > 0 { + params.CompositeQuery.BuilderQueries[qName].Filters.Items = append(params.CompositeQuery.BuilderQueries[qName].Filters.Items, v3.FilterItem{ + Key: v3.AttributeKey{ + Key: "id", + IsColumn: true, + DataType: "string", + }, + Operator: v3.FilterOperatorLessThan, + Value: data[len(data)-1].Data["id"], + }) + } + + if uint64(len(data)) >= pageSize { + break + } + } else { + // we are updating the offset and limit based on the number of traces we have found in the current timerange + // eg - + // 1)offset = 0, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // if 100 traces are there in [t1, t10] then 100 will return immediately. + // if 10 traces are there in [t1, t10] then we get 10, set offset to 0 and limit to 90, search in the next timerange of [t10, 20] + // + // 2) offset = 50, limit = 100, tsRanges = [t1, t10], [t10, 20], [t20, t30] + // If we find 100 traces in [t1, t10] then we return immediately + // If we finds 50 in [t1, t10] then it will set offset = 0 and limit = 50 and search in the next timerange of [t10, 20] + // if we don't find any trace in [t1, t10], then we search in [t10, 20] with offset=50, limit=100 + if len(data) > 0 { + params.CompositeQuery.BuilderQueries[qName].Offset = 0 + params.CompositeQuery.BuilderQueries[qName].Limit = limit - uint64(len(data)) + } + + if uint64(len(data)) >= limit { + break + } } if uint64(len(data)) >= pageSize { @@ -366,17 +401,26 @@ func (q *querier) runLogsListQuery(ctx context.Context, params *v3.QueryRangePar }) return res, nil, nil } - func (q *querier) runBuilderListQueries(ctx context.Context, params *v3.QueryRangeParamsV3) ([]*v3.Result, map[string]error, error) { - // List query has support for only one query. - if q.UseLogsNewSchema && params.CompositeQuery != nil && len(params.CompositeQuery.BuilderQueries) == 1 { + // List query has support for only one query + // we are skipping for PanelTypeTrace as it has a custom order by regardless of what's in the payload + if params.CompositeQuery != nil && + len(params.CompositeQuery.BuilderQueries) == 1 && + params.CompositeQuery.PanelType != v3.PanelTypeTrace { for _, v := range params.CompositeQuery.BuilderQueries { + if (v.DataSource == v3.DataSourceLogs && !q.UseLogsNewSchema) || + (v.DataSource == v3.DataSourceTraces && !q.UseTraceNewSchema) { + break + } + // only allow of logs queries with timestamp ordering desc - if v.DataSource == v3.DataSourceLogs && len(v.OrderBy) == 1 && v.OrderBy[0].ColumnName == "timestamp" && v.OrderBy[0].Order == "desc" { - startEndArr := utils.GetLogsListTsRanges(params.Start, params.End) - if len(startEndArr) > 0 { - return q.runLogsListQuery(ctx, params, startEndArr) - } + // TODO(nitya): allow for timestamp asc + if (v.DataSource == v3.DataSourceLogs || v.DataSource == v3.DataSourceTraces) && + len(v.OrderBy) == 1 && + v.OrderBy[0].ColumnName == "timestamp" && + v.OrderBy[0].Order == "desc" { + startEndArr := utils.GetListTsRanges(params.Start, params.End) + return q.runWindowBasedListQuery(ctx, params, startEndArr) } } } diff --git a/pkg/query-service/app/server.go b/pkg/query-service/app/server.go index dc6ac21e15..8eef01dbac 100644 --- a/pkg/query-service/app/server.go +++ b/pkg/query-service/app/server.go @@ -67,6 +67,7 @@ type ServerOptions struct { FluxInterval string Cluster string UseLogsNewSchema bool + UseTraceNewSchema bool } // Server runs HTTP, Mux and a grpc server @@ -130,6 +131,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { serverOptions.DialTimeout, serverOptions.Cluster, serverOptions.UseLogsNewSchema, + serverOptions.UseTraceNewSchema, ) go clickhouseReader.Start(readerReady) reader = clickhouseReader @@ -157,7 +159,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { rm, err := makeRulesManager( serverOptions.PromConfigPath, constants.GetAlertManagerApiPrefix(), - serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema) + serverOptions.RuleRepoURL, localDB, reader, c, serverOptions.DisableRules, fm, serverOptions.UseLogsNewSchema, serverOptions.UseTraceNewSchema) if err != nil { return nil, err } @@ -202,6 +204,7 @@ func NewServer(serverOptions *ServerOptions) (*Server, error) { Cache: c, FluxInterval: fluxInterval, UseLogsNewSchema: serverOptions.UseLogsNewSchema, + UseTraceNewSchema: serverOptions.UseTraceNewSchema, }) if err != nil { return nil, err @@ -721,7 +724,8 @@ func makeRulesManager( cache cache.Cache, disableRules bool, fm interfaces.FeatureLookup, - useLogsNewSchema bool) (*rules.Manager, error) { + useLogsNewSchema bool, + useTraceNewSchema bool) (*rules.Manager, error) { // create engine pqle, err := pqle.FromReader(ch) @@ -738,18 +742,19 @@ func makeRulesManager( // create manager opts managerOpts := &rules.ManagerOptions{ - NotifierOpts: notifierOpts, - PqlEngine: pqle, - RepoURL: ruleRepoURL, - DBConn: db, - Context: context.Background(), - Logger: zap.L(), - DisableRules: disableRules, - FeatureFlags: fm, - Reader: ch, - Cache: cache, - EvalDelay: constants.GetEvalDelay(), - UseLogsNewSchema: useLogsNewSchema, + NotifierOpts: notifierOpts, + PqlEngine: pqle, + RepoURL: ruleRepoURL, + DBConn: db, + Context: context.Background(), + Logger: zap.L(), + DisableRules: disableRules, + FeatureFlags: fm, + Reader: ch, + Cache: cache, + EvalDelay: constants.GetEvalDelay(), + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } // create Manager diff --git a/pkg/query-service/app/traces/v4/enrich.go b/pkg/query-service/app/traces/v4/enrich.go new file mode 100644 index 0000000000..69dbb2e49c --- /dev/null +++ b/pkg/query-service/app/traces/v4/enrich.go @@ -0,0 +1,104 @@ +package v4 + +import ( + "go.signoz.io/signoz/pkg/query-service/constants" + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" +) + +func isEnriched(field v3.AttributeKey) bool { + // if it is timestamp/id dont check + if field.Key == "timestamp" || field.Key == "id" || field.Key == constants.SigNozOrderByValue { + return true + } + + // don't need to enrich the static fields as they will be always used a column + if _, ok := constants.StaticFieldsTraces[field.Key]; ok && field.IsColumn { + return true + } + + return false +} + +func enrichKeyWithMetadata(key v3.AttributeKey, keys map[string]v3.AttributeKey) v3.AttributeKey { + if isEnriched(key) { + return key + } + + if v, ok := constants.StaticFieldsTraces[key.Key]; ok { + return v + } + + for _, key := range utils.GenerateEnrichmentKeys(key) { + if val, ok := keys[key]; ok { + return val + } + } + + // enrich with default values if metadata is not found + if key.Type == "" { + key.Type = v3.AttributeKeyTypeTag + } + if key.DataType == "" { + key.DataType = v3.AttributeKeyDataTypeString + } + return key +} + +func Enrich(params *v3.QueryRangeParamsV3, keys map[string]v3.AttributeKey) { + if params.CompositeQuery.QueryType == v3.QueryTypeBuilder { + for _, query := range params.CompositeQuery.BuilderQueries { + if query.DataSource == v3.DataSourceTraces { + EnrichTracesQuery(query, keys) + } + } + } +} + +func EnrichTracesQuery(query *v3.BuilderQuery, keys map[string]v3.AttributeKey) { + // enrich aggregate attribute + query.AggregateAttribute = enrichKeyWithMetadata(query.AggregateAttribute, keys) + // enrich filter items + if query.Filters != nil && len(query.Filters.Items) > 0 { + for idx, filter := range query.Filters.Items { + query.Filters.Items[idx].Key = enrichKeyWithMetadata(filter.Key, keys) + // if the serviceName column is used, use the corresponding resource attribute as well during filtering + if filter.Key.Key == "serviceName" && filter.Key.IsColumn { + query.Filters.Items[idx].Key = v3.AttributeKey{ + Key: "service.name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeResource, + IsColumn: false, + } + } + } + } + // enrich group by + for idx, groupBy := range query.GroupBy { + query.GroupBy[idx] = enrichKeyWithMetadata(groupBy, keys) + } + // enrich order by + query.OrderBy = enrichOrderBy(query.OrderBy, keys) + // enrich select columns + for idx, selectColumn := range query.SelectColumns { + query.SelectColumns[idx] = enrichKeyWithMetadata(selectColumn, keys) + } +} + +func enrichOrderBy(items []v3.OrderBy, keys map[string]v3.AttributeKey) []v3.OrderBy { + enrichedItems := []v3.OrderBy{} + for i := 0; i < len(items); i++ { + attributeKey := enrichKeyWithMetadata(v3.AttributeKey{ + Key: items[i].ColumnName, + }, keys) + enrichedItems = append(enrichedItems, v3.OrderBy{ + ColumnName: items[i].ColumnName, + Order: items[i].Order, + Key: attributeKey.Key, + DataType: attributeKey.DataType, + Type: attributeKey.Type, + IsColumn: attributeKey.IsColumn, + }) + } + return enrichedItems +} diff --git a/pkg/query-service/app/traces/v4/enrich_test.go b/pkg/query-service/app/traces/v4/enrich_test.go new file mode 100644 index 0000000000..063fa5c72d --- /dev/null +++ b/pkg/query-service/app/traces/v4/enrich_test.go @@ -0,0 +1,97 @@ +package v4 + +import ( + "reflect" + "testing" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" +) + +func TestEnrichTracesQuery(t *testing.T) { + type args struct { + query *v3.BuilderQuery + keys map[string]v3.AttributeKey + want *v3.BuilderQuery + } + tests := []struct { + name string + args args + }{ + { + name: "test 1", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag}, Value: 100, Operator: ">"}, + }, + }, + }, + keys: map[string]v3.AttributeKey{ + "bytes##tag##int64": {Key: "bytes", DataType: v3.AttributeKeyDataTypeInt64, Type: v3.AttributeKeyTypeTag}, + }, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "bytes", Type: v3.AttributeKeyTypeTag, DataType: v3.AttributeKeyDataTypeInt64}, Value: 100, Operator: ">"}, + }, + }, + }, + }, + }, + { + name: "test service name", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "serviceName", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "myservice", Operator: "="}, + }, + }, + }, + keys: map[string]v3.AttributeKey{}, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "service.name", Type: v3.AttributeKeyTypeResource, DataType: v3.AttributeKeyDataTypeString}, Value: "myservice", Operator: "="}, + }, + }, + }, + }, + }, + { + name: "test mat attrs", + args: args{ + query: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, IsColumn: true}, Value: "/api", Operator: "="}, + }, + }, + }, + keys: map[string]v3.AttributeKey{}, + want: &v3.BuilderQuery{ + Filters: &v3.FilterSet{ + Operator: "AND", + Items: []v3.FilterItem{ + {Key: v3.AttributeKey{Key: "http.route", DataType: v3.AttributeKeyDataTypeString, Type: v3.AttributeKeyTypeTag, IsColumn: true}, Value: "/api", Operator: "="}, + }, + }, + }, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + EnrichTracesQuery(tt.args.query, tt.args.keys) + if !reflect.DeepEqual(tt.args.query.Filters.Items[0].Key, tt.args.want.Filters.Items[0].Key) { + t.Errorf("EnrichTracesQuery() = %v, want %v", tt.args.query, tt.args.want) + } + }) + } +} diff --git a/pkg/query-service/app/traces/v4/utils.go b/pkg/query-service/app/traces/v4/utils.go new file mode 100644 index 0000000000..ae132f6d9e --- /dev/null +++ b/pkg/query-service/app/traces/v4/utils.go @@ -0,0 +1,216 @@ +package v4 + +import ( + "strconv" + + v3 "go.signoz.io/signoz/pkg/query-service/model/v3" + "go.signoz.io/signoz/pkg/query-service/utils" + "go.uber.org/zap" +) + +var TracesListViewDefaultSelectedColumns = []v3.AttributeKey{ + { + Key: "serviceName", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "name", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "durationNano", + DataType: v3.AttributeKeyDataTypeArrayFloat64, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "httpMethod", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, + { + Key: "responseStatusCode", + DataType: v3.AttributeKeyDataTypeString, + Type: v3.AttributeKeyTypeTag, + IsColumn: true, + }, +} + +// check if traceId filter is used in traces query and return the list of traceIds +func TraceIdFilterUsedWithEqual(params *v3.QueryRangeParamsV3) (bool, []string) { + compositeQuery := params.CompositeQuery + if compositeQuery == nil { + return false, []string{} + } + var traceIds []string + var traceIdFilterUsed bool + + // Build queries for each builder query + for queryName, query := range compositeQuery.BuilderQueries { + if query.Expression != queryName && query.DataSource != v3.DataSourceTraces { + continue + } + + // check filter attribute + if query.Filters != nil && len(query.Filters.Items) != 0 { + for _, item := range query.Filters.Items { + + if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn || + item.Operator == v3.FilterOperatorEqual) { + traceIdFilterUsed = true + // validate value + var err error + val := item.Value + val, err = utils.ValidateAndCastValue(val, item.Key.DataType) + if err != nil { + zap.L().Error("invalid value for key", zap.String("key", item.Key.Key), zap.Error(err)) + return false, []string{} + } + if val != nil { + fmtVal := extractFormattedStringValues(val) + traceIds = append(traceIds, fmtVal...) + } + } + } + } + + } + + zap.L().Debug("traceIds", zap.Any("traceIds", traceIds)) + return traceIdFilterUsed, traceIds +} + +func extractFormattedStringValues(v interface{}) []string { + // if it's pointer convert it to a value + v = getPointerValue(v) + + switch x := v.(type) { + case string: + return []string{x} + + case []interface{}: + if len(x) == 0 { + return []string{} + } + switch x[0].(type) { + case string: + values := []string{} + for _, val := range x { + values = append(values, val.(string)) + } + return values + default: + return []string{} + } + default: + return []string{} + } +} + +func getPointerValue(v interface{}) interface{} { + switch x := v.(type) { + case *uint8: + return *x + case *uint16: + return *x + case *uint32: + return *x + case *uint64: + return *x + case *int: + return *x + case *int8: + return *x + case *int16: + return *x + case *int32: + return *x + case *int64: + return *x + case *float32: + return *x + case *float64: + return *x + case *string: + return *x + case *bool: + return *x + case []interface{}: + values := []interface{}{} + for _, val := range x { + values = append(values, getPointerValue(val)) + } + return values + default: + return v + } +} + +func AddTimestampFilters(minTime int64, maxTime int64, params *v3.QueryRangeParamsV3) { + if minTime == 0 && maxTime == 0 { + return + } + + compositeQuery := params.CompositeQuery + if compositeQuery == nil { + return + } + // Build queries for each builder query and apply timestamp filter only if TraceID is present + for queryName, query := range compositeQuery.BuilderQueries { + if query.Expression != queryName && query.DataSource != v3.DataSourceTraces { + continue + } + + addTimeStampFilter := false + + // check filter attribute + if query.Filters != nil && len(query.Filters.Items) != 0 { + for _, item := range query.Filters.Items { + if item.Key.Key == "traceID" && (item.Operator == v3.FilterOperatorIn || + item.Operator == v3.FilterOperatorEqual) { + addTimeStampFilter = true + } + } + } + + // add timestamp filter to query only if traceID filter along with equal/similar operator is used + if addTimeStampFilter { + timeFilters := []v3.FilterItem{ + { + Key: v3.AttributeKey{ + Key: "timestamp", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Value: strconv.FormatUint(uint64(minTime), 10), + Operator: v3.FilterOperatorGreaterThanOrEq, + }, + { + Key: v3.AttributeKey{ + Key: "timestamp", + Type: v3.AttributeKeyTypeTag, + DataType: v3.AttributeKeyDataTypeString, + IsColumn: true, + }, + Value: strconv.FormatUint(uint64(maxTime), 10), + Operator: v3.FilterOperatorLessThanOrEq, + }, + } + + // add new timestamp filter to query + if query.Filters == nil { + query.Filters = &v3.FilterSet{ + Items: timeFilters, + } + } else { + query.Filters.Items = append(query.Filters.Items, timeFilters...) + } + } + } +} diff --git a/pkg/query-service/constants/constants.go b/pkg/query-service/constants/constants.go index dc52f6fd88..968f69b930 100644 --- a/pkg/query-service/constants/constants.go +++ b/pkg/query-service/constants/constants.go @@ -238,8 +238,8 @@ const ( SIGNOZ_EXP_HISTOGRAM_TABLENAME = "distributed_exp_hist" SIGNOZ_TRACE_DBNAME = "signoz_traces" SIGNOZ_SPAN_INDEX_TABLENAME = "distributed_signoz_index_v2" - SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" SIGNOZ_SPAN_INDEX_V3 = "distributed_signoz_index_v3" + SIGNOZ_SPAN_INDEX_LOCAL_TABLENAME = "signoz_index_v2" SIGNOZ_SPAN_INDEX_V3_LOCAL_TABLENAME = "signoz_index_v3" SIGNOZ_TIMESERIES_v4_LOCAL_TABLENAME = "time_series_v4" SIGNOZ_TIMESERIES_v4_6HRS_LOCAL_TABLENAME = "time_series_v4_6hrs" diff --git a/pkg/query-service/contextlinks/links.go b/pkg/query-service/contextlinks/links.go index 260745eda3..9e48dfb1a2 100644 --- a/pkg/query-service/contextlinks/links.go +++ b/pkg/query-service/contextlinks/links.go @@ -183,7 +183,7 @@ func PrepareFilters(labels map[string]string, whereClauseItems []v3.FilterItem, var attrFound bool // as of now this logic will only apply for logs - for _, tKey := range utils.GenerateLogEnrichmentKeys(v3.AttributeKey{Key: key}) { + for _, tKey := range utils.GenerateEnrichmentKeys(v3.AttributeKey{Key: key}) { if val, ok := keys[tKey]; ok { attributeKey = val attrFound = true diff --git a/pkg/query-service/main.go b/pkg/query-service/main.go index cd6a542220..1f1c54b961 100644 --- a/pkg/query-service/main.go +++ b/pkg/query-service/main.go @@ -39,6 +39,7 @@ func main() { var disableRules bool var useLogsNewSchema bool + var useTraceNewSchema bool // the url used to build link in the alert messages in slack and other systems var ruleRepoURL, cacheConfigPath, fluxInterval string var cluster string @@ -50,6 +51,7 @@ func main() { var dialTimeout time.Duration flag.BoolVar(&useLogsNewSchema, "use-logs-new-schema", false, "use logs_v2 schema for logs") + flag.BoolVar(&useTraceNewSchema, "use-trace-new-schema", false, "use new schema for traces") flag.StringVar(&promConfigPath, "config", "./config/prometheus.yml", "(prometheus config to read metrics)") flag.StringVar(&skipTopLvlOpsPath, "skip-top-level-ops", "", "(config file to skip top level operations)") flag.BoolVar(&disableRules, "rules.disable", false, "(disable rule evaluation)") @@ -87,6 +89,7 @@ func main() { FluxInterval: fluxInterval, Cluster: cluster, UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } // Read the jwt secret key diff --git a/pkg/query-service/model/response.go b/pkg/query-service/model/response.go index 61be36f170..259501f24f 100644 --- a/pkg/query-service/model/response.go +++ b/pkg/query-service/model/response.go @@ -269,6 +269,32 @@ type SearchSpanResponseItem struct { SpanKind string `json:"spanKind"` } +type SearchSpanResponseItemV2 struct { + TimeUnixNano time.Time `json:"timestamp" ch:"timestamp"` + DurationNano uint64 `json:"durationNano" ch:"durationNano"` + SpanID string `json:"spanId" ch:"spanID"` + TraceID string `json:"traceId" ch:"traceID"` + HasError bool `json:"hasError" ch:"hasError"` + Kind int8 `json:"kind" ch:"kind"` + ServiceName string `json:"serviceName" ch:"serviceName"` + Name string `json:"name" ch:"name"` + References string `json:"references,omitempty" ch:"references"` + Attributes_string map[string]string `json:"attributes_string" ch:"attributes_string"` + Attributes_number map[string]float64 `json:"attributes_number" ch:"attributes_number"` + Attributes_bool map[string]bool `json:"attributes_bool" ch:"attributes_bool"` + Events []string `json:"event" ch:"events"` + StatusMessage string `json:"statusMessage" ch:"statusMessage"` + StatusCodeString string `json:"statusCodeString" ch:"statusCodeString"` + SpanKind string `json:"spanKind" ch:"spanKind"` +} + +type TraceSummary struct { + TraceID string `json:"traceId" ch:"trace_id"` + Start time.Time `json:"start" ch:"start"` + End time.Time `json:"end" ch:"end"` + NumSpans uint64 `json:"numSpans" ch:"num_spans"` +} + type OtelSpanRef struct { TraceId string `json:"traceId,omitempty"` SpanId string `json:"spanId,omitempty"` diff --git a/pkg/query-service/rules/manager.go b/pkg/query-service/rules/manager.go index 50ad7b5430..84e055cfa0 100644 --- a/pkg/query-service/rules/manager.go +++ b/pkg/query-service/rules/manager.go @@ -35,7 +35,8 @@ type PrepareTaskOptions struct { ManagerOpts *ManagerOptions NotifyFunc NotifyFunc - UseLogsNewSchema bool + UseLogsNewSchema bool + UseTraceNewSchema bool } type PrepareTestRuleOptions struct { @@ -48,7 +49,8 @@ type PrepareTestRuleOptions struct { ManagerOpts *ManagerOptions NotifyFunc NotifyFunc - UseLogsNewSchema bool + UseLogsNewSchema bool + UseTraceNewSchema bool } const taskNamesuffix = "webAppEditor" @@ -91,9 +93,9 @@ type ManagerOptions struct { PrepareTaskFunc func(opts PrepareTaskOptions) (Task, error) + UseLogsNewSchema bool + UseTraceNewSchema bool PrepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) - - UseLogsNewSchema bool } // The Manager manages recording and alerting rules. @@ -117,7 +119,8 @@ type Manager struct { prepareTaskFunc func(opts PrepareTaskOptions) (Task, error) prepareTestRuleFunc func(opts PrepareTestRuleOptions) (int, *model.ApiError) - UseLogsNewSchema bool + UseLogsNewSchema bool + UseTraceNewSchema bool } func defaultOptions(o *ManagerOptions) *ManagerOptions { @@ -156,6 +159,7 @@ func defaultPrepareTaskFunc(opts PrepareTaskOptions) (Task, error) { opts.FF, opts.Reader, opts.UseLogsNewSchema, + opts.UseTraceNewSchema, WithEvalDelay(opts.ManagerOpts.EvalDelay), ) @@ -368,7 +372,8 @@ func (m *Manager) editTask(rule *PostableRule, taskName string) error { ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), - UseLogsNewSchema: m.opts.UseLogsNewSchema, + UseLogsNewSchema: m.opts.UseLogsNewSchema, + UseTraceNewSchema: m.opts.UseTraceNewSchema, }) if err != nil { @@ -490,7 +495,8 @@ func (m *Manager) addTask(rule *PostableRule, taskName string) error { ManagerOpts: m.opts, NotifyFunc: m.prepareNotifyFunc(), - UseLogsNewSchema: m.opts.UseLogsNewSchema, + UseLogsNewSchema: m.opts.UseLogsNewSchema, + UseTraceNewSchema: m.opts.UseTraceNewSchema, }) if err != nil { @@ -809,15 +815,16 @@ func (m *Manager) TestNotification(ctx context.Context, ruleStr string) (int, *m } alertCount, apiErr := m.prepareTestRuleFunc(PrepareTestRuleOptions{ - Rule: parsedRule, - RuleDB: m.ruleDB, - Logger: m.logger, - Reader: m.reader, - Cache: m.cache, - FF: m.featureFlags, - ManagerOpts: m.opts, - NotifyFunc: m.prepareNotifyFunc(), - UseLogsNewSchema: m.opts.UseLogsNewSchema, + Rule: parsedRule, + RuleDB: m.ruleDB, + Logger: m.logger, + Reader: m.reader, + Cache: m.cache, + FF: m.featureFlags, + ManagerOpts: m.opts, + NotifyFunc: m.prepareNotifyFunc(), + UseLogsNewSchema: m.opts.UseLogsNewSchema, + UseTraceNewSchema: m.opts.UseTraceNewSchema, }) return alertCount, apiErr diff --git a/pkg/query-service/rules/test_notification.go b/pkg/query-service/rules/test_notification.go index 37fb2e5f1b..e30b7db94f 100644 --- a/pkg/query-service/rules/test_notification.go +++ b/pkg/query-service/rules/test_notification.go @@ -49,6 +49,7 @@ func defaultTestNotification(opts PrepareTestRuleOptions) (int, *model.ApiError) opts.FF, opts.Reader, opts.UseLogsNewSchema, + opts.UseTraceNewSchema, WithSendAlways(), WithSendUnmatched(), ) diff --git a/pkg/query-service/rules/threshold_rule.go b/pkg/query-service/rules/threshold_rule.go index 3971597ec2..1c0d475983 100644 --- a/pkg/query-service/rules/threshold_rule.go +++ b/pkg/query-service/rules/threshold_rule.go @@ -58,6 +58,7 @@ func NewThresholdRule( featureFlags interfaces.FeatureLookup, reader interfaces.Reader, useLogsNewSchema bool, + useTraceNewSchema bool, opts ...RuleOption, ) (*ThresholdRule, error) { @@ -74,19 +75,21 @@ func NewThresholdRule( } querierOption := querier.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, - UseLogsNewSchema: useLogsNewSchema, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } querierOptsV2 := querierV2.QuerierOptions{ - Reader: reader, - Cache: nil, - KeyGenerator: queryBuilder.NewKeyGenerator(), - FeatureLookup: featureFlags, - UseLogsNewSchema: useLogsNewSchema, + Reader: reader, + Cache: nil, + KeyGenerator: queryBuilder.NewKeyGenerator(), + FeatureLookup: featureFlags, + UseLogsNewSchema: useLogsNewSchema, + UseTraceNewSchema: useTraceNewSchema, } t.querier = querier.NewQuerier(querierOption) diff --git a/pkg/query-service/rules/threshold_rule_test.go b/pkg/query-service/rules/threshold_rule_test.go index e75c82b1a0..30daf9de56 100644 --- a/pkg/query-service/rules/threshold_rule_test.go +++ b/pkg/query-service/rules/threshold_rule_test.go @@ -791,7 +791,7 @@ func TestThresholdRuleShouldAlert(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -880,7 +880,7 @@ func TestPrepareLinksToLogs(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -922,7 +922,7 @@ func TestPrepareLinksToTraces(t *testing.T) { } fm := featureManager.StartManager() - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -998,7 +998,7 @@ func TestThresholdRuleLabelNormalization(t *testing.T) { postableRule.RuleCondition.MatchType = MatchType(c.matchType) postableRule.RuleCondition.Target = &c.target - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1051,7 +1051,7 @@ func TestThresholdRuleEvalDelay(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true) // no eval delay + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true) // no eval delay if err != nil { assert.NoError(t, err) } @@ -1100,7 +1100,7 @@ func TestThresholdRuleClickHouseTmpl(t *testing.T) { fm := featureManager.StartManager() for idx, c := range cases { - rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, WithEvalDelay(2*time.Minute)) + rule, err := NewThresholdRule("69", &postableRule, fm, nil, true, true, WithEvalDelay(2*time.Minute)) if err != nil { assert.NoError(t, err) } @@ -1241,9 +1241,9 @@ func TestThresholdRuleUnitCombinations(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1340,9 +1340,9 @@ func TestThresholdRuleNoData(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1445,9 +1445,9 @@ func TestThresholdRuleTracesLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1570,9 +1570,9 @@ func TestThresholdRuleLogsLink(t *testing.T) { } options := clickhouseReader.NewOptions("", 0, 0, 0, "", "archiveNamespace") - reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true) + reader := clickhouseReader.NewReaderFromClickhouseConnection(mock, options, nil, "", fm, "", true, true) - rule, err := NewThresholdRule("69", &postableRule, fm, reader, true) + rule, err := NewThresholdRule("69", &postableRule, fm, reader, true, true) rule.TemporalityMap = map[string]map[v3.Temporality]bool{ "signoz_calls_total": { v3.Delta: true, @@ -1648,7 +1648,7 @@ func TestThresholdRuleShiftBy(t *testing.T) { }, } - rule, err := NewThresholdRule("69", &postableRule, nil, nil, true) + rule, err := NewThresholdRule("69", &postableRule, nil, nil, true, true) if err != nil { assert.NoError(t, err) } diff --git a/pkg/query-service/tests/integration/test_utils.go b/pkg/query-service/tests/integration/test_utils.go index d060433dba..c1c71808a9 100644 --- a/pkg/query-service/tests/integration/test_utils.go +++ b/pkg/query-service/tests/integration/test_utils.go @@ -46,6 +46,7 @@ func NewMockClickhouseReader( featureFlags, "", true, + true, ) return reader, mockDB diff --git a/pkg/query-service/utils/logs.go b/pkg/query-service/utils/logs.go index 8efa026b52..3dc4a50f48 100644 --- a/pkg/query-service/utils/logs.go +++ b/pkg/query-service/utils/logs.go @@ -9,7 +9,7 @@ type LogsListTsRange struct { End int64 } -func GetLogsListTsRanges(start, end int64) []LogsListTsRange { +func GetListTsRanges(start, end int64) []LogsListTsRange { startNano := GetEpochNanoSecs(start) endNano := GetEpochNanoSecs(end) result := []LogsListTsRange{} @@ -35,13 +35,15 @@ func GetLogsListTsRanges(start, end int64) []LogsListTsRange { tStartNano = startNano } } + } else { + result = append(result, LogsListTsRange{Start: startNano, End: endNano}) } return result } // This tries to see all possible fields that it can fall back to if some meta is missing -// check Test_GenerateLogEnrichmentKeys for example -func GenerateLogEnrichmentKeys(field v3.AttributeKey) []string { +// check Test_GenerateEnrichmentKeys for example +func GenerateEnrichmentKeys(field v3.AttributeKey) []string { names := []string{} if field.Type != v3.AttributeKeyTypeUnspecified && field.DataType != v3.AttributeKeyDataTypeUnspecified { names = append(names, field.Key+"##"+field.Type.String()+"##"+field.DataType.String()) diff --git a/pkg/query-service/utils/logs_test.go b/pkg/query-service/utils/logs_test.go index e1efd813d1..dc0c9663fa 100644 --- a/pkg/query-service/utils/logs_test.go +++ b/pkg/query-service/utils/logs_test.go @@ -7,7 +7,7 @@ import ( v3 "go.signoz.io/signoz/pkg/query-service/model/v3" ) -func TestLogsListTsRange(t *testing.T) { +func TestListTsRange(t *testing.T) { startEndData := []struct { name string start int64 @@ -18,7 +18,9 @@ func TestLogsListTsRange(t *testing.T) { name: "testing for less then one hour", start: 1722262800000000000, // July 29, 2024 7:50:00 PM end: 1722263800000000000, // July 29, 2024 8:06:40 PM - res: []LogsListTsRange{}, + res: []LogsListTsRange{ + {1722262800000000000, 1722263800000000000}, + }, }, { name: "testing for more than one hour", @@ -44,7 +46,7 @@ func TestLogsListTsRange(t *testing.T) { } for _, test := range startEndData { - res := GetLogsListTsRanges(test.start, test.end) + res := GetListTsRanges(test.start, test.end) for i, v := range res { if test.res[i].Start != v.Start || test.res[i].End != v.End { t.Errorf("expected range was %v - %v, got %v - %v", v.Start, v.End, test.res[i].Start, test.res[i].End) @@ -53,7 +55,7 @@ func TestLogsListTsRange(t *testing.T) { } } -func Test_GenerateLogEnrichmentKeys(t *testing.T) { +func Test_GenerateEnrichmentKeys(t *testing.T) { type args struct { field v3.AttributeKey } @@ -96,8 +98,8 @@ func Test_GenerateLogEnrichmentKeys(t *testing.T) { } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - if got := GenerateLogEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) { - t.Errorf("generateLogEnrichmentKeys() = %v, want %v", got, tt.want) + if got := GenerateEnrichmentKeys(tt.args.field); !reflect.DeepEqual(got, tt.want) { + t.Errorf("generateEnrichmentKeys() = %v, want %v", got, tt.want) } }) }