Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Mymdc proxy #226

Merged
merged 3 commits into from
Apr 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 88 additions & 1 deletion damnit/ctxsupport/ctxrunner.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import h5py
import numpy as np
import xarray as xr
import requests
import yaml

from damnit_ctx import RunData, Variable

Expand All @@ -45,6 +47,72 @@ class DataType(Enum):
Timestamp = "timestamp"


class MyMetadataClient:
def __init__(self, proposal, timeout=10, init_server="https://exfldadev01.desy.de/zwop"):
self.proposal = proposal
self.timeout = timeout
self._cache = {}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With how the cache is used (keys as a tuple of arguments basically as far as I could tell?) it seems pretty similar to what using the functools cache decorator would do right?

e.g.

    @cache
    def _run_info(self, run):
        response = requests.get(f"{self.server}/api/mymdc/proposals/by_number/{self.proposal}/runs/{run}",
                                    headers=self._headers, timeout=self.timeout)
        ....

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's true, but when we make this public the caching should probably be disabled by default (at least until mymdc implements caching headers), so I'll stick with the dictionary for now so we can add an option to disable caching later.


proposal_path = Path(extra_data.read_machinery.find_proposal(f"p{proposal:06d}"))
credentials_path = proposal_path / "usr/mymdc-credentials.yml"
if not credentials_path.is_file():
params = {
"proposal_no": str(proposal),
"kinds": "mymdc",
"overwrite": "false",
"dry_run": "false"
}
response = requests.post(f"{init_server}/api/write_tokens", params=params, timeout=timeout)
response.raise_for_status()

with open(credentials_path) as f:
document = yaml.safe_load(f)
self.token = document["token"]
self.server = document["server"]

self._headers = { "X-API-key": self.token }

def _run_info(self, run):
key = (run, "run_info")
if key not in self._cache:
response = requests.get(f"{self.server}/api/mymdc/proposals/by_number/{self.proposal}/runs/{run}",
headers=self._headers, timeout=self.timeout)
response.raise_for_status()
json = response.json()
if len(json["runs"]) == 0:
raise RuntimeError(f"Couldn't get run information from mymdc for p{self.proposal}, r{run}")

self._cache[key] = json["runs"][0]

return self._cache[key]

def sample_name(self, run):
key = (run, "sample_name")
if key not in self._cache:
run_info = self._run_info(run)
sample_id = run_info["sample_id"]
response = requests.get(f"{self.server}/api/mymdc/samples/{sample_id}",
headers=self._headers, timeout=self.timeout)
response.raise_for_status()

self._cache[key] = response.json()["name"]

return self._cache[key]

def run_type(self, run):
key = (run, "run_type")
if key not in self._cache:
run_info = self._run_info(run)
experiment_id = run_info["experiment_id"]
response = requests.get(f"{self.server}/api/mymdc/experiments/{experiment_id}",
headers=self._headers, timeout=self.timeout)
response.raise_for_status()

self._cache[key] = response.json()["name"]

return self._cache[key]


class ContextFileErrors(RuntimeError):
def __init__(self, problems):
self.problems = problems
Expand Down Expand Up @@ -107,6 +175,13 @@ def check(self):
f"These Variables have duplicate titles between them: {', '.join(bad_variables)}"
)

# Check that all mymdc dependencies are valid
for name, var in self.vars.items():
mymdc_args = var.arg_dependencies("mymdc#")
for arg_name, annotation in mymdc_args.items():
if annotation not in ["sample_name", "run_type"]:
problems.append(f"Argument '{arg_name}' of variable '{name}' has an invalid MyMdC dependency: '{annotation}'")

if problems:
raise ContextFileErrors(problems)

Expand Down Expand Up @@ -191,6 +266,7 @@ def filter(self, run_data=RunData.ALL, cluster=True, name_matches=()):

def execute(self, run_data, run_number, proposal, input_vars) -> 'Results':
res = {'start_time': np.asarray(get_start_time(run_data))}
mymdc = None

for name in self.ordered_vars():
var = self.vars[name]
Expand Down Expand Up @@ -221,6 +297,17 @@ def execute(self, run_data, run_number, proposal, input_vars) -> 'Results':
elif param.default is inspect.Parameter.empty:
missing_input.append(inp_name)

# Mymdc fields
elif annotation.startswith("mymdc#"):
if mymdc is None:
mymdc = MyMetadataClient(proposal)

mymdc_field = annotation.removeprefix("mymdc#")
if mymdc_field == "sample_name":
kwargs[arg_name] = mymdc.sample_name(run_number)
elif mymdc_field == "run_type":
kwargs[arg_name] = mymdc.run_type(run_number)

