diff --git a/src/qlever/commands/update_wikidata.py b/src/qlever/commands/update_wikidata.py index 3c686540..0208f4aa 100644 --- a/src/qlever/commands/update_wikidata.py +++ b/src/qlever/commands/update_wikidata.py @@ -202,13 +202,6 @@ def additional_arguments(self, subparser) -> None: default=0, help="The partition to consume from the SSE stream (default: 0)", ) - subparser.add_argument( - "--min-or-max-date", - choices=["min", "max"], - default="max", - help="Use the minimum or maximum date of the batch for the " - "`updatesCompleteUntil` property (default: maximum)", - ) subparser.add_argument( "--wait-between-batches", type=int, @@ -347,8 +340,7 @@ def execute(self, args) -> bool: if result and result != '""': args.offset = int(result.strip('"')) log.info( - f"Resuming from offset from endpoint: " - f"{args.offset}" + f"Resuming from offset from endpoint: {args.offset}" ) except Exception as e: log.debug( @@ -659,9 +651,9 @@ def execute(self, args) -> bool: rdf_linked_shared_data = event_data.get( "rdf_linked_shared_data" ) - rdf_unlinked_shared_data = event_data.get( - "rdf_unlinked_shared_data" - ) + # rdf_unlinked_shared_data = event_data.get( + # "rdf_unlinked_shared_data" + # ) # Check batch completion conditions BEFORE processing the # data of this message. If any of the conditions is met, @@ -763,10 +755,14 @@ def node_to_sparql(node: rdflib.term.Node) -> str: ) # Process the to-be-deleted triples. - for rdf_to_be_deleted in ( - rdf_deleted_data, - rdf_unlinked_shared_data, - ): + # + # NOTE: The triples from `rdf_unlinked_shared_data` + # must not be deleted, because they are only + # unlinked from the current entity, but may still + # be linked from other entities. If they are not + # linked from any other entity, they will be + # orphaned, but we don't mind that. + for rdf_to_be_deleted in (rdf_deleted_data,): if rdf_to_be_deleted is not None: try: rdf_to_be_deleted_data = ( @@ -909,28 +905,13 @@ def node_to_sparql(node: rdflib.term.Node) -> str: f"min delta to NOW: {min_delta_to_now_s}s]" ) - # Add the min and max date of the batch to `insert_triples`. - # - # NOTE: The min date means that we have *all* updates until that - # date. The max date is the date of the latest update we have seen. - # However, there may still be earlier updates that we have not seen - # yet. Wikidata uses `schema:dateModified` for the latter semantics, - # so we use it here as well. For the other semantics, we invent - # a new property `wikibase:updatesCompleteUntil`. - insert_triples.add( - f" " - f" " - f'"{date_list[-1]}"^^' - ) - updates_complete_until = ( - date_list[-1] - if args.min_or_max_date == "max" - else date_list[0] - ) + # Add a triples `wikibase:Dump wikibase:updatesCompleteUntil + # DATE` and `wikibase:Dump wikibase:updateStreamNextOffset + # OFFSET`. insert_triples.add( f" " f" " - f'"{updates_complete_until}"' + f'"{date_list[-1]}"' f"^^" ) insert_triples.add( @@ -1028,13 +1009,19 @@ def node_to_sparql(node: rdflib.term.Node) -> str: update_files[offset].append(file_path) # Sort by offset (newest last) - sorted_offsets = sorted(update_files.keys(), key=lambda x: int(x)) + sorted_offsets = sorted( + update_files.keys(), key=lambda x: int(x) + ) # Determine which to keep if args.keep_update_requests == "none": files_to_keep = [] elif args.keep_update_requests == "last": - files_to_keep = update_files[sorted_offsets[-1]] if sorted_offsets else [] + files_to_keep = ( + update_files[sorted_offsets[-1]] + if sorted_offsets + else [] + ) elif args.keep_update_requests == "last-three": files_to_keep = [] for offset in sorted_offsets[-3:]: