Skip to content

Commit

Permalink
Add support for sending updates from the GUI
Browse files Browse the repository at this point in the history
Currently only for the supported message types. Caveat: adding a new editable
column doesn't preserve the column position in the update.
  • Loading branch information
JamesWrigley committed Apr 4, 2024
1 parent 991e693 commit 8c34590
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 17 deletions.
42 changes: 36 additions & 6 deletions damnit/gui/kafka.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import pickle
import logging

from kafka import KafkaConsumer
from kafka import KafkaConsumer, KafkaProducer
from PyQt5 import QtCore

from ..backend.db import MsgKind, msg_dict
from ..definitions import UPDATE_BROKERS, UPDATE_TOPIC

log = logging.getLogger(__name__)


class UpdateReceiver(QtCore.QObject):
class UpdateAgent(QtCore.QObject):
message = QtCore.pyqtSignal(object)

def __init__(self, db_id: str) -> None:
QtCore.QObject.__init__(self)
self.update_topic = UPDATE_TOPIC.format(db_id)

self.kafka_cns = KafkaConsumer(
UPDATE_TOPIC.format(db_id), bootstrap_servers=UPDATE_BROKERS
self.update_topic, bootstrap_servers=UPDATE_BROKERS
)
self.kafka_prd = KafkaProducer(bootstrap_servers=UPDATE_BROKERS,
value_serializer=lambda d: pickle.dumps(d))
self.running = False

def loop(self) -> None:
def listen_loop(self) -> None:
self.running = True

while self.running:
Expand All @@ -38,12 +42,38 @@ def loop(self) -> None:

self.message.emit(unpickled_msg)

def run_values_updated(self, proposal, run, name, value):
message = msg_dict(MsgKind.run_values_updated,
{
"proposal": proposal,
"run": run,
"values": {
name: value
}
})

# Note: the send() function returns a future that we don't await
# immediately, but we call kafka_prd.flush() in stop() which will ensure
# that all messages are sent.
self.kafka_prd.send(self.update_topic, message)

def variable_set(self, name, title, description, variable_type):
message = msg_dict(MsgKind.variable_set,
{
"name": name,
"title": title,
"attributes": None,
"type": variable_type
})
self.kafka_prd.send(self.update_topic, message)

def stop(self):
self.running = False
self.kafka_prd.flush(timeout=10)


if __name__ == "__main__":
recevier = UpdateReceiver("tcp://localhost:5556")
monitor = UpdateAgent("tcp://localhost:5556")

for record in recevier.kafka_cns:
for record in monitor.kafka_cns:
print(record.value.decode())
23 changes: 14 additions & 9 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ..definitions import UPDATE_BROKERS
from ..util import StatusbarStylesheet, fix_data_for_plotting, icon_path
from .editor import ContextTestResult, Editor
from .kafka import UpdateReceiver
from .kafka import UpdateAgent
from .open_dialog import OpenDBDialog
from .plot import Canvas, Plot
from .table import DamnitTableModel, TableView, prettify_notation
Expand Down Expand Up @@ -118,7 +118,7 @@ def closeEvent(self, event):

def stop_update_listener_thread(self):
if self._updates_thread is not None:
self.update_receiver.stop()
self.update_agent.stop()
self._updates_thread.exit()
self._updates_thread.wait()
self._updates_thread = None
Expand Down Expand Up @@ -286,6 +286,9 @@ def add_variable(self, name, title, variable_type, description="", before=None):
self.table_view.add_new_columns([title], [True], [before_pos - n_static_cols - 1])
self.table.add_editable_column(name)

if self._connect_to_kafka:
self.update_agent.variable_set(name, title, description, variable_type)

def open_column_dialog(self):
if self._columns_dialog is None:
self._columns_dialog = QtWidgets.QDialog(self)
Expand Down Expand Up @@ -498,18 +501,18 @@ def _updates_thread_launcher(self) -> None:
assert self.db_id is not None

try:
self.update_receiver = UpdateReceiver(self.db_id)
self.update_agent = UpdateAgent(self.db_id)
except NoBrokersAvailable:
QtWidgets.QMessageBox.warning(self, "Broker connection failed",
f"Could not connect to any Kafka brokers at: {' '.join(UPDATE_BROKERS)}\n\n" +
"DAMNIT can operate offline, but it will not receive any updates from new or reprocessed runs.")
return

self._updates_thread = QtCore.QThread()
self.update_receiver.moveToThread(self._updates_thread)
self.update_agent.moveToThread(self._updates_thread)

self._updates_thread.started.connect(self.update_receiver.loop)
self.update_receiver.message.connect(self.handle_update)
self._updates_thread.started.connect(self.update_agent.listen_loop)
self.update_agent.message.connect(self.handle_update)
QtCore.QTimer.singleShot(0, self._updates_thread.start)

def _set_comment_date(self):
Expand Down Expand Up @@ -828,13 +831,15 @@ def mark_context_saved(self):
self._editor_status_message = str(self._context_path.resolve())
self.on_tab_changed(self._tab_widget.currentIndex())

def save_value(self, prop, run, column_name, value):
def save_value(self, prop, run, name, value):
if self.db is None:
log.warning("No SQLite database in use, value not saved")
return

log.debug("Saving data for column %s for prop %d run %d", column_name, prop, run)
self.db.set_variable(prop, run, column_name, ReducedData(value))
log.debug("Saving data for variable %s for prop %d run %d", name, prop, run)
self.db.set_variable(prop, run, name, ReducedData(value))
if self._connect_to_kafka:
self.update_agent.run_values_updated(prop, run, name, value)

def save_time_comment(self, comment_id, value):
if self.db is None:
Expand Down
8 changes: 6 additions & 2 deletions tests/test_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ def test_connect_to_kafka(mock_db, qtbot):
db_dir, db = mock_db
pkg = "damnit.gui.kafka"

with patch(f"{pkg}.KafkaConsumer") as kafka_cns:
with patch(f"{pkg}.KafkaConsumer") as kafka_cns, \
patch(f"{pkg}.KafkaProducer") as kafka_prd:
MainWindow(db_dir, False).close()
kafka_cns.assert_not_called()
kafka_prd.assert_not_called()

with patch(f"{pkg}.KafkaConsumer") as kafka_cns:
with patch(f"{pkg}.KafkaConsumer") as kafka_cns, \
patch(f"{pkg}.KafkaProducer") as kafka_prd:
MainWindow(db_dir, True).close()
kafka_cns.assert_called_once()
kafka_prd.assert_called_once()

def test_editor(mock_db, mock_ctx, qtbot):
db_dir, db = mock_db
Expand Down

0 comments on commit 8c34590

Please sign in to comment.