Skip to content

Commit 2e0614d

Browse files
committed
feat: first draft of a daemon
feat: first step of daemon mode (not ready) [ci skip]
1 parent d15446c commit 2e0614d

File tree

7 files changed

+692
-80
lines changed

7 files changed

+692
-80
lines changed

sqlmesh/cli/daemon_connector.py

Lines changed: 205 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,205 @@
1+
import json
2+
import typing as t
3+
import uuid
4+
from pathlib import Path
5+
6+
from sqlmesh.core.console import JanitorState, JanitorStateRenderer
7+
from sqlmesh.lsp.cli_calls import (
8+
DaemonCommunicationModeTCP,
9+
DaemonCommunicationModeUnixSocket,
10+
LockFile,
11+
generate_lock_file,
12+
return_lock_file_path,
13+
)
14+
from sqlmesh.utils.pydantic import PydanticModel
15+
16+
17+
class LSPCLICallRequest(PydanticModel):
18+
"""Request to call a CLI command through the LSP."""
19+
20+
arguments: t.List[str]
21+
22+
23+
class SocketMessageFinished(PydanticModel):
24+
state: t.Literal["finished"] = "finished"
25+
26+
27+
class SocketMessageOngoing(PydanticModel):
28+
state: t.Literal["ongoing"] = "ongoing"
29+
message: t.Dict[str, t.Any]
30+
31+
32+
class SocketMessageError(PydanticModel):
33+
state: t.Literal["error"] = "error"
34+
message: str
35+
36+
37+
SocketMessage = t.Union[SocketMessageFinished, SocketMessageOngoing, SocketMessageError]
38+
39+
40+
def _validate_lock_file(lock_file_path: Path) -> LockFile:
41+
"""Validate that the lock file is compatible with current version."""
42+
current_lock = generate_lock_file()
43+
try:
44+
read_file = LockFile.model_validate_json(lock_file_path.read_text())
45+
except Exception as e:
46+
raise ValueError(f"Failed to parse lock file: {e}")
47+
48+
if not read_file.validate_lock_file(current_lock):
49+
raise ValueError(
50+
f"Lock file version mismatch. Expected: {current_lock.version}, "
51+
f"Got: {read_file.version}"
52+
)
53+
return read_file
54+
55+
56+
import socket
57+
58+
59+
class DaemonConnector:
60+
"""Connects to the LSP daemon via socket to execute commands."""
61+
62+
def __init__(self, project_path: Path, lock_file: LockFile):
63+
self.project_path = project_path
64+
self.lock_file = lock_file
65+
self.renderer = JanitorStateRenderer()
66+
67+
def _open_connection(self) -> tuple[t.BinaryIO, t.BinaryIO]:
68+
lock_file = self.lock_file
69+
communication = lock_file.communication
70+
print(f"Using communication mode: {communication}")
71+
if communication is None:
72+
raise ValueError("not correct")
73+
74+
if isinstance(communication.type, DaemonCommunicationModeUnixSocket):
75+
print("Opening Unix socket connection...")
76+
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
77+
sock.connect(communication.type.socket)
78+
print(f"Connected to Unix socket at {communication.type.socket}")
79+
rfile = sock.makefile("rb", buffering=0)
80+
wfile = sock.makefile("wb", buffering=0)
81+
print("Connected to daemon via Unix socket.")
82+
return rfile, wfile
83+
else:
84+
raise ValueError("Only Unix socket communication is supported")
85+
86+
def _send_jsonrpc_request(self, connection: t.Any, method: str, params: dict) -> str:
87+
"""Send a JSON-RPC request over the connection and return the request ID."""
88+
request_id = str(uuid.uuid4())
89+
jsonrpc_request = {"jsonrpc": "2.0", "method": method, "params": params, "id": request_id}
90+
91+
# JSON-RPC over connection uses Content-Length header (LSP protocol style)
92+
message = json.dumps(jsonrpc_request)
93+
content_length = len(message.encode("utf-8"))
94+
95+
# Send with Content-Length header
96+
header = f"Content-Length: {content_length}\r\n\r\n"
97+
full_message = header.encode("utf-8") + message.encode("utf-8")
98+
connection.write(full_message)
99+
connection.flush()
100+
101+
return request_id
102+
103+
def _read_jsonrpc_message(self, connection: t.Any) -> t.Dict[str, t.Any]:
104+
"""Read any JSON-RPC message (response or notification) from the connection."""
105+
# Read headers
106+
headers = b""
107+
while b"\r\n\r\n" not in headers:
108+
chunk = connection.read(1)
109+
if not chunk:
110+
raise ValueError("Connection closed while reading headers")
111+
headers += chunk
112+
113+
# Parse Content-Length header
114+
header_str = headers.decode("utf-8")
115+
content_length = None
116+
for line in header_str.split("\r\n"):
117+
if line.startswith("Content-Length:"):
118+
content_length = int(line.split(":")[1].strip())
119+
break
120+
121+
if content_length is None:
122+
raise ValueError("No Content-Length header found")
123+
124+
# Read the content
125+
content = connection.read(content_length)
126+
if len(content) < content_length:
127+
raise ValueError("Connection closed while reading content")
128+
129+
# Parse JSON-RPC message
130+
message = json.loads(content.decode("utf-8"))
131+
return message
132+
133+
def _read_jsonrpc_response(self, connection: t.Any, expected_id: str) -> t.Any:
134+
"""Read a JSON-RPC response from the connection."""
135+
message = self._read_jsonrpc_message(connection)
136+
137+
if message.get("id") != expected_id:
138+
raise ValueError(f"Unexpected response ID: {message.get('id')}")
139+
140+
if "error" in message:
141+
raise ValueError(f"JSON-RPC error: {message['error']}")
142+
143+
return message.get("result", {})
144+
145+
def call_janitor(self, ignore_ttl: bool = False) -> bool:
146+
rfile = wfile = None
147+
try:
148+
rfile, wfile = self._open_connection()
149+
150+
request = LSPCLICallRequest(
151+
arguments=["janitor"] + (["--ignore-ttl"] if ignore_ttl else [])
152+
)
153+
request_id = self._send_jsonrpc_request(wfile, "sqlmesh/cli/call", request.model_dump())
154+
155+
with self.renderer as renderer:
156+
while True:
157+
try:
158+
message_data = self._read_jsonrpc_message(rfile)
159+
if "id" in message_data and message_data["id"] == request_id:
160+
result = message_data.get("result", {})
161+
if result.get("state") == "finished":
162+
return True
163+
elif result.get("state") == "error":
164+
print(f"Error from daemon: {result.get('message', 'Unknown error')}")
165+
return False
166+
elif message_data.get("method") == "sqlmesh/cli/update":
167+
params = message_data.get("params", {})
168+
if params.get("state") == "ongoing":
169+
message = params.get("message", {})
170+
if "state" in message:
171+
janitor_state = JanitorState.model_validate(message)
172+
renderer.render(janitor_state.state)
173+
except Exception as stream_error:
174+
print(f"Stream ended: {stream_error}")
175+
break
176+
return True
177+
except Exception as e:
178+
print(f"Failed to communicate with daemon: {e}")
179+
return False
180+
finally:
181+
try:
182+
if rfile: rfile.close()
183+
finally:
184+
if wfile: wfile.close()
185+
186+
def get_daemon_connector(project_path: Path) -> t.Optional[DaemonConnector]:
187+
"""Get a daemon connector if a valid lock file exists."""
188+
lock_path = return_lock_file_path(project_path)
189+
190+
if not lock_path.exists():
191+
return None
192+
193+
try:
194+
# Validate the lock file
195+
lock_file = _validate_lock_file(lock_path)
196+
197+
# Check if communication info is present
198+
if lock_file.communication is None:
199+
return None
200+
201+
return DaemonConnector(project_path, lock_file)
202+
except Exception as e:
203+
# Log the error but don't fail - fall back to direct execution
204+
print(f"Warning: Could not connect to daemon: {e}")
205+
return None

sqlmesh/cli/main.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
from sqlmesh.utils import Verbosity
2525
from sqlmesh.utils.date import TimeLike
2626
from sqlmesh.utils.errors import MissingDependencyError, SQLMeshError
27+
from sqlmesh.cli.daemon_connector import get_daemon_connector
2728

2829
logger = logging.getLogger(__name__)
2930

@@ -640,7 +641,16 @@ def janitor(ctx: click.Context, ignore_ttl: bool, **kwargs: t.Any) -> None:
640641
641642
The janitor cleans up old environments and expired snapshots.
642643
"""
643-
ctx.obj.run_janitor(ignore_ttl, **kwargs)
644+
project_path = Path.cwd()
645+
daemon = get_daemon_connector(project_path)
646+
if daemon:
647+
print("Connecting to SQLMesh daemon...")
648+
success = daemon.call_janitor(ignore_ttl)
649+
print("Janitor completed, with success:", success)
650+
else:
651+
# No daemon available, run directly
652+
# ctx.obj.run_janitor(ignore_ttl, **kwargs)
653+
raise click.ClickException("no socket found")
644654

645655

646656
@cli.command("destroy")

0 commit comments

Comments
 (0)