elif annotation == "meta#run_number":
kwargs[arg_name] = run_number
elif annotation == "meta#proposal":
Expand Down Expand Up @@ -342,7 +429,7 @@ def get_proposal_path(xd_run):
files = [f.filename for f in xd_run.files]
p = Path(files[0])

return Path(*p.parts[:7])
return Path(*p.parts[:-3])


def add_to_h5_file(path) -> h5py.File:
Expand Down
10 changes: 5 additions & 5 deletions damnit/ctxsupport/damnit_ctx.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,14 +52,14 @@ def data(self):
"""
return RunData.RAW if self._data is None else RunData(self._data)

def arg_dependencies(self):
def arg_dependencies(self, prefix="var#"):
"""
Get all direct dependencies of this Variable. Returns a dict of argument name
to variable name.
Get all direct dependencies of this Variable with a certain
type/prefix. Returns a dict of argument name to variable name.
"""
return { arg_name: annotation.removeprefix("var#")
return { arg_name: annotation.removeprefix(prefix)
for arg_name, annotation in self.annotations().items()
if annotation.startswith("var#") }
if annotation.startswith(prefix) }

def annotations(self):
"""
Expand Down
9 changes: 5 additions & 4 deletions damnit/gui/editor.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,12 +99,13 @@ def test_context(self, db, db_dir):
out_buffer = StringIO()
reporter = Reporter(out_buffer, out_buffer)
pyflakes_check(self.text(), "<ctx>", reporter)
# Disgusting hack to avoid getting warnings for "var#foo" and "meta#foo"
# type annotations. This needs some tweaking to avoid missing real
# errors.
# Disgusting hack to avoid getting warnings for "var#foo", "meta#foo",
# and "mymdc#foo" type annotations. This needs some tweaking to avoid
# missing real errors.
pyflakes_output = "\n".join([line for line in out_buffer.getvalue().split("\n")
if not line.endswith("undefined name 'var'") \
and not line.endswith("undefined name 'meta'")])
and not line.endswith("undefined name 'meta'") \
and not line.endswith("undefined name 'mymdc'")])

if len(pyflakes_output) > 0:
return ContextTestResult.WARNING, pyflakes_output
Expand Down
42 changes: 36 additions & 6 deletions damnit/gui/kafka.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import pickle
import logging

from kafka import KafkaConsumer
from kafka import KafkaConsumer, KafkaProducer
from PyQt5 import QtCore

from ..backend.db import MsgKind, msg_dict
from ..definitions import UPDATE_BROKERS, UPDATE_TOPIC

log = logging.getLogger(__name__)


class UpdateReceiver(QtCore.QObject):
class UpdateAgent(QtCore.QObject):
message = QtCore.pyqtSignal(object)

def __init__(self, db_id: str) -> None:
QtCore.QObject.__init__(self)
self.update_topic = UPDATE_TOPIC.format(db_id)

self.kafka_cns = KafkaConsumer(
UPDATE_TOPIC.format(db_id), bootstrap_servers=UPDATE_BROKERS
self.update_topic, bootstrap_servers=UPDATE_BROKERS
)
self.kafka_prd = KafkaProducer(bootstrap_servers=UPDATE_BROKERS,
value_serializer=lambda d: pickle.dumps(d))
self.running = False

def loop(self) -> None:
def listen_loop(self) -> None:
self.running = True

while self.running:
Expand All @@ -38,12 +42,38 @@ def loop(self) -> None:

self.message.emit(unpickled_msg)

def run_values_updated(self, proposal, run, name, value):
message = msg_dict(MsgKind.run_values_updated,
{
"proposal": proposal,
"run": run,
"values": {
name: value
}
})

# Note: the send() function returns a future that we don't await
# immediately, but we call kafka_prd.flush() in stop() which will ensure
# that all messages are sent.
self.kafka_prd.send(self.update_topic, message)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's a concern, I guess one option would be to use loop.run_soon, or store a reference to the coroutine and, only when the gui is being closed, await it or .close() it?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good idea, I added a call to .flush() in fbced4c which should do the same thing.


def variable_set(self, name, title, description, variable_type):
message = msg_dict(MsgKind.variable_set,
{
"name": name,
"title": title,
"attributes": None,
"type": variable_type
})
self.kafka_prd.send(self.update_topic, message)

def stop(self):
self.running = False
self.kafka_prd.flush(timeout=10)


if __name__ == "__main__":
recevier = UpdateReceiver("tcp://localhost:5556")
monitor = UpdateAgent("tcp://localhost:5556")

