-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathhotmic.py
More file actions
814 lines (700 loc) · 28.1 KB
/
hotmic.py
File metadata and controls
814 lines (700 loc) · 28.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
#!/usr/bin/env python3
import asyncio
import contextlib
import json
import os
import platform
import queue
import signal
import sys
import threading
import time
import tkinter as tk
from dataclasses import dataclass
from typing import Optional
# Runtime deps (install via requirements.txt):
# websockets, sounddevice, pynput, pyperclip
import websockets
import sounddevice as sd
from pynput import keyboard, mouse
import pyperclip
@dataclass
class Config:
endpoint: str
hotkey: str
autopaste: bool
samplerate: int
channels: int
block_samples: int # samples per audio block
input_device: Optional[int]
connect_timeout: float
stop_flush_wait: float
# New options for hold-to-talk mode
enable_hold_mode: bool
hold_hotkey: Optional[str]
enable_mouse_button: bool
mouse_button: str
show_visual_indicator: bool
# Voice Activity Detection options
enable_vad: bool
vad_energy_threshold: float
vad_min_speech_duration: float
silence_timeout: float
test_mic_on_startup: bool
def load_config(path: str = "config.json") -> Config:
if not os.path.exists(path):
raise SystemExit(f"config.json not found at {os.path.abspath(path)}. Please create it.")
with open(path, "r", encoding="utf-8") as f:
data = json.load(f)
required = {
"endpoint",
"hotkey",
"autopaste",
"samplerate",
"channels",
"block_samples",
"input_device",
"connect_timeout",
"stop_flush_wait",
}
missing = [k for k in sorted(required) if k not in data]
if missing:
raise SystemExit(f"Missing required config keys: {', '.join(missing)}")
return Config(
endpoint=data["endpoint"],
hotkey=data["hotkey"],
autopaste=bool(data["autopaste"]),
samplerate=int(data["samplerate"]),
channels=int(data["channels"]),
block_samples=int(data["block_samples"]),
input_device=data.get("input_device"),
connect_timeout=float(data["connect_timeout"]),
stop_flush_wait=float(data["stop_flush_wait"]),
# New options with defaults
enable_hold_mode=bool(data.get("enable_hold_mode", False)),
hold_hotkey=data.get("hold_hotkey"),
enable_mouse_button=bool(data.get("enable_mouse_button", False)),
mouse_button=data.get("mouse_button", "middle"),
show_visual_indicator=bool(data.get("show_visual_indicator", True)),
# Voice Activity Detection options
enable_vad=bool(data.get("enable_vad", True)),
vad_energy_threshold=float(data.get("vad_energy_threshold", 500.0)),
vad_min_speech_duration=float(data.get("vad_min_speech_duration", 0.3)),
silence_timeout=float(data.get("silence_timeout", 2.0)),
test_mic_on_startup=bool(data.get("test_mic_on_startup", True)),
)
def calculate_audio_energy(audio_data: bytes) -> float:
"""Calculate the RMS energy of audio data"""
import numpy as np
# Convert bytes to numpy array (int16)
audio_array = np.frombuffer(audio_data, dtype=np.int16)
if len(audio_array) == 0:
return 0.0
# Calculate RMS (Root Mean Square)
rms = np.sqrt(np.mean(audio_array.astype(np.float32) ** 2))
return float(rms)
def test_microphone(cfg: Config) -> bool:
"""Test if microphone is working properly"""
print("[hotmic] Testing microphone...")
try:
test_duration = 2.0 # seconds
test_chunks = []
def callback(indata, frames, time_info, status):
test_chunks.append(bytes(indata))
# Record for a short duration
with sd.RawInputStream(
samplerate=cfg.samplerate,
channels=cfg.channels,
dtype="int16",
blocksize=cfg.block_samples,
callback=callback,
device=cfg.input_device,
) as stream:
print(f"[hotmic] Recording for {test_duration} seconds to test microphone...")
print("[hotmic] Please make some noise (speak, tap the mic, etc.)")
time.sleep(test_duration)
# Analyze collected audio
if not test_chunks:
print("[hotmic] ❌ ERROR: No audio data received from microphone!")
return False
# Calculate average energy
total_energy = sum(calculate_audio_energy(chunk) for chunk in test_chunks)
avg_energy = total_energy / len(test_chunks)
print(f"[hotmic] Microphone test results:")
print(f"[hotmic] - Collected {len(test_chunks)} audio chunks")
print(f"[hotmic] - Average audio energy: {avg_energy:.2f}")
if avg_energy < 10.0:
print("[hotmic] ⚠️ WARNING: Microphone audio level is very low!")
print("[hotmic] The microphone might be muted or not working properly.")
print("[hotmic] Please check your microphone settings.")
return False
else:
print("[hotmic] ✅ Microphone is working correctly!")
return True
except Exception as e:
print(f"[hotmic] ❌ ERROR testing microphone: {e}")
return False
class VisualIndicator:
"""Visual indicator window showing recording status"""
def __init__(self):
self.root: Optional[tk.Tk] = None
self._thread: Optional[threading.Thread] = None
self._visible = threading.Event()
self._should_exit = threading.Event()
def start_ui_thread(self):
"""Start the UI thread"""
if self._thread is None:
self._thread = threading.Thread(target=self._run_ui, daemon=True)
self._thread.start()
# Wait a moment for the window to be created
time.sleep(0.1)
def _run_ui(self):
"""Run the tkinter UI in a separate thread"""
self.root = tk.Tk()
self.root.withdraw() # Start hidden
# Configure window
self.root.overrideredirect(True) # Remove window decorations
self.root.attributes('-topmost', True) # Always on top
# Set transparent background if supported
try:
self.root.attributes('-alpha', 0.9)
except:
pass
# Create label
label = tk.Label(
self.root,
text="🎤 Recording...",
font=("Arial", 14, "bold"),
bg="#FF4444",
fg="white",
padx=20,
pady=10
)
label.pack()
# Position at top-center of screen
self.root.update_idletasks()
screen_width = self.root.winfo_screenwidth()
window_width = self.root.winfo_width()
x = (screen_width - window_width) // 2
y = 50
self.root.geometry(f"+{x}+{y}")
# Check visibility state periodically
def check_visibility():
if self._should_exit.is_set():
self.root.quit()
return
if self._visible.is_set():
if self.root.state() == 'withdrawn':
self.root.deiconify()
else:
if self.root.state() != 'withdrawn':
self.root.withdraw()
self.root.after(50, check_visibility)
self.root.after(50, check_visibility)
self.root.mainloop()
def show(self):
"""Show the indicator"""
self._visible.set()
def hide(self):
"""Hide the indicator"""
self._visible.clear()
def shutdown(self):
"""Shutdown the indicator"""
self._should_exit.set()
if self.root:
try:
self.root.quit()
except:
pass
class Recorder:
def __init__(self, cfg: Config):
self.cfg = cfg
self._stream: Optional[sd.RawInputStream] = None
self._q: "queue.Queue[bytes]" = queue.Queue()
self._running = threading.Event()
self._running.clear()
self._last_chunk_time = 0.0
# Voice activity detection
self._has_speech = False
self._speech_start_time = 0.0
self._total_audio_energy = 0.0
self._chunk_count = 0
self._last_speech_time = 0.0
def start(self):
if self._running.is_set():
return
self._running.set()
# Reset VAD state
self._has_speech = False
self._speech_start_time = 0.0
self._total_audio_energy = 0.0
self._chunk_count = 0
self._last_speech_time = 0.0
def callback(indata, frames, time_info, status):
if status:
# Non-fatal audio status (overflows/underflows)
pass
if not self._running.is_set():
return
# indata is bytes since RawInputStream with dtype=int16
chunk = bytes(indata)
self._q.put(chunk)
self._last_chunk_time = time.time()
# Voice activity detection
if self.cfg.enable_vad:
energy = calculate_audio_energy(chunk)
self._total_audio_energy += energy
self._chunk_count += 1
# Check if this chunk contains speech
if energy > self.cfg.vad_energy_threshold:
if not self._has_speech:
self._speech_start_time = time.time()
self._has_speech = True
self._last_speech_time = time.time()
self._stream = sd.RawInputStream(
samplerate=self.cfg.samplerate,
channels=self.cfg.channels,
dtype="int16",
blocksize=self.cfg.block_samples,
callback=callback,
device=self.cfg.input_device,
)
self._stream.start()
def stop(self):
self._running.clear()
if self._stream is not None:
with contextlib.suppress(Exception):
self._stream.stop()
with contextlib.suppress(Exception):
self._stream.close()
self._stream = None
def get_chunk_nowait(self) -> Optional[bytes]:
try:
return self._q.get_nowait()
except queue.Empty:
return None
def drain_remaining(self, timeout: float = 0.5) -> list[bytes]:
# Give a brief moment for final callback(s) to enqueue
time.sleep(timeout)
chunks: list[bytes] = []
while True:
try:
chunks.append(self._q.get_nowait())
except queue.Empty:
break
return chunks
def has_speech_activity(self) -> bool:
"""Check if speech activity was detected during recording"""
if not self.cfg.enable_vad:
return True # If VAD is disabled, assume there's always speech
# Check if we detected any speech
if not self._has_speech:
return False
# Check if speech duration is long enough
if self._speech_start_time > 0:
speech_duration = self._last_speech_time - self._speech_start_time
if speech_duration < self.cfg.vad_min_speech_duration:
return False
return True
def get_average_energy(self) -> float:
"""Get average audio energy during recording"""
if self._chunk_count == 0:
return 0.0
return self._total_audio_energy / self._chunk_count
class Session:
def __init__(self, cfg: Config):
self.cfg = cfg
self.ws: Optional[websockets.WebSocketClientProtocol] = None
self._rx_task: Optional[asyncio.Task] = None
self._rx_stop = asyncio.Event()
self.transcript = ""
self._final_event = asyncio.Event() # set when status: idle after stop
self._open = False
self._awaiting_final = False
async def connect(self):
self._final_event.clear()
self._rx_stop.clear()
self.transcript = ""
self.ws = await asyncio.wait_for(
websockets.connect(self.cfg.endpoint), timeout=self.cfg.connect_timeout
)
self._open = True
self._rx_task = asyncio.create_task(self._receiver())
async def _receiver(self):
try:
async for message in self.ws:
# Server uses JSON text frames
try:
data = json.loads(message)
except Exception:
continue
typ = data.get("type")
if typ == "text":
if data.get("isNewResponse"):
self.transcript = data.get("content", "")
else:
self.transcript += data.get("content", "")
elif typ == "status":
# After stop_recording flow completes, server sends 'idle'
if data.get("status") == "idle" and self._awaiting_final:
self._final_event.set()
elif typ == "error":
# Treat errors as terminal for this utterance
self._final_event.set()
except Exception:
pass
finally:
self._open = False
self._rx_stop.set()
async def start_recording(self):
# Reconnect if no socket or previously closed
if (self.ws is None) or getattr(self.ws, "closed", True) or (not self._open):
await self.connect()
await self.ws.send(json.dumps({"type": "start_recording"}))
async def send_audio(self, chunk: bytes):
if self.ws and self._open and chunk:
await self.ws.send(chunk)
async def stop_recording(self):
# The server expects any remaining audio first, then a small delay, then stop message.
self._awaiting_final = True
await self.ws.send(json.dumps({"type": "stop_recording"}))
async def wait_final(self, timeout: float = 20.0):
try:
await asyncio.wait_for(self._final_event.wait(), timeout=timeout)
except asyncio.TimeoutError:
pass
finally:
self._awaiting_final = False
self._final_event.clear()
async def close(self):
if self.ws:
with contextlib.suppress(Exception):
await self.ws.close()
if self._rx_task:
self._rx_task.cancel()
with contextlib.suppress(Exception):
await self._rx_task
self._open = False
self.ws = None
self._rx_task = None
self._awaiting_final = False
class HotMic:
def __init__(self, cfg: Config):
self.cfg = cfg
self.rec = Recorder(cfg)
self.loop = asyncio.new_event_loop()
self._loop_thread = threading.Thread(target=self._run_loop, daemon=True)
self._loop_thread.start()
self._sending_task: Optional[asyncio.Future] = None
self._sess: Optional[Session] = None
self._active = False
self._kb_controller = keyboard.Controller()
# Visual indicator
self.indicator: Optional[VisualIndicator] = None
if cfg.show_visual_indicator:
self.indicator = VisualIndicator()
self.indicator.start_ui_thread()
def _run_loop(self):
asyncio.set_event_loop(self.loop)
self.loop.run_forever()
def _call_soon(self, coro):
return asyncio.run_coroutine_threadsafe(coro, self.loop)
def _ensure_session(self) -> Session:
if not self._sess:
self._sess = Session(self.cfg)
return self._sess
def start(self):
if self._active:
return
self._active = True
print("[hotmic] start recording …")
# Show visual indicator
if self.indicator:
self.indicator.show()
self.rec.start()
sess = self._ensure_session()
# Connect and send start
self._call_soon(sess.start_recording()).result()
# Spawn sender
self._sending_task = self._call_soon(self._sender_loop())
async def _sender_loop(self):
assert self._sess
try:
while self._active:
chunk = self.rec.get_chunk_nowait()
if chunk is None:
await asyncio.sleep(0.01)
continue
await self._sess.send_audio(chunk)
except Exception:
pass
def stop(self):
if not self._active:
return
print("[hotmic] stop recording …")
self._active = False
self.rec.stop()
# Hide visual indicator
if self.indicator:
self.indicator.hide()
# Check for speech activity
has_speech = self.rec.has_speech_activity()
avg_energy = self.rec.get_average_energy()
if self.cfg.enable_vad and not has_speech:
print(f"[hotmic] No speech detected (avg energy: {avg_energy:.2f})")
print("[hotmic] Skipping empty audio - not sending to server")
# Clean up session
if self._sess:
self._call_soon(self._sess.close())
return
print(f"[hotmic] Speech detected (avg energy: {avg_energy:.2f})")
# Flush any remaining chunks
remaining = self.rec.drain_remaining(self.cfg.stop_flush_wait)
async def _finalize():
assert self._sess
for c in remaining:
await self._sess.send_audio(c)
# server expects small delay before stop
await asyncio.sleep(0.1)
await self._sess.stop_recording()
await self._sess.wait_final(timeout=30.0)
text = self._sess.transcript.strip()
# Close session so next start is clean
await self._sess.close()
return text
fut = self._call_soon(_finalize())
text = fut.result()
if text:
self._to_clipboard(text)
if self.cfg.autopaste:
self._paste_keystroke()
else:
print("[hotmic] (no transcript received)")
def _to_clipboard(self, text: str):
try:
pyperclip.copy(text)
print("[hotmic] copied transcript to clipboard")
except Exception as e:
print(f"[hotmic] failed to copy to clipboard: {e}")
def _paste_keystroke(self):
try:
# Use Cmd on macOS, Ctrl on Windows/Linux
modifier = keyboard.Key.cmd if platform.system() == "Darwin" else keyboard.Key.ctrl
self._kb_controller.press(modifier)
self._kb_controller.press('v')
self._kb_controller.release('v')
self._kb_controller.release(modifier)
print("[hotmic] pasted at cursor")
except Exception as e:
print(f"[hotmic] failed to paste: {e}")
def shutdown(self):
with contextlib.suppress(Exception):
self.stop()
if self.indicator:
with contextlib.suppress(Exception):
self.indicator.shutdown()
if self._sess:
self._call_soon(self._sess.close()).result(timeout=5)
with contextlib.suppress(Exception):
self.loop.call_soon_threadsafe(self.loop.stop)
if self._loop_thread.is_alive():
self._loop_thread.join(timeout=1)
# Single-key toggle support
def toggle(self):
if self._active:
self.stop()
else:
self.start()
# Safe wrappers to avoid crashing hotkey listener on exceptions
def safe_toggle(self):
try:
self.toggle()
except Exception as e:
print(f"[hotmic] toggle error: {e}")
def safe_start(self):
try:
self.start()
except Exception as e:
print(f"[hotmic] start error: {e}")
def safe_stop(self):
try:
self.stop()
except Exception as e:
print(f"[hotmic] stop error: {e}")
def parse_hotkey(hotkey: str):
# pynput GlobalHotKeys uses strings such as '<cmd>+<alt>+r'
return hotkey
def main():
cfg = load_config()
# Test microphone on startup if enabled
if cfg.test_mic_on_startup:
mic_ok = test_microphone(cfg)
if not mic_ok:
print("\n[hotmic] ⚠️ Warning: Microphone test failed!")
print("[hotmic] The program will continue, but recording may not work properly.")
print("[hotmic] To skip this test, set 'test_mic_on_startup': false in config.json")
# Wait a bit so user can see the warning
time.sleep(2)
print() # Empty line for readability
hotmic = HotMic(cfg)
# Setup keyboard hotkeys
kb_bindings = {}
# Toggle mode hotkey (original)
kb_bindings[parse_hotkey(cfg.hotkey)] = lambda: hotmic.safe_toggle()
print("Hotkey Configuration:")
print(f" Toggle mode: {cfg.hotkey}")
# Hold-to-talk mode hotkey (if enabled)
if cfg.enable_hold_mode and cfg.hold_hotkey:
# For hold-to-talk, we need to use a different approach
# We'll use keyboard listener with on_press and on_release
print(f" Hold-to-talk: {cfg.hold_hotkey} (press and hold)")
hold_key_active = threading.Event()
def parse_hold_combo(combo_str: str):
"""Parse combo like '<ctrl>+<alt>' into a set of keys"""
parts = combo_str.split('+')
keys = set()
for part in parts:
part = part.strip()
if part.startswith('<') and part.endswith('>'):
key_name = part[1:-1]
if key_name == 'ctrl':
keys.add(keyboard.Key.ctrl)
keys.add(keyboard.Key.ctrl_l)
keys.add(keyboard.Key.ctrl_r)
elif key_name == 'alt':
keys.add(keyboard.Key.alt)
keys.add(keyboard.Key.alt_l)
keys.add(keyboard.Key.alt_r)
elif key_name == 'shift':
keys.add(keyboard.Key.shift)
keys.add(keyboard.Key.shift_l)
keys.add(keyboard.Key.shift_r)
elif key_name == 'cmd':
keys.add(keyboard.Key.cmd)
keys.add(keyboard.Key.cmd_l)
keys.add(keyboard.Key.cmd_r)
elif key_name == 'win':
keys.add(keyboard.Key.cmd) # Windows key is mapped to cmd in pynput
keys.add(keyboard.Key.cmd_l)
keys.add(keyboard.Key.cmd_r)
else:
try:
keys.add(keyboard.KeyCode.from_char(part))
except:
pass
return keys
hold_keys = parse_hold_combo(cfg.hold_hotkey)
pressed_keys = set()
def on_press(key):
pressed_keys.add(key)
# Check if all hold keys are pressed
if hold_keys and any(k in pressed_keys for k in hold_keys if isinstance(k, type(key))):
# Check if we have all required modifier types
has_all = True
for required_key in hold_keys:
if not any(k == required_key or (hasattr(k, 'vk') and hasattr(required_key, 'vk') and
getattr(k, 'vk', None) == getattr(required_key, 'vk', None)) for k in pressed_keys):
# Allow matching by key name for modifier keys
if required_key in (keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r):
if not any(k in pressed_keys for k in [keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r]):
has_all = False
break
elif required_key in (keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r):
if not any(k in pressed_keys for k in [keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r]):
has_all = False
break
elif required_key in (keyboard.Key.cmd, keyboard.Key.cmd_l, keyboard.Key.cmd_r):
if not any(k in pressed_keys for k in [keyboard.Key.cmd, keyboard.Key.cmd_l, keyboard.Key.cmd_r]):
has_all = False
break
if has_all and not hold_key_active.is_set():
hold_key_active.set()
hotmic.safe_start()
def on_release(key):
if key in pressed_keys:
pressed_keys.discard(key)
# Check if any of the hold keys were released
if hold_key_active.is_set():
# If any required key is released, stop recording
still_pressed = True
for required_key in hold_keys:
if required_key in (keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r):
if not any(k in pressed_keys for k in [keyboard.Key.ctrl, keyboard.Key.ctrl_l, keyboard.Key.ctrl_r]):
still_pressed = False
break
elif required_key in (keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r):
if not any(k in pressed_keys for k in [keyboard.Key.alt, keyboard.Key.alt_l, keyboard.Key.alt_r]):
still_pressed = False
break
elif required_key in (keyboard.Key.cmd, keyboard.Key.cmd_l, keyboard.Key.cmd_r):
if not any(k in pressed_keys for k in [keyboard.Key.cmd, keyboard.Key.cmd_l, keyboard.Key.cmd_r]):
still_pressed = False
break
elif required_key not in pressed_keys:
still_pressed = False
break
if not still_pressed:
hold_key_active.clear()
hotmic.safe_stop()
kb_listener_with_hold = keyboard.Listener(on_press=on_press, on_release=on_release)
kb_listener_with_hold.start()
else:
kb_listener_with_hold = None
print(f"Endpoint: {cfg.endpoint}")
# Mouse button listener (if enabled)
mouse_listener = None
if cfg.enable_mouse_button:
print(f" Mouse button: {cfg.mouse_button} (press and hold)")
def get_mouse_button(button_name: str):
if button_name.lower() == "middle":
return mouse.Button.middle
elif button_name.lower() == "left":
return mouse.Button.left
elif button_name.lower() == "right":
return mouse.Button.right
else:
return mouse.Button.middle
target_button = get_mouse_button(cfg.mouse_button)
def on_click(x, y, button, pressed):
if button == target_button:
if pressed:
hotmic.safe_start()
else:
hotmic.safe_stop()
mouse_listener = mouse.Listener(on_click=on_click)
mouse_listener.start()
print("\nReady! Use the configured hotkeys or mouse button to control recording.")
if cfg.show_visual_indicator:
print("A visual indicator will appear when recording is active.")
# Setup keyboard toggle hotkey listener
toggle_listener = keyboard.GlobalHotKeys(kb_bindings) if kb_bindings else None
# Handle Ctrl+C and SIGTERM gracefully
def _sig_handler(signum, frame):
print("\n[hotmic] exiting…")
if toggle_listener:
with contextlib.suppress(Exception):
toggle_listener.stop()
if kb_listener_with_hold:
with contextlib.suppress(Exception):
kb_listener_with_hold.stop()
if mouse_listener:
with contextlib.suppress(Exception):
mouse_listener.stop()
hotmic.shutdown()
sys.exit(0)
for s in (signal.SIGINT, signal.SIGTERM):
with contextlib.suppress(Exception):
signal.signal(s, _sig_handler)
# Keep the program running
if toggle_listener:
with toggle_listener:
toggle_listener.join()
else:
# If no toggle listener, just wait
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
_sig_handler(signal.SIGINT, None)
if __name__ == "__main__":
main()