diff --git a/backend/app/services/simulation_runner.py b/backend/app/services/simulation_runner.py index 19cf16ad3d..d460dc1718 100644 --- a/backend/app/services/simulation_runner.py +++ b/backend/app/services/simulation_runner.py @@ -610,83 +610,91 @@ def _read_action_log( try: with open(log_path, 'r', encoding='utf-8') as f: f.seek(position) - for line in f: - line = line.strip() - if line: - try: - action_data = json.loads(line) - - # Handle event-type entries - if "event_type" in action_data: - event_type = action_data.get("event_type") - - # Detect simulation_end event and mark platform as completed - if event_type == "simulation_end": - if platform == "twitter": - state.twitter_completed = True - state.twitter_running = False - logger.info(f"Twitter simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") - elif platform == "reddit": - state.reddit_completed = True - state.reddit_running = False - logger.info(f"Reddit simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") - - # Check if all enabled platforms have completed. - # If only one platform is running, check only that one. - # If both platforms are running, both must complete. - all_completed = cls._check_all_platforms_completed(state) - if all_completed: - state.runner_status = RunnerStatus.COMPLETED - state.completed_at = datetime.now().isoformat() - logger.info(f"All platform simulations completed: {state.simulation_id}") - - # Update round info (from round_end events) - elif event_type == "round_end": - round_num = action_data.get("round", 0) - simulated_hours = action_data.get("simulated_hours", 0) - - # Update per-platform independent rounds and time - if platform == "twitter": - if round_num > state.twitter_current_round: - state.twitter_current_round = round_num - state.twitter_simulated_hours = simulated_hours - elif platform == "reddit": - if round_num > state.reddit_current_round: - state.reddit_current_round = round_num - state.reddit_simulated_hours = simulated_hours - - # Overall round is the maximum across both platforms - if round_num > state.current_round: - state.current_round = round_num - # Overall time is the maximum across both platforms - state.simulated_hours = max(state.twitter_simulated_hours, state.reddit_simulated_hours) - - continue - - action = AgentAction( - round_num=action_data.get("round", 0), - timestamp=action_data.get("timestamp", datetime.now().isoformat()), - platform=platform, - agent_id=action_data.get("agent_id", 0), - agent_name=action_data.get("agent_name", ""), - action_type=action_data.get("action_type", ""), - action_args=action_data.get("action_args", {}), - result=action_data.get("result"), - success=action_data.get("success", True), - ) - state.add_action(action) - - # Update round number - if action.round_num and action.round_num > state.current_round: - state.current_round = action.round_num - - # If graph memory update is enabled, send activity to Zep - if graph_updater: - graph_updater.add_activity_from_dict(action_data, platform) - - except json.JSONDecodeError: - pass - return f.tell() + safe_position = position + while True: + raw_line = f.readline() + if not raw_line: # EOF + break + if not raw_line.endswith('\n'): # Partial line — wait for flush + break + safe_position = f.tell() + line = raw_line.strip() + if not line: + continue + try: + action_data = json.loads(line) + + # Handle event-type entries + if "event_type" in action_data: + event_type = action_data.get("event_type") + + # Detect simulation_end event and mark platform as completed + if event_type == "simulation_end": + if platform == "twitter": + state.twitter_completed = True + state.twitter_running = False + logger.info(f"Twitter simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") + elif platform == "reddit": + state.reddit_completed = True + state.reddit_running = False + logger.info(f"Reddit simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}") + + # Check if all enabled platforms have completed. + # If only one platform is running, check only that one. + # If both platforms are running, both must complete. + all_completed = cls._check_all_platforms_completed(state) + if all_completed: + state.runner_status = RunnerStatus.COMPLETED + state.completed_at = datetime.now().isoformat() + logger.info(f"All platform simulations completed: {state.simulation_id}") + + # Update round info (from round_end events) + elif event_type == "round_end": + round_num = action_data.get("round", 0) + simulated_hours = action_data.get("simulated_hours", 0) + + # Update per-platform independent rounds and time + if platform == "twitter": + if round_num > state.twitter_current_round: + state.twitter_current_round = round_num + state.twitter_simulated_hours = simulated_hours + elif platform == "reddit": + if round_num > state.reddit_current_round: + state.reddit_current_round = round_num + state.reddit_simulated_hours = simulated_hours + + # Overall round is the maximum across both platforms + if round_num > state.current_round: + state.current_round = round_num + # Overall time is the maximum across both platforms + state.simulated_hours = max(state.twitter_simulated_hours, state.reddit_simulated_hours) + + continue + + action = AgentAction( + round_num=action_data.get("round", 0), + timestamp=action_data.get("timestamp", datetime.now().isoformat()), + platform=platform, + agent_id=action_data.get("agent_id", 0), + agent_name=action_data.get("agent_name", ""), + action_type=action_data.get("action_type", ""), + action_args=action_data.get("action_args", {}), + result=action_data.get("result"), + success=action_data.get("success", True), + ) + state.add_action(action) + + # Update round number + if action.round_num and action.round_num > state.current_round: + state.current_round = action.round_num + + # If graph memory update is enabled, send activity to Zep + if graph_updater: + graph_updater.add_activity_from_dict(action_data, platform) + + except json.JSONDecodeError: + pass + return safe_position except Exception as e: logger.warning(f"Failed to read action log: {log_path}, error={e}") return position diff --git a/backend/tests/test_read_action_log.py b/backend/tests/test_read_action_log.py new file mode 100644 index 0000000000..bc6c1e9958 --- /dev/null +++ b/backend/tests/test_read_action_log.py @@ -0,0 +1,91 @@ +import json +import os +import tempfile +import pytest +from unittest.mock import patch + + +def _make_state(sim_id="test-sim"): + from app.services.simulation_runner import SimulationRunState + return SimulationRunState(simulation_id=sim_id) + + +def _call_read(path, position, state, platform="twitter"): + from app.services.simulation_runner import SimulationRunner + with patch.dict(SimulationRunner._graph_memory_enabled, {}, clear=False): + return SimulationRunner._read_action_log(path, position, state, platform) + + +_ACTION = { + "action_type": "post", "agent_id": 1, "agent_name": "Alice", + "round": 1, "timestamp": "2026-01-01T00:00:00", + "action_args": {}, "result": None, "success": True, +} + + +def test_complete_lines_all_processed(): + """All lines ending with \n are processed; final position equals file size.""" + state = _make_state() + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + f.write(json.dumps(_ACTION) + '\n') + f.write(json.dumps({**_ACTION, "agent_id": 2, "agent_name": "Bob"}) + '\n') + path = f.name + try: + new_pos = _call_read(path, 0, state) + assert len(state.recent_actions) == 2 + assert new_pos == os.path.getsize(path) + finally: + os.unlink(path) + + +def test_partial_last_line_not_processed(): + """Partial last line (no trailing \n) is NOT processed; position stays before it.""" + state = _make_state("test-partial") + complete = json.dumps(_ACTION) + '\n' + partial = '{"action_type": "like", "agent_id": 2' # no \n — in-progress write + + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + f.write(complete) + f.write(partial) + path = f.name + try: + new_pos = _call_read(path, 0, state) + assert len(state.recent_actions) == 1 + assert state.recent_actions[0].action_type == 'post' + # Position must be at end of the complete line, before the partial + assert new_pos == len(complete.encode('utf-8')) + finally: + os.unlink(path) + + +def test_incremental_reads_pick_up_new_lines(): + """Second read from returned position picks up lines added after first read.""" + state = _make_state("test-incr") + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + f.write(json.dumps(_ACTION) + '\n') + path = f.name + try: + pos1 = _call_read(path, 0, state) + assert len(state.recent_actions) == 1 + + with open(path, 'a') as f: + f.write(json.dumps({**_ACTION, "agent_id": 3, "agent_name": "Charlie"}) + '\n') + + pos2 = _call_read(path, pos1, state) + assert len(state.recent_actions) == 2 + assert pos2 > pos1 + finally: + os.unlink(path) + + +def test_empty_file_returns_zero(): + """Empty file returns position 0 and processes nothing.""" + state = _make_state("test-empty") + with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: + path = f.name + try: + new_pos = _call_read(path, 0, state) + assert new_pos == 0 + assert len(state.recent_actions) == 0 + finally: + os.unlink(path)