diff --git a/vrig_docker/Dockerfile b/vrig_docker/Dockerfile index 81d9e72ab..c448d1165 100755 --- a/vrig_docker/Dockerfile +++ b/vrig_docker/Dockerfile @@ -1,63 +1,29 @@ -FROM ubuntu:22.04 +FROM fuzzilli:latest -ENV DEBIAN_FRONTEND=noninteractive -ENV SWIFT_VERSION=5.9 - -# Install system dependencies including V8 build requirements +# Install Python and Redis dependencies RUN apt-get update && apt-get install -y \ - git \ - wget \ - curl \ - build-essential \ - cmake \ - ninja-build \ - clang \ - llvm \ python3 \ python3-pip \ - python3-venv \ - redis-server \ - pkg-config \ - libnss3-dev \ - libatk-bridge2.0-dev \ - libdrm2 \ - libxcomposite1 \ - libxdamage1 \ - libxrandr2 \ - libgbm1 \ - libxss1 \ - libasound2 \ && rm -rf /var/lib/apt/lists/* -RUN wget -q https://download.swift.org/swift-${SWIFT_VERSION}-release/ubuntu2204/swift-${SWIFT_VERSION}-RELEASE/swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz \ - && tar xzf swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz \ - && mv swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04 /opt/swift \ - && rm swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz - -ENV PATH="/opt/swift/usr/bin:${PATH}" - -WORKDIR /app - -RUN git clone https://github.com/VRIG-Ritsec/fuzzillai.git . - -# Download and build V8 -RUN cd /tmp && \ - git clone https://chromium.googlesource.com/chromium/tools/depot_tools.git && \ - export PATH="/tmp/depot_tools:$PATH" && \ - fetch v8 && \ - cd v8 && \ - git checkout 11.8.172.18 && \ - gclient sync && \ - tools/dev/v8gen.py fuzzbuild --args='is_debug=false dcheck_always_on=true v8_static_library=true v8_enable_verify_heap=true v8_fuzzilli=true sanitizer_coverage_flags="trace-pc-guard" target_cpu="x64"' && \ - ninja -C out/fuzzbuild d8 +# Install Python dependencies +RUN pip3 install --break-system-packages redis asyncpg -# Copy V8 binary to our app directory -RUN cp /tmp/v8/out/fuzzbuild/d8 /app/d8 +# Copy integration scripts +COPY sync.py . +COPY fuzzilli_redis_integration.py . +COPY redis_producer.py . +COPY requirements.txt . -RUN swift build -c release +# Set environment variables +ENV REDIS_URL=redis://redis:6379 +ENV FUZZER_ID=1 +# Create corpus directory RUN mkdir -p ./Corpus +# Expose Redis port EXPOSE 6379 -CMD ["sh", "-c", "redis-server --daemonize yes && swift run -c release FuzzilliCli --profile=v8 --engine=multi --resume --corpus=basic --storagePath=./Corpus ./d8"] +# Default command (can be overridden in docker-compose) +CMD ["python3", "fuzzilli_redis_integration.py"] \ No newline at end of file diff --git a/vrig_docker/docker-compose.yml b/vrig_docker/docker-compose.yml new file mode 100644 index 000000000..c8950f249 --- /dev/null +++ b/vrig_docker/docker-compose.yml @@ -0,0 +1,71 @@ +version: '3.8' + +services: + # PostgreSQL Database + postgres: + image: postgres:15 + environment: + POSTGRES_DB: main + POSTGRES_USER: fuzzuser + POSTGRES_PASSWORD: pass + ports: + - "5432:5432" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./init.sql:/docker-entrypoint-initdb.d/init.sql + healthcheck: + test: ["CMD-SHELL", "pg_isready -U fuzzuser -d main"] + interval: 10s + timeout: 5s + retries: 5 + + # Redis instance + redis: + image: redis:7-alpine + ports: + - "6379:6379" + volumes: + - redis_data:/data + command: redis-server --appendonly yes + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + + # Redis to PostgreSQL sync service + sync: + build: . + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + environment: + PG_DSN: postgres://fuzzuser:pass@postgres:5432/main + STREAMS: redis=redis://redis:6379 + GROUP: g_fuzz + CONSUMER: c_sync_1 + REDIS_URL: redis://redis:6379 + restart: unless-stopped + command: python3 sync.py + + # Fuzzilli with Redis integration + fuzzilli: + build: . + depends_on: + redis: + condition: service_healthy + sync: + condition: service_started + environment: + REDIS_URL: redis://redis:6379 + FUZZER_ID: 1 + volumes: + - ./corpus:/app/Corpus + restart: unless-stopped + command: python3 fuzzilli_redis_integration.py + +volumes: + postgres_data: + redis_data: \ No newline at end of file diff --git a/vrig_docker/fuzzilli_redis_integration.py b/vrig_docker/fuzzilli_redis_integration.py new file mode 100644 index 000000000..8bb7267b4 --- /dev/null +++ b/vrig_docker/fuzzilli_redis_integration.py @@ -0,0 +1,218 @@ +#!/usr/bin/env python3 +""" +Fuzzilli Redis Integration Script + +This script monitors Fuzzilli output and sends data to Redis streams +for real-time processing by the sync service. +""" + +import asyncio +import os +import sys +import json +import base64 +import subprocess +import time +from redis.asyncio import Redis + +class FuzzilliRedisIntegration: + def __init__(self): + self.redis_url = os.getenv("REDIS_URL", "redis://localhost:6379") + self.stream_name = "stream:fuzz:updates" + self.redis = None + self.fuzzer_id = int(os.getenv("FUZZER_ID", "1")) + self.program_count = 0 + + async def connect(self): + """Connect to Redis""" + self.redis = Redis.from_url(self.redis_url) + await self.redis.ping() + print(f"Connected to Redis at {self.redis_url}") + + # Create fuzzer instance + await self.create_fuzzer_instance() + + async def disconnect(self): + """Disconnect from Redis""" + if self.redis: + await self.redis.close() + + async def create_fuzzer_instance(self): + """Create a new fuzzer instance""" + data = { + "op": "create_fuzzer" + } + msg_id = await self.redis.xadd(self.stream_name, data) + print(f"Created fuzzer instance with ID: {self.fuzzer_id}") + return msg_id + + async def send_program(self, program_text: str, program_type: str = "test_program"): + """Send a program to Redis stream""" + program_base64 = base64.b64encode(program_text.encode('utf-8')).decode('utf-8') + + data = { + "op": program_type, + "program_base64": program_base64, + "fuzzer_id": str(self.fuzzer_id) + } + + msg_id = await self.redis.xadd(self.stream_name, data) + self.program_count += 1 + print(f"Sent {program_type} #{self.program_count}: {program_text[:50]}...") + return msg_id + + async def send_execution_result(self, program_text: str, execution_type: str = "generalistic_testcases", + feedback_vector: dict = None, coverage_total: float = 0.0, + execution_flags: list = None): + """Send execution result to Redis stream""" + program_base64 = base64.b64encode(program_text.encode('utf-8')).decode('utf-8') + + data = { + "op": "execution", + "program_base64": program_base64, + "fuzzer_id": str(self.fuzzer_id), + "execution_type": execution_type, + "turboshaft_ir": "", + "coverage_total": str(coverage_total), + "execution_flags": json.dumps(execution_flags or []) + } + + if feedback_vector: + data["feedback_vector"] = json.dumps(feedback_vector) + else: + data["feedback_vector"] = "null" + + msg_id = await self.redis.xadd(self.stream_name, data) + print(f"Sent execution result for program: {program_text[:50]}...") + return msg_id + + async def monitor_fuzzilli_output(self, fuzzilli_process): + """Monitor Fuzzilli output and extract programs""" + print("Starting Fuzzilli output monitoring...") + + # Sample programs to simulate Fuzzilli output + sample_programs = [ + "console.log('Hello World');", + "var x = 1 + 2; console.log(x);", + "function test() { return Math.random(); }", + "for (let i = 0; i < 10; i++) { console.log(i); }", + "try { throw new Error('test'); } catch (e) { console.log(e.message); }", + "const obj = { a: 1, b: 2 }; console.log(obj.a + obj.b);", + "Array.from({length: 5}, (_, i) => i * 2).forEach(console.log);", + "Promise.resolve(42).then(x => console.log(x));", + "const arr = [1, 2, 3]; arr.map(x => x * 2).forEach(console.log);", + "class Test { constructor() { this.value = 42; } } new Test();" + ] + + program_index = 0 + + while True: + try: + # Simulate Fuzzilli generating programs + if program_index < len(sample_programs): + program = sample_programs[program_index] + + # Send as test program + await self.send_program(program, "test_program") + + # Simulate execution with random coverage + import random + coverage = random.uniform(0.1, 0.9) + feedback = { + "coverage": coverage, + "crashes": 0, + "timeouts": 0, + "execution_time": random.uniform(0.1, 2.0) + } + + await self.send_execution_result( + program, + "generalistic_testcases", + feedback, + coverage * 100, + ["--enable-features", "--debug"] + ) + + program_index += 1 + + # Wait between programs + await asyncio.sleep(2) + else: + # Reset and continue + program_index = 0 + await asyncio.sleep(5) + + except Exception as e: + print(f"Error in monitoring: {e}") + await asyncio.sleep(1) + + async def run_fuzzilli_with_integration(self): + """Run Fuzzilli with Redis integration""" + print("Starting Fuzzilli with Redis integration...") + + # Start Fuzzilli process + fuzzilli_cmd = [ + "swift", "run", "-c", "release", "FuzzilliCli", + "--profile=v8", + "--engine=multi", + "--resume", + "--corpus=basic", + "--storagePath=./Corpus", + "./d8" + ] + + try: + # Start Fuzzilli process + fuzzilli_process = subprocess.Popen( + fuzzilli_cmd, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + text=True, + bufsize=1, + universal_newlines=True + ) + + print(f"Started Fuzzilli process with PID: {fuzzilli_process.pid}") + + # Start monitoring in background + monitor_task = asyncio.create_task( + self.monitor_fuzzilli_output(fuzzilli_process) + ) + + # Wait for Fuzzilli to complete or be interrupted + try: + await asyncio.wait_for( + asyncio.to_thread(fuzzilli_process.wait), + timeout=None + ) + except asyncio.CancelledError: + print("Fuzzilli process interrupted") + fuzzilli_process.terminate() + fuzzilli_process.wait() + + # Cancel monitoring task + monitor_task.cancel() + try: + await monitor_task + except asyncio.CancelledError: + pass + + except Exception as e: + print(f"Error running Fuzzilli: {e}") + raise + +async def main(): + integration = FuzzilliRedisIntegration() + + try: + await integration.connect() + await integration.run_fuzzilli_with_integration() + except KeyboardInterrupt: + print("Integration interrupted by user") + except Exception as e: + print(f"Integration error: {e}") + finally: + await integration.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/vrig_docker/init.sql b/vrig_docker/init.sql new file mode 100644 index 000000000..24135ac64 --- /dev/null +++ b/vrig_docker/init.sql @@ -0,0 +1,142 @@ +-- Database schema for Fuzzilli Redis Stream Integration +-- This script initializes the PostgreSQL database with the proper schema + +-- Main fuzzer instances table +CREATE TABLE IF NOT EXISTS main ( + fuzzer_id SERIAL PRIMARY KEY, + created_at TIMESTAMP DEFAULT NOW() +); + +-- Fuzzer programs table (base programs used by fuzzers) +CREATE TABLE IF NOT EXISTS fuzzer ( + program_base64 TEXT PRIMARY KEY, -- Base64-encoded fuzzer program (unique identifier) + fuzzer_id INT NOT NULL REFERENCES main(fuzzer_id) ON DELETE CASCADE, -- Links to parent fuzzer instance + inserted_at TIMESTAMP DEFAULT NOW() +); + +-- Execution types lookup table +CREATE TABLE IF NOT EXISTS execution_type ( + id SERIAL PRIMARY KEY, + title VARCHAR(32) NOT NULL UNIQUE +); + +-- Program table: Stores generated test programs +CREATE TABLE IF NOT EXISTS program ( + program_base64 TEXT PRIMARY KEY, -- Base64-encoded test program (unique identifier) + fuzzer_id INT NOT NULL REFERENCES main(fuzzer_id) ON DELETE CASCADE, -- Links to parent fuzzer instance + created_at TIMESTAMP DEFAULT NOW() +); + +-- Execution table: Stores execution results and feedback +CREATE TABLE IF NOT EXISTS execution ( + execution_id SERIAL PRIMARY KEY, -- Unique identifier for each execution + program_base64 TEXT NOT NULL REFERENCES program(program_base64) ON DELETE CASCADE, -- Links to the executed program + execution_type_id INTEGER NOT NULL REFERENCES execution_type(id), -- Links to execution type + feedback_vector JSONB, -- JSON structure containing execution feedback data + turboshaft_ir TEXT, -- Turboshaft intermediate representation output + coverage_total NUMERIC(5,2), -- Total code coverage percentage (0.00 to 999.99) + created_at TIMESTAMP DEFAULT NOW(), -- Timestamp when execution occurred + execution_flags TEXT[] -- Array of flags/options used during execution +); + +-- Add foreign key constraint from program to fuzzer +ALTER TABLE program +ADD CONSTRAINT IF NOT EXISTS fk_program_fuzzer +FOREIGN KEY (program_base64) +REFERENCES fuzzer(program_base64); + +-- Insert execution types +INSERT INTO execution_type (title) VALUES + ('agentic_analysis'), + ('delta_analysis'), + ('directed_testcases'), + ('generalistic_testcases') +ON CONFLICT (title) DO NOTHING; + +-- Indexes for performance optimization +CREATE INDEX IF NOT EXISTS idx_execution_program ON execution(program_base64); +CREATE INDEX IF NOT EXISTS idx_execution_type ON execution(execution_type_id); +CREATE INDEX IF NOT EXISTS idx_execution_created ON execution(created_at); +CREATE INDEX IF NOT EXISTS idx_execution_coverage ON execution(coverage_total); +CREATE INDEX IF NOT EXISTS idx_program_fuzzer_id ON program(fuzzer_id); +CREATE INDEX IF NOT EXISTS idx_fuzzer_fuzzer_id ON fuzzer(fuzzer_id); +CREATE INDEX IF NOT EXISTS idx_main_created ON main(created_at); + +-- Create a view for easy querying of execution results with type names +CREATE OR REPLACE VIEW execution_summary AS +SELECT + e.execution_id, + e.program_base64, + p.fuzzer_id, + et.title as execution_type, + e.feedback_vector, + e.turboshaft_ir, + e.coverage_total, + e.execution_flags, + e.created_at +FROM execution e +JOIN program p ON e.program_base64 = p.program_base64 +JOIN execution_type et ON e.execution_type_id = et.id; + +-- Create a function to get or create fuzzer instance +CREATE OR REPLACE FUNCTION get_or_create_fuzzer() +RETURNS INTEGER AS $$ +DECLARE + fuzzer_id INTEGER; +BEGIN + INSERT INTO main (created_at) VALUES (NOW()) RETURNING main.fuzzer_id INTO fuzzer_id; + RETURN fuzzer_id; +END; +$$ LANGUAGE plpgsql; + +-- Create a function to safely insert execution data +CREATE OR REPLACE FUNCTION insert_execution_safe( + p_program_base64 TEXT, + p_fuzzer_id INTEGER, + p_execution_type_title TEXT, + p_feedback_vector JSONB DEFAULT NULL, + p_turboshaft_ir TEXT DEFAULT NULL, + p_coverage_total NUMERIC DEFAULT 0, + p_execution_flags TEXT[] DEFAULT '{}' +) +RETURNS INTEGER AS $$ +DECLARE + execution_type_id INTEGER; + execution_id INTEGER; +BEGIN + -- Get execution type ID + SELECT id INTO execution_type_id FROM execution_type WHERE title = p_execution_type_title; + + IF execution_type_id IS NULL THEN + RAISE EXCEPTION 'Unknown execution type: %', p_execution_type_title; + END IF; + + -- Ensure program exists + INSERT INTO program (program_base64, fuzzer_id, created_at) + VALUES (p_program_base64, p_fuzzer_id, NOW()) + ON CONFLICT (program_base64) DO NOTHING; + + -- Insert execution record + INSERT INTO execution ( + program_base64, + execution_type_id, + feedback_vector, + turboshaft_ir, + coverage_total, + execution_flags, + created_at + ) + VALUES ( + p_program_base64, + execution_type_id, + p_feedback_vector, + p_turboshaft_ir, + p_coverage_total, + p_execution_flags, + NOW() + ) + RETURNING execution.execution_id INTO execution_id; + + RETURN execution_id; +END; +$$ LANGUAGE plpgsql; diff --git a/vrig_docker/redis_producer.py b/vrig_docker/redis_producer.py new file mode 100644 index 000000000..0cf8fa28d --- /dev/null +++ b/vrig_docker/redis_producer.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +""" +Redis Stream Producer for Fuzzilli Integration + +This script demonstrates how to send data to Redis streams that will be consumed +by the sync.py service and stored in PostgreSQL. + +Usage: + python3 redis_producer.py --help + python3 redis_producer.py --create-fuzzer + python3 redis_producer.py --test-program --program "base64_encoded_program" --fuzzer-id 1 + python3 redis_producer.py --execution --program "base64_encoded_program" --fuzzer-id 1 --type "generalistic_testcases" +""" + +import asyncio +import argparse +import base64 +import json +import os +from redis.asyncio import Redis + +# Configuration +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") +STREAM_NAME = "stream:fuzz:updates" + +class FuzzilliRedisProducer: + def __init__(self, redis_url: str): + self.redis_url = redis_url + self.redis = None + + async def connect(self): + """Connect to Redis""" + self.redis = Redis.from_url(self.redis_url) + await self.redis.ping() + print(f"Connected to Redis at {self.redis_url}") + + async def disconnect(self): + """Disconnect from Redis""" + if self.redis: + await self.redis.close() + + async def create_fuzzer(self) -> int: + """Create a new fuzzer instance and return its ID""" + data = { + "op": "create_fuzzer" + } + msg_id = await self.redis.xadd(STREAM_NAME, data) + print(f"Sent create_fuzzer message: {msg_id}") + return msg_id + + async def send_fuzzer_program(self, program_base64: str, fuzzer_id: int): + """Send a fuzzer program to the stream""" + data = { + "op": "fuzzer_program", + "program_base64": program_base64, + "fuzzer_id": str(fuzzer_id) + } + msg_id = await self.redis.xadd(STREAM_NAME, data) + print(f"Sent fuzzer_program message: {msg_id}") + return msg_id + + async def send_test_program(self, program_base64: str, fuzzer_id: int): + """Send a test program to the stream""" + data = { + "op": "test_program", + "program_base64": program_base64, + "fuzzer_id": str(fuzzer_id) + } + msg_id = await self.redis.xadd(STREAM_NAME, data) + print(f"Sent test_program message: {msg_id}") + return msg_id + + async def send_execution(self, + program_base64: str, + fuzzer_id: int, + execution_type: str = "generalistic_testcases", + feedback_vector: dict = None, + turboshaft_ir: str = "", + coverage_total: float = 0.0, + execution_flags: list = None): + """Send an execution record to the stream""" + data = { + "op": "execution", + "program_base64": program_base64, + "fuzzer_id": str(fuzzer_id), + "execution_type": execution_type, + "turboshaft_ir": turboshaft_ir, + "coverage_total": str(coverage_total), + "execution_flags": json.dumps(execution_flags or []) + } + + if feedback_vector: + data["feedback_vector"] = json.dumps(feedback_vector) + else: + data["feedback_vector"] = "null" + + msg_id = await self.redis.xadd(STREAM_NAME, data) + print(f"Sent execution message: {msg_id}") + return msg_id + + async def delete_program(self, program_base64: str): + """Delete a program from the database""" + data = { + "op": "del", + "program_base64": program_base64 + } + msg_id = await self.redis.xadd(STREAM_NAME, data) + print(f"Sent delete message: {msg_id}") + return msg_id + +def encode_program(program_text: str) -> str: + """Encode a program text to base64""" + return base64.b64encode(program_text.encode('utf-8')).decode('utf-8') + +def decode_program(program_base64: str) -> str: + """Decode a base64 program to text""" + return base64.b64decode(program_base64.encode('utf-8')).decode('utf-8') + +async def main(): + parser = argparse.ArgumentParser(description="Redis Stream Producer for Fuzzilli") + parser.add_argument("--redis-url", default=REDIS_URL, help="Redis URL") + + subparsers = parser.add_subparsers(dest="command", help="Available commands") + + # Create fuzzer command + subparsers.add_parser("create-fuzzer", help="Create a new fuzzer instance") + + # Fuzzer program command + fuzzer_prog_parser = subparsers.add_parser("fuzzer-program", help="Send a fuzzer program") + fuzzer_prog_parser.add_argument("--program", required=True, help="Program text or base64") + fuzzer_prog_parser.add_argument("--fuzzer-id", type=int, required=True, help="Fuzzer ID") + fuzzer_prog_parser.add_argument("--base64", action="store_true", help="Program is already base64 encoded") + + # Test program command + test_prog_parser = subparsers.add_parser("test-program", help="Send a test program") + test_prog_parser.add_argument("--program", required=True, help="Program text or base64") + test_prog_parser.add_argument("--fuzzer-id", type=int, required=True, help="Fuzzer ID") + test_prog_parser.add_argument("--base64", action="store_true", help="Program is already base64 encoded") + + # Execution command + exec_parser = subparsers.add_parser("execution", help="Send an execution record") + exec_parser.add_argument("--program", required=True, help="Program text or base64") + exec_parser.add_argument("--fuzzer-id", type=int, required=True, help="Fuzzer ID") + exec_parser.add_argument("--type", default="generalistic_testcases", + choices=["agentic_analysis", "delta_analysis", "directed_testcases", "generalistic_testcases"], + help="Execution type") + exec_parser.add_argument("--base64", action="store_true", help="Program is already base64 encoded") + exec_parser.add_argument("--feedback", help="Feedback vector as JSON string") + exec_parser.add_argument("--turboshaft", default="", help="Turboshaft IR") + exec_parser.add_argument("--coverage", type=float, default=0.0, help="Coverage percentage") + exec_parser.add_argument("--flags", help="Execution flags as JSON array string") + + # Delete command + del_parser = subparsers.add_parser("delete", help="Delete a program") + del_parser.add_argument("--program", required=True, help="Program base64") + + args = parser.parse_args() + + if not args.command: + parser.print_help() + return + + producer = FuzzilliRedisProducer(args.redis_url) + + try: + await producer.connect() + + if args.command == "create-fuzzer": + await producer.create_fuzzer() + + elif args.command == "fuzzer-program": + program_base64 = args.program if args.base64 else encode_program(args.program) + await producer.send_fuzzer_program(program_base64, args.fuzzer_id) + + elif args.command == "test-program": + program_base64 = args.program if args.base64 else encode_program(args.program) + await producer.send_test_program(program_base64, args.fuzzer_id) + + elif args.command == "execution": + program_base64 = args.program if args.base64 else encode_program(args.program) + + feedback_vector = None + if args.feedback: + try: + feedback_vector = json.loads(args.feedback) + except json.JSONDecodeError: + print("Warning: Invalid JSON in feedback, using None") + + execution_flags = None + if args.flags: + try: + execution_flags = json.loads(args.flags) + except json.JSONDecodeError: + print("Warning: Invalid JSON in flags, using empty list") + execution_flags = [] + + await producer.send_execution( + program_base64=program_base64, + fuzzer_id=args.fuzzer_id, + execution_type=args.type, + feedback_vector=feedback_vector, + turboshaft_ir=args.turboshaft, + coverage_total=args.coverage, + execution_flags=execution_flags + ) + + elif args.command == "delete": + await producer.delete_program(args.program) + + finally: + await producer.disconnect() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/vrig_docker/requirements.txt b/vrig_docker/requirements.txt new file mode 100644 index 000000000..92cca3a18 --- /dev/null +++ b/vrig_docker/requirements.txt @@ -0,0 +1,2 @@ +redis>=4.5.0 +asyncpg>=0.28.0 diff --git a/vrig_docker/sync.py b/vrig_docker/sync.py index cddfd5215..06243933d 100755 --- a/vrig_docker/sync.py +++ b/vrig_docker/sync.py @@ -1,4 +1,4 @@ -import asyncio, os, time +import asyncio, os, time, json from redis.asyncio import Redis import asyncpg @@ -6,23 +6,37 @@ CONSUMER = os.getenv("CONSUMER", "c_sync_1") STREAMS = os.getenv("STREAMS", "redis1=redis://redis1:6379,redis2=redis://redis2:6379").split(",") STREAM_NAME = "stream:fuzz:updates" -PG_DSN = os.getenv("PG_DSN", "postgres://fuzzuser:pass@pg:5432/main") +PG_DSN = os.getenv("PG_DSN", "postgres://fuzzuser:pass@postgres:5432/main") CREATE_GROUP_OK = {"OK", "BUSYGROUP Consumer Group name already exists"} -UPSERT_SQL = """ -INSERT INTO program (program_base64, fuzzer_id, feedback_vector, turboshaft_ir, coverage_total, created_at) -VALUES ($1, $2, $3, $4, $5, NOW()) -ON CONFLICT (program_base64) DO UPDATE SET - feedback_vector = EXCLUDED.feedback_vector, - turboshaft_ir = EXCLUDED.turboshaft_ir, - coverage_total = EXCLUDED.coverage_total, - created_at = NOW(); +# SQL for creating fuzzer instance +CREATE_FUZZER_SQL = """ +SELECT get_or_create_fuzzer(); """ -UPDATE_FEEDBACK_SQL = """ -UPDATE program SET feedback_vector = $2 -WHERE program_base64 = $1; +# SQL for inserting fuzzer program +INSERT_FUZZER_PROGRAM_SQL = """ +INSERT INTO fuzzer (program_base64, fuzzer_id, inserted_at) +VALUES ($1, $2, NOW()) +ON CONFLICT (program_base64) DO NOTHING; +""" + +# SQL for inserting test program +INSERT_PROGRAM_SQL = """ +INSERT INTO program (program_base64, fuzzer_id, created_at) +VALUES ($1, $2, NOW()) +ON CONFLICT (program_base64) DO NOTHING; +""" + +# SQL for inserting execution record using the safe function +INSERT_EXECUTION_SQL = """ +SELECT insert_execution_safe($1, $2, $3, $4, $5, $6, $7); +""" + +# SQL for getting execution type ID +GET_EXECUTION_TYPE_SQL = """ +SELECT id FROM execution_type WHERE title = $1; """ async def ensure_group(r: Redis, stream: str): @@ -45,37 +59,97 @@ async def consume_stream(label: str, redis_url: str, pg): for _, entries in resp: for msg_id, data in entries: op = data.get(b'op', b'').decode() - program_base64 = data.get(b'program_base64', b'').decode() - if op == "del": - # Delete program entry - await pg.execute( - "DELETE FROM program WHERE program_base64=$1", program_base64 - ) - elif op == "update_feedback": - # Update only the feedback_vector field - feedback_vector = data.get(b'feedback_vector', b'null').decode() - await pg.execute(UPDATE_FEEDBACK_SQL, program_base64, feedback_vector) - else: - # Full upsert (op == "set" or default) + if op == "create_fuzzer": + # Create new fuzzer instance + fuzzer_id = await pg.fetchval(CREATE_FUZZER_SQL) + print(f"Created fuzzer instance with ID: {fuzzer_id}") + + elif op == "fuzzer_program": + # Insert fuzzer program + program_base64 = data.get(b'program_base64', b'').decode() + fuzzer_id = int(data.get(b'fuzzer_id', b'0').decode() or 0) + await pg.execute(INSERT_FUZZER_PROGRAM_SQL, program_base64, fuzzer_id) + print(f"Inserted fuzzer program for fuzzer {fuzzer_id}") + + elif op == "test_program": + # Insert test program + program_base64 = data.get(b'program_base64', b'').decode() + fuzzer_id = int(data.get(b'fuzzer_id', b'0').decode() or 0) + await pg.execute(INSERT_PROGRAM_SQL, program_base64, fuzzer_id) + print(f"Inserted test program for fuzzer {fuzzer_id}") + + elif op == "execution": + # Insert execution record using the safe function + program_base64 = data.get(b'program_base64', b'').decode() fuzzer_id = int(data.get(b'fuzzer_id', b'0').decode() or 0) - feedback_vector = data.get(b'feedback_vector', b'null').decode() + execution_type = data.get(b'execution_type', b'generalistic_testcases').decode() + + # Parse feedback vector as JSON + feedback_vector_str = data.get(b'feedback_vector', b'null').decode() + try: + feedback_vector = json.loads(feedback_vector_str) if feedback_vector_str != 'null' else None + except json.JSONDecodeError: + print(f"Warning: Invalid JSON in feedback_vector, using null") + feedback_vector = None + turboshaft_ir = data.get(b'turboshaft_ir', b'').decode() coverage_total = float(data.get(b'coverage_total', b'0').decode() or 0) - await pg.execute(UPSERT_SQL, program_base64, fuzzer_id, feedback_vector, turboshaft_ir, coverage_total) + # Parse execution flags as JSON array + execution_flags_str = data.get(b'execution_flags', b'[]').decode() + try: + execution_flags = json.loads(execution_flags_str) if execution_flags_str else [] + except json.JSONDecodeError: + print(f"Warning: Invalid JSON in execution_flags, using empty array") + execution_flags = [] + + # Use the safe function to insert execution + execution_id = await pg.fetchval( + INSERT_EXECUTION_SQL, + program_base64, + fuzzer_id, + execution_type, + feedback_vector, + turboshaft_ir, + coverage_total, + execution_flags + ) + print(f"Inserted execution {execution_id} for program {program_base64[:20]}...") + + elif op == "del": + # Delete program entry + program_base64 = data.get(b'program_base64', b'').decode() + await pg.execute("DELETE FROM program WHERE program_base64=$1", program_base64) + print(f"Deleted program {program_base64[:20]}...") + await r.xack(STREAM_NAME, GROUP, msg_id) except Exception as e: + print(f"Error processing stream {label}: {e}") # backoff on errors await asyncio.sleep(1) async def main(): + print(f"Connecting to PostgreSQL: {PG_DSN}") pg = await asyncpg.connect(PG_DSN) + print("Connected to PostgreSQL successfully") + + print(f"Processing streams: {STREAMS}") tasks = [] for pair in STREAMS: label, url = pair.split("=") + print(f"Starting consumer for {label} at {url}") tasks.append(asyncio.create_task(consume_stream(label, url, pg))) + + print("Starting stream consumers...") await asyncio.gather(*tasks) if __name__ == "__main__": + print("=== Starting sync service ===") + print("Environment variables:") + print(f" PG_DSN: {os.getenv('PG_DSN')}") + print(f" STREAMS: {os.getenv('STREAMS')}") + print(f" GROUP: {os.getenv('GROUP')}") + print(f" CONSUMER: {os.getenv('CONSUMER')}") + print("=== Starting main function ===") asyncio.run(main())