Skip to content

Commit

Permalink
Support for WARC resource record types.
Browse files Browse the repository at this point in the history
WARC resource records store "a resource retrieved over a network,
but not necessarily as a direct result of a single HTTP request/response
exchange."

These records can omit http headers, making the prior code which relied on
http header presence problematic. Here were add a simple method to resolve
the attribute values necessary to process records for linearization.
  • Loading branch information
no0p committed Mar 4, 2025
1 parent a1755cd commit 9a908b4
Showing 1 changed file with 35 additions and 11 deletions.
46 changes: 35 additions & 11 deletions python/dolma/warc/processor.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import datetime
from logging import warning
import multiprocessing
import tempfile
from contextlib import ExitStack
Expand Down Expand Up @@ -73,6 +74,30 @@ def increment_progressbar( # type: ignore
# we call the super method to increment the progress bar
return super().increment_progressbar(queue, files=files, records=records, extracted=extracted)

@classmethod
def resolve_record_info(cls, record):
payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower()
target_uri = record.headers.get("WARC-Target-URI")

if record.record_type == WarcRecordType.response:
ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";")

return dict(
payload_id=payload_id,
target_uri=target_uri,
ctype=ctype,
date=cls._parse_warc_timestamp(record.http_headers.get("Date")),
)
elif record.record_type == WarcRecordType.resource:
return dict(
payload_id=payload_id,
target_uri=target_uri,
ctype=record.headers.get("Content-Type"),
date=cls._parse_warc_timestamp(record.headers.get("WARC-Date")),
)
else:
warning("Unsupported WARC record type: {record.record_type}")

@classmethod
def process_single(
cls,
Expand Down Expand Up @@ -138,7 +163,7 @@ def process_single(
smart_open.open(source_path, "rb") as warc_file,
smart_open.open(destination_path, "wb") as output_file,
):
it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo)
it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo | WarcRecordType.resource)
for record in it:
if record.record_type == WarcRecordType.warcinfo:
warc_date = record.record_date or None
Expand All @@ -164,24 +189,23 @@ def process_single(
if not decoded_content:
continue

# metadata
ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";")
date = cls._parse_warc_timestamp(record.http_headers.get("Date"))
target_uri = record.headers.get("WARC-Target-URI")
payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower()
warc_record_info = cls.resolve_record_info(record)
if not warc_record_info:
continue

metadata = dict(
warc_url=target_uri,
url=url_normalizer(target_uri),
warc_url=warc_record_info["target_uri"],
url=url_normalizer(warc_record_info["target_uri"]),
html=decoded_content,
warc_date=cls._format_to_dolma_timestamp(warc_date),
warc_filename=warc_filename or "",
content_type=ctype,
content_type=warc_record_info["ctype"],
uncompressed_offset=record.stream_pos,
)
doc = InputSpecWithMetadataAndAttributes(
source=source_name,
version=source_version,
id=payload_id,
id=warc_record_info["payload_id"],
text="", # this will come later
metadata=metadata,
)
Expand All @@ -205,7 +229,7 @@ def process_single(
for a_name, attr_values in attributes.items()
}

doc.created = cls._format_to_dolma_timestamp(date)
doc.created = cls._format_to_dolma_timestamp(warc_record_info["date"])
doc.added = cls._format_to_dolma_timestamp(date_now)

if not store_html_in_metadata:
Expand Down

0 comments on commit 9a908b4

Please sign in to comment.