Skip to content

Commit d6819ae

Browse files
authored
Implement GetLogs queries without grouping and filter columns (#44)
### TL;DR Updated API response structure, improved query filtering, and added aggregation support for logs. ### What changed? - Modified `QueryResponse` structure in `api.go` to use `interface{}` for `Data` field and `map[string]string` for `Aggregations`. - Updated `ParseQueryParams` function to handle `event_name` filter. - Changed `GroupBy` parameter in `QueryFilter` to accept an array of strings. - Refactored `GetLogs` function in `clickhouse.go` to support advanced filtering, sorting, and aggregations. - Updated `IMainStorage` interface to return `QueryResult` instead of `[]common.Log` for `GetLogs`. - Added debug binary files to `.gitignore`. ### How to test? 1. Run the API and make requests to the logs endpoint with various query parameters. 2. Test filtering using `filter_event_name` and other filter parameters. 3. Verify that grouping, sorting, and pagination work as expected. 4. Check if aggregations are correctly calculated and returned in the response. ### Why make this change? These changes improve the flexibility and functionality of the API, allowing for more complex queries and data analysis. The updated structure supports better filtering, aggregation, and response formatting, which enhances the overall usability of the API for clients.
1 parent f7f11bc commit d6819ae

File tree

6 files changed

+113
-27
lines changed

6 files changed

+113
-27
lines changed

.gitignore

+3-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ go.work.sum
2424
# env file
2525
.env
2626

27+
# dev files
28+
cmd/api/__debug_bin*
2729
.vscode
2830

2931
configs/secrets*
30-
!configs/secrets.example.yml
32+
!configs/secrets.example.yml

api/api.go

+9-4
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,9 @@ type Meta struct {
3939
}
4040

4141
type QueryResponse struct {
42-
Meta Meta `json:"meta"`
43-
Data []interface{} `json:"data"`
44-
Aggregations map[string]interface{} `json:"aggregations,omitempty"`
42+
Meta Meta `json:"meta"`
43+
Data interface{} `json:"data,omitempty"`
44+
Aggregations map[string]string `json:"aggregations,omitempty"`
4545
}
4646

4747
func writeError(w http.ResponseWriter, message string, code int) {
@@ -75,7 +75,12 @@ func ParseQueryParams(r *http.Request) (QueryParams, error) {
7575
params.FilterParams = make(map[string]string)
7676
for key, values := range rawQueryParams {
7777
if strings.HasPrefix(key, "filter_") {
78-
params.FilterParams[key] = values[0]
78+
// TODO: tmp hack remove it once we implement filtering with operators
79+
strippedKey := strings.Replace(key, "filter_", "", 1)
80+
if strippedKey == "event_name" {
81+
strippedKey = "data"
82+
}
83+
params.FilterParams[strippedKey] = values[0]
7984
delete(rawQueryParams, key)
8085
}
8186
}

internal/handlers/logs_handlers.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func handleLogsRequest(w http.ResponseWriter, r *http.Request, contractAddress,
5656

5757
logs, err := mainStorage.GetLogs(storage.QueryFilter{
5858
FilterParams: queryParams.FilterParams,
59-
GroupBy: queryParams.GroupBy,
59+
GroupBy: []string{queryParams.GroupBy},
6060
SortBy: queryParams.SortBy,
6161
SortOrder: queryParams.SortOrder,
6262
Page: queryParams.Page,
@@ -81,10 +81,8 @@ func handleLogsRequest(w http.ResponseWriter, r *http.Request, contractAddress,
8181
TotalItems: 0, // TODO: Implement total items count
8282
TotalPages: 0, // TODO: Implement total pages count
8383
},
84-
Data: []interface{}{logs},
85-
Aggregations: map[string]interface{}{
86-
"aggregates": queryParams.Aggregates,
87-
},
84+
Data: logs.Data,
85+
Aggregations: logs.Aggregates,
8886
}
8987

9088
sendJSONResponse(w, response)

internal/handlers/transactions_handlers.go

+3-5
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ func handleTransactionsRequest(w http.ResponseWriter, r *http.Request, contractA
4646

4747
transactions, err := mainStorage.GetTransactions(storage.QueryFilter{
4848
FilterParams: queryParams.FilterParams,
49-
GroupBy: queryParams.GroupBy,
49+
GroupBy: []string{queryParams.GroupBy},
5050
SortBy: queryParams.SortBy,
5151
SortOrder: queryParams.SortOrder,
5252
Page: queryParams.Page,
@@ -71,10 +71,8 @@ func handleTransactionsRequest(w http.ResponseWriter, r *http.Request, contractA
7171
TotalItems: 0, // TODO: Implement total items count
7272
TotalPages: 0, // TODO: Implement total pages count
7373
},
74-
Data: []interface{}{transactions},
75-
Aggregations: map[string]interface{}{
76-
"aggregates": queryParams.Aggregates,
77-
},
74+
Data: transactions,
75+
// Aggregations: queryParams.Aggregates,
7876
}
7977

8078
sendJSONResponse(w, response)

internal/storage/clickhouse.go

+89-9
Original file line numberDiff line numberDiff line change
@@ -312,17 +312,53 @@ func (c *ClickHouseConnector) GetTransactions(qf QueryFilter) (txs []common.Tran
312312
return txs, nil
313313
}
314314

315-
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (logs []common.Log, err error) {
315+
func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (QueryResult, error) {
316316
columns := "chain_id, block_number, block_hash, block_timestamp, transaction_hash, transaction_index, log_index, address, data, topic_0, topic_1, topic_2, topic_3"
317-
query := fmt.Sprintf("SELECT %s FROM %s.logs FINAL WHERE is_deleted = 0%s",
318-
columns, c.cfg.Database, getLimitClause(int(qf.Limit)))
319-
// TODO: add aggregations, filters and grouping
317+
318+
// Start building the query
319+
query := fmt.Sprintf("SELECT %s FROM %s.logs FINAL WHERE is_deleted = 0", columns, c.cfg.Database)
320+
321+
// Add filters
322+
if qf.ContractAddress != "" {
323+
query += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress)
324+
}
325+
if qf.FunctionSig != "" {
326+
query += fmt.Sprintf(" AND topic_0 = '%s'", qf.FunctionSig)
327+
}
328+
if len(qf.FilterParams) > 0 {
329+
for key, value := range qf.FilterParams {
330+
query += fmt.Sprintf(" AND %s = '%s'", key, value)
331+
}
332+
}
333+
334+
// Add grouping (TODO: fix it by adding all needed columns)
335+
// if len(qf.GroupBy) > 0 {
336+
// query += " GROUP BY " + strings.Join(qf.GroupBy, ", ")
337+
// }
338+
339+
// Add sorting
340+
if qf.SortBy != "" {
341+
query += fmt.Sprintf(" ORDER BY %s %s", qf.SortBy, qf.SortOrder)
342+
}
343+
344+
// Add pagination
345+
if qf.Page > 0 && qf.Limit > 0 {
346+
offset := (qf.Page - 1) * qf.Limit
347+
query += fmt.Sprintf(" LIMIT %d OFFSET %d", qf.Limit, offset)
348+
} else {
349+
query += getLimitClause(int(qf.Limit))
350+
}
351+
352+
// Execute the query
320353
rows, err := c.conn.Query(context.Background(), query)
321354
if err != nil {
322-
return nil, err
355+
return QueryResult{Data: []common.Log{}, Aggregates: map[string]string{}}, err
323356
}
324357
defer rows.Close()
325-
358+
queryResult := QueryResult{
359+
Data: []common.Log{},
360+
Aggregates: map[string]string{},
361+
}
326362
for rows.Next() {
327363
var log common.Log
328364
var timestamp uint64
@@ -344,17 +380,61 @@ func (c *ClickHouseConnector) GetLogs(qf QueryFilter) (logs []common.Log, err er
344380
)
345381
if err != nil {
346382
zLog.Error().Err(err).Msg("Error scanning log")
347-
return nil, err
383+
return QueryResult{Data: []common.Log{}}, err
348384
}
349385
log.BlockTimestamp = time.Unix(int64(timestamp), 0)
350386
for _, topic := range topics {
351387
if topic != "" {
352388
log.Topics = append(log.Topics, topic)
353389
}
354390
}
355-
logs = append(logs, log)
391+
queryResult.Data = append(queryResult.Data, log)
356392
}
357-
return logs, nil
393+
394+
// Handle aggregations
395+
if len(qf.Aggregates) > 0 {
396+
aggregateQuery := "SELECT "
397+
for i, agg := range qf.Aggregates {
398+
if i > 0 {
399+
aggregateQuery += ", "
400+
}
401+
aggregateQuery += agg
402+
}
403+
aggregateQuery += " FROM " + c.cfg.Database + ".logs FINAL WHERE is_deleted = 0"
404+
405+
// Add the same filters as the main query
406+
if qf.ContractAddress != "" {
407+
aggregateQuery += fmt.Sprintf(" AND address = '%s'", qf.ContractAddress)
408+
}
409+
if qf.FunctionSig != "" {
410+
aggregateQuery += fmt.Sprintf(" AND topic_0 = '%s'", qf.FunctionSig)
411+
}
412+
if len(qf.FilterParams) > 0 {
413+
for key, value := range qf.FilterParams {
414+
// TODO: add parsing for filters to get the column name and operator
415+
aggregateQuery += fmt.Sprintf(" AND %s = '%s'", key, value)
416+
}
417+
}
418+
419+
row := c.conn.QueryRow(context.Background(), aggregateQuery)
420+
// Marshal the result to JSON
421+
// TODO: needs a typing scan
422+
aggregateResultsJSON, err := json.Marshal(row)
423+
if err != nil {
424+
zLog.Error().Err(err).Msg("Error marshaling aggregate results to JSON")
425+
return queryResult, err
426+
}
427+
428+
var aggregateResultsMap map[string]string
429+
err = json.Unmarshal(aggregateResultsJSON, &aggregateResultsMap)
430+
if err != nil {
431+
zLog.Error().Err(err).Msg("Error unmarshaling aggregate results JSON to map")
432+
return queryResult, err
433+
}
434+
queryResult.Aggregates = aggregateResultsMap
435+
}
436+
437+
return queryResult, nil
358438
}
359439

360440
func (c *ClickHouseConnector) GetMaxBlockNumber() (maxBlockNumber *big.Int, err error) {

internal/storage/connector.go

+6-3
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ import (
1111
type QueryFilter struct {
1212
BlockNumbers []*big.Int
1313
FilterParams map[string]string
14-
GroupBy string
14+
GroupBy []string
1515
SortBy string
1616
SortOrder string
1717
Page int
@@ -20,7 +20,10 @@ type QueryFilter struct {
2020
ContractAddress string
2121
FunctionSig string
2222
}
23-
23+
type QueryResult struct {
24+
Data []common.Log `json:"data"`
25+
Aggregates map[string]string `json:"aggregates"`
26+
}
2427
type IStorage struct {
2528
OrchestratorStorage IOrchestratorStorage
2629
MainStorage IMainStorage
@@ -50,7 +53,7 @@ type IMainStorage interface {
5053

5154
GetBlocks(qf QueryFilter) (logs []common.Block, err error)
5255
GetTransactions(qf QueryFilter) (logs []common.Transaction, err error)
53-
GetLogs(qf QueryFilter) (logs []common.Log, err error)
56+
GetLogs(qf QueryFilter) (logs QueryResult, err error)
5457
GetTraces(qf QueryFilter) (traces []common.Trace, err error)
5558
GetMaxBlockNumber() (maxBlockNumber *big.Int, err error)
5659
}

0 commit comments

Comments
 (0)