From 0df2b97bb651e94a3c2682e18dc726983b3aea3c Mon Sep 17 00:00:00 2001 From: aagbsn Date: Wed, 12 Nov 2025 11:06:03 +0100 Subject: [PATCH 1/8] in test_settings, add clickhouse_server fixture (#1028) * in test_settings, add clickhouse_server fixture * delay and decrease probe frequency of is_clickhouse_running to reduce logspam * remove docker_ip and docker_services included by clickhouse_server --- ooniapi/services/oonimeasurements/tests/conftest.py | 4 +++- ooniapi/services/ooniprobe/tests/conftest.py | 10 ++++++---- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/ooniapi/services/oonimeasurements/tests/conftest.py b/ooniapi/services/oonimeasurements/tests/conftest.py index b85c4371..5c553e46 100644 --- a/ooniapi/services/oonimeasurements/tests/conftest.py +++ b/ooniapi/services/oonimeasurements/tests/conftest.py @@ -40,6 +40,8 @@ def maybe_download_fixtures(): def is_clickhouse_running(url): + # using ClickhouseClient as probe spams WARN messages with logger in clickhouse_driver + time.sleep(2) try: with ClickhouseClient.from_url(url) as client: client.execute("SELECT 1") @@ -53,7 +55,7 @@ def clickhouse_server(maybe_download_fixtures, docker_ip, docker_services): port = docker_services.port_for("clickhouse", 9000) url = "clickhouse://test:test@{}:{}".format(docker_ip, port) docker_services.wait_until_responsive( - timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) + timeout=30.0, pause=1.0, check=lambda: is_clickhouse_running(url) ) yield url diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index f1543893..efac76ff 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -4,6 +4,7 @@ import pytest import shutil import os +import time from urllib.request import urlopen from fastapi.testclient import TestClient @@ -107,13 +108,12 @@ def client(clickhouse_server, test_settings, geoip_db_dir): @pytest.fixture -def test_settings(alembic_migration, docker_ip, docker_services, geoip_db_dir, fastpath_server): - port = docker_services.port_for("clickhouse", 9000) +def test_settings(alembic_migration, geoip_db_dir, clickhouse_server, fastpath_server): yield make_override_get_settings( postgresql_url=alembic_migration, jwt_encryption_key=JWT_ENCRYPTION_KEY, prometheus_metrics_password="super_secure", - clickhouse_url=f"clickhouse://test:test@{docker_ip}:{port}", + clickhouse_url=clickhouse_server, geoip_db_dir=geoip_db_dir, collector_id="1", fastpath_url=fastpath_server @@ -126,6 +126,8 @@ def jwt_encryption_key(): def is_clickhouse_running(url): + # using ClickhouseClient as probe spams WARN messages with logger in clickhouse_driver + time.sleep(2) try: with ClickhouseClient.from_url(url) as client: client.execute("SELECT 1") @@ -140,7 +142,7 @@ def clickhouse_server(docker_ip, docker_services): # See password in docker compose url = "clickhouse://test:test@{}:{}".format(docker_ip, port) docker_services.wait_until_responsive( - timeout=30.0, pause=0.1, check=lambda: is_clickhouse_running(url) + timeout=30.0, pause=1.0, check=lambda: is_clickhouse_running(url) ) yield url From 921fb7cc597a52b94d334f5f332345df51b41aa7 Mon Sep 17 00:00:00 2001 From: aagbsn Date: Wed, 12 Nov 2025 11:11:25 +0100 Subject: [PATCH 2/8] add collectors to ooniapi (#1032) * add collectors to ooniapi * Make method "GET" on route for list_collectors * Use SettingsDep from ooniprobe.dependencies --- ooniapi/common/src/common/config.py | 20 +++++++++++++++- .../ooniprobe/routers/v1/probe_services.py | 23 ++++++++++++++++--- .../tests/integ/test_list_collectors.py | 5 ++++ 3 files changed, 44 insertions(+), 4 deletions(-) create mode 100644 ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py diff --git a/ooniapi/common/src/common/config.py b/ooniapi/common/src/common/config.py index 803493c3..6512af33 100644 --- a/ooniapi/common/src/common/config.py +++ b/ooniapi/common/src/common/config.py @@ -1,4 +1,4 @@ -from typing import List +from typing import List, Dict from pydantic_settings import BaseSettings @@ -41,3 +41,21 @@ class Settings(BaseSettings): failed_reports_bucket: str = ( "" # for uploading reports that couldn't be sent to fastpath ) + + # ooniprobe client configuration + collectors: List[Dict[str, str]] = [ + {"address": "httpo://guegdifjy7bjpequ.onion", "type": "onion"}, + {"address": "https://ams-pg.ooni.org:443", "type": "https"}, + { + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + }, + {"address": "httpo://guegdifjy7bjpequ.onion", "type": "onion"}, + {"address": "https://ams-pg.ooni.org:443", "type": "https"}, + { + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + }, + ] diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 8d4bc2e6..1ce84572 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -17,7 +17,6 @@ lookup_probe_network, ) from ...dependencies import CCReaderDep, ASNReaderDep, ClickhouseDep, SettingsDep -from ...common.dependencies import get_settings from ...common.routers import BaseModel from ...common.auth import create_jwt, decode_jwt, jwt from ...common.config import Settings @@ -81,7 +80,7 @@ class ProbeLoginResponse(BaseModel): def probe_login_post( probe_login: ProbeLogin, response: Response, - settings: Settings = Depends(get_settings), + settings: SettingsDep, ) -> ProbeLoginResponse: if probe_login.username is None or probe_login.password is None: @@ -151,7 +150,7 @@ class ProbeRegisterResponse(BaseModel): def probe_register_post( probe_register: ProbeRegister, response: Response, - settings: Settings = Depends(get_settings), + settings: SettingsDep, ) -> ProbeRegisterResponse: """Probe Services: Register @@ -590,3 +589,21 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]: for th_addr in th_list: out.append({"address": th_addr, "type": "https"}) return out + + +class CollectorEntry(BaseModel): + # not actually used but necessary to be compliant with the old API schema + address: str = Field(description="Address of collector") + front: Optional[str] = Field(default=None, description="Fronted domain") + type: Optional[str] = Field(default=None, description="Type of collector") + +@router.get("/collectors", tags=["ooniprobe"]) +def list_collectors( + settings: SettingsDep, + ) -> List[CollectorEntry]: + config_collectors = settings.collectors + collectors_response = [] + for entry in config_collectors: + collector = CollectorEntry(**entry) + collectors_response.append(collector) + return collectors_response diff --git a/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py b/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py new file mode 100644 index 00000000..c79942a3 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/integ/test_list_collectors.py @@ -0,0 +1,5 @@ +def test_list_collectors(client): + c = client.get("/api/v1/collectors").json() + assert len(c) == 6 + for entry in c: + assert "address" in entry From a310fd34fddab95f69e481804cfe1c25abbc7a0b Mon Sep 17 00:00:00 2001 From: aagbsn Date: Mon, 17 Nov 2025 13:35:20 +0100 Subject: [PATCH 3/8] add geolookup to ooniapi probe services (#1031) * add geolookup to ooniapi probe services this includes a small refactor to make the probe_geoip method re-usable * Replace class docstring with pydantic.Field annotation * use pydantic field validation for geolookup endpoint * probe_services: validate probe_geoip; add type GeoLookupResult * monkeypatch lookup_probe_cc and lookup_probe_network to provide static responses --- .../ooniprobe/routers/v1/probe_services.py | 49 +++++++++++++++++-- .../ooniprobe/tests/integ/test_geolookup.py | 27 ++++++++++ 2 files changed, 71 insertions(+), 5 deletions(-) create mode 100644 ooniapi/services/ooniprobe/tests/integ/test_geolookup.py diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py index 1ce84572..6fcca9f1 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/v1/probe_services.py @@ -8,7 +8,7 @@ import geoip2.errors from fastapi import APIRouter, Depends, HTTPException, Response, Request from prometheus_client import Counter, Info, Gauge -from pydantic import Field +from pydantic import Field, IPvAnyAddress from ...utils import ( generate_report_id, @@ -348,9 +348,9 @@ def check_in( probe_asn = check_in.probe_asn software_name = check_in.software_name software_version = check_in.software_version - + ipaddr = extract_probe_ipaddr(request) resp, probe_cc, asn_i = probe_geoip( - request, + ipaddr, probe_cc, probe_asn, cc_reader, @@ -473,7 +473,7 @@ def check_in( def probe_geoip( - request: Request, + ipaddr: str, probe_cc: str, asn: str, cc_reader: CCReaderDep, @@ -486,7 +486,6 @@ def probe_geoip( db_asn = "AS0" db_probe_network_name = None try: - ipaddr = extract_probe_ipaddr(request) db_probe_cc = lookup_probe_cc(ipaddr, cc_reader) db_asn, db_probe_network_name = lookup_probe_network(ipaddr, asn_reader) Metrics.GEOIP_ADDR_FOUND.labels(probe_cc=db_probe_cc, asn=db_asn).inc() @@ -591,6 +590,46 @@ def random_web_test_helpers(th_list: List[str]) -> List[Dict]: return out +class GeoLookupResult(BaseModel): + cc: str = Field(description="Country Code") + asn: str = Field(description="Autonomous System Number (ASN)") + as_name: str = Field(description="Autonomous System Name") + + +class GeoLookupRequest(BaseModel): + addresses: List[IPvAnyAddress] = Field(description="list of IPv4 or IPv6 address to geolookup") + + +class GeoLookupResponse(BaseModel): + v: int = Field(description="response format version", default="1") + geolocation: Dict[IPvAnyAddress, GeoLookupResult] = Field(description="Dict of IP addresses to GeoLookupResult") + + +@router.post("/geolookup", tags=["ooniprobe"]) +async def geolookup( + data: GeoLookupRequest, + response: Response, + cc_reader: CCReaderDep, + asn_reader: ASNReaderDep, +) -> GeoLookupResponse: + + # initial values probe_geoip compares with + probe_cc = "ZZ" + asn = "AS0" + geolookup_resp = {"geolocation": {}} + + # for each address provided, call probe_geoip and add the data to our response + for ipaddr in data.addresses: + # call probe_geoip() and map the keys to the geolookup v1 API + resp, _, _ = probe_geoip(ipaddr, probe_cc, asn, cc_reader, asn_reader) + # it doesn't seem possible to have separate aliases for (de)serialization + geolookup_resp["geolocation"][ipaddr] = GeoLookupResult(cc=resp["probe_cc"], + asn=resp["probe_asn"], as_name=resp["probe_network_name"]) + + setnocacheresponse(response) + return geolookup_resp + + class CollectorEntry(BaseModel): # not actually used but necessary to be compliant with the old API schema address: str = Field(description="Address of collector") diff --git a/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py new file mode 100644 index 00000000..8dd7fec2 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/integ/test_geolookup.py @@ -0,0 +1,27 @@ +from typing import Dict, Tuple +import ooniprobe.routers.v1.probe_services as ps +from ooniprobe.utils import lookup_probe_cc, lookup_probe_network +from ooniprobe.dependencies import CCReaderDep, ASNReaderDep + +def fake_lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, str]: + return ("AS4242", "Testing Networks") + +def fake_lookup_probe_cc(ipaddr: str, cc_reader: CCReaderDep) -> str: + return "US" + + +def test_geolookup(client, monkeypatch): + monkeypatch.setattr(ps, "lookup_probe_network", fake_lookup_probe_network) + monkeypatch.setattr(ps, "lookup_probe_cc", fake_lookup_probe_cc) + j = dict( + addresses=["192.33.4.12", "170.247.170.2", "2801:1b8:10::b", "2001:500:2::c"] + ) + c = client.post("/api/v1/geolookup", json=j).json() + assert "geolocation" in c + assert "v" in c + g = c["geolocation"] + + for ip in j["addresses"]: + assert g[ip]["cc"] == "US" + assert g[ip]["asn"] == "AS4242" + assert g[ip]["as_name"] == "Testing Networks" From eecc132d07c47e80b20eb9cc1f0a87bf5b91acd6 Mon Sep 17 00:00:00 2001 From: aagbsn Date: Fri, 21 Nov 2025 16:20:23 +0100 Subject: [PATCH 4/8] add bouncer/net-tests to ooniapi under new router bouncer (#1036) * add bouncer/net-tests to ooniapi under new router bouncer * fix pylint import complaints * let fastapi validate input * exclude unset parameters in bouncer/net-tests * add bouncer tests from api/tests/integ/test_probe_services.py I modified the tests to expect any error (other than http response 200) because FastAPI returns HTTP error 422 Unprocessable Entity instead of 400, as the tests do not supply a request that the model can validate * raise HTTPException with status 400 for backwards-compatibility * check status code on error is 400 for backwards compatibility with old probes --- .../services/ooniprobe/src/ooniprobe/main.py | 3 +- .../src/ooniprobe/routers/bouncer.py | 110 ++++++++++++++++++ .../services/ooniprobe/tests/test_bouncer.py | 74 ++++++++++++ 3 files changed, 186 insertions(+), 1 deletion(-) create mode 100644 ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py create mode 100644 ooniapi/services/ooniprobe/tests/test_bouncer.py diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/main.py b/ooniapi/services/ooniprobe/src/ooniprobe/main.py index ca5909be..0ad5adf6 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/main.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/main.py @@ -16,7 +16,7 @@ from . import models from .routers.v2 import vpn from .routers.v1 import probe_services -from .routers import reports +from .routers import reports, bouncer from .download_geoip import try_update from .dependencies import get_postgresql_session, get_clickhouse_session, SettingsDep @@ -80,6 +80,7 @@ def update_geoip_task(): app.include_router(vpn.router, prefix="/api") app.include_router(probe_services.router, prefix="/api") app.include_router(reports.router) +app.include_router(bouncer.router) @app.get("/version") diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py new file mode 100644 index 00000000..d59188f3 --- /dev/null +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/bouncer.py @@ -0,0 +1,110 @@ +import logging +from typing import List, Optional, Any, Dict +from json.decoder import JSONDecodeError + +from fastapi import APIRouter, HTTPException, Request, Response +from pydantic import Field, ValidationError + +from ooniprobe.common.utils import setnocacheresponse +from ooniprobe.common.routers import BaseModel + +router = APIRouter(prefix="/bouncer") + +log = logging.getLogger(__name__) + + +class TestHelperEntry(BaseModel): + address: str + type: str + front: Optional[str] = None + + +class CollectorEntry(BaseModel): + address: str + type: str + front: Optional[str] = None + + +class NetTest(BaseModel): + name: str + collector: str + altcollector: List[CollectorEntry] = Field(alias="collector-alternate") + hashes: Optional[Any] = Field(None, alias="input-hashes") + helpers: Dict[str, str] = Field(alias="test-helpers") + althelpers: Dict[str, List[TestHelperEntry]] = Field(alias="test-helpers-alternate") + version: str + + +class NetTestRequest(BaseModel): + name: str + version: str + + +class NetTestsRequest(BaseModel): + nettests: List[NetTestRequest] = Field(alias="net-tests") + + +class NetTestResponse(BaseModel): + nettests: List[NetTest] = Field(alias="net-tests") + + +@router.post("/net-tests", tags=["bouncer"], response_model=NetTestResponse, response_model_exclude_unset=True) +async def bouncer_net_tests( + response: Response, + request: Request, +) -> Dict[str, List[NetTest]]: + + try: + j = await request.json() + m = NetTestsRequest(**j) + except ValidationError as e: + raise HTTPException(400, detail=e.errors()) + except JSONDecodeError as e: + raise HTTPException(400, detail=str(e)) + except Exception as e: + log.warning("Unexpected Exception:" + str(e)) + raise HTTPException(400, detail=str(e)) + + try: + name = m.nettests[0].name + version = m.nettests[0].version + except IndexError: + raise HTTPException(status_code=400, detail="invalid net-tests request") + + # TODO: load this json from environment or filepath + j = { + "net-tests": [ + { + "collector": "httpo://guegdifjy7bjpequ.onion", + "collector-alternate": [ + {"type": "https", "address": "https://ams-pg.ooni.org"}, + { + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + }, + ], + "input-hashes": None, + "name": name, + "test-helpers": { + "tcp-echo": "37.218.241.93", + "http-return-json-headers": "http://37.218.241.94:80", + "web-connectivity": "httpo://y3zq5fwelrzkkv3s.onion", + }, + "test-helpers-alternate": { + "web-connectivity": [ + {"type": "https", "address": "https://wcth.ooni.io"}, + { + "front": "d33d1gs9kpq1c5.cloudfront.net", + "type": "cloudfront", + "address": "https://d33d1gs9kpq1c5.cloudfront.net", + }, + ] + }, + "version": version, + } + ] + } + resp = NetTestResponse(**j) + setnocacheresponse(response) + return resp diff --git a/ooniapi/services/ooniprobe/tests/test_bouncer.py b/ooniapi/services/ooniprobe/tests/test_bouncer.py new file mode 100644 index 00000000..3a94cc0f --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/test_bouncer.py @@ -0,0 +1,74 @@ +""" +Integration test for OONIProbe API +""" + + +def test_get_bouncer_nettests(client): + version = "1" + + r = client.post("/bouncer/net-tests", json={"net-tests": [{"name": "foo", "version": version}]}) + j = r.json() + assert "net-tests" in j + for v in j["net-tests"]: + for x in ["collector", "input-hashes", "name", "test-helpers", "test-helpers-alternate", "version"]: + assert x in v + +def test_bouncer_net_tests(client): + j = { + "net-tests": [ + { + "input-hashes": None, + "name": "web_connectivity", + "test-helpers": ["web-connectivity"], + "version": "0.0.1", + } + ] + } + c = client.post("/bouncer/net-tests", json=j) + expected = { + "net-tests": [ + { + "collector": "httpo://guegdifjy7bjpequ.onion", + "collector-alternate": [ + {"type": "https", "address": "https://ams-pg.ooni.org"}, + { + "front": "dkyhjv0wpi2dk.cloudfront.net", + "type": "cloudfront", + "address": "https://dkyhjv0wpi2dk.cloudfront.net", + }, + ], + "name": "web_connectivity", + "test-helpers": { + "tcp-echo": "37.218.241.93", + "http-return-json-headers": "http://37.218.241.94:80", + "web-connectivity": "httpo://y3zq5fwelrzkkv3s.onion", + }, + "test-helpers-alternate": { + "web-connectivity": [ + {"type": "https", "address": "https://wcth.ooni.io"}, + { + "front": "d33d1gs9kpq1c5.cloudfront.net", + "type": "cloudfront", + "address": "https://d33d1gs9kpq1c5.cloudfront.net", + }, + ] + }, + "version": "0.0.1", + "input-hashes": None, + } + ] + } + assert c.json() == expected + + +def test_bouncer_net_tests_bad_request1(client): + resp = client.post("/bouncer/net-tests") + # XXX: returns status code 400 for backwards compatibility + assert resp.status_code == 400 + + +def test_bouncer_net_tests_bad_request2(client): + j = {"net-tests": []} + resp = client.post("/bouncer/net-tests", json=j) + # XXX: returns status code 400 for backwards compatibility + assert resp.status_code == 400 From 4bd8d207a97d62b7508d957eec4724ad4c9d210c Mon Sep 17 00:00:00 2001 From: Luis Diaz <41093870+LDiazN@users.noreply.github.com> Date: Wed, 3 Dec 2025 11:09:35 +0100 Subject: [PATCH 5/8] Fix bad geoip reporting (#1022) * Add logging to failing comparision function * Trigger CI * Trigger CI * Fix bad db_asn logging * Improve logging of geoip mismatches * Fix extract_probe_ipaddr --- .../ooniprobe/src/ooniprobe/routers/reports.py | 2 -- ooniapi/services/ooniprobe/src/ooniprobe/utils.py | 13 ++++++++++++- ooniapi/services/ooniprobe/tests/test_utils.py | 7 +++++++ 3 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 ooniapi/services/ooniprobe/tests/test_utils.py diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py index 213669ee..439b0d85 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/reports.py @@ -273,10 +273,8 @@ def compare_probe_msmt_cc_asn( if db_probe_cc == cc and db_asn == asn: Metrics.PROBE_CC_ASN_MATCH.inc() elif db_probe_cc != cc: - log.error(f"db_cc != cc: {db_probe_cc} != {cc}") Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="cc", reported=cc, detected=db_probe_cc).inc() elif db_asn != asn: - log.error(f"db_asn != asn: {db_asn} != {asn}") Metrics.PROBE_CC_ASN_NO_MATCH.labels(mismatch="asn", reported=asn, detected=db_asn).inc() except Exception: pass diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py index ee0b8388..4a6dc663 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py @@ -128,7 +128,7 @@ def extract_probe_ipaddr(request: Request) -> str: for h in real_ip_headers: if h in request.headers: - return request.headers.getlist(h)[0].rpartition(" ")[-1] + return get_first_ip(request.headers.getlist(h)[0]) return request.client.host if request.client else "" @@ -145,3 +145,14 @@ def lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, st "AS{}".format(resp.autonomous_system_number), resp.autonomous_system_organization or "0", ) + +def get_first_ip(headers: str) -> str: + """ + parse the first ip from a comma-separated list of ips encoded as a string + + example: + in: '123.123.123, 1.1.1.1' + out: '123.123.123' + """ + + return headers.partition(',')[0] \ No newline at end of file diff --git a/ooniapi/services/ooniprobe/tests/test_utils.py b/ooniapi/services/ooniprobe/tests/test_utils.py new file mode 100644 index 00000000..f34ee403 --- /dev/null +++ b/ooniapi/services/ooniprobe/tests/test_utils.py @@ -0,0 +1,7 @@ +from ooniprobe.utils import get_first_ip + +def test_get_first_ip(): + + assert get_first_ip("1.1.1.1") == "1.1.1.1" + assert get_first_ip("1.1.1.1, 2.2.2.2") == "1.1.1.1" + assert get_first_ip("") == "" \ No newline at end of file From f6871568598627300d4040641fb6ec7ef4281cd7 Mon Sep 17 00:00:00 2001 From: Luis Diaz <41093870+LDiazN@users.noreply.github.com> Date: Wed, 3 Dec 2025 11:10:47 +0100 Subject: [PATCH 6/8] Cusum changepoint api (#1027) * Add initial endpoint shape for event detector changepoints * Add cusum changepoint list endpoint * Remove unused request model * Add event detector changepoint * Fix event detector table migration for tests * Add basic test for changepoint endpoint * Add basic filtering test to changepoint endpoint * Remove unused log * Add tests to check filtering by date * Add utils module with useful types * Remove unused types * Remove unused import * Add enum for change direction * Black reformat * Add testing data from the DB * Remove out dated comment * Trigger ci * fix typo in changepoint api endpoint * trigger ci * Modify changepoints table to add block_type column * replace bad quotes * Add block_type field to changepoints API --- .../src/oonimeasurements/dependencies.py | 7 +- .../routers/data/aggregate_analysis.py | 175 ++++++++++++++++-- .../routers/v1/measurements.py | 5 +- .../src/oonimeasurements/utils/__init__.py | 0 .../src/oonimeasurements/utils/api.py | 10 + .../migrations/0_clickhouse_init_tables.sql | 45 ++++- .../4_clickhouse_populate_changepoints.sql | 11 ++ .../tests/test_event_detection.py | 136 ++++++++++++++ .../tests/test_measurements.py | 15 +- ooniapi/services/ooniprobe/tests/conftest.py | 6 +- 10 files changed, 389 insertions(+), 21 deletions(-) create mode 100644 ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py create mode 100644 ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py create mode 100644 ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql create mode 100644 ooniapi/services/oonimeasurements/tests/test_event_detection.py diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py index 60b177c8..c37a0107 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/dependencies.py @@ -7,10 +7,15 @@ from .common.config import Settings from .common.dependencies import get_settings +SettingsDep = Annotated[Settings, Depends(get_settings)] -def get_clickhouse_session(settings: Annotated[Settings, Depends(get_settings)]): + +def get_clickhouse_session(settings: SettingsDep): db = Clickhouse.from_url(settings.clickhouse_url) try: yield db finally: db.disconnect() + + +ClickhouseDep = Annotated[Clickhouse, Depends(get_clickhouse_session)] diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py index 09924584..1851671a 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/data/aggregate_analysis.py @@ -1,15 +1,17 @@ +from enum import Enum import time import math from datetime import datetime -from typing import List, Literal, Optional, Tuple, Union, Dict +from ...common.clickhouse_utils import query_click +from typing import Any, List, Literal, Optional, Self, Tuple, Dict from typing_extensions import Annotated from fastapi import APIRouter, Depends, Query -from pydantic import BaseModel +from pydantic import BaseModel, Field +import sqlalchemy as sql from .utils import get_measurement_start_day_agg, TimeGrains, parse_probe_asn_to_int -from ...dependencies import ( - get_clickhouse_session, -) +from ...utils.api import ProbeCCOrNone, ProbeASNOrNone +from ...dependencies import get_clickhouse_session, ClickhouseDep from .list_analysis import ( SinceUntil, utc_30_days_ago, @@ -279,6 +281,10 @@ def format_aggregate_query(extra_cols: Dict[str, str], where: str): ) """ +def nan_to_none(val): + if math.isnan(val): + return None + return val @router.get( "/v1/aggregation/analysis", @@ -292,8 +298,8 @@ async def get_aggregation_analysis( test_name: Annotated[Optional[str], Query()] = None, domain: Annotated[Optional[str], Query()] = None, input: Annotated[Optional[str], Query()] = None, - probe_asn: Annotated[Union[int, str, None], Query()] = None, - probe_cc: Annotated[Optional[str], Query(min_length=2, max_length=2)] = None, + probe_asn: ProbeASNOrNone = None, + probe_cc: ProbeCCOrNone = None, ooni_run_link_id: Annotated[Optional[str], Query()] = None, since: SinceUntil = utc_30_days_ago(), until: SinceUntil = utc_today(), @@ -397,10 +403,6 @@ async def get_aggregation_analysis( d = dict(zip(list(extra_cols.keys()) + fixed_cols, row)) blocked_max_protocol = d["blocked_max_protocol"] - def nan_to_none(val): - if math.isnan(val): - return None - return val loni = Loni( dns_blocked=nan_to_none(d["dns_blocked"]), @@ -447,3 +449,154 @@ def nan_to_none(val): dimension_count=dimension_count, results=results, ) + + +class ChangeDir(str, Enum): + up = "up" + down = "down" + + @classmethod + def from_n_or_i(cls, i: int | None) -> Self | None: + if i is None: + return None + + return cls("down") if i == -1 else cls("up") + + +class ChangePointEntry(BaseModel): + # TODO Double check which fields are actually necessary + probe_asn: int + probe_cc: str + domain: str + start_time: datetime # TODO double check the naming of these datetime fields + end_time: datetime + count_isp_resolver: int + count_other_resolver: int + count: int + dns_isp_blocked: float | None + dns_other_blocked: float | None + tcp_blocked: float | None + tls_blocked: float | None + dns_isp_blocked_obs_w_sum: float | None + dns_isp_blocked_w_sum: float | None + dns_isp_blocked_s_pos: float | None + dns_isp_blocked_s_neg: float | None + dns_other_blocked_obs_w_sum: float | None + dns_other_blocked_w_sum: float | None + dns_other_blocked_s_pos: float | None + dns_other_blocked_s_neg: float | None + tcp_blocked_obs_w_sum: float | None + tcp_blocked_w_sum: float | None + tcp_blocked_s_pos: float | None + tcp_blocked_s_neg: float | None + tls_blocked_obs_w_sum: float | None + tls_blocked_w_sum: float | None + tls_blocked_s_pos: float | None + tls_blocked_s_neg: float | None + change_dir: ChangeDir | None = Field( + description="If blocking behaviour goes up or down" + ) + s_pos: float | None + s_neg: float | None + current_mean: float | None + h: float | None + block_type: str + + @classmethod + def from_row(cls, row: Dict[str, Any]) -> Self: + """ + Takes a row as it comes from the clickhouse table 'event_detector_changepoints' + and converts it to a changepoint entry + """ + + def g(s : str) -> Any | None: + return row.get(s) + + return ChangePointEntry( + probe_asn=g("probe_asn"), + probe_cc=g("probe_cc"), + domain=g("domain"), + start_time=g("ts"), + end_time=g("last_ts"), + count_isp_resolver=g("count_isp_resolver"), + count_other_resolver=g("count_other_resolver"), + count=g("count"), + dns_isp_blocked= nan_to_none(g("dns_isp_blocked")), + dns_other_blocked=nan_to_none(g("dns_other_blocked")), + tcp_blocked=nan_to_none(g("tcp_blocked")), + tls_blocked=nan_to_none(g("tls_blocked")), + dns_isp_blocked_obs_w_sum=nan_to_none(g("dns_isp_blocked_obs_w_sum")), + dns_isp_blocked_w_sum=nan_to_none(g("dns_isp_blocked_w_sum")), + dns_isp_blocked_s_pos=nan_to_none(g("dns_isp_blocked_s_pos")), + dns_isp_blocked_s_neg=nan_to_none(g("dns_isp_blocked_s_neg")), + dns_other_blocked_obs_w_sum=nan_to_none(g("dns_other_blocked_obs_w_sum")), + dns_other_blocked_w_sum=nan_to_none(g("dns_other_blocked_w_sum")), + dns_other_blocked_s_pos=nan_to_none(g("dns_other_blocked_s_pos")), + dns_other_blocked_s_neg=nan_to_none(g("dns_other_blocked_s_neg")), + tcp_blocked_obs_w_sum=nan_to_none(g("tcp_blocked_obs_w_sum")), + tcp_blocked_w_sum=nan_to_none(g("tcp_blocked_w_sum")), + tcp_blocked_s_pos=nan_to_none(g("tcp_blocked_s_pos")), + tcp_blocked_s_neg=nan_to_none(g("tcp_blocked_s_neg")), + tls_blocked_obs_w_sum=nan_to_none(g("tls_blocked_obs_w_sum")), + tls_blocked_w_sum=nan_to_none(g("tls_blocked_w_sum")), + tls_blocked_s_pos=nan_to_none(g("tls_blocked_s_pos")), + tls_blocked_s_neg=nan_to_none(g("tls_blocked_s_neg")), + change_dir=ChangeDir.from_n_or_i(g("change_dir")), + s_pos=nan_to_none(g("s_pos")), + s_neg=nan_to_none(g("s_neg")), + current_mean=nan_to_none(g("current_mean")), + h=nan_to_none(g("h")), + block_type= g("block_type") + ) # type: ignore + + +class ListChangePointsResponse(BaseModel): + results: List[ChangePointEntry] + + +@router.get( + "/v1/detector/changepoints", + tags=["detector"], + description="List changepoints detected by the event detector using the cusum algorithm", + response_model=ListChangePointsResponse, +) +@parse_probe_asn_to_int +async def list_changepoints( + clickhouse: ClickhouseDep, + probe_asn: ProbeASNOrNone = None, + probe_cc: ProbeCCOrNone = None, + domain: str | None = Query(default=None), + since: SinceUntil = utc_30_days_ago(), + until: SinceUntil = utc_today(), +) -> ListChangePointsResponse: + conditions = [] + query_params = {} + + if probe_asn: + conditions.append(sql.text("probe_asn = :probe_asn")) + query_params["probe_asn"] = probe_asn + + if probe_cc: + conditions.append(sql.text("probe_cc = :probe_cc")) + query_params["probe_cc"] = probe_cc + + if domain: + conditions.append( + sql.text("domain = :domain") + ) # TODO should this be 'like %domain%'? + query_params["domain"] = domain + + conditions.append(sql.text("ts >= :since")) + query_params["since"] = since + + conditions.append(sql.text("ts <= :until")) + query_params["until"] = until + + changepoints = sql.table("event_detector_changepoints") + q = sql.select("*").select_from(changepoints).where(sql.and_(*conditions)) + + query_result = query_click(clickhouse, q, query_params) + + results = [ChangePointEntry.from_row(entry) for entry in query_result] + + return ListChangePointsResponse(results=results) diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py index 11251491..06c9381d 100644 --- a/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/routers/v1/measurements.py @@ -470,6 +470,7 @@ def report_id_validator(cls, report_id: str) -> str: return report_id + def validate_report_id(report_id: str) -> str: if len(report_id) < 15 or len(report_id) > 100: raise HTTPException( @@ -483,6 +484,7 @@ def validate_report_id(report_id: str) -> str: return report_id + @router.get("/v1/measurement_meta", response_model_exclude_unset=True) async def get_measurement_meta( response: Response, @@ -505,7 +507,7 @@ async def get_measurement_meta( else: raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, - detail="Missing measurement_uid or report_id. You should provide at the least one" + detail="Missing measurement_uid or report_id. You should provide at the least one", ) if msmt_meta.probe_asn is not None and isinstance(msmt_meta.probe_asn, str): @@ -1019,6 +1021,7 @@ def get_bucket_url(bucket_name: str) -> str: def asn_to_int(asn_str: str) -> int: return int(asn_str.strip("AS")) + def is_in_charset(s: str, charset: str, error_msg: str): """Ensure `s` contains only valid characters listed in `charset`""" for c in s: diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py new file mode 100644 index 00000000..881ea358 --- /dev/null +++ b/ooniapi/services/oonimeasurements/src/oonimeasurements/utils/api.py @@ -0,0 +1,10 @@ +""" +Utility functions and types to assist API development +""" + +from typing import Annotated, Optional, Union +from fastapi import Query + + +ProbeCCOrNone = Annotated[Optional[str], Query(min_length=2, max_length=2)] +ProbeASNOrNone = Annotated[Union[int, str, None], Query()] diff --git a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql index 7245f4ae..e5b9e554 100644 --- a/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql +++ b/ooniapi/services/oonimeasurements/tests/migrations/0_clickhouse_init_tables.sql @@ -42,7 +42,7 @@ ENGINE = ReplacingMergeTree ORDER BY (measurement_start_time, report_id, input) SETTINGS index_granularity = 8192; -CREATE TABLE IF NOT EXISTS default.citizenlab +CREATE TABLE IF NOT EXISTS default.citizenlab ( `domain` String, `url` String, @@ -64,3 +64,46 @@ CREATE TABLE IF NOT EXISTS default.jsonl ENGINE = MergeTree ORDER BY (report_id, input) SETTINGS index_granularity = 8192; + +CREATE TABLE IF NOT EXISTS default.event_detector_changepoints +( + `probe_asn` UInt32, + `probe_cc` String, + `domain` String, + `ts` DateTime64(3, 'UTC'), + `count_isp_resolver` Nullable(UInt32), + `count_other_resolver` Nullable(UInt32), + `count` Nullable(UInt32), + `dns_isp_blocked` Nullable(Float32), + `dns_other_blocked` Nullable(Float32), + `tcp_blocked` Nullable(Float32), + `tls_blocked` Nullable(Float32), + `last_ts` DateTime64(3, 'UTC'), + `dns_isp_blocked_obs_w_sum` Nullable(Float32), + `dns_isp_blocked_w_sum` Nullable(Float32), + `dns_isp_blocked_s_pos` Nullable(Float32), + `dns_isp_blocked_s_neg` Nullable(Float32), + `dns_other_blocked_obs_w_sum` Nullable(Float32), + `dns_other_blocked_w_sum` Nullable(Float32), + `dns_other_blocked_s_pos` Nullable(Float32), + `dns_other_blocked_s_neg` Nullable(Float32), + `tcp_blocked_obs_w_sum` Nullable(Float32), + `tcp_blocked_w_sum` Nullable(Float32), + `tcp_blocked_s_pos` Nullable(Float32), + `tcp_blocked_s_neg` Nullable(Float32), + `tls_blocked_obs_w_sum` Nullable(Float32), + `tls_blocked_w_sum` Nullable(Float32), + `tls_blocked_s_pos` Nullable(Float32), + `tls_blocked_s_neg` Nullable(Float32), + `change_dir` Nullable(Int8), + `s_pos` Nullable(Float32), + `s_neg` Nullable(Float32), + `current_mean` Nullable(Float32), + `h` Nullable(Float32), +) +ENGINE = MergeTree +PARTITION BY toYYYYMM(ts) +ORDER BY (probe_asn, probe_cc, ts, domain) +SETTINGS index_granularity = 8192; + +ALTER TABLE default.event_detector_changepoints ADD COLUMN `block_type` String; \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql b/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql new file mode 100644 index 00000000..83f0f32f --- /dev/null +++ b/ooniapi/services/oonimeasurements/tests/migrations/4_clickhouse_populate_changepoints.sql @@ -0,0 +1,11 @@ +INSERT INTO default.event_detector_changepoints VALUES +(945,'US','www.facebook.com','2024-01-15 18:00:00.000',0,2,2,nan,0,0.75,0,'2024-01-10 23:00:00.000',nan,0,0,0,0,373,0,0,1.5,373,0,0,0,373,0,0,-1,3.6899884,0,0.04597198,3.5, 'tcp_block'), +(15169,'VE','google.com','2024-01-25 23:00:00.000',6,0,6,0,nan,0,0.7,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,-1,3.6650198,0,0.084454976,3.5, 'tls_block'), +(8346,'SN','www.tiktok.com','2024-01-31 15:00:00.000',2,0,2,0,nan,0,0.75,'2024-01-10 23:00:00.000',nan,660,0,0,nan,2,0,0,0,662,0,0,42.9,662,0,0,1,3.7752855,0,0.098714285,3.5, 'tcp_block'), +(8767,'DE','preview.redd.it','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,56,0,0,nan,26,0,0,0,82,0,0,0,82,0,0,-1,3.5091832,0,0.08009709,3.5, 'dns_isp_block'), +(8767,'DE','twitter.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,132,0,0,nan,37,0,0,0,169,0,0,0,169,0,0,1,3.7883966,0,0.03966346,3.5, 'tcp_block'), +(8767,'DE','www.facebook.com','2024-01-12 22:00:00.000',1,0,1,0,nan,0.75,0,'2024-01-10 23:00:00.000',nan,141,0,0,nan,37,0,0,0,178,0,0,0,178,0,0,1,3.800651,0,0.037844036,3.5, 'dns_other_block'), +(8767,'DE','external.xx.fbcdn.net','2024-01-14 22:00:00.000',2,0,2,0,nan,0.75,0,'2024-01-09 01:00:00.000',nan,37,0,0,nan,13,0,0,0,50,0,0,0,50,0,0,1,3.5141737,0,0.13356164,3.5, 'tcp_block'), +(12668,'RU','www.facebook.com','2024-01-20 19:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 19:00:00.000',nan,0,0,0,0,184,0,0,0,184,0,0,48.3,184,0,0,1,3.500514,0,0.31609195,3.5, 'tcp_block'), +(8048,'VE','amazon.com','2024-01-29 22:00:00.000',0,1,1,nan,0,0.75,0,'2024-01-10 20:00:00.000',nan,1,0,0,nan,213,0,0,19.5,214,0,0,0,214,0,0,1,3.647134,0,0.1875,3.5, 'tcp_block'), +(8048,'VE','google.com','2024-01-23 04:00:00.000',0,1,1,nan,0,0,0.7,'2024-01-10 21:00:00.000',nan,16,0,0,nan,17,0,0,0,33,0,0,0,33,0,0,-1,3.6120336,0,0.19636363,3.5, 'tcp_block') \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/test_event_detection.py b/ooniapi/services/oonimeasurements/tests/test_event_detection.py new file mode 100644 index 00000000..bbd4b20f --- /dev/null +++ b/ooniapi/services/oonimeasurements/tests/test_event_detection.py @@ -0,0 +1,136 @@ +from typing import Dict, Any +import httpx +from datetime import datetime, UTC +import pytest + + +def getj( + client: httpx.Client, url: str, params: Dict[str, Any] | None = None +) -> Dict[str, Any]: + resp = client.get(url, params=params) + assert ( + resp.status_code == 200 + ), f"Unexpected status code: {resp.status_code}. {resp.content}" + return resp.json() + + +# reasonable default since and until for the testing data +since = datetime(2024, 1, 1, tzinfo=UTC) +until = datetime(2024, 1, 30, tzinfo=UTC) + + +def getjsu(client, url, params={}): + # use this default since and until + # since the testing data has a fixed date that might + # become "obsolete" in the future for the default + # '30 days ago' since and until limits + params["since"] = since + params["until"] = until + return getj(client, url, params) + + +def test_changepoint_list_basic(client): + resp = getjsu( + client, + "/api/v1/detector/changepoints", + ) + + assert "results" in resp, resp + assert len(resp["results"]) > 0, resp["results"] + + +def normalize_asn(asn: int | str): + if isinstance(asn, int): + return asn + if isinstance(asn, str) and asn.upper().startswith("AS"): + asn = asn.upper().strip("AS") + + return int(asn) + + +def parse_dt(dt: str) -> datetime: + return datetime.fromisoformat(dt) + + +@pytest.mark.parametrize( + "filter_param, filter_value", + [ + ("probe_asn", 8048), + ("probe_asn", "AS8048"), # filter asn by string is also valid + ("probe_asn", "8048"), + ("probe_cc", "VE"), + ("domain", "google.com"), + ], +) +def test_changepoint_filter_basic(client, filter_param, filter_value): + + resp = getjsu( + client, "/api/v1/detector/changepoints", params={filter_param: filter_value} + ) + + assert len(resp["results"]) > 0, "No results to validate" + + if filter_param == "probe_asn": + normalize = normalize_asn + else: + normalize = id + + if filter_param == "probe_asn": + for r in resp["results"]: + assert r[filter_param] == normalize(filter_value), r + + +@pytest.mark.parametrize( + "since_param, until_param, expect_emtpy", + [ + (since, until, False), + (datetime(2021, 1, 1, tzinfo=UTC), datetime(2021, 1, 30, tzinfo=UTC), True), + ], +) +def test_changepoint_date_filter(client, since_param, until_param, expect_emtpy): + + resp = getj( + client, + "/api/v1/detector/changepoints", + params={"since": since_param, "until": until_param}, + ) + + assert len(resp["results"]) > 0 or expect_emtpy, "Not enough results to validate" + assert len(resp["results"]) == 0 or not expect_emtpy, "Result should be empty" + + for r in resp["results"]: + assert parse_dt(r["start_time"]) >= since, r["start_time"] + assert parse_dt(r["end_time"]) <= until, r["end_time"] + + +def test_changepoint_change_dir_values(client): + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + ) + + assert len(resp["results"]) > 0, "Not enough data to validate" + for r in resp["results"]: + assert r["change_dir"] in ["up", "down"] + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + {"probe_cc": "VE", "probe_asn": 15169, "domain": "google.com"}, + ) + + assert resp["results"][0]["change_dir"] == "down" # this one has change_dir == -1 + + resp = getjsu( + client, + "/api/v1/detector/changepoints", + {"probe_cc": "VE", "probe_asn": 8048, "domain": "amazon.com"}, + ) + + assert resp["results"][0]["change_dir"] == "up" # this one has change_dir == 1 + +def test_changepoint_field_present(client): + + resp = getj(client, "/api/v1/detector/changepoints") + assert all('block_type' in r for r in resp['results']) \ No newline at end of file diff --git a/ooniapi/services/oonimeasurements/tests/test_measurements.py b/ooniapi/services/oonimeasurements/tests/test_measurements.py index 5d5e2039..5435368a 100644 --- a/ooniapi/services/oonimeasurements/tests/test_measurements.py +++ b/ooniapi/services/oonimeasurements/tests/test_measurements.py @@ -269,6 +269,7 @@ def test_msm_meta_probe_asn_int(client, monkeypatch): j = resp.json() assert isinstance(j["probe_asn"], int), "probe_asn should be int" + def test_no_report_id_msm_uid_400(client): """ measurement_meta should return 400 if neither report_id nor measurement_uid are provided @@ -276,6 +277,7 @@ def test_no_report_id_msm_uid_400(client): resp = client.get("/api/v1/measurement_meta") assert resp.status_code == 400 + def test_fix_msm_date_parsing(client): # This query was raising an error parsing the date: @@ -316,7 +318,7 @@ def test_get_measurement_meta_basic(client): # You can also query by measurment uid uid = "20210709005529.664022_MY_webconnectivity_68e5bea1060d1874" - response = client.get("/api/v1/measurement_meta", params={'measurement_uid' : uid}) + response = client.get("/api/v1/measurement_meta", params={"measurement_uid": uid}) assert response.status_code == 200, response.status_code @@ -383,13 +385,18 @@ def test_get_measurement_meta_full(client, monkeypatch): } assert raw_msm + def test_bad_report_id_wont_validate(client): - resp = client.get("/api/v1/measurement_meta", params={ - "report_id" : "20210709T004340Z_webconnectivity_MY_4818_n1_YCM7J9mGcEHds#$%" # bad suffix - }) + resp = client.get( + "/api/v1/measurement_meta", + params={ + "report_id": "20210709T004340Z_webconnectivity_MY_4818_n1_YCM7J9mGcEHds#$%" # bad suffix + }, + ) assert resp.status_code == 422, resp.json() + def test_no_measurements_before_30_days(client): """ The default filtering should not retrieve measurements older than 30 days since tomorrow diff --git a/ooniapi/services/ooniprobe/tests/conftest.py b/ooniapi/services/ooniprobe/tests/conftest.py index efac76ff..a8f53a0e 100644 --- a/ooniapi/services/ooniprobe/tests/conftest.py +++ b/ooniapi/services/ooniprobe/tests/conftest.py @@ -79,7 +79,7 @@ def client_with_bad_settings(): @pytest.fixture(scope="session") def fixture_path(): """ - Directory for this fixtures used to store temporary data, will be + Directory for this fixtures used to store temporary data, will be deleted after the tests are finished """ FIXTURE_PATH = Path(os.path.dirname(os.path.realpath(__file__))) / "data" @@ -171,8 +171,8 @@ def fastpath_server(docker_ip, docker_services): ) yield url -def is_fastpath_running(url: str) -> bool: - try: +def is_fastpath_running(url: str) -> bool: + try: resp = urlopen(url) return resp.status == 200 except: From 458cbbb922e13f7b08692c53a5c8e6999deb40c6 Mon Sep 17 00:00:00 2001 From: aagbsn Date: Thu, 4 Dec 2025 15:07:28 +0100 Subject: [PATCH 7/8] Add prioritization CRUD methods (#1040) * Add prioritization CRUD methods /api/_/show_countries_prioritization /api/_/debug_prioritization * rename routers/prio to routers/prio_crud * use raw string in regex --- .../services/ooniprobe/src/ooniprobe/main.py | 5 +- .../src/ooniprobe/routers/prio_crud.py | 87 +++++++++++++++++++ ooniapi/services/ooniprobe/tests/test_prio.py | 27 +++++- 3 files changed, 116 insertions(+), 3 deletions(-) create mode 100644 ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/main.py b/ooniapi/services/ooniprobe/src/ooniprobe/main.py index 0ad5adf6..0cb1a0e8 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/main.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/main.py @@ -16,7 +16,7 @@ from . import models from .routers.v2 import vpn from .routers.v1 import probe_services -from .routers import reports, bouncer +from .routers import reports, bouncer, prio_crud from .download_geoip import try_update from .dependencies import get_postgresql_session, get_clickhouse_session, SettingsDep @@ -71,7 +71,7 @@ def update_geoip_task(): app.add_middleware( CORSMiddleware, - allow_origin_regex="^https://[-A-Za-z0-9]+(\.test)?\.ooni\.(org|io)$", + allow_origin_regex=r"^https://[-A-Za-z0-9]+(\.test)?\.ooni\.(org|io)$", allow_credentials=True, allow_methods=["*"], allow_headers=["*"], @@ -81,6 +81,7 @@ def update_geoip_task(): app.include_router(probe_services.router, prefix="/api") app.include_router(reports.router) app.include_router(bouncer.router) +app.include_router(prio_crud.router, prefix="/api") @app.get("/version") diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py b/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py new file mode 100644 index 00000000..6a997b1b --- /dev/null +++ b/ooniapi/services/ooniprobe/src/ooniprobe/routers/prio_crud.py @@ -0,0 +1,87 @@ +import logging + +from typing import List, Optional, Tuple + +from fastapi import Response, APIRouter, HTTPException, Query +from pydantic import BaseModel, Field +from sqlalchemy import sql as sa + +from ooniprobe.prio import compute_priorities, generate_test_list +from ooniprobe.dependencies import ClickhouseDep +from ooniprobe.common.clickhouse_utils import query_click +from ooniprobe.common.utils import convert_to_csv + +router = APIRouter() + +log = logging.getLogger(__name__) + + +class PrioritizationType(BaseModel): + anomaly_perc: Optional[str] = Field(description="Anomaly percent") + category_code: Optional[str] = Field(description="Category code") + cc: Optional[str] = Field(description="Country Code") + domain: Optional[str] = Field(description="Domain or wildcard (*)") + msmt_cnt: Optional[int] = Field(description="msmt_cnt") + priority: Optional[int] = Field("Priority weight") + url: Optional[str] = Field("URL or wildcard (*)") + + +@router.get("/_/show_countries_prioritization", tags=["prioritization"], response_model=None) +def show_countries_prioritization(clickhouse: ClickhouseDep, + format: Optional[str] = Query(default="JSON", description="Format of response, CSV or JSON") + ) -> List[PrioritizationType]: + sql = """ + SELECT domain, url, cc, category_code, msmt_cnt, anomaly_perc + FROM citizenlab + LEFT JOIN ( + SELECT input, probe_cc, count() AS msmt_cnt, + toInt8(countIf(anomaly = 't') / msmt_cnt * 100) AS anomaly_perc + FROM fastpath + WHERE measurement_start_time > now() - interval 1 week + AND measurement_start_time < now() + GROUP BY input, probe_cc + ) AS x ON x.input = citizenlab.url AND x.probe_cc = UPPER(citizenlab.cc) + """ + cz = tuple(query_click(clickhouse, sa.text(sql), {})) # cc can be "ZZ" here + + # Fetch priority rules and apply them to URLs + sql = "SELECT category_code, cc, domain, url, priority FROM url_priorities" + prio_rules = tuple(query_click(clickhouse, sa.text(sql), {})) + li = compute_priorities(cz, prio_rules) + for x in li: + x.pop("weight") + x["cc"] = x["cc"].upper() + + li = sorted(li, key=lambda x: (x["cc"], -x["priority"])) + + if len(li)== 0: + raise HTTPException(status_code=400, detail="no data") + + if format.upper() == "CSV": + csv_data = convert_to_csv(li) + response = Response(content=csv_data, media_type="text/csv") + return response + + return li + + +class DebugPrioritization(BaseModel): + test_items: List + entries: Tuple + prio_rules: Tuple + + +@router.get("/_/debug_prioritization", tags=["prioritization"], response_model=DebugPrioritization) +def debug_prioritization( + clickhouse: ClickhouseDep, + probe_cc: Optional[str] = Query(description="2-letter Country-Code", default="ZZ"), + category_codes: str = Query(description="Comma separated list of uppercase URL categories"), + probe_asn: int = Query(description="Probe ASN"), + limit: Optional[int] = Query(description="Maximum number of URLs to return", default=-1), + ) -> DebugPrioritization: + + test_items, entries, prio_rules = generate_test_list(clickhouse, + probe_cc, category_codes, probe_asn, limit, True + ) + return {"test_items": test_items, "entries": entries, "prio_rules": prio_rules} + diff --git a/ooniapi/services/ooniprobe/tests/test_prio.py b/ooniapi/services/ooniprobe/tests/test_prio.py index ba1a85b6..7229e649 100644 --- a/ooniapi/services/ooniprobe/tests/test_prio.py +++ b/ooniapi/services/ooniprobe/tests/test_prio.py @@ -146,4 +146,29 @@ def test_compute_priorities_country_list(): "url": "https://ooni.org/", "weight": 11.052631578947368, } - ] \ No newline at end of file + ] + +def test_show_countries_prioritization(client): + c = client.get("/api/_/show_countries_prioritization").json() + assert len(c) > 10 + assert len(c) < 60000 + assert sorted(c[0].keys()) == [ + "anomaly_perc", + "category_code", + "cc", + "domain", + "msmt_cnt", + "priority", + "url", + ] + + +def test_show_countries_prioritization_csv(client): + resp = client.get("/api/_/show_countries_prioritization?format=CSV") + assert resp.status_code == 200 + assert resp.headers["content-type"] != "application/json" + +def test_debug_prioritization(client): + resp = client.get("/api/_/debug_prioritization?probe_cc=ZZ&category_codes=GOVT&probe_asn=4242") + assert resp.status_code == 200 + assert resp.headers["content-type"] == "application/json" From 66574ad67966905af574fa5ee591f99d44adadea Mon Sep 17 00:00:00 2001 From: Luis Diaz <41093870+LDiazN@users.noreply.github.com> Date: Tue, 16 Dec 2025 11:31:12 +0100 Subject: [PATCH 8/8] S3 config parameter (#1039) * Add config bucket setting and function to read file from bucket * trigger ci --- ooniapi/common/src/common/config.py | 3 +++ .../services/ooniprobe/src/ooniprobe/utils.py | 17 ++++++++++++++--- 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/ooniapi/common/src/common/config.py b/ooniapi/common/src/common/config.py index 6512af33..f2c9c406 100644 --- a/ooniapi/common/src/common/config.py +++ b/ooniapi/common/src/common/config.py @@ -33,6 +33,9 @@ class Settings(BaseSettings): vpn_credential_refresh_hours: int = 24 + # Bucket used to store configuration files + config_bucket: str = "" + # Where the geoip DBs are downloaded to geoip_db_dir: str = "/var/lib/ooni/geoip" # Ooniprobe only diff --git a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py index 4a6dc663..83504d74 100644 --- a/ooniapi/services/ooniprobe/src/ooniprobe/utils.py +++ b/ooniapi/services/ooniprobe/src/ooniprobe/utils.py @@ -9,9 +9,11 @@ from datetime import datetime, timezone import itertools import logging -from typing import Dict, List, Mapping, TypedDict, Tuple +from typing import List, TypedDict, Tuple +import io from fastapi import Request +from mypy_boto3_s3 import S3Client from sqlalchemy.orm import Session import pem import httpx @@ -145,7 +147,7 @@ def lookup_probe_network(ipaddr: str, asn_reader: ASNReaderDep) -> Tuple[str, st "AS{}".format(resp.autonomous_system_number), resp.autonomous_system_organization or "0", ) - + def get_first_ip(headers: str) -> str: """ parse the first ip from a comma-separated list of ips encoded as a string @@ -154,5 +156,14 @@ def get_first_ip(headers: str) -> str: in: '123.123.123, 1.1.1.1' out: '123.123.123' """ + return headers.partition(',')[0] + +def read_file(s3_client : S3Client, bucket: str, file : str) -> str: + """ + Reads the content of `file` within `bucket` into a string - return headers.partition(',')[0] \ No newline at end of file + Useful for reading config files from the s3 bucket + """ + buff = io.BytesIO() + s3_client.download_fileobj(bucket, file, buff) + return buff.getvalue().decode() \ No newline at end of file