diff --git a/Sources/Fuzzilli/Corpus/RedisCorpus.swift b/Sources/Fuzzilli/Corpus/RedisCorpus.swift index d10c07fcc..ee24391a3 100644 --- a/Sources/Fuzzilli/Corpus/RedisCorpus.swift +++ b/Sources/Fuzzilli/Corpus/RedisCorpus.swift @@ -37,7 +37,17 @@ public class RedisCorpus: ComponentBase, Collection, Corpus { } eventLoopGroup = MultiThreadedEventLoopGroup(numberOfThreads: 1) if let group = eventLoopGroup { - let address = try? SocketAddress(ipAddress: "127.0.0.1", port: 6379) + // Use Redis container hostname from environment or default to localhost + let redisHost = ProcessInfo.processInfo.environment["REDIS_HOST"] ?? "127.0.0.1" + let redisPort = Int(ProcessInfo.processInfo.environment["REDIS_PORT"] ?? "6379") ?? 6379 + let dockerNetwork = ProcessInfo.processInfo.environment["DOCKER_NETWORK"] ?? "false" + + logger.info("RedisCorpus: Connecting to Redis at \(redisHost):\(redisPort)") + if dockerNetwork.lowercased() == "true" { + logger.info("RedisCorpus: Running in Docker network mode") + } + + let address = try? SocketAddress(ipAddress: redisHost, port: redisPort) let config = RedisConnectionPool.Configuration( initialServerConnectionAddresses: [address!], maximumConnectionCount: .maximumActiveConnections(1), diff --git a/vrig_docker/Dockerfile.fuzzillai b/vrig_docker/Dockerfile.fuzzillai new file mode 100644 index 000000000..105e5b5a9 --- /dev/null +++ b/vrig_docker/Dockerfile.fuzzillai @@ -0,0 +1,112 @@ +FROM ubuntu:24.04 + +ENV DEBIAN_FRONTEND=noninteractive +ENV SWIFT_VERSION=6.2 + +# Install system dependencies including V8 build requirements +RUN apt-get update && apt-get install -y \ + git \ + wget \ + curl \ + build-essential \ + cmake \ + ninja-build \ + clang \ + llvm \ + python3 \ + python3-pip \ + python3-venv \ + pkg-config \ + unzip \ + redis-tools \ + libnss3-dev \ + libatk-bridge2.0-dev \ + libdrm2 \ + libxcomposite1 \ + libxdamage1 \ + libxrandr2 \ + libgbm1 \ + libxss1 \ + libasound2t64 \ + && rm -rf /var/lib/apt/lists/* + +# Install Swift 6.2 (using Ubuntu 22.04 package as 24.04 is not available yet) +RUN wget -q https://download.swift.org/swift-${SWIFT_VERSION}-release/ubuntu2204/swift-${SWIFT_VERSION}-RELEASE/swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz \ + && tar xzf swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz \ + && mv swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04 /opt/swift \ + && rm swift-${SWIFT_VERSION}-RELEASE-ubuntu22.04.tar.gz + +ENV PATH="/opt/swift/usr/bin:${PATH}" + +WORKDIR /app + +# Clone and setup FuzzilliAI +RUN git clone https://github.com/VRIG-Ritsec/fuzzillai.git . + +# Build FuzzilliAI +RUN swift build -c release + +# Create necessary directories and clean up any old corpus +RUN mkdir -p ./Corpus ./logs +RUN rm -rf ./Corpus/old_corpus ./Corpus/corpus + +# Create a startup script that runs FuzzilliAI with Redis corpus +RUN echo '#!/bin/bash\n\ +# Start FuzzilliAI with Redis corpus support\n\ +\n\ +# Check if we are in Docker network\n\ +if [ "${DOCKER_NETWORK}" = "true" ]; then\n\ + echo "Running in Docker network mode"\n\ + echo "Service: ${SERVICE_NAME:-fuzzillai}"\n\ + echo "Redis Host: ${REDIS_HOST:-redis}"\n\ + echo "Redis Port: ${REDIS_PORT:-6379}"\n\ + echo "Profile: ${FUZZILLI_PROFILE:-v8}"\n\ + echo "Engine: ${FUZZILLI_ENGINE:-multi}"\n\ +fi\n\ +\n\ +# Check if V8 d8 binary exists in mounted directory\n\ +if [ -f "/v8/out/fuzzbuild/d8" ]; then\n\ + echo "Using mounted V8 binary from /v8/out/fuzzbuild/d8"\n\ + V8_BINARY="/v8/out/fuzzbuild/d8"\n\ +elif [ -f "/v8/out/d8" ]; then\n\ + echo "Using mounted V8 binary from /v8/out/d8"\n\ + V8_BINARY="/v8/out/d8"\n\ +else\n\ + echo "V8 binary not found in mounted directory. Available files:"\n\ + find /v8 -name "d8" -type f 2>/dev/null || echo "No d8 binary found"\n\ + exit 1\n\ +fi\n\ +\n\ +# Wait for Redis to be ready\n\ +echo "Waiting for Redis to be ready..."\n\ +REDIS_HOST=${REDIS_HOST:-redis}\n\ +REDIS_PORT=${REDIS_PORT:-6379}\n\ +\n\ +for i in {1..30}; do\n\ + if redis-cli -h "$REDIS_HOST" -p "$REDIS_PORT" ping > /dev/null 2>&1; then\n\ + echo "Redis is ready at $REDIS_HOST:$REDIS_PORT!"\n\ + break\n\ + fi\n\ + echo "Redis not ready at $REDIS_HOST:$REDIS_PORT, waiting... (attempt $i/30)"\n\ + sleep 2\n\ + if [ $i -eq 30 ]; then\n\ + echo "Redis connection timeout, proceeding anyway..."\n\ + fi\n\ +done\n\ +\n\ +# Clean up any problematic corpus directories\n\ +echo "Cleaning up corpus directories..."\n\ +rm -rf ./Corpus/old_corpus\n\ +\n\ +echo "Starting FuzzilliAI..."\n\ +swift run -c release FuzzilliCli \\\n\ + --profile=${FUZZILLI_PROFILE:-v8} \\\n\ + --engine=${FUZZILLI_ENGINE:-multi} \\\n\ + --corpus=redis \\\n\ + --storagePath=./Corpus \\\n\ + --resume \\\n\ + "$V8_BINARY"' > /app/start.sh && chmod +x /app/start.sh + +EXPOSE 6379 + +CMD ["/app/start.sh"] diff --git a/vrig_docker/Dockerfile.sync b/vrig_docker/Dockerfile.sync new file mode 100644 index 000000000..a68ce831b --- /dev/null +++ b/vrig_docker/Dockerfile.sync @@ -0,0 +1,26 @@ +FROM python:3.11-slim + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + && rm -rf /var/lib/apt/lists/* + +WORKDIR /app + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy the sync script +COPY sync.py . + +# Create a startup script +RUN echo '#!/bin/bash\n\ +echo "Starting sync service..."\n\ +echo "PostgreSQL DSN: ${PG_DSN}"\n\ +echo "Redis Streams: ${STREAMS}"\n\ +echo "Group: ${GROUP}"\n\ +echo "Consumer: ${CONSUMER}"\n\ +python sync.py' > /app/start.sh && chmod +x /app/start.sh + +CMD ["/app/start.sh"] diff --git a/vrig_docker/__pycache__/sync.cpython-312.pyc b/vrig_docker/__pycache__/sync.cpython-312.pyc new file mode 100644 index 000000000..93e4993eb Binary files /dev/null and b/vrig_docker/__pycache__/sync.cpython-312.pyc differ diff --git a/vrig_docker/database.sql b/vrig_docker/database.sql new file mode 100644 index 000000000..a69b10443 --- /dev/null +++ b/vrig_docker/database.sql @@ -0,0 +1,51 @@ +CREATE TABLE main ( + fuzzer_id SERIAL PRIMARY KEY, + created_at TIMESTAMP DEFAULT NOW() +); + +CREATE TABLE fuzzer ( + program_base64 TEXT PRIMARY KEY, -- Base64-encoded fuzzer program (unique identifier) + fuzzer_id INT NOT NULL REFERENCES main(fuzzer_id) ON DELETE CASCADE, -- Links to parent fuzzer instance + inserted_at TIMESTAMP DEFAULT NOW() +); + +CREATE TABLE execution_type ( + id SERIAL PRIMARY KEY, + title VARCHAR(32) NOT NULL UNIQUE +); + +-- Program table: Stores generated test programs +CREATE TABLE program ( + program_base64 TEXT PRIMARY KEY, -- Base64-encoded test program (unique identifier) + fuzzer_id INT NOT NULL REFERENCES main(fuzzer_id) ON DELETE CASCADE, -- Links to parent fuzzer instance + created_at TIMESTAMP DEFAULT NOW() +); + +CREATE TABLE execution ( + execution_id SERIAL PRIMARY KEY, -- Unique identifier for each execution + program_base64 TEXT NOT NULL REFERENCES program(program_base64) ON DELETE CASCADE, -- Links to the executed program + execution_type_id INTEGER NOT NULL REFERENCES execution_type(id), -- Links to execution type + feedback_vector JSONB, -- JSON structure containing execution feedback data + turboshaft_ir TEXT, -- Turboshaft intermediate representation output + coverage_total NUMERIC(5,2), -- Total code coverage percentage (0.00 to 999.99) + created_at TIMESTAMP DEFAULT NOW(), -- Timestamp when execution occurred + execution_flags TEXT[] -- Array of flags/options used during execution +); + +ALTER TABLE program +ADD CONSTRAINT fk_program_fuzzer +FOREIGN KEY (program_base64) +REFERENCES fuzzer(program_base64); + +INSERT INTO execution_type (title) VALUES + ('agentic_analysis'), + ('delta_analysis'), + ('directed_testcases'), + ('generalistic_testcases'); + + +-- Indexes for performance, just query indexes to save on speed +CREATE INDEX idx_execution_program ON execution(program_base64); +CREATE INDEX idx_execution_type ON execution(execution_type_id); +CREATE INDEX idx_execution_created ON execution(created_at); +CREATE INDEX idx_execution_coverage ON execution(coverage_total); \ No newline at end of file diff --git a/vrig_docker/docker-compose.yml b/vrig_docker/docker-compose.yml new file mode 100644 index 000000000..23684d00a --- /dev/null +++ b/vrig_docker/docker-compose.yml @@ -0,0 +1,114 @@ +services: + # PostgreSQL Database + postgres: + image: postgres:15 + environment: + POSTGRES_DB: main + POSTGRES_USER: fuzzuser + POSTGRES_PASSWORD: pass + DOCKER_NETWORK: "true" + SERVICE_NAME: "postgres" + volumes: + - postgres_data:/var/lib/postgresql/data + - ./database.sql:/docker-entrypoint-initdb.d/01-init.sql + ports: + - "5432:5432" + networks: + - fuzzillai_network + healthcheck: + test: ["CMD-SHELL", "pg_isready -U fuzzuser -d main"] + interval: 10s + timeout: 5s + retries: 5 + + # Redis for FuzzilliAI communication + redis: + image: redis:7-alpine + environment: + DOCKER_NETWORK: "true" + SERVICE_NAME: "redis" + ports: + - "6379:6379" + volumes: + - redis_data:/data + networks: + - fuzzillai_network + healthcheck: + test: ["CMD", "redis-cli", "ping"] + interval: 10s + timeout: 5s + retries: 5 + + # FuzzilliAI Fuzzer Container + fuzzillai: + build: + context: .. + dockerfile: vrig_docker/Dockerfile.fuzzillai + environment: + - REDIS_HOST=redis + - REDIS_PORT=6379 + - FUZZILLI_PROFILE=v8 + - FUZZILLI_ENGINE=multi + - DOCKER_NETWORK=true + - SERVICE_NAME=fuzzillai + volumes: + - fuzzillai_corpus:/app/Corpus + - fuzzillai_logs:/app/logs + - /home/tropic/vrig/v8/v8:/v8:ro + - fuzzillai_corpus_data:/app/Corpus/corpus + depends_on: + redis: + condition: service_healthy + restart: unless-stopped + networks: + - fuzzillai_network + healthcheck: + test: ["CMD", "pgrep", "-f", "FuzzilliCli"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + + # Sync Service - Processes Redis streams and sends to PostgreSQL + sync: + build: + context: . + dockerfile: Dockerfile.sync + environment: + - PG_DSN=postgres://fuzzuser:pass@postgres:5432/main + - STREAMS=redis1=redis://redis:6379 + - GROUP=g_fuzz + - CONSUMER=c_sync_1 + - DB_WORKER_THREADS=4 + - BATCH_SIZE=400 + - BATCH_TIMEOUT=0.1 + - DOCKER_NETWORK=true + - SERVICE_NAME=sync + depends_on: + postgres: + condition: service_healthy + redis: + condition: service_healthy + restart: unless-stopped + networks: + - fuzzillai_network + healthcheck: + test: ["CMD", "pgrep", "-f", "sync.py"] + interval: 30s + timeout: 10s + retries: 3 + start_period: 60s + +volumes: + postgres_data: + redis_data: + fuzzillai_corpus: + fuzzillai_corpus_data: + fuzzillai_logs: + +networks: + fuzzillai_network: + driver: bridge + ipam: + config: + - subnet: 172.20.0.0/16 diff --git a/vrig_docker/requirements.txt b/vrig_docker/requirements.txt new file mode 100644 index 000000000..eeca2902f --- /dev/null +++ b/vrig_docker/requirements.txt @@ -0,0 +1,4 @@ +asyncpg==0.29.0 +redis[hiredis]==5.0.1 +asyncio-mqtt==0.16.1 +backoff==2.2.1 diff --git a/vrig_docker/sync.py b/vrig_docker/sync.py index cddfd5215..8bd5d0da4 100644 --- a/vrig_docker/sync.py +++ b/vrig_docker/sync.py @@ -1,30 +1,258 @@ import asyncio, os, time from redis.asyncio import Redis import asyncpg +from typing import List, Dict, Any GROUP = os.getenv("GROUP", "g_fuzz") CONSUMER = os.getenv("CONSUMER", "c_sync_1") -STREAMS = os.getenv("STREAMS", "redis1=redis://redis1:6379,redis2=redis://redis2:6379").split(",") +STREAMS = os.getenv("STREAMS", "redis1=redis://redis:6379,redis2=redis://redis2:6379").split(",") STREAM_NAME = "stream:fuzz:updates" PG_DSN = os.getenv("PG_DSN", "postgres://fuzzuser:pass@pg:5432/main") +DB_WORKER_THREADS = int(os.getenv("DB_WORKER_THREADS", "4")) +BATCH_SIZE = int(os.getenv("BATCH_SIZE", "400")) +BATCH_TIMEOUT = float(os.getenv("BATCH_TIMEOUT", "0.1")) -CREATE_GROUP_OK = {"OK", "BUSYGROUP Consumer Group name already exists"} +UPSERT_MAIN_SQL = """ +INSERT INTO main (fuzzer_id, created_at) +VALUES ($1, NOW()) +ON CONFLICT (fuzzer_id) DO UPDATE SET + created_at = NOW(); +""" + +UPSERT_FUZZER_SQL = """ +INSERT INTO fuzzer (program_base64, fuzzer_id, inserted_at) +VALUES ($1, $2, NOW()) +ON CONFLICT (program_base64) DO UPDATE SET + inserted_at = NOW(); +""" -UPSERT_SQL = """ -INSERT INTO program (program_base64, fuzzer_id, feedback_vector, turboshaft_ir, coverage_total, created_at) -VALUES ($1, $2, $3, $4, $5, NOW()) +UPSERT_PROGRAM_SQL = """ +INSERT INTO program (program_base64, fuzzer_id, created_at) +VALUES ($1, $2, NOW()) ON CONFLICT (program_base64) DO UPDATE SET - feedback_vector = EXCLUDED.feedback_vector, - turboshaft_ir = EXCLUDED.turboshaft_ir, - coverage_total = EXCLUDED.coverage_total, created_at = NOW(); """ +UPSERT_EXECUTION_SQL = """ +INSERT INTO execution (program_base64, execution_type_id, feedback_vector, turboshaft_ir, coverage_total, execution_flags, created_at) +VALUES ($1, $2, $3, $4, $5, $6, NOW()); +""" + UPDATE_FEEDBACK_SQL = """ -UPDATE program SET feedback_vector = $2 +UPDATE execution SET feedback_vector = $2 WHERE program_base64 = $1; """ +DELETE_SQL = """ +DELETE FROM program WHERE program_base64 = $1; +""" + +class DatabaseWorker: + def __init__(self, worker_id: int, dsn: str): + self.worker_id = worker_id + self.dsn = dsn + self.connection_pool = None + + async def initialize(self): + self.connection_pool = await asyncpg.create_pool( + self.dsn, + min_size=2, + max_size=10, + command_timeout=30 + ) + + async def process_batch(self, batch: List[Dict[str, Any]]): + if not batch: + return + + async with self.connection_pool.acquire() as conn: + main_upserts = [] + fuzzer_upserts = [] + program_upserts = [] + execution_upserts = [] + updates = [] + deletes = [] + + for operation in batch: + op_type = operation.get('op') + if op_type == 'del': + deletes.append(operation) + elif op_type == 'update_feedback': + updates.append(operation) + else: + main_upserts.append(operation) + fuzzer_upserts.append(operation) + program_upserts.append(operation) + if operation.get('execution_type_id'): + execution_upserts.append(operation) + + if main_upserts: + await self._batch_upsert_main(conn, main_upserts) + if fuzzer_upserts: + await self._batch_upsert_fuzzer(conn, fuzzer_upserts) + if program_upserts: + await self._batch_upsert_program(conn, program_upserts) + if execution_upserts: + await self._batch_upsert_execution(conn, execution_upserts) + if updates: + await self._batch_update_feedback(conn, updates) + if deletes: + await self._batch_delete(conn, deletes) + + async def _batch_upsert_main(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [] + for op in operations: + fuzzer_id = int(op.get('fuzzer_id', 0) or 0) + values.append((fuzzer_id,)) + + await conn.executemany(UPSERT_MAIN_SQL, values) + + async def _batch_upsert_fuzzer(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [] + for op in operations: + program_base64 = op.get('program_base64', '') + fuzzer_id = int(op.get('fuzzer_id', 0) or 0) + values.append((program_base64, fuzzer_id)) + + await conn.executemany(UPSERT_FUZZER_SQL, values) + + async def _batch_upsert_program(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [] + for op in operations: + program_base64 = op.get('program_base64', '') + fuzzer_id = int(op.get('fuzzer_id', 0) or 0) + values.append((program_base64, fuzzer_id)) + + await conn.executemany(UPSERT_PROGRAM_SQL, values) + + async def _batch_upsert_execution(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [] + for op in operations: + program_base64 = op.get('program_base64', '') + execution_type_id = int(op.get('execution_type_id', 1) or 1) + feedback_vector = op.get('feedback_vector', 'null') + turboshaft_ir = op.get('turboshaft_ir', '') + coverage_total = float(op.get('coverage_total', 0) or 0) + execution_flags = op.get('execution_flags', []) + + values.append((program_base64, execution_type_id, feedback_vector, turboshaft_ir, coverage_total, execution_flags)) + + await conn.executemany(UPSERT_EXECUTION_SQL, values) + + async def _batch_update_feedback(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [] + for op in operations: + program_base64 = op.get('program_base64', '') + feedback_vector = op.get('feedback_vector', 'null') + values.append((program_base64, feedback_vector)) + + await conn.executemany(UPDATE_FEEDBACK_SQL, values) + + async def _batch_delete(self, conn, operations: List[Dict[str, Any]]): + if not operations: + return + + values = [(op.get('program_base64', ''),) for op in operations] + await conn.executemany(DELETE_SQL, values) + + async def close(self): + if self.connection_pool: + await self.connection_pool.close() + +class DatabaseBatchProcessor: + def __init__(self, dsn: str, num_workers: int = DB_WORKER_THREADS): + self.dsn = dsn + self.num_workers = num_workers + self.workers: List[DatabaseWorker] = [] + self.operation_queue = asyncio.Queue() + self.running = False + + async def initialize(self): + self.workers = [] + for i in range(self.num_workers): + worker = DatabaseWorker(i, self.dsn) + await worker.initialize() + self.workers.append(worker) + + self.running = True + + def add_operation(self, operation: Dict[str, Any]): + self.operation_queue.put_nowait(operation) + + async def process_operations(self): + batch = [] + last_batch_time = time.time() + + while self.running: + try: + try: + operation = await asyncio.wait_for( + self.operation_queue.get(), + timeout=0.1 + ) + batch.append(operation) + except asyncio.TimeoutError: + pass + + current_time = time.time() + should_process_batch = ( + len(batch) >= BATCH_SIZE or + (batch and current_time - last_batch_time >= BATCH_TIMEOUT) + ) + + if should_process_batch and batch: + worker_index = hash(batch[0].get('program_base64', '')) % len(self.workers) + worker = self.workers[worker_index] + + try: + await worker.process_batch(batch.copy()) + except Exception as e: + pass + + batch.clear() + last_batch_time = current_time + + await asyncio.sleep(0.01) + + except Exception as e: + await asyncio.sleep(0.1) + + async def close(self): + self.running = False + + remaining_ops = [] + while not self.operation_queue.empty(): + try: + remaining_ops.append(self.operation_queue.get_nowait()) + except: + break + + if remaining_ops: + for worker in self.workers: + try: + await worker.process_batch(remaining_ops) + break + except: + pass + + for worker in self.workers: + await worker.close() + async def ensure_group(r: Redis, stream: str): try: await r.xgroup_create(stream, GROUP, id="$", mkstream=True) @@ -32,50 +260,112 @@ async def ensure_group(r: Redis, stream: str): if "BUSYGROUP" not in str(e): raise -async def consume_stream(label: str, redis_url: str, pg): +async def consume_stream(label: str, redis_url: str, batch_processor: DatabaseBatchProcessor): r = Redis.from_url(redis_url) await ensure_group(r, STREAM_NAME) + while True: try: - # Read new messages for this consumer resp = await r.xreadgroup(GROUP, CONSUMER, {STREAM_NAME: ">"}, count=100, block=5000) if not resp: continue - # resp = [(b'stream:fuzz:updates', [(id, {b'k':b'v', ...}), ...])] + for _, entries in resp: for msg_id, data in entries: op = data.get(b'op', b'').decode() program_base64 = data.get(b'program_base64', b'').decode() + operation = { + 'op': op, + 'program_base64': program_base64, + 'msg_id': msg_id + } + if op == "del": - # Delete program entry - await pg.execute( - "DELETE FROM program WHERE program_base64=$1", program_base64 - ) + pass elif op == "update_feedback": - # Update only the feedback_vector field - feedback_vector = data.get(b'feedback_vector', b'null').decode() - await pg.execute(UPDATE_FEEDBACK_SQL, program_base64, feedback_vector) + operation['feedback_vector'] = data.get(b'feedback_vector', b'null').decode() else: - # Full upsert (op == "set" or default) - fuzzer_id = int(data.get(b'fuzzer_id', b'0').decode() or 0) - feedback_vector = data.get(b'feedback_vector', b'null').decode() - turboshaft_ir = data.get(b'turboshaft_ir', b'').decode() - coverage_total = float(data.get(b'coverage_total', b'0').decode() or 0) + operation['fuzzer_id'] = data.get(b'fuzzer_id', b'0').decode() + operation['execution_type_id'] = data.get(b'execution_type_id', b'1').decode() + operation['feedback_vector'] = data.get(b'feedback_vector', b'null').decode() + operation['turboshaft_ir'] = data.get(b'turboshaft_ir', b'').decode() + operation['coverage_total'] = data.get(b'coverage_total', b'0').decode() - await pg.execute(UPSERT_SQL, program_base64, fuzzer_id, feedback_vector, turboshaft_ir, coverage_total) + execution_flags_str = data.get(b'execution_flags', b'').decode() + if execution_flags_str: + operation['execution_flags'] = execution_flags_str.split(',') + else: + operation['execution_flags'] = [ + 'is_debug=false', + 'v8_enable_i18n_support=false', + 'dcheck_always_on=true', + 'v8_static_library=true', + 'v8_enable_verify_heap=true', + 'v8_fuzzilli=true', + 'sanitizer_coverage_flags=trace-pc-guard', + 'target_cpu=x64' + ] + + batch_processor.add_operation(operation) await r.xack(STREAM_NAME, GROUP, msg_id) + except Exception as e: - # backoff on errors await asyncio.sleep(1) async def main(): - pg = await asyncpg.connect(PG_DSN) - tasks = [] + # Check if we're in a Docker network + docker_network = os.getenv("DOCKER_NETWORK", "false").lower() == "true" + service_name = os.getenv("SERVICE_NAME", "sync") + + print(f"Starting {service_name} service...") + if docker_network: + print("Running in Docker network mode") + print(f"PostgreSQL DSN: {PG_DSN}") + print(f"Redis Streams: {STREAMS}") + print(f"Group: {GROUP}") + print(f"Consumer: {CONSUMER}") + + # Wait for PostgreSQL to be ready with retries + print("Waiting for PostgreSQL to be ready...") + max_retries = 60 # Increased retries + retry_count = 0 + + # Initial delay to let the network settle (longer in Docker) + initial_delay = 10 if docker_network else 5 + print(f"Initial network settle delay: {initial_delay}s") + await asyncio.sleep(initial_delay) + + while retry_count < max_retries: + try: + batch_processor = DatabaseBatchProcessor(PG_DSN, DB_WORKER_THREADS) + await batch_processor.initialize() + print("Successfully connected to PostgreSQL") + break + except Exception as e: + retry_count += 1 + print(f"Failed to connect to PostgreSQL (attempt {retry_count}/{max_retries}): {e}") + if retry_count >= max_retries: + print("Max retries reached, exiting") + return + # Progressive backoff: start with 2s, increase to 5s + delay = min(2 + (retry_count * 0.1), 5) + await asyncio.sleep(delay) + + batch_task = asyncio.create_task(batch_processor.process_operations()) + + stream_tasks = [] for pair in STREAMS: label, url = pair.split("=") - tasks.append(asyncio.create_task(consume_stream(label, url, pg))) - await asyncio.gather(*tasks) + task = asyncio.create_task(consume_stream(label, url, batch_processor)) + stream_tasks.append(task) + + try: + await asyncio.gather(batch_task, *stream_tasks) + except KeyboardInterrupt: + pass + finally: + await batch_processor.close() if __name__ == "__main__": asyncio.run(main()) diff --git a/vrig_docker/test_sync.py b/vrig_docker/test_sync.py new file mode 100644 index 000000000..23e872c81 --- /dev/null +++ b/vrig_docker/test_sync.py @@ -0,0 +1,118 @@ +import asyncio +import asyncpg +import json +import base64 +from sync import DatabaseWorker, DatabaseBatchProcessor + +async def setup_test_database(): + dsn = "postgres://fuzzuser:pass@localhost:5432/main" + conn = await asyncpg.connect(dsn) + + try: + await conn.execute("DELETE FROM execution") + await conn.execute("DELETE FROM program") + await conn.execute("DELETE FROM fuzzer") + await conn.execute("DELETE FROM main") + print("✓ Test database cleared") + finally: + await conn.close() + +async def test_database_worker(): + dsn = "postgres://fuzzuser:pass@localhost:5432/main" + + worker = DatabaseWorker(0, dsn) + await worker.initialize() + + test_operations = [ + { + 'op': 'set', + 'program_base64': base64.b64encode(b'test_program_1').decode(), + 'fuzzer_id': 1, + 'execution_type_id': 1, + 'feedback_vector': '{"edges": [1, 2, 3]}', + 'turboshaft_ir': 'test_ir_data', + 'coverage_total': 85.5, + 'execution_flags': ['is_debug=false', 'v8_fuzzilli=true'] + }, + { + 'op': 'update_feedback', + 'program_base64': base64.b64encode(b'test_program_1').decode(), + 'feedback_vector': '{"edges": [1, 2, 3, 4]}' + }, + { + 'op': 'del', + 'program_base64': base64.b64encode(b'test_program_2').decode() + } + ] + + try: + await worker.process_batch(test_operations) + print("✓ Database worker test passed") + except Exception as e: + print(f"✗ Database worker test failed: {e}") + finally: + await worker.close() + +async def test_batch_processor(): + dsn = "postgres://fuzzuser:pass@localhost:5432/main" + + processor = DatabaseBatchProcessor(dsn, 2) + await processor.initialize() + + for i in range(20): + operation = { + 'op': 'set', + 'program_base64': base64.b64encode(f'test_program_{i}'.encode()).decode(), + 'fuzzer_id': (i % 3) + 1, + 'execution_type_id': (i % 4) + 1, + 'feedback_vector': f'{{"edges": [{i}, {i+1}, {i+2}]}}', + 'turboshaft_ir': f'test_ir_{i}', + 'coverage_total': float(i * 5), + 'execution_flags': ['is_debug=false', 'v8_fuzzilli=true', 'target_cpu=x64'] + } + processor.add_operation(operation) + + await asyncio.sleep(3) + await processor.close() + print("✓ Batch processor test passed") + +async def verify_results(): + dsn = "postgres://fuzzuser:pass@localhost:5432/main" + conn = await asyncpg.connect(dsn) + + try: + program_count = await conn.fetchval("SELECT COUNT(*) FROM program") + execution_count = await conn.fetchval("SELECT COUNT(*) FROM execution") + + print(f"✓ Programs in database: {program_count}") + print(f"✓ Executions in database: {execution_count}") + + if execution_count > 0: + sample = await conn.fetchrow(""" + SELECT program_base64, execution_type_id, coverage_total, execution_flags + FROM execution + ORDER BY created_at DESC + LIMIT 1 + """) + + print(f"✓ Sample execution:") + print(f" Program: {sample['program_base64']}") + print(f" Type: {sample['execution_type_id']}") + print(f" Coverage: {sample['coverage_total']}%") + print(f" Flags: {sample['execution_flags']}") + + finally: + await conn.close() + +async def main(): + print("Testing multi-threaded database sync with mock data...") + + await setup_test_database() + await test_database_worker() + await test_batch_processor() + await verify_results() + + print("All tests completed") + +if __name__ == "__main__": + asyncio.run(main())