Skip to content

Commit 0c80974

Browse files
authored
feat: implemented R sessions api (#2)
* feat: implemented R sessions api * fix: corrected lint errors * feat: updated R session api * chore: updated datashield with released version * feat: check for credentials on new connection * chore: updated resource test * chore: updated released versions of datashield and obiba-opal
1 parent 41c6b7c commit 0c80974

File tree

5 files changed

+1502
-37
lines changed

5 files changed

+1502
-37
lines changed

datashield_opal/impl.py

Lines changed: 156 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
from argparse import Namespace
66
from contextlib import suppress
77
from obiba_opal.core import OpalClient, UriBuilder, OpalRequest, OpalResponse, HTTPError
8-
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError
8+
from datashield.interface import DSLoginInfo, DSDriver, DSConnection, DSResult, DSError, RSession
99

1010

1111
class OpalDSError(DSError):
@@ -23,15 +23,132 @@ def is_server_error(self) -> bool:
2323
return isinstance(self.exception, HTTPError) and self.exception.code >= 500
2424

2525

26+
class OpalRSession(RSession):
27+
def __init__(self, client: OpalClient, profile: str = None, restore: str = None, verbose: bool = False):
28+
self.client = client
29+
self.profile = profile
30+
self.restore = restore
31+
self.verbose = verbose
32+
self.id = None
33+
34+
def get_id(self) -> str:
35+
if self.id is None:
36+
self.start(False)
37+
return self.id
38+
39+
def start(self, asynchronous: bool = True) -> None:
40+
builder = UriBuilder(["datashield", "sessions"]).query("wait", not asynchronous)
41+
if self.profile is not None:
42+
builder.query("profile", self.profile)
43+
if self.restore is not None:
44+
builder.query("restore", self.restore)
45+
response = self._post(builder.build()).send()
46+
if response.code != 201:
47+
raise OpalDSError(ValueError(f"Failed to start R session: {response.code}"))
48+
session = response.from_json()
49+
if "id" not in session:
50+
raise OpalDSError(ValueError("Failed to start R session: no session id returned"))
51+
self.id = session["id"]
52+
53+
def is_started(self) -> bool:
54+
return self.id is not None
55+
56+
def is_ready(self) -> bool:
57+
if self.id is None:
58+
raise OpalDSError(ValueError("R session not started"))
59+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
60+
if response.code != 200:
61+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
62+
session = response.from_json()
63+
return session.get("state", "").lower() == "running"
64+
65+
def is_pending(self) -> bool:
66+
if self.id is None:
67+
raise OpalDSError(ValueError("R session not started"))
68+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
69+
if response.code != 200:
70+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
71+
session = response.from_json()
72+
return session.get("state", "").lower() == "pending"
73+
74+
def is_failed(self) -> bool:
75+
if self.id is None:
76+
raise OpalDSError(ValueError("R session not started"))
77+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
78+
if response.code != 200:
79+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
80+
session = response.from_json()
81+
return session.get("state", "").lower() == "failed"
82+
83+
def is_terminated(self) -> bool:
84+
if self.id is None:
85+
raise OpalDSError(ValueError("R session not started"))
86+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
87+
if response.code != 200:
88+
raise OpalDSError(ValueError(f"Failed to check R session status: {response.code}"))
89+
session = response.from_json()
90+
return session.get("state", "").lower() == "terminated"
91+
92+
def get_events(self) -> list:
93+
if self.id is None:
94+
raise OpalDSError(ValueError("R session not started"))
95+
response = self._get(UriBuilder(["datashield", "session", self.id]).build()).send()
96+
if response.code != 200:
97+
raise OpalDSError(ValueError(f"Failed to retrieve R session events: {response.code}"))
98+
session = response.from_json()
99+
events = [evt.split(";") for evt in session.get("events", [])]
100+
return events
101+
102+
def get_last_message(self) -> str:
103+
events = self.get_events()
104+
if events and len(events) > 0:
105+
last_event = events[-1]
106+
return last_event[2] if len(last_event) > 2 else "No message"
107+
return "No recent events"
108+
109+
def close(self) -> None:
110+
if self.id is not None:
111+
builder = UriBuilder(["datashield", "session", self.id])
112+
self._delete(builder.build()).send()
113+
self.id = None
114+
115+
def _post(self, ws: str) -> OpalRequest:
116+
request = self.client.new_request()
117+
if self.verbose:
118+
request.verbose()
119+
return request.accept_json().post().resource(ws)
120+
121+
def _get(self, ws: str) -> OpalRequest:
122+
request = self.client.new_request()
123+
if self.verbose:
124+
request.verbose()
125+
return request.accept_json().get().resource(ws)
126+
127+
def _delete(self, ws: str) -> OpalRequest:
128+
request = self.client.new_request()
129+
if self.verbose:
130+
request.verbose()
131+
return request.accept_json().delete().resource(ws)
132+
133+
26134
class OpalConnection(DSConnection):
27135
def __init__(self, name: str, loginInfo: OpalClient.LoginInfo, profile: str = "default", restore: str = None):
28136
self.name = name
29137
self.client = OpalClient.build(loginInfo)
30138
self.subject = None
31139
self.profile = profile
32140
self.restore = restore
33-
self.session = None
34141
self.verbose = False
142+
self.rsession = None
143+
self.rsession_started = False
144+
145+
def check_user(self) -> bool:
146+
"""Check if the user can authenticate by trying to retrieve the current subject profile."""
147+
try:
148+
self._get("/system/subject-profile/_current").fail_on_error().send()
149+
return True
150+
except Exception:
151+
return False
35152

36153
#
37154
# Content listing
@@ -68,6 +185,34 @@ def has_resource(self, name: str) -> bool:
68185
response = self._get(UriBuilder(["project", parts[0], "resource", parts[1]]).build()).send()
69186
return response.code == 200
70187

188+
#
189+
# R Session (server side)
190+
#
191+
192+
def has_session(self) -> bool:
193+
return self.rsession is not None
194+
195+
def start_session(self, asynchronous: bool = True) -> RSession:
196+
if self.rsession is not None:
197+
return self.rsession
198+
self.rsession = OpalRSession(self.client, profile=self.profile, restore=self.restore, verbose=self.verbose)
199+
self.rsession.start(asynchronous=asynchronous)
200+
self.rsession_started = not asynchronous or not self.rsession.is_pending()
201+
return self.rsession
202+
203+
def is_session_started(self) -> bool:
204+
if self.rsession is None:
205+
return False
206+
if self.rsession_started:
207+
return True
208+
self.rsession_started = not self.rsession.is_pending()
209+
return self.rsession_started
210+
211+
def get_session(self) -> RSession:
212+
if self.rsession is None:
213+
raise OpalDSError(ValueError("No R session established. Please start a session first."))
214+
return self.rsession
215+
71216
#
72217
# Assign
73218
#
@@ -249,10 +394,8 @@ def disconnect(self) -> None:
249394
"""
250395
Close DataSHIELD session, and then Opal session.
251396
"""
252-
if self.session is not None:
253-
builder = UriBuilder(["datashield", "session", self._get_session_id()])
254-
self._delete(builder.build()).send()
255-
self.session = None
397+
if self.rsession is not None:
398+
self.rsession.close()
256399
self.client.close()
257400

258401
#
@@ -267,21 +410,8 @@ def _get_subject(self):
267410
return self.subject
268411

269412
def _get_session_id(self) -> str:
270-
return self._get_session()["id"]
271-
272-
def _get_session(self):
273-
if self.session is None:
274-
builder = UriBuilder(["datashield", "sessions"])
275-
if self.profile is not None:
276-
builder.query("profile", self.profile)
277-
if self.restore is not None:
278-
builder.query("restore", self.restore)
279-
response = self._post(builder.build()).send()
280-
if response.code == 201:
281-
self.session = response.from_json()
282-
else:
283-
raise OpalDSError(ValueError(f"DataSHIELD session creation failed: {response.code}"))
284-
return self.session
413+
self.start_session(asynchronous=False)
414+
return self.rsession.get_id()
285415

286416
def _get(self, ws) -> OpalRequest:
287417
request = self.client.new_request()
@@ -313,7 +443,11 @@ class OpalDriver(DSDriver):
313443
def new_connection(cls, args: DSLoginInfo, restore: str = None) -> DSConnection:
314444
namedArgs = Namespace(opal=args.url, user=args.user, password=args.password, token=args.token)
315445
loginInfo = OpalClient.LoginInfo.parse(namedArgs)
316-
return OpalConnection(args.name, loginInfo, args.profile, restore)
446+
conn = OpalConnection(args.name, loginInfo, args.profile, restore)
447+
if not conn.check_user():
448+
creds = f"user {args.user}" if args.user else "token"
449+
raise OpalDSError(ValueError(f"Failed to authenticate on {args.url} with {creds}"))
450+
return conn
317451

318452

319453
class OpalResult(DSResult):

0 commit comments

Comments
 (0)