Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 76 additions & 51 deletions src/qlever/commands/update_wikidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from datetime import datetime, timezone
from enum import Enum, auto
from pathlib import Path
from threading import Event

import rdflib.term
import requests_sse
Expand All @@ -32,43 +33,6 @@ def custom_cast_lexical_to_python(lexical, datatype):
rdflib.term._castLexicalToPython = custom_cast_lexical_to_python


def retry_with_backoff(operation, operation_name, max_retries, log):
"""
Retry an operation with exponential backoff, see backoff intervals below
(in seconds). Returns the result of the operation if successful, or raises
the last exception.
"""
backoff_intervals = [5, 10, 30, 60, 300, 900, 1800, 3600]

for attempt in range(max_retries):
try:
return operation()
except Exception as e:
if attempt < max_retries - 1:
# Use the appropriate backoff interval (once we get to the end
# of the list, keep using the last interval).
retry_delay = (
backoff_intervals[attempt]
if attempt < len(backoff_intervals)
else backoff_intervals[-1]
)
# Show the delay as seconds, minutes, or hours.
if retry_delay >= 3600:
delay_str = f"{retry_delay // 3600}h"
elif retry_delay >= 60:
delay_str = f"{retry_delay // 60}min"
else:
delay_str = f"{retry_delay}s"
log.warn(
f"{operation_name} failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay_str} ..."
)
time.sleep(retry_delay)
else:
# If this was the last attempt, re-raise the exception.
raise


