Skip to content
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.

Commit 6831f3c

Browse files
committedMar 1, 2025·
WIP: Snapshot
1 parent e77bc7d commit 6831f3c

File tree

2 files changed

+814
-0
lines changed

2 files changed

+814
-0
lines changed
 

‎src/libtmux/snapshot.py

Lines changed: 720 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,720 @@
1+
"""Hierarchical snapshots of tmux objects.
2+
3+
libtmux.snapshot
4+
~~~~~~~~~~~~~~~~
5+
6+
This module provides read-only snapshot classes for tmux objects that preserve
7+
the object structure and relationships while preventing modifications or
8+
tmux command execution.
9+
"""
10+
11+
from __future__ import annotations
12+
13+
import contextlib
14+
import copy
15+
import typing as t
16+
from dataclasses import dataclass, field
17+
from datetime import datetime
18+
from types import TracebackType
19+
20+
from libtmux._internal.query_list import QueryList
21+
from libtmux.pane import Pane
22+
from libtmux.server import Server
23+
from libtmux.session import Session
24+
from libtmux.window import Window
25+
26+
if t.TYPE_CHECKING:
27+
pass
28+
29+
30+
@dataclass
31+
class PaneSnapshot(Pane):
32+
"""A read-only snapshot of a tmux pane.
33+
34+
This maintains compatibility with the original Pane class but prevents modification.
35+
"""
36+
37+
# Fields only present in snapshot
38+
pane_content: list[str] | None = None
39+
created_at: datetime = field(default_factory=datetime.now)
40+
window_snapshot: WindowSnapshot | None = None
41+
_read_only: bool = field(default=False, repr=False)
42+
43+
def __post_init__(self) -> None:
44+
"""Make instance effectively read-only after initialization."""
45+
object.__setattr__(self, "_read_only", True)
46+
47+
def __setattr__(self, name: str, value: t.Any) -> None:
48+
"""Prevent attribute modification after initialization."""
49+
if hasattr(self, "_read_only") and self._read_only:
50+
error_msg = f"Cannot modify '{name}' on read-only PaneSnapshot"
51+
raise AttributeError(error_msg)
52+
super().__setattr__(name, value)
53+
54+
def __enter__(self) -> PaneSnapshot:
55+
"""Context manager entry point."""
56+
return self
57+
58+
def __exit__(
59+
self,
60+
exc_type: type[BaseException] | None,
61+
exc_val: BaseException | None,
62+
exc_tb: TracebackType | None,
63+
) -> None:
64+
"""Context manager exit point."""
65+
pass
66+
67+
def cmd(self, *args: t.Any, **kwargs: t.Any) -> t.NoReturn:
68+
"""Prevent executing tmux commands on a snapshot."""
69+
error_msg = "PaneSnapshot is read-only and cannot execute tmux commands"
70+
raise NotImplementedError(error_msg)
71+
72+
def capture_pane(self, *args: t.Any, **kwargs: t.Any) -> list[str]:
73+
"""Return the previously captured content instead of capturing new content."""
74+
if self.pane_content is None:
75+
return []
76+
return self.pane_content
77+
78+
@property
79+
def window(self) -> WindowSnapshot | None:
80+
"""Return the WindowSnapshot parent, or None."""
81+
return self.window_snapshot
82+
83+
@property
84+
def session(self) -> SessionSnapshot | None:
85+
"""Return SessionSnapshot via window_snapshot's session_snapshot, or None."""
86+
if self.window_snapshot is not None:
87+
return self.window_snapshot.session_snapshot
88+
return None
89+
90+
@classmethod
91+
def from_pane(
92+
cls,
93+
pane: Pane,
94+
capture_content: bool = True,
95+
window_snapshot: WindowSnapshot | None = None,
96+
) -> PaneSnapshot:
97+
"""Create a PaneSnapshot from a live Pane.
98+
99+
Parameters
100+
----------
101+
pane : Pane
102+
Live pane to snapshot
103+
capture_content : bool, optional
104+
Whether to capture the current text from the pane
105+
window_snapshot : WindowSnapshot, optional
106+
Parent window snapshot to link back to
107+
108+
Returns
109+
-------
110+
PaneSnapshot
111+
A read-only snapshot of the pane
112+
"""
113+
# Try capturing the pane's content
114+
pane_content = None
115+
if capture_content:
116+
with contextlib.suppress(Exception):
117+
pane_content = pane.capture_pane()
118+
119+
# Gather fields from the parent Pane class
120+
# We need to use object.__setattr__ to bypass our own __setattr__ override
121+
snapshot = cls(server=pane.server)
122+
123+
# Copy all relevant attributes from the original pane
124+
for name, value in vars(pane).items():
125+
if not name.startswith("_"): # Skip private attributes
126+
object.__setattr__(snapshot, name, copy.deepcopy(value))
127+
128+
# Set snapshot-specific fields
129+
object.__setattr__(snapshot, "pane_content", pane_content)
130+
object.__setattr__(snapshot, "window_snapshot", window_snapshot)
131+
object.__setattr__(snapshot, "created_at", datetime.now())
132+
133+
return snapshot
134+
135+
136+
@dataclass
137+
class WindowSnapshot(Window):
138+
"""A read-only snapshot of a tmux window.
139+
140+
This maintains compatibility with the original Window class but prevents modification.
141+
"""
142+
143+
# Fields only present in snapshot
144+
created_at: datetime = field(default_factory=datetime.now)
145+
session_snapshot: SessionSnapshot | None = None
146+
panes_snapshot: list[PaneSnapshot] = field(default_factory=list)
147+
_read_only: bool = field(default=False, repr=False)
148+
149+
def __post_init__(self) -> None:
150+
"""Make instance effectively read-only after initialization."""
151+
object.__setattr__(self, "_read_only", True)
152+
153+
def __setattr__(self, name: str, value: t.Any) -> None:
154+
"""Prevent attribute modification after initialization."""
155+
if hasattr(self, "_read_only") and self._read_only:
156+
error_msg = f"Cannot modify '{name}' on read-only WindowSnapshot"
157+
raise AttributeError(error_msg)
158+
super().__setattr__(name, value)
159+
160+
def __enter__(self) -> WindowSnapshot:
161+
"""Context manager entry point."""
162+
return self
163+
164+
def __exit__(
165+
self,
166+
exc_type: type[BaseException] | None,
167+
exc_val: BaseException | None,
168+
exc_tb: TracebackType | None,
169+
) -> None:
170+
"""Context manager exit point."""
171+
pass
172+
173+
def cmd(self, *args: t.Any, **kwargs: t.Any) -> t.NoReturn:
174+
"""Prevent executing tmux commands on a snapshot."""
175+
error_msg = "WindowSnapshot is read-only and cannot execute tmux commands"
176+
raise NotImplementedError(error_msg)
177+
178+
@property
179+
def panes(self) -> QueryList[PaneSnapshot]:
180+
"""Return the list of pane snapshots."""
181+
return QueryList(self.panes_snapshot)
182+
183+
@property
184+
def session(self) -> SessionSnapshot | None:
185+
"""Return the SessionSnapshot parent, or None."""
186+
return self.session_snapshot
187+
188+
@property
189+
def active_pane(self) -> PaneSnapshot | None:
190+
"""Return the active pane from the pane snapshots."""
191+
active_panes = [
192+
p for p in self.panes_snapshot if getattr(p, "pane_active", "0") == "1"
193+
]
194+
return active_panes[0] if active_panes else None
195+
196+
@classmethod
197+
def from_window(
198+
cls,
199+
window: Window,
200+
capture_content: bool = True,
201+
session_snapshot: SessionSnapshot | None = None,
202+
) -> WindowSnapshot:
203+
"""Create a WindowSnapshot from a live Window.
204+
205+
Parameters
206+
----------
207+
window : Window
208+
Live window to snapshot
209+
capture_content : bool, optional
210+
Whether to capture the current content of all panes
211+
session_snapshot : SessionSnapshot, optional
212+
Parent session snapshot to link back to
213+
214+
Returns
215+
-------
216+
WindowSnapshot
217+
A read-only snapshot of the window
218+
"""
219+
# Create a new window snapshot instance
220+
snapshot = cls(server=window.server)
221+
222+
# Copy all relevant attributes from the original window
223+
for name, value in vars(window).items():
224+
if not name.startswith("_") and name not in ["panes", "session"]:
225+
object.__setattr__(snapshot, name, copy.deepcopy(value))
226+
227+
# Set snapshot-specific fields
228+
object.__setattr__(snapshot, "created_at", datetime.now())
229+
object.__setattr__(snapshot, "session_snapshot", session_snapshot)
230+
231+
# Now snapshot all panes
232+
panes_snapshot = []
233+
for p in window.panes:
234+
pane_snapshot = PaneSnapshot.from_pane(
235+
p, capture_content=capture_content, window_snapshot=snapshot
236+
)
237+
panes_snapshot.append(pane_snapshot)
238+
239+
object.__setattr__(snapshot, "panes_snapshot", panes_snapshot)
240+
241+
return snapshot
242+
243+
244+
@dataclass
245+
class SessionSnapshot(Session):
246+
"""A read-only snapshot of a tmux session.
247+
248+
This maintains compatibility with the original Session class but prevents modification.
249+
"""
250+
251+
# Make server field optional by giving it a default value
252+
server: t.Any = None # type: ignore
253+
254+
# Fields only present in snapshot
255+
created_at: datetime = field(default_factory=datetime.now)
256+
server_snapshot: ServerSnapshot | None = None
257+
windows_snapshot: list[WindowSnapshot] = field(default_factory=list)
258+
_read_only: bool = field(default=False, repr=False)
259+
260+
def __post_init__(self) -> None:
261+
"""Make instance effectively read-only after initialization."""
262+
object.__setattr__(self, "_read_only", True)
263+
264+
def __setattr__(self, name: str, value: t.Any) -> None:
265+
"""Prevent attribute modification after initialization."""
266+
if hasattr(self, "_read_only") and self._read_only:
267+
error_msg = f"Cannot modify '{name}' on read-only SessionSnapshot"
268+
raise AttributeError(error_msg)
269+
super().__setattr__(name, value)
270+
271+
def __enter__(self) -> SessionSnapshot:
272+
"""Context manager entry point."""
273+
return self
274+
275+
def __exit__(
276+
self,
277+
exc_type: type[BaseException] | None,
278+
exc_val: BaseException | None,
279+
exc_tb: TracebackType | None,
280+
) -> None:
281+
"""Context manager exit point."""
282+
pass
283+
284+
def cmd(self, *args: t.Any, **kwargs: t.Any) -> t.NoReturn:
285+
"""Prevent executing tmux commands on a snapshot."""
286+
error_msg = "SessionSnapshot is read-only and cannot execute tmux commands"
287+
raise NotImplementedError(error_msg)
288+
289+
@property
290+
def windows(self) -> QueryList[WindowSnapshot]:
291+
"""Return the list of window snapshots."""
292+
return QueryList(self.windows_snapshot)
293+
294+
@property
295+
def server(self) -> ServerSnapshot | None:
296+
"""Return the ServerSnapshot parent, or None."""
297+
return self.server_snapshot
298+
299+
@property
300+
def active_window(self) -> WindowSnapshot | None:
301+
"""Return the active window snapshot, if any."""
302+
for window in self.windows_snapshot:
303+
if getattr(window, "window_active", "0") == "1":
304+
return window
305+
return None
306+
307+
@property
308+
def active_pane(self) -> PaneSnapshot | None:
309+
"""Return the active pane from the active window, if it exists."""
310+
active_win = self.active_window
311+
return active_win.active_pane if active_win else None
312+
313+
@classmethod
314+
def from_session(
315+
cls,
316+
session: Session,
317+
*,
318+
capture_content: bool = False,
319+
server_snapshot: ServerSnapshot | None = None,
320+
) -> SessionSnapshot:
321+
"""Create a SessionSnapshot from a live Session.
322+
323+
Parameters
324+
----------
325+
session : Session
326+
Live session to snapshot
327+
capture_content : bool, optional
328+
Whether to capture the current content of all panes
329+
server_snapshot : ServerSnapshot, optional
330+
Parent server snapshot to link back to
331+
332+
Returns
333+
-------
334+
SessionSnapshot
335+
A read-only snapshot of the session
336+
"""
337+
# Create a new empty instance using __new__ to bypass __init__
338+
snapshot = cls.__new__(cls)
339+
340+
# Initialize _read_only to False to allow setting attributes
341+
object.__setattr__(snapshot, "_read_only", False)
342+
343+
# Copy all relevant attributes from the original session
344+
for name, value in vars(session).items():
345+
if not name.startswith("_") and name not in ["server", "windows"]:
346+
object.__setattr__(snapshot, name, copy.deepcopy(value))
347+
348+
# Set snapshot-specific fields
349+
object.__setattr__(snapshot, "created_at", datetime.now())
350+
object.__setattr__(snapshot, "server_snapshot", server_snapshot)
351+
352+
# Initialize empty lists
353+
object.__setattr__(snapshot, "windows_snapshot", [])
354+
355+
# Now snapshot all windows
356+
windows_snapshot = []
357+
for w in session.windows:
358+
window_snapshot = WindowSnapshot.from_window(
359+
w, capture_content=capture_content, session_snapshot=snapshot
360+
)
361+
windows_snapshot.append(window_snapshot)
362+
363+
object.__setattr__(snapshot, "windows_snapshot", windows_snapshot)
364+
365+
# Finally, set _read_only to True to prevent future modifications
366+
object.__setattr__(snapshot, "_read_only", True)
367+
368+
return snapshot
369+
370+
371+
@dataclass
372+
class ServerSnapshot(Server):
373+
"""A read-only snapshot of a tmux server.
374+
375+
This maintains compatibility with the original Server class but prevents modification.
376+
"""
377+
378+
# Fields only present in snapshot
379+
created_at: datetime = field(default_factory=datetime.now)
380+
sessions_snapshot: list[SessionSnapshot] = field(default_factory=list)
381+
windows_snapshot: list[WindowSnapshot] = field(default_factory=list)
382+
panes_snapshot: list[PaneSnapshot] = field(default_factory=list)
383+
_read_only: bool = field(default=False, repr=False)
384+
385+
def __post_init__(self) -> None:
386+
"""Make instance effectively read-only after initialization."""
387+
object.__setattr__(self, "_read_only", True)
388+
389+
def __setattr__(self, name: str, value: t.Any) -> None:
390+
"""Prevent attribute modification after initialization."""
391+
if hasattr(self, "_read_only") and self._read_only:
392+
error_msg = f"Cannot modify '{name}' on read-only ServerSnapshot"
393+
raise AttributeError(error_msg)
394+
super().__setattr__(name, value)
395+
396+
def __enter__(self) -> ServerSnapshot:
397+
"""Context manager entry point."""
398+
return self
399+
400+
def __exit__(
401+
self,
402+
exc_type: type[BaseException] | None,
403+
exc_val: BaseException | None,
404+
exc_tb: TracebackType | None,
405+
) -> None:
406+
"""Context manager exit point."""
407+
pass
408+
409+
def cmd(self, *args: t.Any, **kwargs: t.Any) -> t.NoReturn:
410+
"""Prevent executing tmux commands on a snapshot."""
411+
error_msg = "ServerSnapshot is read-only and cannot execute tmux commands"
412+
raise NotImplementedError(error_msg)
413+
414+
def is_alive(self) -> bool:
415+
"""Return False as snapshot servers are not connected to a live tmux instance."""
416+
return False
417+
418+
def raise_if_dead(self) -> t.NoReturn:
419+
"""Raise exception as snapshots are not connected to a live server."""
420+
error_msg = "ServerSnapshot is not connected to a live tmux server"
421+
raise NotImplementedError(error_msg)
422+
423+
@property
424+
def sessions(self) -> QueryList[SessionSnapshot]:
425+
"""Return the list of session snapshots."""
426+
return QueryList(self.sessions_snapshot)
427+
428+
@property
429+
def windows(self) -> QueryList[WindowSnapshot]:
430+
"""Return the list of all window snapshots across all sessions."""
431+
return QueryList(self.windows_snapshot)
432+
433+
@property
434+
def panes(self) -> QueryList[PaneSnapshot]:
435+
"""Return the list of all pane snapshots across all windows and sessions."""
436+
return QueryList(self.panes_snapshot)
437+
438+
@classmethod
439+
def from_server(
440+
cls, server: Server, include_content: bool = True
441+
) -> ServerSnapshot:
442+
"""Create a ServerSnapshot from a live Server.
443+
444+
Examples
445+
--------
446+
>>> server_snap = ServerSnapshot.from_server(server)
447+
>>> isinstance(server_snap, ServerSnapshot)
448+
True
449+
>>> # Check if it preserves the class hierarchy relationship
450+
>>> isinstance(server_snap, type(server))
451+
True
452+
>>> # Snapshot is read-only
453+
>>> try:
454+
... server_snap.cmd("list-sessions")
455+
... except NotImplementedError:
456+
... print("Cannot execute commands on snapshot")
457+
Cannot execute commands on snapshot
458+
>>> # Check that server is correctly snapshotted
459+
>>> server_snap.socket_name == server.socket_name
460+
True
461+
462+
Parameters
463+
----------
464+
server : Server
465+
Live server to snapshot
466+
include_content : bool, optional
467+
Whether to capture the current content of all panes
468+
469+
Returns
470+
-------
471+
ServerSnapshot
472+
A read-only snapshot of the server
473+
"""
474+
# Create a new server snapshot instance
475+
snapshot = cls()
476+
477+
# Copy all relevant attributes from the original server
478+
for name, value in vars(server).items():
479+
if not name.startswith("_") and name not in [
480+
"sessions",
481+
"windows",
482+
"panes",
483+
]:
484+
object.__setattr__(snapshot, name, copy.deepcopy(value))
485+
486+
# Set snapshot-specific fields
487+
object.__setattr__(snapshot, "created_at", datetime.now())
488+
489+
# Now snapshot all sessions
490+
sessions_snapshot = []
491+
windows_snapshot = []
492+
panes_snapshot = []
493+
494+
for s in server.sessions:
495+
session_snapshot = SessionSnapshot.from_session(
496+
s, capture_content=include_content, server_snapshot=snapshot
497+
)
498+
sessions_snapshot.append(session_snapshot)
499+
500+
# Also collect all windows and panes for quick access
501+
windows_snapshot.extend(session_snapshot.windows_snapshot)
502+
for w in session_snapshot.windows_snapshot:
503+
panes_snapshot.extend(w.panes_snapshot)
504+
505+
object.__setattr__(snapshot, "sessions_snapshot", sessions_snapshot)
506+
object.__setattr__(snapshot, "windows_snapshot", windows_snapshot)
507+
object.__setattr__(snapshot, "panes_snapshot", panes_snapshot)
508+
509+
return snapshot
510+
511+
512+
def filter_snapshot(
513+
snapshot: ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot,
514+
filter_func: t.Callable[
515+
[ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot], bool
516+
],
517+
) -> ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot | None:
518+
"""Filter a snapshot hierarchy based on a filter function.
519+
520+
This will prune the snapshot tree, removing any objects that don't match the filter.
521+
The filter is applied recursively down the hierarchy, and parent-child relationships
522+
are maintained in the filtered snapshot.
523+
524+
Parameters
525+
----------
526+
snapshot : ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot
527+
The snapshot to filter
528+
filter_func : Callable
529+
A function that takes a snapshot object and returns True to keep it
530+
or False to filter it out
531+
532+
Returns
533+
-------
534+
ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot | None
535+
A new filtered snapshot, or None if everything was filtered out
536+
"""
537+
# Handle filtering ServerSnapshot
538+
if isinstance(snapshot, ServerSnapshot):
539+
filtered_sessions = []
540+
541+
# Filter each session
542+
for sess in snapshot.sessions_snapshot:
543+
filtered_sess = filter_snapshot(sess, filter_func)
544+
if filtered_sess is not None:
545+
filtered_sessions.append(filtered_sess)
546+
547+
# If the server itself fails filter or everything is filtered out, return None
548+
if not filter_func(snapshot) and not filtered_sessions:
549+
return None
550+
551+
# Create a new server snapshot with filtered sessions
552+
server_copy = copy.deepcopy(snapshot)
553+
server_copy.sessions_snapshot = filtered_sessions
554+
555+
# Also update windows and panes lists to reflect filtered data
556+
server_copy.windows_snapshot = []
557+
server_copy.panes_snapshot = []
558+
for sess in filtered_sessions:
559+
server_copy.windows_snapshot.extend(sess.windows_snapshot)
560+
for w in sess.windows_snapshot:
561+
server_copy.panes_snapshot.extend(w.panes_snapshot)
562+
563+
return server_copy
564+
565+
# Handle filtering SessionSnapshot
566+
elif isinstance(snapshot, SessionSnapshot):
567+
filtered_windows = []
568+
569+
# Filter each window
570+
for w in snapshot.windows_snapshot:
571+
filtered_w = filter_snapshot(w, filter_func)
572+
if filtered_w is not None:
573+
filtered_windows.append(filtered_w)
574+
575+
# If the session itself fails filter or everything is filtered out, return None
576+
if not filter_func(snapshot) and not filtered_windows:
577+
return None
578+
579+
# Create a new session snapshot with filtered windows
580+
session_copy = copy.deepcopy(snapshot)
581+
session_copy.windows_snapshot = filtered_windows
582+
return session_copy
583+
584+
# Handle filtering WindowSnapshot
585+
elif isinstance(snapshot, WindowSnapshot):
586+
filtered_panes = []
587+
588+
# Filter each pane - panes are leaf nodes
589+
filtered_panes = [p for p in snapshot.panes_snapshot if filter_func(p)]
590+
591+
# If the window itself fails filter or everything is filtered out, return None
592+
if not filter_func(snapshot) and not filtered_panes:
593+
return None
594+
595+
# Create a new window snapshot with filtered panes
596+
window_copy = copy.deepcopy(snapshot)
597+
window_copy.panes_snapshot = filtered_panes
598+
return window_copy
599+
600+
# Handle filtering PaneSnapshot (leaf node)
601+
elif isinstance(snapshot, PaneSnapshot):
602+
if filter_func(snapshot):
603+
return snapshot
604+
return None
605+
606+
# Unhandled type
607+
return snapshot
608+
609+
610+
def snapshot_to_dict(
611+
snapshot: ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot | t.Any,
612+
) -> dict[str, t.Any]:
613+
"""Convert a snapshot to a dictionary, avoiding circular references.
614+
615+
This is useful for serializing snapshots to JSON or other formats.
616+
617+
Parameters
618+
----------
619+
snapshot : ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot | Any
620+
The snapshot to convert to a dictionary
621+
622+
Returns
623+
-------
624+
dict
625+
A dictionary representation of the snapshot
626+
"""
627+
# Base case: For non-snapshot objects, just return them directly
628+
if not isinstance(
629+
snapshot, (ServerSnapshot, SessionSnapshot, WindowSnapshot, PaneSnapshot)
630+
):
631+
return t.cast(dict[str, t.Any], snapshot)
632+
633+
# Convert dataclass to dict
634+
result: dict[str, t.Any] = {}
635+
636+
# Get all fields from the instance
637+
for name, value in vars(snapshot).items():
638+
# Skip internal and parent reference fields - we want a tree, not a graph with cycles
639+
if name.startswith("_") or name in [
640+
"server",
641+
"server_snapshot",
642+
"session_snapshot",
643+
"window_snapshot",
644+
]:
645+
continue
646+
647+
# Handle lists of snapshots
648+
if (
649+
isinstance(value, list)
650+
and value
651+
and isinstance(
652+
value[0],
653+
(ServerSnapshot, SessionSnapshot, WindowSnapshot, PaneSnapshot),
654+
)
655+
):
656+
result[name] = [snapshot_to_dict(item) for item in value]
657+
# Handle nested snapshots
658+
elif isinstance(
659+
value, (ServerSnapshot, SessionSnapshot, WindowSnapshot, PaneSnapshot)
660+
):
661+
result[name] = snapshot_to_dict(value)
662+
# Handle QueryList (convert to regular list first)
663+
elif hasattr(value, "list") and callable(getattr(value, "list", None)):
664+
try:
665+
# If it's a QueryList, convert to list of dicts
666+
items = value.list()
667+
result[name] = []
668+
for item in items:
669+
if isinstance(
670+
item,
671+
(ServerSnapshot, SessionSnapshot, WindowSnapshot, PaneSnapshot),
672+
):
673+
result[name].append(snapshot_to_dict(item))
674+
else:
675+
result[name] = str(value)
676+
except Exception:
677+
# If not a QueryList, just use the string representation
678+
result[name] = str(value)
679+
# Handle non-serializable objects
680+
elif isinstance(value, datetime):
681+
result[name] = str(value)
682+
# Handle remaining basic types
683+
else:
684+
result[name] = value
685+
686+
return result
687+
688+
689+
def snapshot_active_only(
690+
full_snapshot: ServerSnapshot,
691+
) -> ServerSnapshot:
692+
"""Return a filtered snapshot containing only active sessions, windows, and panes.
693+
694+
Parameters
695+
----------
696+
full_snapshot : ServerSnapshot
697+
The complete server snapshot to filter
698+
699+
Returns
700+
-------
701+
ServerSnapshot
702+
A filtered snapshot with only active components
703+
"""
704+
705+
def is_active(
706+
obj: ServerSnapshot | SessionSnapshot | WindowSnapshot | PaneSnapshot,
707+
) -> bool:
708+
"""Return True if the object is active."""
709+
if isinstance(obj, PaneSnapshot):
710+
return getattr(obj, "pane_active", "0") == "1"
711+
elif isinstance(obj, WindowSnapshot):
712+
return getattr(obj, "window_active", "0") == "1"
713+
# Servers and sessions are always considered active
714+
return isinstance(obj, (ServerSnapshot, SessionSnapshot))
715+
716+
filtered = filter_snapshot(full_snapshot, is_active)
717+
if filtered is None:
718+
error_msg = "No active objects found!"
719+
raise ValueError(error_msg)
720+
return t.cast(ServerSnapshot, filtered)

