diff --git a/src/qlever/commands/update_wikidata.py b/src/qlever/commands/update_wikidata.py index 3c686540..fcf88848 100644 --- a/src/qlever/commands/update_wikidata.py +++ b/src/qlever/commands/update_wikidata.py @@ -10,6 +10,7 @@ from datetime import datetime, timezone from enum import Enum, auto from pathlib import Path +from threading import Event import rdflib.term import requests_sse @@ -32,43 +33,6 @@ def custom_cast_lexical_to_python(lexical, datatype): rdflib.term._castLexicalToPython = custom_cast_lexical_to_python -def retry_with_backoff(operation, operation_name, max_retries, log): - """ - Retry an operation with exponential backoff, see backoff intervals below - (in seconds). Returns the result of the operation if successful, or raises - the last exception. - """ - backoff_intervals = [5, 10, 30, 60, 300, 900, 1800, 3600] - - for attempt in range(max_retries): - try: - return operation() - except Exception as e: - if attempt < max_retries - 1: - # Use the appropriate backoff interval (once we get to the end - # of the list, keep using the last interval). - retry_delay = ( - backoff_intervals[attempt] - if attempt < len(backoff_intervals) - else backoff_intervals[-1] - ) - # Show the delay as seconds, minutes, or hours. - if retry_delay >= 3600: - delay_str = f"{retry_delay // 3600}h" - elif retry_delay >= 60: - delay_str = f"{retry_delay // 60}min" - else: - delay_str = f"{retry_delay}s" - log.warn( - f"{operation_name} failed (attempt {attempt + 1}/{max_retries}): {e}. " - f"Retrying in {delay_str} ..." - ) - time.sleep(retry_delay) - else: - # If this was the last attempt, re-raise the exception. - raise - - def connect_to_sse_stream(sse_stream_url, since=None, event_id=None): """ Connect to the SSE stream and return the connected EventSource. @@ -128,7 +92,7 @@ def __init__(self): "stream/rdf-streaming-updater.mutation.v2" ) # Remember if Ctrl+C was pressed, so we can handle it gracefully. - self.ctrl_c_pressed = False + self.ctrl_c_pressed = Event() # Set to `True` when finished. self.finished = False @@ -266,12 +230,52 @@ def additional_arguments(self, subparser) -> None: "last-three (keep the three most recent) (default: last)", ) + def retry_with_backoff(self, operation, operation_name, max_retries, log): + """ + Retry an operation with exponential backoff, see backoff intervals below + (in seconds). Returns the result of the operation if successful, or raises + the last exception. + """ + backoff_intervals = [5, 10, 30, 60, 300, 900, 1800, 3600] + + for attempt in range(max_retries): + try: + return operation() + except Exception as e: + if self.ctrl_c_pressed.is_set(): + raise KeyboardInterrupt() + if attempt < max_retries - 1: + # Use the appropriate backoff interval (once we get to the end + # of the list, keep using the last interval). + retry_delay = ( + backoff_intervals[attempt] + if attempt < len(backoff_intervals) + else backoff_intervals[-1] + ) + # Show the delay as seconds, minutes, or hours. + if retry_delay >= 3600: + delay_str = f"{retry_delay // 3600}h" + elif retry_delay >= 60: + delay_str = f"{retry_delay // 60}min" + else: + delay_str = f"{retry_delay}s" + log.warn( + f"{operation_name} failed (attempt {attempt + 1}/{max_retries}): {e}. " + f"Retrying in {delay_str} ..." + ) + # Returns true if the wait ended because of the flag being set. + if self.ctrl_c_pressed.wait(timeout=retry_delay): + raise KeyboardInterrupt() + else: + # If this was the last attempt, re-raise the exception. + raise + # Handle Ctrl+C gracefully by finishing the current batch and then exiting. def handle_ctrl_c(self, signal_received, frame): - if self.ctrl_c_pressed: + if self.ctrl_c_pressed.is_set(): log.warn("\rCtrl+C pressed again, watch your blood pressure") else: - self.ctrl_c_pressed = True + self.ctrl_c_pressed.set() def execute(self, args) -> bool: # cURL command to get the date until which the updates of the @@ -360,7 +364,7 @@ def execute(self, args) -> bool: # message from the SSE stream using the `since` date. if not args.offset: try: - source = retry_with_backoff( + source = self.retry_with_backoff( lambda: connect_to_sse_stream( args.sse_stream_url, since=since ), @@ -385,6 +389,12 @@ def execute(self, args) -> bool: f"No event with topic {args.topic} found in stream" ) args.offset = offset + except KeyboardInterrupt: + log.warn( + "\rCtrl+C pressed while determing current state, " + "exiting" + ) + return True except Exception as e: log.error(f"Error determining offset from stream: {e}") return False @@ -424,11 +434,8 @@ def execute(self, args) -> bool: ) log.info("") wait_before_next_batch = False - for _ in range(args.wait_between_batches): - if self.ctrl_c_pressed: - break - time.sleep(1) - if self.ctrl_c_pressed: + self.ctrl_c_pressed.wait(args.wait_between_batches) + if self.ctrl_c_pressed.is_set(): log.warn( "\rCtrl+C pressed while waiting in between batches, " "exiting" @@ -457,7 +464,7 @@ def execute(self, args) -> bool: # Connect to the SSE stream with retry logic try: - source = retry_with_backoff( + source = self.retry_with_backoff( lambda: connect_to_sse_stream( args.sse_stream_url, since=since if not event_id_for_next_batch else None, @@ -467,6 +474,12 @@ def execute(self, args) -> bool: args.num_retries, log, ) + except KeyboardInterrupt: + log.warn( + "\rCtrl+C pressed while while connecting to stream, " + "exiting" + ) + break except Exception as e: log.error( f"Failed to connect to SSE stream after " @@ -515,7 +528,7 @@ def execute(self, args) -> bool: ) # Verify offset with retry logic try: - result = retry_with_backoff( + result = self.retry_with_backoff( lambda: run_command( f"{curl_cmd_check_offset} | sed 1d", return_output=True, @@ -570,6 +583,12 @@ def execute(self, args) -> bool: f"out of order or some updates are missing" ) return False + except KeyboardInterrupt: + log.warn( + "\rCtrl+C pressed while while verifying state, " + "exiting" + ) + break except Exception as e: log.error( f"Failed to retrieve or verify offset from " @@ -868,7 +887,7 @@ def node_to_sparql(node: rdflib.term.Node) -> str: # Ctrl+C finishes the current batch (this should come at the # end of the inner event loop so that always at least one # message is processed). - if self.ctrl_c_pressed: + if self.ctrl_c_pressed.is_set(): log.warn( "\rCtrl+C pressed while processing a batch, " "finishing it and exiting" @@ -1003,7 +1022,7 @@ def node_to_sparql(node: rdflib.term.Node) -> str: # Run it (using `curl` for batch size up to 1000, otherwise # `requests`) with retry logic. try: - result = retry_with_backoff( + result = self.retry_with_backoff( lambda: run_command(curl_cmd, return_output=True), "UPDATE request", args.num_retries, @@ -1049,6 +1068,12 @@ def node_to_sparql(node: rdflib.term.Node) -> str: except Exception: pass # Ignore errors during cleanup + except KeyboardInterrupt: + log.warn( + "\rCtrl+C pressed while executing update, " + "exiting" + ) + return True except Exception as e: log.error( f"Failed to execute UPDATE request after " @@ -1271,7 +1296,7 @@ def get_time_ms( # If Ctrl+C was pressed, we reached `--until`, or we processed # exactly `--num-messages`, finish. if ( - self.ctrl_c_pressed + self.ctrl_c_pressed.is_set() or self.finished or ( args.num_messages is not None