Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 85 additions & 77 deletions backend/app/services/simulation_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -610,83 +610,91 @@ def _read_action_log(
try:
with open(log_path, 'r', encoding='utf-8') as f:
f.seek(position)
for line in f:
line = line.strip()
if line:
try:
action_data = json.loads(line)

# Handle event-type entries
if "event_type" in action_data:
event_type = action_data.get("event_type")

# Detect simulation_end event and mark platform as completed
if event_type == "simulation_end":
if platform == "twitter":
state.twitter_completed = True
state.twitter_running = False
logger.info(f"Twitter simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")
elif platform == "reddit":
state.reddit_completed = True
state.reddit_running = False
logger.info(f"Reddit simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")

# Check if all enabled platforms have completed.
# If only one platform is running, check only that one.
# If both platforms are running, both must complete.
all_completed = cls._check_all_platforms_completed(state)
if all_completed:
state.runner_status = RunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat()
logger.info(f"All platform simulations completed: {state.simulation_id}")

# Update round info (from round_end events)
elif event_type == "round_end":
round_num = action_data.get("round", 0)
simulated_hours = action_data.get("simulated_hours", 0)

# Update per-platform independent rounds and time
if platform == "twitter":
if round_num > state.twitter_current_round:
state.twitter_current_round = round_num
state.twitter_simulated_hours = simulated_hours
elif platform == "reddit":
if round_num > state.reddit_current_round:
state.reddit_current_round = round_num
state.reddit_simulated_hours = simulated_hours

# Overall round is the maximum across both platforms
if round_num > state.current_round:
state.current_round = round_num
# Overall time is the maximum across both platforms
state.simulated_hours = max(state.twitter_simulated_hours, state.reddit_simulated_hours)

continue

action = AgentAction(
round_num=action_data.get("round", 0),
timestamp=action_data.get("timestamp", datetime.now().isoformat()),
platform=platform,
agent_id=action_data.get("agent_id", 0),
agent_name=action_data.get("agent_name", ""),
action_type=action_data.get("action_type", ""),
action_args=action_data.get("action_args", {}),
result=action_data.get("result"),
success=action_data.get("success", True),
)
state.add_action(action)

# Update round number
if action.round_num and action.round_num > state.current_round:
state.current_round = action.round_num

# If graph memory update is enabled, send activity to Zep
if graph_updater:
graph_updater.add_activity_from_dict(action_data, platform)

except json.JSONDecodeError:
pass
return f.tell()
safe_position = position
while True:
raw_line = f.readline()
if not raw_line: # EOF
break
if not raw_line.endswith('\n'): # Partial line — wait for flush
break
safe_position = f.tell()
line = raw_line.strip()
if not line:
continue
try:
action_data = json.loads(line)

# Handle event-type entries
if "event_type" in action_data:
event_type = action_data.get("event_type")

# Detect simulation_end event and mark platform as completed
if event_type == "simulation_end":
if platform == "twitter":
state.twitter_completed = True
state.twitter_running = False
logger.info(f"Twitter simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")
elif platform == "reddit":
state.reddit_completed = True
state.reddit_running = False
logger.info(f"Reddit simulation completed: {state.simulation_id}, total_rounds={action_data.get('total_rounds')}, total_actions={action_data.get('total_actions')}")

# Check if all enabled platforms have completed.
# If only one platform is running, check only that one.
# If both platforms are running, both must complete.
all_completed = cls._check_all_platforms_completed(state)
if all_completed:
state.runner_status = RunnerStatus.COMPLETED
state.completed_at = datetime.now().isoformat()
logger.info(f"All platform simulations completed: {state.simulation_id}")

# Update round info (from round_end events)
elif event_type == "round_end":
round_num = action_data.get("round", 0)
simulated_hours = action_data.get("simulated_hours", 0)

# Update per-platform independent rounds and time
if platform == "twitter":
if round_num > state.twitter_current_round:
state.twitter_current_round = round_num
state.twitter_simulated_hours = simulated_hours
elif platform == "reddit":
if round_num > state.reddit_current_round:
state.reddit_current_round = round_num
state.reddit_simulated_hours = simulated_hours

# Overall round is the maximum across both platforms
if round_num > state.current_round:
state.current_round = round_num
# Overall time is the maximum across both platforms
state.simulated_hours = max(state.twitter_simulated_hours, state.reddit_simulated_hours)

continue

action = AgentAction(
round_num=action_data.get("round", 0),
timestamp=action_data.get("timestamp", datetime.now().isoformat()),
platform=platform,
agent_id=action_data.get("agent_id", 0),
agent_name=action_data.get("agent_name", ""),
action_type=action_data.get("action_type", ""),
action_args=action_data.get("action_args", {}),
result=action_data.get("result"),
success=action_data.get("success", True),
)
state.add_action(action)

# Update round number
if action.round_num and action.round_num > state.current_round:
state.current_round = action.round_num

# If graph memory update is enabled, send activity to Zep
if graph_updater:
graph_updater.add_activity_from_dict(action_data, platform)

except json.JSONDecodeError:
pass
return safe_position
except Exception as e:
logger.warning(f"Failed to read action log: {log_path}, error={e}")
return position
Expand Down
91 changes: 91 additions & 0 deletions backend/tests/test_read_action_log.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import json
import os
import tempfile
import pytest
from unittest.mock import patch


def _make_state(sim_id="test-sim"):
from app.services.simulation_runner import SimulationRunState
return SimulationRunState(simulation_id=sim_id)


def _call_read(path, position, state, platform="twitter"):
from app.services.simulation_runner import SimulationRunner
with patch.dict(SimulationRunner._graph_memory_enabled, {}, clear=False):
return SimulationRunner._read_action_log(path, position, state, platform)


_ACTION = {
"action_type": "post", "agent_id": 1, "agent_name": "Alice",
"round": 1, "timestamp": "2026-01-01T00:00:00",
"action_args": {}, "result": None, "success": True,
}


def test_complete_lines_all_processed():
"""All lines ending with \n are processed; final position equals file size."""
state = _make_state()
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write(json.dumps(_ACTION) + '\n')
f.write(json.dumps({**_ACTION, "agent_id": 2, "agent_name": "Bob"}) + '\n')
path = f.name
try:
new_pos = _call_read(path, 0, state)
assert len(state.recent_actions) == 2
assert new_pos == os.path.getsize(path)
finally:
os.unlink(path)


def test_partial_last_line_not_processed():
"""Partial last line (no trailing \n) is NOT processed; position stays before it."""
state = _make_state("test-partial")
complete = json.dumps(_ACTION) + '\n'
partial = '{"action_type": "like", "agent_id": 2' # no \n — in-progress write

with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write(complete)
f.write(partial)
path = f.name
try:
new_pos = _call_read(path, 0, state)
assert len(state.recent_actions) == 1
assert state.recent_actions[0].action_type == 'post'
# Position must be at end of the complete line, before the partial
assert new_pos == len(complete.encode('utf-8'))
finally:
os.unlink(path)


def test_incremental_reads_pick_up_new_lines():
"""Second read from returned position picks up lines added after first read."""
state = _make_state("test-incr")
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
f.write(json.dumps(_ACTION) + '\n')
path = f.name
try:
pos1 = _call_read(path, 0, state)
assert len(state.recent_actions) == 1

with open(path, 'a') as f:
f.write(json.dumps({**_ACTION, "agent_id": 3, "agent_name": "Charlie"}) + '\n')

pos2 = _call_read(path, pos1, state)
assert len(state.recent_actions) == 2
assert pos2 > pos1
finally:
os.unlink(path)


def test_empty_file_returns_zero():
"""Empty file returns position 0 and processes nothing."""
state = _make_state("test-empty")
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
path = f.name
try:
new_pos = _call_read(path, 0, state)
assert new_pos == 0
assert len(state.recent_actions) == 0
finally:
os.unlink(path)