Skip to content

Commit

Permalink
Merge pull request #248 from allenai/warc_resource_support
Browse files Browse the repository at this point in the history
Support for WARC resource record types.
  • Loading branch information
no0p authored Mar 4, 2025
2 parents a1755cd + 2adbaa1 commit ca1727f
Show file tree
Hide file tree
Showing 3 changed files with 156 additions and 23 deletions.
37 changes: 14 additions & 23 deletions python/dolma/warc/processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# from .documents import WarcDocument, WarcDocumentMetadata
# from .filters import FilterInputType, partition_extractors
from .linearizers import LinearizerRegistry
from .record_info import WarcRecordInfo
from .utils import UrlNormalizer, raise_warc_dependency_error

with necessary("fastwarc", soft=True) as FASTWARC_AVAILABLE:
Expand All @@ -28,10 +29,7 @@

with necessary("dateparser", soft=True) as DATEPARSER_AVAILABLE:
if DATEPARSER_AVAILABLE or TYPE_CHECKING:
import dateparser


DATE_FORMATS = ["%a, %d %b %Y %H:%M:%S %Z", "%Y-%m-%dT%H:%M:%SZ"]
import dateparser # noqa: F401


class WarcProcessor(BaseParallelProcessor):
Expand All @@ -51,14 +49,6 @@ def _format_to_dolma_timestamp(timestamp: Optional[datetime.datetime] = None) ->
timestamp = datetime.datetime.now()
return timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f")[:23] + "Z"

@staticmethod
def _parse_warc_timestamp(timestamp_str: Optional[str]) -> datetime.datetime:
"""Parse a WARC timestamp into a datetime object."""
if not timestamp_str:
return datetime.datetime.now()

return dateparser.parse(date_string=timestamp_str, date_formats=DATE_FORMATS) or datetime.datetime.now()

