|
1 | | -#!/usr/bin/env python3 |
2 | | -import requests |
| 1 | +import json |
| 2 | +import subprocess |
3 | 3 |
|
4 | 4 |
|
5 | 5 | def check_sent_events(): |
6 | | - response = requests.post( |
7 | | - "http://airflow-vector-aggregator:8686/graphql", |
8 | | - json={ |
9 | | - "query": """ |
10 | | - { |
11 | | - transforms(first:100) { |
12 | | - nodes { |
13 | | - componentId |
14 | | - metrics { |
15 | | - sentEventsTotal { |
16 | | - sentEventsTotal |
17 | | - } |
18 | | - } |
19 | | - } |
20 | | - } |
21 | | - } |
22 | | - """ |
23 | | - }, |
| 6 | + response = subprocess.run( |
| 7 | + [ |
| 8 | + "grpcurl", |
| 9 | + "-plaintext", |
| 10 | + "-d", |
| 11 | + '{"limit": 100}', |
| 12 | + "airflow-vector-aggregator:8686", |
| 13 | + "vector.observability.v1.ObservabilityService/GetComponents", |
| 14 | + ], |
| 15 | + capture_output=True, |
| 16 | + text=True, |
| 17 | + check=True, # Raise a CalledProcessError if non-zero return |
| 18 | + timeout=20, # seconds |
24 | 19 | ) |
| 20 | + result = json.loads(response.stdout) |
| 21 | + components = result.get("components", []) |
| 22 | + transforms = [ |
| 23 | + c for c in components if c.get("componentType") == "COMPONENT_TYPE_TRANSFORM" |
| 24 | + ] |
25 | 25 |
|
26 | | - assert response.status_code == 200, ( |
27 | | - "Cannot access the API of the vector aggregator." |
28 | | - ) |
29 | | - |
30 | | - result = response.json() |
| 26 | + assert len(transforms) > 0, "No transform components found" |
31 | 27 |
|
32 | | - transforms = result["data"]["transforms"]["nodes"] |
33 | 28 | for transform in transforms: |
34 | 29 | sentEvents = transform["metrics"]["sentEventsTotal"] |
35 | 30 | componentId = transform["componentId"] |
36 | 31 |
|
37 | 32 | if componentId == "filteredInvalidEvents": |
38 | | - assert sentEvents is None or sentEvents["sentEventsTotal"] == 0, ( |
| 33 | + assert sentEvents is None or int(sentEvents) == 0, ( |
39 | 34 | "Invalid log events were sent." |
40 | 35 | ) |
41 | 36 | else: |
42 | | - assert sentEvents is not None and sentEvents["sentEventsTotal"] > 0, ( |
| 37 | + assert sentEvents is not None and int(sentEvents) > 0, ( |
43 | 38 | f'No events were sent in "{componentId}".' |
44 | 39 | ) |
45 | 40 |
|
|
0 commit comments