for record in recevier.kafka_cns:
for record in monitor.kafka_cns:
print(record.value.decode())
23 changes: 14 additions & 9 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ..definitions import UPDATE_BROKERS
from ..util import StatusbarStylesheet, fix_data_for_plotting, icon_path
from .editor import ContextTestResult, Editor
from .kafka import UpdateReceiver
from .kafka import UpdateAgent
from .open_dialog import OpenDBDialog
from .plot import Canvas, Plot
from .table import DamnitTableModel, TableView, prettify_notation
Expand Down Expand Up @@ -118,7 +118,7 @@ def closeEvent(self, event):

def stop_update_listener_thread(self):
if self._updates_thread is not None:
self.update_receiver.stop()
self.update_agent.stop()
self._updates_thread.exit()
self._updates_thread.wait()
self._updates_thread = None
Expand Down Expand Up @@ -286,6 +286,9 @@ def add_variable(self, name, title, variable_type, description="", before=None):
self.table_view.add_new_columns([title], [True], [before_pos - n_static_cols - 1])
self.table.add_editable_column(name)

if self._connect_to_kafka:
self.update_agent.variable_set(name, title, description, variable_type)

def open_column_dialog(self):
if self._columns_dialog is None:
self._columns_dialog = QtWidgets.QDialog(self)
Expand Down Expand Up @@ -498,18 +501,18 @@ def _updates_thread_launcher(self) -> None:
assert self.db_id is not None

try:
self.update_receiver = UpdateReceiver(self.db_id)
self.update_agent = UpdateAgent(self.db_id)
except NoBrokersAvailable:
QtWidgets.QMessageBox.warning(self, "Broker connection failed",
f"Could not connect to any Kafka brokers at: {' '.join(UPDATE_BROKERS)}\n\n" +
"DAMNIT can operate offline, but it will not receive any updates from new or reprocessed runs.")
return

self._updates_thread = QtCore.QThread()
self.update_receiver.moveToThread(self._updates_thread)
self.update_agent.moveToThread(self._updates_thread)

self._updates_thread.started.connect(self.update_receiver.loop)
self.update_receiver.message.connect(self.handle_update)
self._updates_thread.started.connect(self.update_agent.listen_loop)
self.update_agent.message.connect(self.handle_update)
QtCore.QTimer.singleShot(0, self._updates_thread.start)

def _set_comment_date(self):
Expand Down Expand Up @@ -828,13 +831,15 @@ def mark_context_saved(self):
self._editor_status_message = str(self._context_path.resolve())
self.on_tab_changed(self._tab_widget.currentIndex())

def save_value(self, prop, run, column_name, value):
def save_value(self, prop, run, name, value):
if self.db is None:
log.warning("No SQLite database in use, value not saved")
return

log.debug("Saving data for column %s for prop %d run %d", column_name, prop, run)
self.db.set_variable(prop, run, column_name, ReducedData(value))
log.debug("Saving data for variable %s for prop %d run %d", name, prop, run)
self.db.set_variable(prop, run, name, ReducedData(value))
if self._connect_to_kafka:
self.update_agent.run_values_updated(prop, run, name, value)

def save_time_comment(self, comment_id, value):
if self.db is None:
Expand Down
8 changes: 8 additions & 0 deletions docs/backend.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,12 @@ arguments if they have the right _annotations_:
- `meta#proposal_dir`: The root
[Path](https://docs.python.org/3/library/pathlib.html) to the current
proposal.
- `mymdc#sample_name`: The sample name from myMdC.
- `mymdc#run_type`: The run type from myMdC.

!!! warning
The myMdC integration requires a special token to work properly, please
contact the DA group if you would like to use this for your experiment.

You can also use annotations to express a dependency between `Variable`'s using
the `var#<name>` annotation:
Expand Down Expand Up @@ -195,6 +201,8 @@ $ amore-proto db-config context_python /path/to/your/python
The environment *must* have these dependencies installed for DAMNIT to work:

- `extra_data`
- `pyyaml`
- `requests`
- `scipy`

## Managing the backend
Expand Down
3 changes: 2 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@ dependencies = [
"numpy",
"pandas<2",
"xarray",
"requests",
"scipy",
"supervisor",
"termcolor",
"pyyaml"
]

[project.optional-dependencies]
Expand All @@ -34,7 +36,6 @@ gui = [
"PyQt5",
"pyflakes", # for checking context file in editor
"QScintilla==2.13",
"requests",
"tabulate", # used in pandas to make markdown tables (for Zulip)
]
test = [
Expand Down
Loading