Skip to content

Commit

Permalink
Migrate WARC download to local WARC cache (WIP)
Browse files Browse the repository at this point in the history
  • Loading branch information
janheinrichmerker committed Feb 26, 2025
1 parent c53fd49 commit 028bc73
Showing 1 changed file with 57 additions and 20 deletions.
77 changes: 57 additions & 20 deletions archive_query_log/downloaders/warc.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
from datetime import datetime
from itertools import chain
from typing import Iterable, Iterator, TypeVar, Generic, Type, Callable
from json import dumps, loads, JSONEncoder, JSONDecoder
from typing import Iterable, Iterator, TypeVar, Generic, Type, Callable, Any
from uuid import uuid5
from warnings import warn

from click import echo
from elasticsearch_dsl import Search
from elasticsearch_dsl import Document, Search
from elasticsearch_dsl.function import RandomScore
from elasticsearch_dsl.query import Exists, FunctionScore, Term, RankFeature
from requests import ConnectionError as RequestsConnectionError
Expand All @@ -21,14 +22,28 @@
from archive_query_log.orm import Serp, InnerDownloader, WarcLocation, Result
from archive_query_log.utils.es import safe_iter_scan, update_action
from archive_query_log.utils.time import utc_now
from archive_query_log.utils.warc_cache import WarcCacheRecord

_T = TypeVar("_T")
_T = TypeVar("_T", bound=Document)


class _JsonEncoderDecoder(JSONEncoder, JSONDecoder):
def default(self, object: Any) -> str | Any:
if isinstance(object, datetime):
return object.isoformat()
return super().default(object)

def decode(self, s: str, *args) -> Any:
try:
return datetime.fromisoformat(s)
except ValueError:
return super().decode(s, *args)


class _WrapperArcWarcRecord(ArcWarcRecord, Generic[_T]):
wrapped: _T
_wrapped_type: Type[_T]

def __init__(self, wrapped: _T, record: ArcWarcRecord):
def __init__(self, record: ArcWarcRecord, wrapped: _T | Type[_T]) -> None:
super().__init__(
record.format,
record.rec_type,
Expand All @@ -40,12 +55,24 @@ def __init__(self, wrapped: _T, record: ArcWarcRecord):
payload_length=record.payload_length,
digest_checker=record.digest_checker,
)
self.wrapped = wrapped
if isinstance(wrapped, type):
self._wrapped_type = wrapped
else:
self._wrapped_type = type(wrapped)
self.rec_headers["WARC-Wrapped"] = dumps(wrapped.to_dict (include_meta=True), cls=_JsonEncoderDecoder)

@property
def wrapped(self) -> _T:
return self._wrapped_type._from_dict(loads(self.rec_headers["WARC-Wrapped"], cls=_JsonEncoderDecoder))

@wrapped.setter
def set_wrapped(self, value):
self.rec_headers["WARC-Wrapped"] = dumps(value.to_dict(include_meta=True), cls=_JsonEncoderDecoder)


def _unwrap(
warc_record: WarcS3Record,
wrapper_type: Type[_WrapperArcWarcRecord[_T]],
warc_record: WarcS3Record | WarcCacheRecord,
wrapper_type: Type[_WrapperArcWarcRecord[_T]],
) -> tuple[_T, WarcLocation]:
record: ArcWarcRecord = warc_record.record
if not isinstance(record, wrapper_type):
Expand Down Expand Up @@ -86,7 +113,7 @@ def _download_serp_warc(
))
return
for record in records:
yield _SerpArcWarcRecord(serp, record)
yield _SerpArcWarcRecord(record, serp)


def download_serps_warc(config: Config) -> None:
Expand Down Expand Up @@ -118,23 +145,33 @@ def download_serps_warc(config: Config) -> None:
)
changed_serps = safe_iter_scan(changed_serps)
# noinspection PyTypeChecker
changed_serps = tqdm(changed_serps, total=num_changed_serps,
desc="Downloading WARCs", unit="SERP")
changed_serps = tqdm(
changed_serps, total=num_changed_serps, desc="Downloading WARCs", unit="SERP"
)

# Download from Memento API.
serp_records = chain.from_iterable(
_download_serp_warc(config, serp)
for serp in changed_serps
serp_records: Iterable[_SerpArcWarcRecord] = chain.from_iterable(
_download_serp_warc(config, serp) for serp in changed_serps
)

# Write to S3.
stored_records: Iterator[WarcS3Record] = (
config.s3.warc_store.write(serp_records))
stored_serps = (
_unwrap(record, _SerpArcWarcRecord)
for record in stored_records
# Write to cache.
stored_records: Iterator[WarcCacheRecord] = config.warc_cache_store.write(
serp_records
)

# Consume iterator to write to cache.
for _ in stored_records:
pass


def upload_serps_warc(config: Config) -> None:
raise NotImplementedError()

# Write to S3.
# stored_records: Iterator[WarcS3Record] = (
# config.s3.warc_store.write(serp_records))
stored_serps = (_unwrap(record, _SerpArcWarcRecord) for record in stored_records)

downloader_id_components = (
config.s3.endpoint_url,
config.s3.bucket_name,
Expand Down

0 comments on commit 028bc73

Please sign in to comment.