@classmethod
def increment_progressbar( # type: ignore
cls,
Expand Down Expand Up @@ -138,7 +128,9 @@ def process_single(
smart_open.open(source_path, "rb") as warc_file,
smart_open.open(destination_path, "wb") as output_file,
):
it = ArchiveIterator(warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo)
it = ArchiveIterator(
warc_file, record_types=WarcRecordType.response | WarcRecordType.warcinfo | WarcRecordType.resource
)
for record in it:
if record.record_type == WarcRecordType.warcinfo:
warc_date = record.record_date or None
Expand All @@ -164,24 +156,23 @@ def process_single(
if not decoded_content:
continue

# metadata
ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";")
date = cls._parse_warc_timestamp(record.http_headers.get("Date"))
target_uri = record.headers.get("WARC-Target-URI")
payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower()
warc_record_info = WarcRecordInfo(record)
if not warc_record_info.is_valid:
continue

metadata = dict(
warc_url=target_uri,
url=url_normalizer(target_uri),
warc_url=warc_record_info.target_uri,
url=url_normalizer(warc_record_info.target_uri),
html=decoded_content,
warc_date=cls._format_to_dolma_timestamp(warc_date),
warc_filename=warc_filename or "",
content_type=ctype,
content_type=warc_record_info.ctype,
uncompressed_offset=record.stream_pos,
)
doc = InputSpecWithMetadataAndAttributes(
source=source_name,
version=source_version,
id=payload_id,
id=warc_record_info.payload_id,
text="", # this will come later
metadata=metadata,
)
Expand All @@ -205,7 +196,7 @@ def process_single(
for a_name, attr_values in attributes.items()
}

doc.created = cls._format_to_dolma_timestamp(date)
doc.created = cls._format_to_dolma_timestamp(warc_record_info.date)
doc.added = cls._format_to_dolma_timestamp(date_now)

if not store_html_in_metadata:
Expand Down
51 changes: 51 additions & 0 deletions python/dolma/warc/record_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import datetime
from typing import TYPE_CHECKING, Optional

from fastwarc.warc import WarcRecordType
from necessary import necessary

with necessary("dateparser", soft=True) as DATEPARSER_AVAILABLE:
if DATEPARSER_AVAILABLE or TYPE_CHECKING:
import dateparser


DATE_FORMATS = ["%a, %d %b %Y %H:%M:%S %Z", "%Y-%m-%dT%H:%M:%SZ"]


class WarcRecordInfo:
def __init__(self, record):
self.record = record

if not self.is_valid:
return None

self.payload_id = record.headers.get("WARC-Payload-Digest").split(":")[1].lower()
self.target_uri = record.headers.get("WARC-Target-URI")

if record.record_type == WarcRecordType.response:
ctype, *_ = (record.http_headers.get("Content-Type") or "").split(";")
self.ctype = ctype
self.date = WarcRecordInfo._parse_warc_timestamp(record.http_headers.get("Date"))
elif record.record_type == WarcRecordType.resource:
self.ctype, *_ = (record.headers.get("Content-Type") or "").split(";")
self.date = WarcRecordInfo._parse_warc_timestamp(record.headers.get("WARC-Date"))
else:
raise ValueError(f"Unsupported record type: {record.record_type}")

@property
def is_valid(self) -> bool:
if not self.record.headers.get("WARC-Payload-Digest"):
return False

if not self.record.headers.get("WARC-Target-URI"):
return False

return True

@staticmethod
def _parse_warc_timestamp(timestamp_str: Optional[str]) -> datetime.datetime:
"""Parse a WARC timestamp into a datetime object."""
if not timestamp_str:
return datetime.datetime.now()

return dateparser.parse(date_string=timestamp_str, date_formats=DATE_FORMATS) or datetime.datetime.now()
91 changes: 91 additions & 0 deletions tests/python/test_warc_record_info.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import datetime
from unittest import TestCase
from unittest.mock import MagicMock, patch

from fastwarc.warc import WarcRecord, WarcRecordType

from dolma.warc.record_info import WarcRecordInfo


class TestWarcRecordInfo(TestCase):
def test_response_record(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = WarcRecordType.response
record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"}
record_mock.http_headers = {
"Content-Type": "text/html; charset=utf-8",
"Date": "Thu, 20 Apr 2023 12:00:00 GMT",
}

record_info = WarcRecordInfo(record_mock)

self.assertEqual(record_info.payload_id, "payload_id")
self.assertEqual(record_info.target_uri, "http://example.com")
self.assertEqual(record_info.ctype, "text/html")
self.assertEqual(record_info.date, datetime.datetime(2023, 4, 20, 12, 0, 0))

def test_resource_record(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = WarcRecordType.resource
record_mock.headers = {
"WARC-Payload-Digest": "sha1:payload_id",
"WARC-Target-URI": "http://example.com",
"Content-Type": "application/json",
"WARC-Date": "2023-04-20T12:00:00Z",
}
record_mock.http_headers = {}

record_info = WarcRecordInfo(record_mock)

self.assertEqual(record_info.payload_id, "payload_id")
self.assertEqual(record_info.target_uri, "http://example.com")
self.assertEqual(record_info.ctype, "application/json")
self.assertEqual(record_info.date, datetime.datetime(2023, 4, 20, 12, 0, 0))

def test_unsupported_record_type(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = "unsupported"
record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"}
record_mock.http_headers = {}

with self.assertRaises(ValueError):
WarcRecordInfo(record_mock)

def test_missing_headers(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = WarcRecordType.response
record_mock.headers = {}
record_mock.http_headers = {}

with patch("dolma.warc.record_info.datetime") as datetime_mock:
now = datetime.datetime.now()
datetime_mock.datetime.now.return_value = now
record_info = WarcRecordInfo(record_mock)
assert not record_info.is_valid

def test_content_type_with_extra_info(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = WarcRecordType.response
record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"}
record_mock.http_headers = {
"Content-Type": "text/html; charset=utf-8; boundary=---123",
"Date": "Thu, 20 Apr 2023 12:00:00 GMT",
}

record_info = WarcRecordInfo(record_mock)

self.assertEqual(record_info.ctype, "text/html")

def test_invalid_date_format(self):
record_mock = MagicMock(spec=WarcRecord)
record_mock.record_type = WarcRecordType.response
record_mock.headers = {"WARC-Payload-Digest": "sha1:payload_id", "WARC-Target-URI": "http://example.com"}
record_mock.http_headers = {"Content-Type": "text/html; charset=utf-8", "Date": "Invalid Date"}

with patch("dolma.warc.record_info.datetime") as datetime_mock:
now = datetime.datetime.now()
datetime_mock.datetime.now.return_value = now
record_info = WarcRecordInfo(record_mock)

# Assert that the date is "close enough" to now, since it's hard to mock perfectly
self.assertAlmostEqual(record_info.date.timestamp(), now.timestamp(), delta=1)

0 comments on commit ca1727f

Please sign in to comment.