‎tests/test_snapshot.py

Lines changed: 94 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
#!/usr/bin/env python3
2+
"""Test the snapshot functionality of libtmux."""
3+
4+
from __future__ import annotations
5+
6+
import json
7+
import sys
8+
from pathlib import Path
9+
10+
# Add the src directory to the Python path
11+
sys.path.insert(0, str(Path(__file__).parent / "src"))
12+
13+
from libtmux.server import Server
14+
from libtmux.snapshot import (
15+
ServerSnapshot,
16+
snapshot_active_only,
17+
snapshot_to_dict,
18+
)
19+
20+
21+
def main():
22+
"""Demonstrate the snapshot functionality."""
23+
# Create a test server
24+
server = Server()
25+
26+
# Take a complete snapshot of the server
27+
print("Creating a complete snapshot of the server...")
28+
server_snapshot = ServerSnapshot.from_server(server)
29+
30+
# Print some information about the snapshot
31+
print(f"Server snapshot created at: {server_snapshot.created_at}")
32+
print(f"Number of sessions: {len(server_snapshot.sessions)}")
33+
34+
# Test that the snapshot is read-only
35+
try:
36+
server_snapshot.cmd("list-sessions")
37+
except NotImplementedError as e:
38+
print(f"Expected error when trying to execute a command: {e}")
39+
40+
# If there are sessions, print information about the first one
41+
if server_snapshot.sessions:
42+
session = server_snapshot.sessions[0]
43+
print(f"\nFirst session ID: {session.id}")
44+
print(f"First session name: {session.name}")
45+
print(f"Number of windows: {len(session.windows)}")
46+
47+
# If there are windows, print information about the first one
48+
if session.windows:
49+
window = session.windows[0]
50+
print(f"\nFirst window ID: {window.id}")
51+
print(f"First window name: {window.name}")
52+
print(f"Number of panes: {len(window.panes)}")
53+
54+
# If there are panes, print information about the first one
55+
if window.panes:
56+
pane = window.panes[0]
57+
print(f"\nFirst pane ID: {pane.id}")
58+
print(
59+
f"First pane content (up to 5 lines): {pane.pane_content[:5] if pane.pane_content else 'No content captured'}"
60+
)
61+
62+
# Demonstrate filtering
63+
print("\nFiltering snapshot to get only active components...")
64+
try:
65+
filtered_snapshot = snapshot_active_only(server)
66+
print(f"Active sessions: {len(filtered_snapshot.sessions)}")
67+
68+
active_windows = 0
69+
active_panes = 0
70+
for session in filtered_snapshot.sessions:
71+
active_windows += len(session.windows)
72+
for window in session.windows:
73+
active_panes += len(window.panes)
74+
75+
print(f"Active windows: {active_windows}")
76+
print(f"Active panes: {active_panes}")
77+
except ValueError as e:
78+
print(f"No active components found: {e}")
79+
80+
# Demonstrate serialization
81+
print("\nSerializing snapshot to dictionary...")
82+
snapshot_dict = snapshot_to_dict(server_snapshot)
83+
print(f"Dictionary has {len(snapshot_dict)} top-level keys")
84+
print(f"Top-level keys: {', '.join(sorted(key for key in snapshot_dict.keys()))}")
85+
86+
# Output to JSON (just to show it's possible)
87+
json_file = "server_snapshot.json"
88+
with open(json_file, "w") as f:
89+
json.dump(snapshot_dict, f, indent=2, default=str)
90+
print(f"Snapshot saved to {json_file}")
91+
92+
93+
if __name__ == "__main__":
94+
main()

0 commit comments

Comments
 (0)
Please sign in to comment.