def connect_to_sse_stream(sse_stream_url, since=None, event_id=None):
"""
Connect to the SSE stream and return the connected EventSource.
Expand Down Expand Up @@ -128,7 +92,7 @@ def __init__(self):
"stream/rdf-streaming-updater.mutation.v2"
)
# Remember if Ctrl+C was pressed, so we can handle it gracefully.
self.ctrl_c_pressed = False
self.ctrl_c_pressed = Event()
# Set to `True` when finished.
self.finished = False

Expand Down Expand Up @@ -266,12 +230,52 @@ def additional_arguments(self, subparser) -> None:
"last-three (keep the three most recent) (default: last)",
)

def retry_with_backoff(self, operation, operation_name, max_retries, log):
"""
Retry an operation with exponential backoff, see backoff intervals below
(in seconds). Returns the result of the operation if successful, or raises
the last exception.
"""
backoff_intervals = [5, 10, 30, 60, 300, 900, 1800, 3600]

for attempt in range(max_retries):
try:
return operation()
except Exception as e:
if self.ctrl_c_pressed.is_set():
raise KeyboardInterrupt()
if attempt < max_retries - 1:
# Use the appropriate backoff interval (once we get to the end
# of the list, keep using the last interval).
retry_delay = (
backoff_intervals[attempt]
if attempt < len(backoff_intervals)
else backoff_intervals[-1]
)
# Show the delay as seconds, minutes, or hours.
if retry_delay >= 3600:
delay_str = f"{retry_delay // 3600}h"
elif retry_delay >= 60:
delay_str = f"{retry_delay // 60}min"
else:
delay_str = f"{retry_delay}s"
log.warn(
f"{operation_name} failed (attempt {attempt + 1}/{max_retries}): {e}. "
f"Retrying in {delay_str} ..."
)
# Returns true if the wait ended because of the flag being set.
if self.ctrl_c_pressed.wait(timeout=retry_delay):
raise KeyboardInterrupt()
else:
# If this was the last attempt, re-raise the exception.
raise

# Handle Ctrl+C gracefully by finishing the current batch and then exiting.
def handle_ctrl_c(self, signal_received, frame):
if self.ctrl_c_pressed:
if self.ctrl_c_pressed.is_set():
log.warn("\rCtrl+C pressed again, watch your blood pressure")
else:
self.ctrl_c_pressed = True
self.ctrl_c_pressed.set()

def execute(self, args) -> bool:
# cURL command to get the date until which the updates of the
Expand Down Expand Up @@ -360,7 +364,7 @@ def execute(self, args) -> bool:
# message from the SSE stream using the `since` date.
if not args.offset:
try:
source = retry_with_backoff(
source = self.retry_with_backoff(
lambda: connect_to_sse_stream(
args.sse_stream_url, since=since
),
Expand All @@ -385,6 +389,12 @@ def execute(self, args) -> bool:
f"No event with topic {args.topic} found in stream"
)
args.offset = offset
except KeyboardInterrupt:
log.warn(
"\rCtrl+C pressed while determing current state, "
"exiting"
)
return True
except Exception as e:
log.error(f"Error determining offset from stream: {e}")
return False
Expand Down Expand Up @@ -424,11 +434,8 @@ def execute(self, args) -> bool:
)
log.info("")
wait_before_next_batch = False
for _ in range(args.wait_between_batches):
if self.ctrl_c_pressed:
break
time.sleep(1)
if self.ctrl_c_pressed:
self.ctrl_c_pressed.wait(args.wait_between_batches)
if self.ctrl_c_pressed.is_set():
log.warn(
"\rCtrl+C pressed while waiting in between batches, "
"exiting"
Expand Down Expand Up @@ -457,7 +464,7 @@ def execute(self, args) -> bool:

# Connect to the SSE stream with retry logic
try:
source = retry_with_backoff(
source = self.retry_with_backoff(
lambda: connect_to_sse_stream(
args.sse_stream_url,
since=since if not event_id_for_next_batch else None,
Expand All @@ -467,6 +474,12 @@ def execute(self, args) -> bool:
args.num_retries,
log,
)
except KeyboardInterrupt:
log.warn(
"\rCtrl+C pressed while while connecting to stream, "
"exiting"
)
break
except Exception as e:
log.error(
f"Failed to connect to SSE stream after "
Expand Down Expand Up @@ -515,7 +528,7 @@ def execute(self, args) -> bool:
)
# Verify offset with retry logic
try:
result = retry_with_backoff(
result = self.retry_with_backoff(
lambda: run_command(
f"{curl_cmd_check_offset} | sed 1d",
return_output=True,
Expand Down Expand Up @@ -570,6 +583,12 @@ def execute(self, args) -> bool:
f"out of order or some updates are missing"
)
return False
except KeyboardInterrupt:
log.warn(
"\rCtrl+C pressed while while verifying state, "
"exiting"
)
break
except Exception as e:
log.error(
f"Failed to retrieve or verify offset from "
Expand Down Expand Up @@ -868,7 +887,7 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
# Ctrl+C finishes the current batch (this should come at the
# end of the inner event loop so that always at least one
# message is processed).
if self.ctrl_c_pressed:
if self.ctrl_c_pressed.is_set():
log.warn(
"\rCtrl+C pressed while processing a batch, "
"finishing it and exiting"
Expand Down Expand Up @@ -1003,7 +1022,7 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
# Run it (using `curl` for batch size up to 1000, otherwise
# `requests`) with retry logic.
try:
result = retry_with_backoff(
result = self.retry_with_backoff(
lambda: run_command(curl_cmd, return_output=True),
"UPDATE request",
args.num_retries,
Expand Down Expand Up @@ -1049,6 +1068,12 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
except Exception:
pass # Ignore errors during cleanup

except KeyboardInterrupt:
log.warn(
"\rCtrl+C pressed while executing update, "
"exiting"
)
return True
except Exception as e:
log.error(
f"Failed to execute UPDATE request after "
Expand Down Expand Up @@ -1271,7 +1296,7 @@ def get_time_ms(
# If Ctrl+C was pressed, we reached `--until`, or we processed
# exactly `--num-messages`, finish.
if (
self.ctrl_c_pressed
self.ctrl_c_pressed.is_set()
or self.finished
or (
args.num_messages is not None
Expand Down