Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ These must be preserved for compatibility with the JS Crawlee `MemoryStorage` on
- Uses **PyO3 0.28** with **pyo3-async-runtimes** (tokio feature) for native Python coroutines.
- Each Rust client is wrapped in `Arc` so it can be cloned into async blocks (standard pattern for pyo3 async methods).
- JSON data crosses the FFI boundary as Python dicts/lists, converted to/from `serde_json::Value` via `value_to_py` / `py_to_value` helper functions.
- KVS binary values cross the FFI boundary as Python `bytes` ↔ `KvsValue::Binary(Vec<u8>)` directly — no base64 intermediary.
- KVS values are `bytes`-only in the Python bindings. `setValue` accepts `bytes` (PyO3 `Vec<u8>`) directly, and `getValue` returns raw file bytes as Python `bytes`. The caller is responsible for serialization/deserialization.
- The compiled native module is `crawlee_storage._native`, re-exported by `crawlee_storage/__init__.py`.

### Node.js Bindings
Expand Down
8 changes: 7 additions & 1 deletion crawlee-storage-node/dts-header.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,13 @@ export interface ProcessedRequest {
wasAlreadyHandled: boolean;
}

export interface UnprocessedRequest {
uniqueKey: string;
url: string;
method?: string | null;
}

export interface AddRequestsResponse {
processedRequests: ProcessedRequest[];
unprocessedRequests: unknown[];
unprocessedRequests: UnprocessedRequest[];
}
8 changes: 7 additions & 1 deletion crawlee-storage-node/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,15 @@ export interface ProcessedRequest {
wasAlreadyHandled: boolean;
}

export interface UnprocessedRequest {
uniqueKey: string;
url: string;
method?: string | null;
}

