Skip to content

Commit

Permalink
Add support for sending updates from the GUI
Browse files Browse the repository at this point in the history
  • Loading branch information
JamesWrigley committed Mar 23, 2024
1 parent 43ca406 commit 24b5a36
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 17 deletions.
33 changes: 27 additions & 6 deletions damnit/gui/kafka.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,30 @@
import pickle
import logging

from kafka import KafkaConsumer
from kafka import KafkaConsumer, KafkaProducer
from PyQt5 import QtCore

from ..backend.db import MsgKind, msg_dict
from ..definitions import UPDATE_BROKERS, UPDATE_TOPIC

log = logging.getLogger(__name__)


class UpdateReceiver(QtCore.QObject):
class UpdateAgent(QtCore.QObject):
message = QtCore.pyqtSignal(object)

def __init__(self, db_id: str) -> None:
QtCore.QObject.__init__(self)
self.update_topic = UPDATE_TOPIC.format(db_id)

self.kafka_cns = KafkaConsumer(
UPDATE_TOPIC.format(db_id), bootstrap_servers=UPDATE_BROKERS
self.update_topic, bootstrap_servers=UPDATE_BROKERS
)
self.kafka_prd = KafkaProducer(bootstrap_servers=UPDATE_BROKERS,
value_serializer=lambda d: pickle.dumps(d))
self.running = False

def loop(self) -> None:
def listen_loop(self) -> None:
self.running = True

while self.running:
Expand All @@ -38,12 +42,29 @@ def loop(self) -> None:

self.message.emit(unpickled_msg)

def send_update(self, proposal, run, name, value):
message = msg_dict(MsgKind.run_values_updated,
{
"proposal": proposal,
"run": run,
"values": {
name: value
}
})

# Note: the send() function returns a future that we don't await. This
# is potentially dangerous, but the reason is because it would cause a
# delay in the GUI and because people don't usually close the GUI
# immediately after editing a variable so it's unlikely that the process
# will die before the update is sent (as happened with the backend).
self.kafka_prd.send(self.update_topic, message)

def stop(self):
self.running = False


if __name__ == "__main__":
recevier = UpdateReceiver("tcp://localhost:5556")
monitor = UpdateAgent("tcp://localhost:5556")

for record in recevier.kafka_cns:
for record in monitor.kafka_cns:
print(record.value.decode())
20 changes: 11 additions & 9 deletions damnit/gui/main_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from ..definitions import UPDATE_BROKERS
from ..util import StatusbarStylesheet, fix_data_for_plotting, icon_path
from .editor import ContextTestResult, Editor
from .kafka import UpdateReceiver
from .kafka import UpdateAgent
from .open_dialog import OpenDBDialog
from .plot import Canvas, Plot
from .table import DamnitTableModel, TableView, prettify_notation
Expand Down Expand Up @@ -119,7 +119,7 @@ def closeEvent(self, event):

def stop_update_listener_thread(self):
if self._updates_thread is not None:
self.update_receiver.stop()
self.update_agent.stop()
self._updates_thread.exit()
self._updates_thread.wait()
self._updates_thread = None
Expand Down Expand Up @@ -506,18 +506,18 @@ def _updates_thread_launcher(self) -> None:
assert self.db_id is not None

try:
self.update_receiver = UpdateReceiver(self.db_id)
self.update_agent = UpdateAgent(self.db_id)
except NoBrokersAvailable:
QtWidgets.QMessageBox.warning(self, "Broker connection failed",
f"Could not connect to any Kafka brokers at: {' '.join(UPDATE_BROKERS)}\n\n" +
"DAMNIT can operate offline, but it will not receive any updates from new or reprocessed runs.")
return

self._updates_thread = QtCore.QThread()
self.update_receiver.moveToThread(self._updates_thread)
self.update_agent.moveToThread(self._updates_thread)

self._updates_thread.started.connect(self.update_receiver.loop)
self.update_receiver.message.connect(self.handle_update)
self._updates_thread.started.connect(self.update_agent.listen_loop)
self.update_agent.message.connect(self.handle_update)
QtCore.QTimer.singleShot(0, self._updates_thread.start)

def _set_comment_date(self):
Expand Down Expand Up @@ -835,13 +835,15 @@ def mark_context_saved(self):
self._editor_status_message = str(self._context_path.resolve())
self.on_tab_changed(self._tab_widget.currentIndex())

def save_value(self, prop, run, column_name, value):
def save_value(self, prop, run, name, value):
if self.db is None:
log.warning("No SQLite database in use, value not saved")
return

log.debug("Saving data for column %s for prop %d run %d", column_name, prop, run)
self.db.set_variable(prop, run, column_name, ReducedData(value))
log.debug("Saving data for variable %s for prop %d run %d", name, prop, run)
self.db.set_variable(prop, run, name, ReducedData(value))
if self._connect_to_kafka:
self.update_agent.send_update(prop, run, name, value)

def save_time_comment(self, comment_id, value):
if self.db is None:
Expand Down
8 changes: 6 additions & 2 deletions tests/test_gui.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,17 @@ def test_connect_to_kafka(mock_db, qtbot):
db_dir, db = mock_db
pkg = "damnit.gui.kafka"

with patch(f"{pkg}.KafkaConsumer") as kafka_cns:
with patch(f"{pkg}.KafkaConsumer") as kafka_cns, \
patch(f"{pkg}.KafkaProducer") as kafka_prd:
MainWindow(db_dir, False).close()
kafka_cns.assert_not_called()
kafka_prd.assert_not_called()

with patch(f"{pkg}.KafkaConsumer") as kafka_cns:
with patch(f"{pkg}.KafkaConsumer") as kafka_cns, \
patch(f"{pkg}.KafkaProducer") as kafka_prd:
MainWindow(db_dir, True).close()
kafka_cns.assert_called_once()
kafka_prd.assert_called_once()

def test_editor(mock_db, mock_ctx, qtbot):
db_dir, db = mock_db
Expand Down

0 comments on commit 24b5a36

Please sign in to comment.