Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 24 additions & 37 deletions src/qlever/commands/update_wikidata.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,13 +202,6 @@ def additional_arguments(self, subparser) -> None:
default=0,
help="The partition to consume from the SSE stream (default: 0)",
)
subparser.add_argument(
"--min-or-max-date",
choices=["min", "max"],
default="max",
help="Use the minimum or maximum date of the batch for the "
"`updatesCompleteUntil` property (default: maximum)",
)
subparser.add_argument(
"--wait-between-batches",
type=int,
Expand Down Expand Up @@ -347,8 +340,7 @@ def execute(self, args) -> bool:
if result and result != '""':
args.offset = int(result.strip('"'))
log.info(
f"Resuming from offset from endpoint: "
f"{args.offset}"
f"Resuming from offset from endpoint: {args.offset}"
)
except Exception as e:
log.debug(
Expand Down Expand Up @@ -659,9 +651,9 @@ def execute(self, args) -> bool:
rdf_linked_shared_data = event_data.get(
"rdf_linked_shared_data"
)
rdf_unlinked_shared_data = event_data.get(
"rdf_unlinked_shared_data"
)
# rdf_unlinked_shared_data = event_data.get(
# "rdf_unlinked_shared_data"
# )

# Check batch completion conditions BEFORE processing the
# data of this message. If any of the conditions is met,
Expand Down Expand Up @@ -763,10 +755,14 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
)

# Process the to-be-deleted triples.
for rdf_to_be_deleted in (
rdf_deleted_data,
rdf_unlinked_shared_data,
):
#
# NOTE: The triples from `rdf_unlinked_shared_data`
# must not be deleted, because they are only
# unlinked from the current entity, but may still
# be linked from other entities. If they are not
# linked from any other entity, they will be
# orphaned, but we don't mind that.
for rdf_to_be_deleted in (rdf_deleted_data,):
if rdf_to_be_deleted is not None:
try:
rdf_to_be_deleted_data = (
Expand Down Expand Up @@ -909,28 +905,13 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
f"min delta to NOW: {min_delta_to_now_s}s]"
)

# Add the min and max date of the batch to `insert_triples`.
#
# NOTE: The min date means that we have *all* updates until that
# date. The max date is the date of the latest update we have seen.
# However, there may still be earlier updates that we have not seen
# yet. Wikidata uses `schema:dateModified` for the latter semantics,
# so we use it here as well. For the other semantics, we invent
# a new property `wikibase:updatesCompleteUntil`.
insert_triples.add(
f"<http://wikiba.se/ontology#Dump> "
f"<http://schema.org/dateModified> "
f'"{date_list[-1]}"^^<http://www.w3.org/2001/XMLSchema#dateTime>'
)
updates_complete_until = (
date_list[-1]
if args.min_or_max_date == "max"
else date_list[0]
)
# Add a triples `wikibase:Dump wikibase:updatesCompleteUntil
# DATE` and `wikibase:Dump wikibase:updateStreamNextOffset
# OFFSET`.
insert_triples.add(
f"<http://wikiba.se/ontology#Dump> "
f"<http://wikiba.se/ontology#updatesCompleteUntil> "
f'"{updates_complete_until}"'
f'"{date_list[-1]}"'
f"^^<http://www.w3.org/2001/XMLSchema#dateTime>"
)
insert_triples.add(
Expand Down Expand Up @@ -1028,13 +1009,19 @@ def node_to_sparql(node: rdflib.term.Node) -> str:
update_files[offset].append(file_path)

# Sort by offset (newest last)
sorted_offsets = sorted(update_files.keys(), key=lambda x: int(x))
sorted_offsets = sorted(
update_files.keys(), key=lambda x: int(x)
)

# Determine which to keep
if args.keep_update_requests == "none":
files_to_keep = []
elif args.keep_update_requests == "last":
files_to_keep = update_files[sorted_offsets[-1]] if sorted_offsets else []
files_to_keep = (
update_files[sorted_offsets[-1]]
if sorted_offsets
else []
)
elif args.keep_update_requests == "last-three":
files_to_keep = []
for offset in sorted_offsets[-3:]:
Expand Down