-
-
Notifications
You must be signed in to change notification settings - Fork 177
Expand file tree
/
Copy pathstore.py
More file actions
354 lines (315 loc) · 13 KB
/
store.py
File metadata and controls
354 lines (315 loc) · 13 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
"""JSONL message persistence for the chat room with observer callbacks."""
import json
import os
import time
import threading
import uuid
from pathlib import Path
class MessageStore:
def __init__(self, path: str):
self._path = Path(path)
self._path.parent.mkdir(parents=True, exist_ok=True)
self._todos_path = self._path.parent / "todos.json"
self._messages: list[dict] = []
self._next_id: int = 0 # monotonically increasing, survives deletions
self._todos: dict[int, str] = {} # msg_id → "todo" | "done"
self._lock = threading.Lock()
self._callbacks: list = [] # called on each new message
self._todo_callbacks: list = [] # called on todo changes
self._delete_callbacks: list = [] # called on message deletion
self.upload_dir = self._path.parent.parent / "uploads" # Default fallback
self._load()
self._load_todos()
def _load(self):
if not self._path.exists():
return
max_id = -1
with open(self._path, "r", encoding="utf-8") as f:
for i, line in enumerate(f):
line = line.strip()
if not line:
continue
try:
msg = json.loads(line)
# Preserve persisted ID; fall back to line number for legacy data
if "id" not in msg:
msg["id"] = i
if msg["id"] > max_id:
max_id = msg["id"]
self._messages.append(msg)
except json.JSONDecodeError:
continue
self._next_id = max_id + 1
def on_message(self, callback):
"""Register a callback(msg) called whenever a message is added."""
self._callbacks.append(callback)
def add(self, sender: str, text: str, msg_type: str = "chat",
attachments: list | None = None, reply_to: int | None = None,
channel: str = "general",
metadata: dict | None = None,
uid: str | None = None,
timestamp: float | None = None,
time_str: str | None = None,
_bulk: bool = False) -> dict:
with self._lock:
ts = timestamp if timestamp is not None else time.time()
msg = {
"id": self._next_id,
"uid": uid or str(uuid.uuid4()),
"sender": sender,
"text": text,
"type": msg_type,
"timestamp": ts,
"time": time_str or time.strftime("%H:%M:%S"),
"attachments": attachments or [],
"channel": channel,
}
if reply_to is not None:
msg["reply_to"] = reply_to
if metadata:
msg["metadata"] = metadata
self._next_id += 1
self._messages.append(msg)
if not _bulk:
with open(self._path, "a", encoding="utf-8") as f:
f.write(json.dumps(msg, ensure_ascii=False) + "\n")
f.flush()
os.fsync(f.fileno())
# Fire callbacks outside the lock (skip during bulk import)
if not _bulk:
for cb in self._callbacks:
try:
cb(msg)
except Exception:
pass
return msg
def flush_bulk(self):
"""Write all in-memory messages to disk. Call after bulk add operations."""
with self._lock:
self._rewrite()
def update_reply_to(self, msg_id: int, reply_to: int):
"""Set reply_to on an existing message (used by import to rebuild links)."""
with self._lock:
for m in self._messages:
if m["id"] == msg_id:
m["reply_to"] = reply_to
self._rewrite()
return
def _rewrite(self):
"""Rewrite the full JSONL file from memory (used after bulk edits)."""
with open(self._path, "w", encoding="utf-8") as f:
for m in self._messages:
f.write(json.dumps(m, ensure_ascii=False) + "\n")
f.flush()
os.fsync(f.fileno())
def get_by_id(self, msg_id: int) -> dict | None:
with self._lock:
for m in self._messages:
if m["id"] == msg_id:
return m
return None
def get_recent(self, count: int = 50, channel: str | None = None) -> list[dict]:
with self._lock:
msgs = self._messages
if channel:
msgs = [m for m in msgs if m.get("channel", "general") == channel]
return list(msgs[-count:])
def get_since(self, since_id: int = 0, channel: str | None = None) -> list[dict]:
with self._lock:
msgs = [m for m in self._messages if m["id"] > since_id]
if channel:
msgs = [m for m in msgs if m.get("channel", "general") == channel]
return msgs
def delete(self, msg_ids: list[int]) -> list[int]:
"""Delete messages by ID. Returns list of IDs actually deleted."""
deleted = []
deleted_attachments = []
with self._lock:
for mid in msg_ids:
for i, m in enumerate(self._messages):
if m["id"] == mid:
# Collect attachment files for cleanup
for att in m.get("attachments", []):
url = att.get("url", "")
if url.startswith("/uploads/"):
deleted_attachments.append(url.split("/")[-1])
# Remove any associated todo
if mid in self._todos:
del self._todos[mid]
self._messages.pop(i)
deleted.append(mid)
break
if deleted:
self._rewrite_jsonl()
self._save_todos()
# Clean up uploaded images outside the lock
for filename in deleted_attachments:
filepath = self.upload_dir / filename
if filepath.exists():
try:
filepath.unlink()
except Exception:
pass
# Fire callbacks
for cb in self._delete_callbacks:
try:
cb(deleted)
except Exception:
pass
return deleted
def on_delete(self, callback):
"""Register a callback(ids) called when messages are deleted."""
self._delete_callbacks.append(callback)
def update_message(self, msg_id: int, updates: dict) -> dict | None:
"""Update fields on a message in-place. Returns the updated message or None."""
with self._lock:
for m in self._messages:
if m["id"] == msg_id:
m.update(updates)
self._rewrite_jsonl()
return dict(m)
return None
def _rewrite_jsonl(self):
"""Rewrite the JSONL file from current in-memory messages."""
with open(self._path, "w", encoding="utf-8") as f:
for m in self._messages:
f.write(json.dumps(m, ensure_ascii=False) + "\n")
f.flush()
os.fsync(f.fileno())
def clear(self, channel: str | None = None):
"""Wipe messages and rewrite the log file.
If channel is given, only clear messages in that channel."""
with self._lock:
if channel:
removed_ids = {m["id"] for m in self._messages if m.get("channel", "general") == channel}
self._messages = [m for m in self._messages if m.get("channel", "general") != channel]
self._rewrite_jsonl()
# Clean up todos for cleared messages
for tid in list(self._todos.keys()):
if tid in removed_ids:
del self._todos[tid]
if removed_ids:
self._save_todos()
else:
self._messages.clear()
self._path.write_text("")
self._todos.clear()
self._save_todos()
def rename_channel(self, old_name: str, new_name: str):
"""Migrate all messages from old_name to new_name."""
with self._lock:
modified = False
for m in self._messages:
if m.get("channel") == old_name:
m["channel"] = new_name
modified = True
if modified:
self._rewrite_jsonl()
def rename_sender(self, old_name: str, new_name: str) -> int:
"""Rename sender on all messages from old_name to new_name. Returns count updated."""
with self._lock:
count = 0
for m in self._messages:
if m.get("sender") == old_name:
m["sender"] = new_name
count += 1
if count:
self._rewrite_jsonl()
return count
def delete_channel(self, name: str):
"""Remove all messages belonging to a deleted channel."""
with self._lock:
original_len = len(self._messages)
# Collect IDs of messages being removed so we can clean up their todos
removed_ids = {m["id"] for m in self._messages if m.get("channel") == name}
self._messages = [m for m in self._messages if m.get("channel") != name]
if len(self._messages) != original_len:
self._rewrite_jsonl()
# Clean up todos that referenced deleted messages
for tid in list(self._todos.keys()):
if tid in removed_ids:
del self._todos[tid]
self._save_todos()
# --- Todos ---
def _load_todos(self):
# Migrate old pins.json (list of ints) → todos.json (dict of id→status)
old_pins = self._todos_path.parent / "pins.json"
if old_pins.exists() and not self._todos_path.exists():
try:
ids = json.loads(old_pins.read_text("utf-8"))
if isinstance(ids, list):
self._todos = {int(i): "todo" for i in ids}
self._save_todos()
old_pins.unlink()
except Exception:
pass
if self._todos_path.exists():
try:
raw = json.loads(self._todos_path.read_text("utf-8"))
self._todos = {int(k): v for k, v in raw.items()}
except Exception:
self._todos = {}
def _save_todos(self):
self._todos_path.write_text(
json.dumps({str(k): v for k, v in self._todos.items()}, indent=2),
"utf-8"
)
def on_todo(self, callback):
"""Register a callback(msg_id, status) called on todo changes.
status is 'todo', 'done', or None (removed)."""
self._todo_callbacks.append(callback)
def _fire_todo(self, msg_id: int, status: str | None):
for cb in self._todo_callbacks:
try:
cb(msg_id, status)
except Exception:
pass
def add_todo(self, msg_id: int) -> bool:
with self._lock:
if not any(m["id"] == msg_id for m in self._messages):
return False
self._todos[msg_id] = "todo"
self._save_todos()
self._fire_todo(msg_id, "todo")
return True
def complete_todo(self, msg_id: int) -> bool:
with self._lock:
if msg_id not in self._todos:
return False
self._todos[msg_id] = "done"
self._save_todos()
self._fire_todo(msg_id, "done")
return True
def reopen_todo(self, msg_id: int) -> bool:
with self._lock:
if msg_id not in self._todos:
return False
self._todos[msg_id] = "todo"
self._save_todos()
self._fire_todo(msg_id, "todo")
return True
def remove_todo(self, msg_id: int) -> bool:
with self._lock:
if msg_id not in self._todos:
return False
del self._todos[msg_id]
self._save_todos()
self._fire_todo(msg_id, None)
return True
def get_todo_status(self, msg_id: int) -> str | None:
return self._todos.get(msg_id)
def get_todos(self) -> dict[int, str]:
"""Returns {msg_id: status} for all todos."""
return dict(self._todos)
def get_todo_messages(self, status: str | None = None) -> list[dict]:
"""Get todo messages, optionally filtered by status."""
with self._lock:
if status:
ids = {k for k, v in self._todos.items() if v == status}
else:
ids = set(self._todos.keys())
return [m for m in self._messages if m["id"] in ids]
@property
def last_id(self) -> int:
with self._lock:
return self._messages[-1]["id"] if self._messages else -1