feat: add native GCS watch directory support#5
Conversation
- Add GCSWatchSource in gcs_watcher.py for listing/downloading GCS blobs - Modify scan_and_process_once() to detect gs:// URIs and process them - Dedup key = gs://bucket/path (stable across runs, not temp file path) - Add 'gcs' optional dependency: pip install dmaf[gcs] - Temp files cleaned up after processing (finally block) - Clear ImportError if google-cloud-storage not installed - Local directory scanning unchanged - Add tests with mocked GCS client
There was a problem hiding this comment.
Pull request overview
Adds support for scanning Google Cloud Storage (GCS) buckets/prefixes as “watch directories” during batch scans, so images can be listed/downloaded from gs://... sources and deduplicated using stable GCS paths rather than local temp file paths.
Changes:
- Introduces
dmaf.gcs_watcherto parsegs://URIs, list image blobs, download blobs to temp files, and clean up temps. - Refactors
scan_and_process_once()to detectgs://inputs and process GCS blobs while usinggs://bucket/objectas the dedup key. - Adds tests for URI parsing/cleanup and an integration-style scan test using a mocked GCS client; updates extras and cloud config example.
Reviewed changes
Copilot reviewed 5 out of 5 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
tests/test_gcs_watcher.py |
Adds unit/integration tests for GCS URI helpers and scan-once dedup behavior. |
src/dmaf/watcher.py |
Adds shared image-processing helper and extends scan-once to support gs:// sources and stable dedup keys. |
src/dmaf/gcs_watcher.py |
New module for GCS listing/downloading and temp file cleanup. |
pyproject.toml |
Adds gcs optional dependency and includes it in all. |
config.cloud.example.yaml |
Documents use of gs:// watch dirs and dedup behavior for cloud runs. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| client = _get_storage_client() | ||
| bucket_name, prefix = parse_gcs_uri(uri) | ||
| bucket = client.bucket(bucket_name) | ||
|
|
||
| gcs_paths = [] | ||
| for blob in bucket.list_blobs(prefix=prefix): | ||
| # Skip "directory" markers | ||
| if blob.name.endswith("/"): | ||
| continue | ||
| suffix = Path(blob.name).suffix.lower() | ||
| if suffix in IMAGE_EXTENSIONS: | ||
| gcs_paths.append(f"gs://{bucket_name}/{blob.name}") | ||
| return gcs_paths | ||
|
|
||
|
|
||
| def download_gcs_blob(gcs_path: str) -> Path: | ||
| """ | ||
| Download a GCS blob to a temporary file. | ||
|
|
||
| Args: | ||
| gcs_path: Full GCS path like 'gs://bucket/path/to/image.jpg' | ||
|
|
||
| Returns: | ||
| Path to the downloaded temporary file. Caller must clean up with cleanup_temp_file(). | ||
| """ | ||
| client = _get_storage_client() | ||
| bucket_name, blob_name = parse_gcs_uri(gcs_path) | ||
| # blob_name from parse_gcs_uri is the prefix, but for a full path it's the object key | ||
| bucket = client.bucket(bucket_name) | ||
| blob = bucket.blob(blob_name) | ||
|
|
There was a problem hiding this comment.
_get_storage_client() is called inside both list_gcs_images() and download_gcs_blob(), and scan_and_process_once() calls download_gcs_blob() per blob. This will create a new GCS client for every object download, which is expensive and can become a bottleneck. Consider caching the client (e.g., module-level singleton or @functools.lru_cache) and/or passing a client/bucket object through to avoid repeated initialization.
| logger.info(f"Match {Path(dedup_key).name} -> {who}") | ||
| try: | ||
| handler.on_match(image_path, who) | ||
| if handler.cfg.delete_source_after_upload: | ||
| try: | ||
| image_path.unlink() | ||
| logger.info(f"Deleted source: {image_path.name}") | ||
| except Exception as e: | ||
| logger.warning(f"Failed to delete {image_path.name}: {e}") | ||
| except Exception as e: | ||
| logger.error(f"Upload failed for {Path(dedup_key).name}: {e}") | ||
| had_error = True | ||
| if handler.alert_manager: | ||
| handler.alert_manager.record_error("upload", str(e), dedup_key) |
There was a problem hiding this comment.
For GCS processing, dedup_key is the gs:// path but on_match() is still called with image_path (the downloaded temp file). Existing on_match implementations (e.g., in main.py) call db_conn.mark_uploaded(str(p)), which will use the temp path rather than the gs:// dedup key; for the Firestore backend this can raise (document not found) and cause uploads to be treated as failures. Consider passing dedup_key into on_match (or moving mark_uploaded(dedup_key) into _process_image_file after a successful upload) so uploaded status is recorded against the same key that was added to the DB.
| Supports both local directories and GCS URIs (gs://bucket/prefix/). | ||
| For GCS URIs, the dedup key is the full gs:// path (not the temp file path). | ||
|
|
||
| Args: | ||
| dirs: List of directory paths to scan | ||
| dirs: List of directory paths or GCS URIs to scan |
There was a problem hiding this comment.
scan_and_process_once() now supports gs:// URIs, but run_watch() (continuous watcher mode) still treats every entry as a local filesystem path (Path(d).mkdir / watchdog schedule). If a user configures watch_dirs with a gs:// entry and runs without --scan-once, this will likely create invalid local directories or error. Consider rejecting/ignoring gs:// entries in run_watch() (or documenting that GCS is scan-once only).
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
Fix PR #5 CI failures in GCS temp-file handling and watcher batch-mode tests
📝 Description
Brief description of the changes in this PR.
🔗 Related Issue
Fixes #(issue number)
🔄 Type of Change
✅ Checklist
📸 Screenshots (if applicable)
Add screenshots to help explain your changes.
📝 Additional Notes
Any additional information that reviewers should know.