-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathrtaa.py
More file actions
130 lines (111 loc) · 4.2 KB
/
rtaa.py
File metadata and controls
130 lines (111 loc) · 4.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
#!/usr/bin/env python3
"""
Realtime Agent Launcher
Starts:
- assemblyai_multistream_to_log.py (streams a stereo file to AssemblyAI and logs END-OF-TURN lines to assemblyai.log)
- log_agents.py (tails assemblyai.log and writes transcripts.log, customer.log, recommendations.log)
Shuts down both on Ctrl+C.
"""
import os
import sys
import signal
import subprocess
import time
import argparse
import threading
from dotenv import load_dotenv
load_dotenv()
def main(input_source: str, mode: str, filepath: str):
python_bin = sys.executable or "python3"
# Verify API key early
if not os.getenv("ASSEMBLYAI_API_KEY"):
print("❌ ASSEMBLYAI_API_KEY not found in environment variables")
print(" Make sure your .env contains ASSEMBLYAI_API_KEY and the env is loaded.")
# Continue anyway; child scripts may also handle this
procs = []
try:
# Start AssemblyAI transcription
cmd = [python_bin, "-u", "transcription.py", "--mode", mode, "-i", input_source]
if mode == "file":
cmd += ["-f", filepath]
env = os.environ.copy()
env["PYTHONUNBUFFERED"] = "1"
# Let transcription.py write directly to this console (no buffering/alteration)
p1 = subprocess.Popen(
cmd,
env=env,
)
# Do not pipe its stdout; it will print directly like when run alone
# Give it a short head start to create assemblyai.log
time.sleep(0.5)
# Start log agents orchestrator
p2 = subprocess.Popen(
[python_bin, "-u", "agents.py"],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1,
env=env,
)
procs.append(("agents", p2))
print("🚀 Realtime agent started. Press Ctrl+C to stop.")
# Start dedicated reader threads for child stdout (agents only)
def pump_output(name: str, proc: subprocess.Popen):
try:
if not proc.stdout:
return
# Line-based for agents
while True:
line = proc.stdout.readline()
if not line:
break
print(f"[{name}] {line}", end="")
except Exception:
pass
readers = []
for name, p in procs:
t = threading.Thread(target=pump_output, args=(name, p), daemon=True)
t.start()
readers.append(t)
# Wait for processes to exit
while procs or (p1 and p1.poll() is None):
alive = []
for name, p in procs:
if p.poll() is None:
alive.append((name, p))
else:
print(f"[{name}] exited with code {p.returncode}")
procs = alive
time.sleep(0.1)
except KeyboardInterrupt:
print("\n🛑 Ctrl+C received. Stopping...")
finally:
# Terminate children gracefully, then kill if needed
for name, p in procs:
try:
p.terminate()
except Exception:
pass
# Give them a moment
time.sleep(1.0)
for name, p in procs:
if p.poll() is None:
try:
p.kill()
except Exception:
pass
print("✅ Shutdown complete")
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Realtime Agent Launcher")
parser.add_argument("-i", "--input", choices=["blackhole", "microphone"], default="blackhole", help="Audio input source (default: blackhole)")
parser.add_argument("--mode", "-m", choices=["live", "file"], default="file", help="Streaming mode: file-based stereo (default) or live input")
parser.add_argument("-f", "--filepath", default="./sample_call.wav", help="Stereo WAV file path for --mode file (default: sample_call.wav)")
args = parser.parse_args()
# Ensure child processes also receive SIGINT
try:
# On POSIX, set process group so children get signals
if hasattr(os, 'setpgrp'):
os.setpgrp()
except Exception:
pass
main(args.input, args.mode, args.filepath)