export interface AddRequestsResponse {
processedRequests: ProcessedRequest[];
unprocessedRequests: unknown[];
unprocessedRequests: UnprocessedRequest[];
}
export declare class DatasetItemIterator {
/** Fetch the next item. Returns null when iteration is exhausted. */
Expand Down
2 changes: 1 addition & 1 deletion crawlee-storage-node/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ impl FileSystemDatasetClient {
.inner
.get_data(
offset.unwrap_or(0) as usize,
limit.unwrap_or(999_999_999) as usize,
limit.map_or(999_999_999_999_usize, |l| l as usize),
desc.unwrap_or(false),
skip_empty.unwrap_or(false),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class KeyValueStoreRecordMetadata(typing.TypedDict):

class KeyValueStoreRecord(typing.TypedDict):
key: builtins.str
content_type: builtins.str
contentType: builtins.str
size: typing.Optional[builtins.int]
value: typing.Any
value: builtins.bytes

class RequestQueueMetadata(typing.TypedDict):
id: builtins.str
Expand Down Expand Up @@ -150,7 +150,10 @@ class FileSystemKeyValueStoreClient:
async def purge(self) -> None: ...
async def get_value(self, key: builtins.str) -> typing.Optional[KeyValueStoreRecord]: ...
async def set_value(
self, key: builtins.str, value: typing.Any, content_type: typing.Optional[builtins.str] = None
self,
key: builtins.str,
value: typing.Sequence[builtins.int],
content_type: typing.Optional[builtins.str] = None,
) -> None: ...
async def delete_value(self, key: builtins.str) -> None: ...
def iterate_keys(
Expand Down
4 changes: 2 additions & 2 deletions crawlee-storage-python/src/bin/stub_gen.rs
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ fn generate_typed_dicts() -> String {
py_type: "builtins.str".into(),
},
TypedDictField {
name: "content_type".into(),
name: "contentType".into(),
py_type: "builtins.str".into(),
},
TypedDictField {
Expand All @@ -243,7 +243,7 @@ fn generate_typed_dicts() -> String {
},
TypedDictField {
name: "value".into(),
py_type: "typing.Any".into(),
py_type: "builtins.bytes".into(),
},
],
},
Expand Down
80 changes: 25 additions & 55 deletions crawlee-storage-python/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,25 +122,21 @@ fn metadata_to_py<T: serde::Serialize>(py: Python<'_>, meta: &T) -> PyResult<Py<
value_to_py(py, &val)
}

/// Convert a KVS record to a Python dict, mapping [`KvsValue`] variants to native Python types.
fn record_to_py(
/// Convert a KVS file record (raw bytes) to a Python dict with `bytes` value.
fn record_file_to_py(
py: Python<'_>,
record: &crawlee_storage::models::KeyValueStoreRecord,
key: &str,
content_type: &str,
size: Option<usize>,
data: &[u8],
) -> PyResult<Py<PyAny>> {
use crawlee_storage::models::KvsValue;
use pyo3::IntoPyObject;

let dict = PyDict::new(py);
dict.set_item("key", &record.key)?;
dict.set_item("content_type", &record.content_type)?;
dict.set_item("size", record.size)?;

match &record.value {
KvsValue::None => dict.set_item("value", py.None())?,
KvsValue::Json(v) => dict.set_item("value", value_to_py(py, v)?)?,
KvsValue::Text(s) => dict.set_item("value", s)?,
KvsValue::Binary(bytes) => dict.set_item("value", pyo3::types::PyBytes::new(py, bytes))?,
}
dict.set_item("key", key)?;
dict.set_item("contentType", content_type)?;
dict.set_item("size", size)?;
dict.set_item("value", pyo3::types::PyBytes::new(py, data))?;

Ok(dict.into_pyobject(py)?.into_any().unbind())
}
Expand Down Expand Up @@ -522,11 +518,18 @@ impl FileSystemKeyValueStoreClient {
fn get_value<'py>(&self, py: Python<'py>, key: String) -> PyResult<Bound<'py, pyo3::PyAny>> {
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let record = client.get_value(&key).await.map_err(storage_err)?;
Python::attach(|py| match record {
Some(r) => record_to_py(py, &r),
None => Ok(py.None()),
})
let result = client.get_value_file(&key).await.map_err(storage_err)?;
match result {
Some((path, meta)) => {
let data = tokio::fs::read(&path)
.await
.map_err(|e| storage_err(e.into()))?;
Python::attach(|py| {
record_file_to_py(py, &key, &meta.content_type, meta.size, &data)
})
}
None => Ok(Python::attach(|py| py.None())),
}
})
}

Expand All @@ -536,47 +539,14 @@ impl FileSystemKeyValueStoreClient {
&self,
py: Python<'py>,
key: String,
value: &Bound<'py, pyo3::PyAny>,
value: Vec<u8>,
content_type: Option<String>,
) -> PyResult<Bound<'py, pyo3::PyAny>> {
use crawlee_storage::models::KvsValue;

// Convert Python value → KvsValue directly, no base64 intermediary.
let is_bytes = value.is_instance_of::<pyo3::types::PyBytes>()
|| value.is_instance_of::<pyo3::types::PyByteArray>();

let (kvs_value, content_type) = if value.is_none() {
(KvsValue::None, content_type)
} else if is_bytes {
let bytes = value.extract::<Vec<u8>>()?;
let ct = content_type.or_else(|| Some("application/octet-stream".to_string()));
(KvsValue::Binary(bytes), ct)
} else if let Some(ref ct) = content_type {
if ct.starts_with("text/") {
// Caller explicitly requested text content type — extract as string.
let s: String = value
.extract()
.or_else(|_| value.str().and_then(|s| s.extract()))?;
(KvsValue::Text(s), content_type)
} else {
// Explicit content type that isn't text — treat value as JSON.
let val = py_to_value(value)?;
(KvsValue::Json(val), content_type)
}
} else {
// No content type given, not bytes, not None — infer from Python type.
if let Ok(s) = value.extract::<String>() {
(KvsValue::Text(s), None)
} else {
let val = py_to_value(value)?;
(KvsValue::Json(val), None)
}
};

let ct = content_type.unwrap_or_else(|| "application/octet-stream".to_string());
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
client
.set_value(&key, kvs_value, content_type)
.set_value_raw(&key, &value, ct)
.await
.map_err(storage_err)?;
Ok(())
Expand Down
Loading