diff --git a/AGENTS.md b/AGENTS.md index 2d70072..1ef6a94 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -143,7 +143,7 @@ These must be preserved for compatibility with the JS Crawlee `MemoryStorage` on - Uses **PyO3 0.28** with **pyo3-async-runtimes** (tokio feature) for native Python coroutines. - Each Rust client is wrapped in `Arc` so it can be cloned into async blocks (standard pattern for pyo3 async methods). - JSON data crosses the FFI boundary as Python dicts/lists, converted to/from `serde_json::Value` via `value_to_py` / `py_to_value` helper functions. -- KVS binary values cross the FFI boundary as Python `bytes` ↔ `KvsValue::Binary(Vec)` directly — no base64 intermediary. +- KVS values are `bytes`-only in the Python bindings. `setValue` accepts `bytes` (PyO3 `Vec`) directly, and `getValue` returns raw file bytes as Python `bytes`. The caller is responsible for serialization/deserialization. - The compiled native module is `crawlee_storage._native`, re-exported by `crawlee_storage/__init__.py`. ### Node.js Bindings diff --git a/crawlee-storage-node/dts-header.d.ts b/crawlee-storage-node/dts-header.d.ts index 91fbfb7..08a8981 100644 --- a/crawlee-storage-node/dts-header.d.ts +++ b/crawlee-storage-node/dts-header.d.ts @@ -63,7 +63,13 @@ export interface ProcessedRequest { wasAlreadyHandled: boolean; } +export interface UnprocessedRequest { + uniqueKey: string; + url: string; + method?: string | null; +} + export interface AddRequestsResponse { processedRequests: ProcessedRequest[]; - unprocessedRequests: unknown[]; + unprocessedRequests: UnprocessedRequest[]; } diff --git a/crawlee-storage-node/index.d.ts b/crawlee-storage-node/index.d.ts index 7e19c1a..7b06bd3 100644 --- a/crawlee-storage-node/index.d.ts +++ b/crawlee-storage-node/index.d.ts @@ -63,9 +63,15 @@ export interface ProcessedRequest { wasAlreadyHandled: boolean; } +export interface UnprocessedRequest { + uniqueKey: string; + url: string; + method?: string | null; +} + export interface AddRequestsResponse { processedRequests: ProcessedRequest[]; - unprocessedRequests: unknown[]; + unprocessedRequests: UnprocessedRequest[]; } export declare class DatasetItemIterator { /** Fetch the next item. Returns null when iteration is exhausted. */ diff --git a/crawlee-storage-node/src/lib.rs b/crawlee-storage-node/src/lib.rs index 06a64b0..e52d475 100644 --- a/crawlee-storage-node/src/lib.rs +++ b/crawlee-storage-node/src/lib.rs @@ -87,7 +87,7 @@ impl FileSystemDatasetClient { .inner .get_data( offset.unwrap_or(0) as usize, - limit.unwrap_or(999_999_999) as usize, + limit.map_or(999_999_999_999_usize, |l| l as usize), desc.unwrap_or(false), skip_empty.unwrap_or(false), ) diff --git a/crawlee-storage-python/python/crawlee_storage/_native/__init__.pyi b/crawlee-storage-python/python/crawlee_storage/_native/__init__.pyi index b6a6e88..fb8e525 100644 --- a/crawlee-storage-python/python/crawlee_storage/_native/__init__.pyi +++ b/crawlee-storage-python/python/crawlee_storage/_native/__init__.pyi @@ -44,9 +44,9 @@ class KeyValueStoreRecordMetadata(typing.TypedDict): class KeyValueStoreRecord(typing.TypedDict): key: builtins.str - content_type: builtins.str + contentType: builtins.str size: typing.Optional[builtins.int] - value: typing.Any + value: builtins.bytes class RequestQueueMetadata(typing.TypedDict): id: builtins.str @@ -150,7 +150,10 @@ class FileSystemKeyValueStoreClient: async def purge(self) -> None: ... async def get_value(self, key: builtins.str) -> typing.Optional[KeyValueStoreRecord]: ... async def set_value( - self, key: builtins.str, value: typing.Any, content_type: typing.Optional[builtins.str] = None + self, + key: builtins.str, + value: typing.Sequence[builtins.int], + content_type: typing.Optional[builtins.str] = None, ) -> None: ... async def delete_value(self, key: builtins.str) -> None: ... def iterate_keys( diff --git a/crawlee-storage-python/src/bin/stub_gen.rs b/crawlee-storage-python/src/bin/stub_gen.rs index 264e4dc..0a60626 100644 --- a/crawlee-storage-python/src/bin/stub_gen.rs +++ b/crawlee-storage-python/src/bin/stub_gen.rs @@ -234,7 +234,7 @@ fn generate_typed_dicts() -> String { py_type: "builtins.str".into(), }, TypedDictField { - name: "content_type".into(), + name: "contentType".into(), py_type: "builtins.str".into(), }, TypedDictField { @@ -243,7 +243,7 @@ fn generate_typed_dicts() -> String { }, TypedDictField { name: "value".into(), - py_type: "typing.Any".into(), + py_type: "builtins.bytes".into(), }, ], }, diff --git a/crawlee-storage-python/src/lib.rs b/crawlee-storage-python/src/lib.rs index 1400811..0154e9e 100644 --- a/crawlee-storage-python/src/lib.rs +++ b/crawlee-storage-python/src/lib.rs @@ -122,25 +122,21 @@ fn metadata_to_py(py: Python<'_>, meta: &T) -> PyResult, - record: &crawlee_storage::models::KeyValueStoreRecord, + key: &str, + content_type: &str, + size: Option, + data: &[u8], ) -> PyResult> { - use crawlee_storage::models::KvsValue; use pyo3::IntoPyObject; let dict = PyDict::new(py); - dict.set_item("key", &record.key)?; - dict.set_item("content_type", &record.content_type)?; - dict.set_item("size", record.size)?; - - match &record.value { - KvsValue::None => dict.set_item("value", py.None())?, - KvsValue::Json(v) => dict.set_item("value", value_to_py(py, v)?)?, - KvsValue::Text(s) => dict.set_item("value", s)?, - KvsValue::Binary(bytes) => dict.set_item("value", pyo3::types::PyBytes::new(py, bytes))?, - } + dict.set_item("key", key)?; + dict.set_item("contentType", content_type)?; + dict.set_item("size", size)?; + dict.set_item("value", pyo3::types::PyBytes::new(py, data))?; Ok(dict.into_pyobject(py)?.into_any().unbind()) } @@ -522,11 +518,18 @@ impl FileSystemKeyValueStoreClient { fn get_value<'py>(&self, py: Python<'py>, key: String) -> PyResult> { let client = self.inner.clone(); pyo3_async_runtimes::tokio::future_into_py(py, async move { - let record = client.get_value(&key).await.map_err(storage_err)?; - Python::attach(|py| match record { - Some(r) => record_to_py(py, &r), - None => Ok(py.None()), - }) + let result = client.get_value_file(&key).await.map_err(storage_err)?; + match result { + Some((path, meta)) => { + let data = tokio::fs::read(&path) + .await + .map_err(|e| storage_err(e.into()))?; + Python::attach(|py| { + record_file_to_py(py, &key, &meta.content_type, meta.size, &data) + }) + } + None => Ok(Python::attach(|py| py.None())), + } }) } @@ -536,47 +539,14 @@ impl FileSystemKeyValueStoreClient { &self, py: Python<'py>, key: String, - value: &Bound<'py, pyo3::PyAny>, + value: Vec, content_type: Option, ) -> PyResult> { - use crawlee_storage::models::KvsValue; - - // Convert Python value → KvsValue directly, no base64 intermediary. - let is_bytes = value.is_instance_of::() - || value.is_instance_of::(); - - let (kvs_value, content_type) = if value.is_none() { - (KvsValue::None, content_type) - } else if is_bytes { - let bytes = value.extract::>()?; - let ct = content_type.or_else(|| Some("application/octet-stream".to_string())); - (KvsValue::Binary(bytes), ct) - } else if let Some(ref ct) = content_type { - if ct.starts_with("text/") { - // Caller explicitly requested text content type — extract as string. - let s: String = value - .extract() - .or_else(|_| value.str().and_then(|s| s.extract()))?; - (KvsValue::Text(s), content_type) - } else { - // Explicit content type that isn't text — treat value as JSON. - let val = py_to_value(value)?; - (KvsValue::Json(val), content_type) - } - } else { - // No content type given, not bytes, not None — infer from Python type. - if let Ok(s) = value.extract::() { - (KvsValue::Text(s), None) - } else { - let val = py_to_value(value)?; - (KvsValue::Json(val), None) - } - }; - + let ct = content_type.unwrap_or_else(|| "application/octet-stream".to_string()); let client = self.inner.clone(); pyo3_async_runtimes::tokio::future_into_py(py, async move { client - .set_value(&key, kvs_value, content_type) + .set_value_raw(&key, &value, ct) .await .map_err(storage_err)?; Ok(())