Skip to content

Commit 52a3f16

Browse files
authored
Flag categorize labels on streams response (#10419)
We recently introduced support for ingesting and querying structured metadata in Loki. This adds a new dimension to Loki's labels since now we arguably have three categories of labels: _stream_, _structured metadata_, and _parsed_ labels. Depending on the origin of the labels, they should be used in LogQL expressions differently to achieve optimal performance. _stream_ labels should be added to stream matchers, _structured metadata_ labels should be used in a filter expression before any parsing expression, and _parsed_ labels should be placed after the parser expression extracting them. The Grafana UI has a hard time dealing with this same problem. Before grafana/grafana#73955, the filtering functionality in Grafana was broken since it was not able to distinguish between _stream_ and _structured metadata_ labels. Also, as soon as a parser expression was added to the query, filters added by Grafana would be appended to the end of the query regardless of the label category. The PR above implements a workaround for this problem but needs a better API on Loki's end to mitigate all corner cases. Loki currently returns the following JSON for log queries: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "info", "traceID": "68810cf0c94bfcca" }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n" ], ... }, { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "debug", "traceID": "a7116cj54c4bjz8s" }, "values": [ [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n" ], ... }, ... ``` As can be seen, there is no way we can distinguish the category of each label. This PR introduces a new flag `X-Loki-Response-Encoding-Flags: categorize-labels` that makes Loki return categorized labels as follows: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n", { "structuredMetadata": { "traceID": "68810cf0c94bfcca" }, "parsed": { "level": "info" } } ], [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n", { "structuredMetadata": { "traceID": "a7116cj54c4bjz8s" }, "parsed": { "level": "debug" } } ], ... }, ... ``` Note that this PR only supports log queries, not metric queries. From a UX perspective, being able to categorize labels in metric queries doesn't have any benefit yet. Having said that, supporting this for metric queries would require some minor refactoring on top of what has been implemented here. If we decide to do that, I think we should do it on a separate PR to avoid making this PR even larger. I also decided to leave out support for Tail queries to avoid making this PR even larger. Once this one gets merged, we can work to support tailing. --- **Note to reviewers** This PR is huge since we need to forward categorized all over the codebase (from parsing logs all the way to marshaling), fortunately, many of the changes come from updating tests and refactoring iterators. Tested out in a dev cell with query `'{stream="stdout"} | label_format new="text"`. - Without the new flag: ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED { "data": { "result": [ { "stream": { "new": "text", "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n" ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n" ], ... ``` - With the new flag ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED X-Loki-Response-Encoding-Flags:categorize-labels { "data": { "encodingFlags": [ "categorize-labels" ], "result": [ { "stream": { "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n", { "parsed": { "new": "text" } } ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n", { "parsed": { "new": "text" } } ], ... ```
1 parent 60ea954 commit 52a3f16

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2190
-813
lines changed

go.mod

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ require (
123123
github.com/efficientgo/core v1.0.0-rc.2
124124
github.com/fsnotify/fsnotify v1.6.0
125125
github.com/gogo/googleapis v1.4.0
126-
github.com/grafana/loki/pkg/push v0.0.0-20231017172654-cfc4f0e84adc
126+
github.com/grafana/loki/pkg/push v0.0.0-20231023154132-0a7737e7c7eb
127127
github.com/heroku/x v0.0.61
128128
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
129129
github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.86.0

integration/client/client.go

+55-14
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"strings"
1414
"time"
1515

16+
"github.com/buger/jsonparser"
1617
"github.com/grafana/dskit/user"
1718
"github.com/prometheus/prometheus/model/labels"
1819
"go.opentelemetry.io/collector/pdata/pcommon"
@@ -335,10 +336,40 @@ func (c *Client) GetDeleteRequests() (DeleteRequests, error) {
335336
return deleteReqs, nil
336337
}
337338

339+
type Entry []string
340+
341+
func (e *Entry) UnmarshalJSON(data []byte) error {
342+
if *e == nil {
343+
*e = make([]string, 0, 3)
344+
}
345+
346+
var parseError error
347+
_, err := jsonparser.ArrayEach(data, func(value []byte, t jsonparser.ValueType, _ int, _ error) {
348+
// The TS and the lines are strings. The labels are a JSON object.
349+
// but we will parse them as strings.
350+
if t != jsonparser.String && t != jsonparser.Object {
351+
parseError = jsonparser.MalformedStringError
352+
return
353+
}
354+
355+
v, err := jsonparser.ParseString(value)
356+
if err != nil {
357+
parseError = err
358+
return
359+
}
360+
*e = append(*e, v)
361+
})
362+
363+
if parseError != nil {
364+
return parseError
365+
}
366+
return err
367+
}
368+
338369
// StreamValues holds a label key value pairs for the Stream and a list of a list of values
339370
type StreamValues struct {
340371
Stream map[string]string
341-
Values [][]string
372+
Values []Entry
342373
}
343374

344375
// MatrixValues holds a label key value pairs for the metric and a list of a list of values
@@ -377,17 +408,19 @@ func (a *VectorValues) UnmarshalJSON(b []byte) error {
377408

378409
// DataType holds the result type and a list of StreamValues
379410
type DataType struct {
380-
ResultType string
381-
Stream []StreamValues
382-
Matrix []MatrixValues
383-
Vector []VectorValues
411+
ResultType string
412+
Stream []StreamValues
413+
Matrix []MatrixValues
414+
Vector []VectorValues
415+
EncodingFlags []string
384416
}
385417

386418
func (a *DataType) UnmarshalJSON(b []byte) error {
387419
// get the result type
388420
var s struct {
389-
ResultType string `json:"resultType"`
390-
Result json.RawMessage `json:"result"`
421+
ResultType string `json:"resultType"`
422+
EncodingFlags []string `json:"encodingFlags"`
423+
Result json.RawMessage `json:"result"`
391424
}
392425
if err := json.Unmarshal(b, &s); err != nil {
393426
return err
@@ -410,6 +443,7 @@ func (a *DataType) UnmarshalJSON(b []byte) error {
410443
return fmt.Errorf("unknown result type %s", s.ResultType)
411444
}
412445
a.ResultType = s.ResultType
446+
a.EncodingFlags = s.EncodingFlags
413447
return nil
414448
}
415449

@@ -434,12 +468,16 @@ type Rules struct {
434468
Rules []interface{}
435469
}
436470

471+
type Header struct {
472+
Name, Value string
473+
}
474+
437475
// RunRangeQuery runs a query and returns an error if anything went wrong
438-
func (c *Client) RunRangeQuery(ctx context.Context, query string) (*Response, error) {
476+
func (c *Client) RunRangeQuery(ctx context.Context, query string, extraHeaders ...Header) (*Response, error) {
439477
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
440478
defer cancelFunc()
441479

442-
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query))
480+
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query), extraHeaders...)
443481
if err != nil {
444482
return nil, err
445483
}
@@ -448,7 +486,7 @@ func (c *Client) RunRangeQuery(ctx context.Context, query string) (*Response, er
448486
}
449487

450488
// RunQuery runs a query and returns an error if anything went wrong
451-
func (c *Client) RunQuery(ctx context.Context, query string) (*Response, error) {
489+
func (c *Client) RunQuery(ctx context.Context, query string, extraHeaders ...Header) (*Response, error) {
452490
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
453491
defer cancelFunc()
454492

@@ -463,7 +501,7 @@ func (c *Client) RunQuery(ctx context.Context, query string) (*Response, error)
463501
u.Path = "/loki/api/v1/query"
464502
u.RawQuery = v.Encode()
465503

466-
buf, statusCode, err := c.run(ctx, u.String())
504+
buf, statusCode, err := c.run(ctx, u.String(), extraHeaders...)
467505
if err != nil {
468506
return nil, err
469507
}
@@ -617,18 +655,21 @@ func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]strin
617655
return values.Data, nil
618656
}
619657

620-
func (c *Client) request(ctx context.Context, method string, url string) (*http.Request, error) {
658+
func (c *Client) request(ctx context.Context, method string, url string, extraHeaders ...Header) (*http.Request, error) {
621659
ctx = user.InjectOrgID(ctx, c.instanceID)
622660
req, err := http.NewRequestWithContext(ctx, method, url, nil)
623661
if err != nil {
624662
return nil, err
625663
}
626664
req.Header.Set("X-Scope-OrgID", c.instanceID)
665+
for _, h := range extraHeaders {
666+
req.Header.Add(h.Name, h.Value)
667+
}
627668
return req, nil
628669
}
629670

630-
func (c *Client) run(ctx context.Context, u string) ([]byte, int, error) {
631-
req, err := c.request(ctx, "GET", u)
671+
func (c *Client) run(ctx context.Context, u string, extraHeaders ...Header) ([]byte, int, error) {
672+
req, err := c.request(ctx, "GET", u, extraHeaders...)
632673
if err != nil {
633674
return nil, 0, err
634675
}

integration/loki_micro_services_delete_test.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -408,7 +408,7 @@ func getMetricValue(t *testing.T, metricName, metrics string) float64 {
408408
}
409409

410410
func pushRequestToClientStreamValues(t *testing.T, p pushRequest) []client.StreamValues {
411-
logsByStream := map[string][][]string{}
411+
logsByStream := map[string][]client.Entry{}
412412
for _, entry := range p.entries {
413413
lb := labels.NewBuilder(labels.FromMap(p.stream))
414414
for _, l := range entry.StructuredMetadata {

0 commit comments

Comments
 (0)