From 9a908b4bb8da757f467d125dd57ad3d0952d28b9 Mon Sep 17 00:00:00 2001 From: robert berry Date: Mon, 3 Mar 2025 19:07:27 -0800 Subject: [PATCH 1/2] Support for WARC resource record types. WARC resource records store "a resource retrieved over a network, but not necessarily as a direct result of a single HTTP request/response exchange." These records can omit http headers, making the prior code which relied on http header presence problematic. Here were add a simple method to resolve the attribute values necessary to process records for linearization. --- python/dolma/warc/processor.py | 46 ++++++++++++++++++++++++++-------- 1 file changed, 35 insertions(+), 11 deletions(-) diff --git a/python/dolma/warc/processor.py b/python/dolma/warc/processor.py index 474c6ca9..d3122c09 100644 --- a/python/dolma/warc/processor.py +++ b/python/dolma/warc/processor.py @@ -1,4 +1,5 @@ import datetime +from logging import warning import multiprocessing import tempfile from contextlib import ExitStack @@ -73,6 +74,30 @@ def increment_progressbar( # type: ignore # we call the super method to increment the progress bar return super().increment_progressbar(queue, files=files, records=records, extracted=extracted) + @classmethod + def resolve_record_info(cls, record): + payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower() + target_uri = record.headers.get("WARC-Target-URI") + + if record.record_type == WarcRecordType.response: + ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";") + + return dict( + payload_id=payload_id, + target_uri=target_uri, + ctype=ctype, + date=cls._parse_warc_timestamp(record.http_headers.get("Date")), + ) + elif record.record_type == WarcRecordType.resource: + return dict( + payload_id=payload_id, + target_uri=target_uri, + ctype=record.headers.get("Content-Type"), + date=cls._parse_warc_timestamp(record.headers.get("WARC-Date")), + ) + else: + warning("Unsupported WARC record type: {record.record_type}") + @classmethod def process_single( cls, @@ -138,7 +163,7 @@ def process_single( smart_open.open(source_path, "rb") as warc_file, smart_open.open(destination_path, "wb") as output_file, ): - it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo) + it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo | WarcRecordType.resource) for record in it: if record.record_type == WarcRecordType.warcinfo: warc_date = record.record_date or None @@ -164,24 +189,23 @@ def process_single( if not decoded_content: continue - # metadata - ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";") - date = cls._parse_warc_timestamp(record.http_headers.get("Date")) - target_uri = record.headers.get("WARC-Target-URI") - payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower() + warc_record_info = cls.resolve_record_info(record) + if not warc_record_info: + continue + metadata = dict( - warc_url=target_uri, - url=url_normalizer(target_uri), + warc_url=warc_record_info["target_uri"], + url=url_normalizer(warc_record_info["target_uri"]), html=decoded_content, warc_date=cls._format_to_dolma_timestamp(warc_date), warc_filename=warc_filename or "", - content_type=ctype, + content_type=warc_record_info["ctype"], uncompressed_offset=record.stream_pos, ) doc = InputSpecWithMetadataAndAttributes( source=source_name, version=source_version, - id=payload_id, + id=warc_record_info["payload_id"], text="", # this will come later metadata=metadata, ) @@ -205,7 +229,7 @@ def process_single( for a_name, attr_values in attributes.items() } - doc.created = cls._format_to_dolma_timestamp(date) + doc.created = cls._format_to_dolma_timestamp(warc_record_info["date"]) doc.added = cls._format_to_dolma_timestamp(date_now) if not store_html_in_metadata: From 2adbaa16a62ca308d67b3324d101e07a8e1f41b3 Mon Sep 17 00:00:00 2001 From: robert berry Date: Mon, 3 Mar 2025 20:21:48 -0800 Subject: [PATCH 2/2] WarcRecordInfo class refactor. Some cleanup on the code organization to introduce support for resource WARC record types. Improved safety. Also linting. --- python/dolma/warc/processor.py | 57 ++++------------- python/dolma/warc/record_info.py | 51 +++++++++++++++ tests/python/test_warc_record_info.py | 91 +++++++++++++++++++++++++++ 3 files changed, 154 insertions(+), 45 deletions(-) create mode 100644 python/dolma/warc/record_info.py create mode 100644 tests/python/test_warc_record_info.py diff --git a/python/dolma/warc/processor.py b/python/dolma/warc/processor.py index d3122c09..b8ecde32 100644 --- a/python/dolma/warc/processor.py +++ b/python/dolma/warc/processor.py @@ -1,5 +1,4 @@ import datetime -from logging import warning import multiprocessing import tempfile from contextlib import ExitStack @@ -21,6 +20,7 @@ # from .documents import WarcDocument, WarcDocumentMetadata # from .filters import FilterInputType, partition_extractors from .linearizers import LinearizerRegistry +from .record_info import WarcRecordInfo from .utils import UrlNormalizer, raise_warc_dependency_error with necessary("fastwarc", soft=True) as FASTWARC_AVAILABLE: @@ -29,10 +29,7 @@ with necessary("dateparser", soft=True) as DATEPARSER_AVAILABLE: if DATEPARSER_AVAILABLE or TYPE_CHECKING: - import dateparser - - -DATE_FORMATS = ["%a, %d %b %Y %H:%M:%S %Z", "%Y-%m-%dT%H:%M:%SZ"] + import dateparser # noqa: F401 class WarcProcessor(BaseParallelProcessor): @@ -52,14 +49,6 @@ def _format_to_dolma_timestamp(timestamp: Optional[datetime.datetime] = None) -> timestamp = datetime.datetime.now() return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + "Z" - @staticmethod - def _parse_warc_timestamp(timestamp_str: Optional[str]) -> datetime.datetime: - """Parse a WARC timestamp into a datetime object.""" - if not timestamp_str: - return datetime.datetime.now() - - return dateparser.parse(date_string=timestamp_str, date_formats=DATE_FORMATS) or datetime.datetime.now() - @classmethod def increment_progressbar( # type: ignore cls, @@ -74,30 +63,6 @@ def increment_progressbar( # type: ignore # we call the super method to increment the progress bar return super().increment_progressbar(queue, files=files, records=records, extracted=extracted) - @classmethod - def resolve_record_info(cls, record): - payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower() - target_uri = record.headers.get("WARC-Target-URI") - - if record.record_type == WarcRecordType.response: - ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";") - - return dict( - payload_id=payload_id, - target_uri=target_uri, - ctype=ctype, - date=cls._parse_warc_timestamp(record.http_headers.get("Date")), - ) - elif record.record_type == WarcRecordType.resource: - return dict( - payload_id=payload_id, - target_uri=target_uri, - ctype=record.headers.get("Content-Type"), - date=cls._parse_warc_timestamp(record.headers.get("WARC-Date")), - ) - else: - warning("Unsupported WARC record type: {record.record_type}") - @classmethod def process_single( cls, @@ -163,7 +128,9 @@ def process_single( smart_open.open(source_path, "rb") as warc_file, smart_open.open(destination_path, "wb") as output_file, ): - it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo | WarcRecordType.resource) + it = ArchiveIterator( + warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo | WarcRecordType.resource + ) for record in it: if record.record_type == WarcRecordType.warcinfo: warc_date = record.record_date or None @@ -189,23 +156,23 @@ def process_single( if not decoded_content: continue - warc_record_info = cls.resolve_record_info(record) - if not warc_record_info: + warc_record_info = WarcRecordInfo(record) + if not warc_record_info.is_valid: continue metadata = dict( - warc_url=warc_record_info["target_uri"], - url=url_normalizer(warc_record_info["target_uri"]), + warc_url=warc_record_info.target_uri, + url=url_normalizer(warc_record_info.target_uri), html=decoded_content, warc_date=cls._format_to_dolma_timestamp(warc_date), warc_filename=warc_filename or "", - content_type=warc_record_info["ctype"], + content_type=warc_record_info.ctype, uncompressed_offset=record.stream_pos, ) doc = InputSpecWithMetadataAndAttributes( source=source_name, version=source_version, - id=warc_record_info["payload_id"], + id=warc_record_info.payload_id, text="", # this will come later metadata=metadata, ) @@ -229,7 +196,7 @@ def process_single( for a_name, attr_values in attributes.items() } - doc.created = cls._format_to_dolma_timestamp(warc_record_info["date"]) + doc.created = cls._format_to_dolma_timestamp(warc_record_info.date) doc.added = cls._format_to_dolma_timestamp(date_now) if not store_html_in_metadata: diff --git a/python/dolma/warc/record_info.py b/python/dolma/warc/record_info.py new file mode 100644 index 00000000..f519bf15 --- /dev/null +++ b/python/dolma/warc/record_info.py @@ -0,0 +1,51 @@ +import datetime +from typing import TYPE_CHECKING, Optional + +from fastwarc.warc import WarcRecordType +from necessary import necessary + +with necessary("dateparser", soft=True) as DATEPARSER_AVAILABLE: + if DATEPARSER_AVAILABLE or TYPE_CHECKING: + import dateparser + + +DATE_FORMATS = ["%a, %d %b %Y %H:%M:%S %Z", "%Y-%m-%dT%H:%M:%SZ"] + + +class WarcRecordInfo: + def __init__(self, record): + self.record = record + + if not self.is_valid: + return None + + self.payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower() + self.target_uri = record.headers.get("WARC-Target-URI") + + if record.record_type == WarcRecordType.response: + ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";") + self.ctype = ctype + self.date = WarcRecordInfo._parse_warc_timestamp(record.http_headers.get("Date")) + elif record.record_type == WarcRecordType.resource: + self.ctype, *_ = (record.headers.get("Content-Type") or "").split(";") + self.date = WarcRecordInfo._parse_warc_timestamp(record.headers.get("WARC-Date")) + else: + raise ValueError(f"Unsupported record type: {record.record_type}") + + @property + def is_valid(self) -> bool: + if not self.record.headers.get("WARC-Payload-Digest"): + return False + + if not self.record.headers.get("WARC-Target-URI"): + return False + + return True + + @staticmethod + def _parse_warc_timestamp(timestamp_str: Optional[str]) -> datetime.datetime: + """Parse a WARC timestamp into a datetime object.""" + if not timestamp_str: + return datetime.datetime.now() + + return dateparser.parse(date_string=timestamp_str, date_formats=DATE_FORMATS) or datetime.datetime.now() diff --git a/tests/python/test_warc_record_info.py b/tests/python/test_warc_record_info.py new file mode 100644 index 00000000..3a2611ea --- /dev/null +++ b/tests/python/test_warc_record_info.py @@ -0,0 +1,91 @@ +import datetime +from unittest import TestCase +from unittest.mock import MagicMock, patch + +from fastwarc.warc import WarcRecord, WarcRecordType + +from dolma.warc.record_info import WarcRecordInfo + + +class TestWarcRecordInfo(TestCase): + def test_response_record(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = WarcRecordType.response + record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"} + record_mock.http_headers = { + "Content-Type": "text/html; charset=utf-8", + "Date": "Thu, 20 Apr 2023 12:00:00 GMT", + } + + record_info = WarcRecordInfo(record_mock) + + self.assertEqual(record_info.payload_id, "payload_id") + self.assertEqual(record_info.target_uri, "http://example.com") + self.assertEqual(record_info.ctype, "text/html") + self.assertEqual(record_info.date, datetime.datetime(2023, 4, 20, 12, 0, 0)) + + def test_resource_record(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = WarcRecordType.resource + record_mock.headers = { + "WARC-Payload-Digest": "sha1:payload_id", + "WARC-Target-URI": "http://example.com", + "Content-Type": "application/json", + "WARC-Date": "2023-04-20T12:00:00Z", + } + record_mock.http_headers = {} + + record_info = WarcRecordInfo(record_mock) + + self.assertEqual(record_info.payload_id, "payload_id") + self.assertEqual(record_info.target_uri, "http://example.com") + self.assertEqual(record_info.ctype, "application/json") + self.assertEqual(record_info.date, datetime.datetime(2023, 4, 20, 12, 0, 0)) + + def test_unsupported_record_type(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = "unsupported" + record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"} + record_mock.http_headers = {} + + with self.assertRaises(ValueError): + WarcRecordInfo(record_mock) + + def test_missing_headers(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = WarcRecordType.response + record_mock.headers = {} + record_mock.http_headers = {} + + with patch("dolma.warc.record_info.datetime") as datetime_mock: + now = datetime.datetime.now() + datetime_mock.datetime.now.return_value = now + record_info = WarcRecordInfo(record_mock) + assert not record_info.is_valid + + def test_content_type_with_extra_info(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = WarcRecordType.response + record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"} + record_mock.http_headers = { + "Content-Type": "text/html; charset=utf-8; boundary=---123", + "Date": "Thu, 20 Apr 2023 12:00:00 GMT", + } + + record_info = WarcRecordInfo(record_mock) + + self.assertEqual(record_info.ctype, "text/html") + + def test_invalid_date_format(self): + record_mock = MagicMock(spec=WarcRecord) + record_mock.record_type = WarcRecordType.response + record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"} + record_mock.http_headers = {"Content-Type": "text/html; charset=utf-8", "Date": "Invalid Date"} + + with patch("dolma.warc.record_info.datetime") as datetime_mock: + now = datetime.datetime.now() + datetime_mock.datetime.now.return_value = now + record_info = WarcRecordInfo(record_mock) + + # Assert that the date is "close enough" to now, since it's hard to mock perfectly + self.assertAlmostEqual(record_info.date.timestamp(), now.timestamp(), delta=1)