From 722e1ffa75d6d441fb13ee09a5f288318f224bee Mon Sep 17 00:00:00 2001 From: Amir Barshavit Date: Wed, 18 Feb 2026 19:42:35 +0200 Subject: [PATCH 1/2] Trim large payloads before sending report to avoid 413 errors Add _trim_payload helper that progressively halves large list fields (tool_calls, web_activity, etc.) until the JSON payload fits under 5 MB. Improves error message for HTTP 413 responses. Co-Authored-By: Claude Opus 4.6 --- openclaw_usage.py | 50 +++++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 48 insertions(+), 2 deletions(-) diff --git a/openclaw_usage.py b/openclaw_usage.py index a55c88a..8d0d3d2 100755 --- a/openclaw_usage.py +++ b/openclaw_usage.py @@ -481,6 +481,49 @@ def scan_session_logs(bot_config_dir: Path) -> Dict[str, Any]: return aggregate_tool_calls(tool_calls, sessions_count) +MAX_PAYLOAD_BYTES = 5 * 1024 * 1024 # 5 MB + + +def _trim_payload(payload: Dict[str, Any]) -> bytes: + """Serialize payload to JSON, trimming oldest entries from large lists if it exceeds MAX_PAYLOAD_BYTES.""" + data = json.dumps(payload).encode("utf-8") + if len(data) <= MAX_PAYLOAD_BYTES: + return data + + # Fields to trim, in priority order (most expendable first). + # Each is a dot-path into the payload dict. + trimmable = [ + ("session_analysis", "tool_calls"), + ("summary", "apps_commands"), + ("session_analysis", "web_activity", "browser_urls"), + ("session_analysis", "web_activity", "fetched_urls"), + ("session_analysis", "web_activity", "search_queries"), + ("summary", "web_activity", "browser_urls"), + ("summary", "web_activity", "fetched_urls"), + ("summary", "web_activity", "search_queries"), + ] + + for path in trimmable: + # Navigate to parent, get the list + obj = payload + for key in path[:-1]: + obj = obj.get(key, {}) if isinstance(obj, dict) else {} + field = path[-1] + if not isinstance(obj.get(field), list): + continue + lst = obj[field] + # Progressively halve the list until under limit + while len(lst) > 1: + lst = lst[: len(lst) // 2] + obj[field] = lst + data = json.dumps(payload).encode("utf-8") + if len(data) <= MAX_PAYLOAD_BYTES: + return data + + # If still too large after all trimming, return what we have + return json.dumps(payload).encode("utf-8") + + def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = True) -> Dict[str, Any]: """Send scan report to the API endpoint. @@ -515,7 +558,7 @@ def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = Tr try: req = urllib.request.Request( API_ENDPOINT, - data=json.dumps(payload).encode("utf-8"), + data=_trim_payload(payload), headers=headers, method="POST" ) @@ -530,10 +573,13 @@ def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = Tr except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") if e.fp else "" + error_msg = f"HTTP {e.code}: {e.reason}" + if e.code == 413: + error_msg += " - payload too large even after trimming; try reducing --limit" return { "success": False, "status_code": e.code, - "error": f"HTTP {e.code}: {e.reason}", + "error": error_msg, "response": error_body } except urllib.error.URLError as e: From 1b1dcb39136759551c308c134939d6c37838ff4c Mon Sep 17 00:00:00 2001 From: Amir Barshavit Date: Wed, 18 Feb 2026 20:01:13 +0200 Subject: [PATCH 2/2] Gzip compress payload instead of trimming old data Replace _trim_payload logic with gzip compression on the entire JSON payload. Sets Content-Encoding: gzip header so the server can decompress transparently. Co-Authored-By: Claude Opus 4.6 --- openclaw_usage.py | 51 ++++++----------------------------------------- 1 file changed, 6 insertions(+), 45 deletions(-) diff --git a/openclaw_usage.py b/openclaw_usage.py index 8d0d3d2..64ee83c 100755 --- a/openclaw_usage.py +++ b/openclaw_usage.py @@ -6,6 +6,7 @@ """ import argparse +import gzip import json import os import re @@ -481,48 +482,6 @@ def scan_session_logs(bot_config_dir: Path) -> Dict[str, Any]: return aggregate_tool_calls(tool_calls, sessions_count) -MAX_PAYLOAD_BYTES = 5 * 1024 * 1024 # 5 MB - - -def _trim_payload(payload: Dict[str, Any]) -> bytes: - """Serialize payload to JSON, trimming oldest entries from large lists if it exceeds MAX_PAYLOAD_BYTES.""" - data = json.dumps(payload).encode("utf-8") - if len(data) <= MAX_PAYLOAD_BYTES: - return data - - # Fields to trim, in priority order (most expendable first). - # Each is a dot-path into the payload dict. - trimmable = [ - ("session_analysis", "tool_calls"), - ("summary", "apps_commands"), - ("session_analysis", "web_activity", "browser_urls"), - ("session_analysis", "web_activity", "fetched_urls"), - ("session_analysis", "web_activity", "search_queries"), - ("summary", "web_activity", "browser_urls"), - ("summary", "web_activity", "fetched_urls"), - ("summary", "web_activity", "search_queries"), - ] - - for path in trimmable: - # Navigate to parent, get the list - obj = payload - for key in path[:-1]: - obj = obj.get(key, {}) if isinstance(obj, dict) else {} - field = path[-1] - if not isinstance(obj.get(field), list): - continue - lst = obj[field] - # Progressively halve the list until under limit - while len(lst) > 1: - lst = lst[: len(lst) // 2] - obj[field] = lst - data = json.dumps(payload).encode("utf-8") - if len(data) <= MAX_PAYLOAD_BYTES: - return data - - # If still too large after all trimming, return what we have - return json.dumps(payload).encode("utf-8") - def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = True) -> Dict[str, Any]: """Send scan report to the API endpoint. @@ -538,8 +497,12 @@ def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = Tr # Remove api_key from the payload (it's used in header, not body) payload = {k: v for k, v in report_data.items() if k != "api_key"} + data = json.dumps(payload).encode("utf-8") + compressed = gzip.compress(data) + headers = { "Content-Type": "application/json", + "Content-Encoding": "gzip", "Authorization": f"Bearer {api_key}", "User-Agent": "OpenClaw-Scanner/1.0" } @@ -558,7 +521,7 @@ def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = Tr try: req = urllib.request.Request( API_ENDPOINT, - data=_trim_payload(payload), + data=compressed, headers=headers, method="POST" ) @@ -574,8 +537,6 @@ def send_report(report_data: Dict[str, Any], api_key: str, verify_ssl: bool = Tr except urllib.error.HTTPError as e: error_body = e.read().decode("utf-8") if e.fp else "" error_msg = f"HTTP {e.code}: {e.reason}" - if e.code == 413: - error_msg += " - payload too large even after trimming; try reducing --limit" return { "success": False, "status_code": e.code,