diff --git a/Makefile b/Makefile
index 29ead49f..c4384238 100644
--- a/Makefile
+++ b/Makefile
@@ -4,27 +4,35 @@
# Pre-requisite: Python virtual environment is active (source .venv/bin/activate)
# Run Pytest tests in virtualenv: .venv/bin/pytest tests/test_unit_tests.py -v
+# Removes build artifacts, distribution files, and other generated content
clean:
rm -rf artifacts/ build/ dist/
+# Performs a complete cleanup including uninstalling the c2pa package and clearing pip cache
clean-c2pa-env: clean
python3 -m pip uninstall -y c2pa
python3 -m pip cache purge
+# Installs all required dependencies from requirements.txt and requirements-dev.txt
install-deps:
python3 -m pip install -r requirements.txt
python3 -m pip install -r requirements-dev.txt
pip install -e .
+# Installs the package in development mode
build-python:
pip install -e .
+# Performs a complete rebuild of the development environment
rebuild: clean-c2pa-env install-deps download-native-artifacts build-python
@echo "Development rebuild done!"
+# Runs the unit tests
test:
python3 ./tests/test_unit_tests.py
+# Tests building and installing a local wheel package
+# Downloads required artifacts, builds the wheel, installs it, and verifies the installation
test-local-wheel-build:
# Clean any existing builds
rm -rf build/ dist/
@@ -41,6 +49,8 @@ test-local-wheel-build:
# Verify wheel structure
twine check dist/*
+# Tests building and installing a local source distribution package
+# Downloads required artifacts, builds the sdist, installs it, and verifies the installation
test-local-sdist-build:
# Clean any existing builds
rm -rf build/ dist/
@@ -58,17 +68,21 @@ test-local-sdist-build:
# Verify sdist structure
twine check dist/*
+# Verifies the wheel build process and checks the built package and its metadata
verify-wheel-build:
rm -rf build/ dist/ src/*.egg-info/
python -m build
twine check dist/*
+# Manually publishes the package to PyPI after creating a release
publish: release
python3 -m pip install twine
python3 -m twine upload dist/*
+# Formats Python source code using autopep8 with aggressive settings
format:
autopep8 --aggressive --aggressive --in-place src/c2pa/*.py
+# Downloads the required native artifacts for the specified version
download-native-artifacts:
python3 scripts/download_artifacts.py c2pa-v0.55.0
diff --git a/README.md b/README.md
index cb5a2cf2..3395ac8c 100644
--- a/README.md
+++ b/README.md
@@ -12,26 +12,29 @@ This project provides a Python API for working with [C2PA](https://c2pa.org/) (C
## Project Structure
```bash
-python_api/
- ├── examples/ # Example scripts demonstrating usage
- ├── src/ # Source code for the C2PA Python API
- │ └── c2pa/ # Main package directory
- │ └── libs/ # Platform-specific libraries
- ├── tests/ # Unit tests and benchmarks
- ├── artifacts/ # Platform-specific libraries for building
- │ ├── win_amd64/ # Windows x64 libraries
- │ ├── win_arm64/ # Windows ARM64 libraries
- │ ├── macosx_x86_64/ # macOS x64 libraries
- │ ├── macosx_arm64/ # macOS ARM64 libraries
- │ ├── linux_x86_64/ # Linux x64 libraries
- │ └── linux_aarch64/ # Linux ARM64 libraries
- ├── requirements.txt # Python dependencies
- └── README.md # Project documentation
+.
+├── .github/ # GitHub configuration files
+├── artifacts/ # Platform-specific libraries for building (per subfolder)
+│ └── linux_x86_64/ # Linux ARM64 libraries
+├── docs/ # Project documentation
+├── examples/ # Example scripts demonstrating usage
+├── scripts/ # Utility scripts (eg. artifacts download)
+├── src/ # Source code
+│ └── c2pa/ # Main package directory
+│ └── libs/ # Platform-specific libraries
+├── tests/ # Unit tests and benchmarks
+├── .gitignore # Git ignore rules
+├── Makefile # Build and development commands
+├── pyproject.toml # Python project configuration
+├── requirements.txt # Python dependencies
+├── requirements-dev.txt # Development dependencies
+└── setup.py # Package setup script
```
## Development Setup
1. Create and activate a virtual environment:
+
```bash
# Create virtual environment
python -m venv .venv
@@ -43,19 +46,20 @@ python -m venv .venv
source .venv/bin/activate
# load project dependencies
-pip install -r requirements.txt
+pip install -r requirements.txt
-# download library artifacts for the current version you want
-python scripts/download_artifacts.py c2pa-v0.49.5
+# download library artifacts for the current version you want, eg v0.55.0
+python scripts/download_artifacts.py c2pa-v0.55.0
```
-
2. Install the package in development mode:
+
```bash
pip install -e .
```
This will:
+
- Copy the appropriate libraries for your platform from `artifacts/` to `src/c2pa/libs/`
- Install the package in development mode, allowing you to make changes to the Python code without reinstalling
@@ -67,16 +71,30 @@ To build wheels for all platforms that have libraries in the `artifacts/` direct
python setup.py bdist_wheel
```
+You can use `twine` to verify the wheels have correct metadata:
+
+```bash
+twine check dist/*
+```
+
This will create platform-specific wheels in the `dist/` directory.
## Running Tests
-Install pytest (if not already installed):
+Run the tests:
+
+```bash
+make test
+```
+
+Alternatively, install pytest (if not already installed):
+
```bash
pip install pytest
```
-Run the tests:
+And run:
+
```bash
pytest
```
@@ -84,9 +102,11 @@ pytest
## Examples
### Adding a "Do Not Train" Assertion
+
The `examples/training.py` script demonstrates how to add a "Do Not Train" assertion to an asset and verify it.
### Signing and Verifying Assets
+
The `examples/test.py` script shows how to sign an asset with a C2PA manifest and verify it.
## Contributing
@@ -95,4 +115,4 @@ Contributions are welcome! Please fork the repository and submit a pull request.
## License
-This project is licensed under the Apache License 2.0 or the MIT License. See the LICENSE-MIT and LICENSE-APACHE files for details.
\ No newline at end of file
+This project is licensed under the Apache License 2.0 or the MIT License. See the LICENSE-MIT and LICENSE-APACHE files for details.
diff --git a/README_saved.md b/README_saved.md
deleted file mode 100644
index 59456388..00000000
--- a/README_saved.md
+++ /dev/null
@@ -1,59 +0,0 @@
-# C2PA Python library
-
-The [c2pa-python](https://github.com/contentauth/c2pa-python) repository implements Python bindings for the Content Authenticity Initiative (CAI) SDK.
-It enables you to read and validate C2PA manifest data from and add signed manifests to media files in supported formats.
-
-**NOTE**: Starting with version 0.5.0, this package has a completely different API from version 0.4.0. See [Release notes](docs/release-notes.md) for more information.
-
-**WARNING**: This is an prerelease version of this library. There may be bugs and unimplemented features, and the API is subject to change.
-
-
-
-Additional documentation:
-- [Using the Python library](docs/usage.md)
-- [Release notes](docs/release-notes.md)
-- [Contributing to the project](docs/project-contributions.md)
-
-
-
-## Installation
-
-Install from PyPI by entering this command:
-
-```bash
-pip install -U c2pa-python
-```
-
-This is a platform wheel built with Rust that works on Windows, macOS, and most Linux distributions (using [manylinux](https://github.com/pypa/manylinux)). If you need to run on another platform, see [Project contributions - Development](docs/project-contributions.md#development) for information on how to build from source.
-
-### Updating
-
-Determine what version you've got by entering this command:
-
-```bash
-pip list | grep c2pa-python
-```
-
-If the version shown is lower than the most recent version, then update by [reinstalling](#installation).
-
-### Reinstalling
-
-If you tried unsuccessfully to install this package before the [0.40 release](https://github.com/contentauth/c2pa-python/releases/tag/v0.4), then use this command to reinstall:
-
-```bash
-pip install --upgrade --force-reinstall c2pa-python
-```
-
-## Supported formats
-
-The Python library [supports the same media file formats](https://github.com/contentauth/c2pa-rs/blob/main/docs/supported-formats.md) as the Rust library.
-
-## License
-
-This package is distributed under the terms of both the [MIT license](https://github.com/contentauth/c2pa-python/blob/main/LICENSE-MIT) and the [Apache License (Version 2.0)](https://github.com/contentauth/c2pa-python/blob/main/LICENSE-APACHE).
-
-Note that some components and dependent crates are licensed under different terms; please check the license terms for each crate and component for details.
-
-### Contributions and feedback
-
-We welcome contributions to this project. For information on contributing, providing feedback, and about ongoing work, see [Contributing](https://github.com/contentauth/c2pa-python/blob/main/CONTRIBUTING.md).
diff --git a/requirements-dev.txt b/requirements-dev.txt
index 7f647197..cd916d6e 100644
--- a/requirements-dev.txt
+++ b/requirements-dev.txt
@@ -5,7 +5,8 @@ build==1.0.3 # For building packages using PEP 517/518
toml==0.10.2 # For reading pyproject.toml files
# Testing dependencies
-pytest==7.4.0
+pytest>=8.1.0
+pytest-benchmark>=5.1.0
# for downloading the library artifacts
requests>=2.0.0
diff --git a/setup.py b/setup.py
index 489c070d..7ab8a3dc 100644
--- a/setup.py
+++ b/setup.py
@@ -103,9 +103,6 @@ def copy_platform_libraries(platform_name, clean_first=False):
clean_first: If True, remove existing files in PACKAGE_LIBS_DIR first
"""
platform_dir = ARTIFACTS_DIR / platform_name
- print(" ")
- print('##### Found platform dir: ', platform_dir)
- print(" ")
# Ensure the platform directory exists and contains files
if not platform_dir.exists():
@@ -113,9 +110,6 @@ def copy_platform_libraries(platform_name, clean_first=False):
# Get list of all files in the platform directory
platform_files = list(platform_dir.glob('*'))
- print(" ")
- print('##### Platform files: ', platform_files)
- print(" ")
if not platform_files:
raise ValueError(f"No files found in platform directory: {platform_dir}")
@@ -124,9 +118,6 @@ def copy_platform_libraries(platform_name, clean_first=False):
shutil.rmtree(PACKAGE_LIBS_DIR)
# Ensure the package libs directory exists
- print(" ")
- print('##### Package libs dir will be in: ', PACKAGE_LIBS_DIR)
- print(" ")
PACKAGE_LIBS_DIR.mkdir(parents=True, exist_ok=True)
# Copy files from platform-specific directory to the package libs directory
@@ -189,9 +180,6 @@ def find_available_platforms():
print(f"\nBuilding wheel for {platform_name}...")
try:
# Copy libraries for this platform (cleaning first)
- print(" ")
- print('##### Preparing to copy libraries to where they belong')
- print(" ")
copy_platform_libraries(platform_name, clean_first=True)
# Build the wheel
diff --git a/src/c2pa/__init__.py b/src/c2pa/__init__.py
index f5cd278e..548f638b 100644
--- a/src/c2pa/__init__.py
+++ b/src/c2pa/__init__.py
@@ -7,6 +7,7 @@
C2paSigningAlg,
C2paSignerInfo,
Signer,
+ Stream,
sdk_version
) # NOQA
@@ -18,5 +19,6 @@
'C2paSigningAlg',
'C2paSignerInfo',
'Signer',
+ 'Stream',
'sdk_version'
-]
\ No newline at end of file
+]
diff --git a/src/c2pa/build.py b/src/c2pa/build.py
index e3b4c86f..032ca776 100644
--- a/src/c2pa/build.py
+++ b/src/c2pa/build.py
@@ -13,6 +13,7 @@
GITHUB_API_BASE = "https://api.github.com"
ARTIFACTS_DIR = Path("artifacts")
+
def get_latest_release() -> dict:
"""Get the latest release information from GitHub."""
url = f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/releases/latest"
@@ -20,6 +21,7 @@ def get_latest_release() -> dict:
response.raise_for_status()
return response.json()
+
def download_artifact(url: str, platform_name: str) -> None:
"""Download and extract an artifact to the appropriate platform directory."""
print(f"Downloading artifact for {platform_name}...")
@@ -37,7 +39,9 @@ def download_artifact(url: str, platform_name: str) -> None:
# Extract all files to the platform directory
zip_ref.extractall(platform_dir)
- print(f"Successfully downloaded and extracted artifacts for {platform_name}")
+ print(f"Successfully downloaded and extracted artifacts for {
+ platform_name}")
+
def download_artifacts() -> None:
"""Main function to download artifacts. Can be called as a script or from hatch."""
@@ -72,11 +76,22 @@ def download_artifacts() -> None:
print(f"Unexpected error: {e}", file=sys.stderr)
sys.exit(1)
+
def inject_version():
"""Inject the version from pyproject.toml into src/c2pa/__init__.py as __version__."""
import toml
- pyproject_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "pyproject.toml"))
- init_path = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "c2pa", "__init__.py"))
+ pyproject_path = os.path.abspath(
+ os.path.join(
+ os.path.dirname(__file__),
+ "..",
+ "..",
+ "pyproject.toml"))
+ init_path = os.path.abspath(
+ os.path.join(
+ os.path.dirname(__file__),
+ "..",
+ "c2pa",
+ "__init__.py"))
with open(pyproject_path, "r") as f:
pyproject = toml.load(f)
version = pyproject["project"]["version"]
@@ -89,11 +104,13 @@ def inject_version():
with open(init_path, "w") as f:
f.writelines(lines)
+
def initialize_build() -> None:
"""Initialize the build process by downloading artifacts."""
inject_version()
download_artifacts()
+
if __name__ == "__main__":
inject_version()
- download_artifacts()
\ No newline at end of file
+ download_artifacts()
diff --git a/src/c2pa/c2pa.py b/src/c2pa/c2pa.py
index 5f6fdfa4..fda5aa0a 100644
--- a/src/c2pa/c2pa.py
+++ b/src/c2pa/c2pa.py
@@ -43,6 +43,7 @@
'c2pa_signature_free',
]
+
def _validate_library_exports(lib):
"""Validate that all required functions are present in the loaded library.
@@ -82,6 +83,7 @@ def _validate_library_exports(lib):
"This could indicate an incomplete or corrupted library installation or a version mismatch between the library and this Python wrapper"
)
+
# Determine the library name based on the platform
if sys.platform == "win32":
_lib_name_default = "c2pa_c.dll"
@@ -101,12 +103,14 @@ def _validate_library_exports(lib):
_validate_library_exports(_lib)
+
class C2paSeekMode(enum.IntEnum):
"""Seek mode for stream operations."""
START = 0
CURRENT = 1
END = 2
+
class C2paSigningAlg(enum.IntEnum):
"""Supported signing algorithms."""
ES256 = 0
@@ -117,23 +121,44 @@ class C2paSigningAlg(enum.IntEnum):
PS512 = 5
ED25519 = 6
+
# Define callback types
-ReadCallback = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint8), ctypes.c_ssize_t)
-SeekCallback = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_void_p, ctypes.c_ssize_t, ctypes.c_int)
+ReadCallback = ctypes.CFUNCTYPE(
+ ctypes.c_ssize_t,
+ ctypes.c_void_p,
+ ctypes.POINTER(
+ ctypes.c_uint8),
+ ctypes.c_ssize_t)
+SeekCallback = ctypes.CFUNCTYPE(
+ ctypes.c_ssize_t,
+ ctypes.c_void_p,
+ ctypes.c_ssize_t,
+ ctypes.c_int)
# Additional callback types
-WriteCallback = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_void_p, ctypes.POINTER(ctypes.c_uint8), ctypes.c_ssize_t)
+WriteCallback = ctypes.CFUNCTYPE(
+ ctypes.c_ssize_t,
+ ctypes.c_void_p,
+ ctypes.POINTER(
+ ctypes.c_uint8),
+ ctypes.c_ssize_t)
FlushCallback = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_void_p)
-SignerCallback = ctypes.CFUNCTYPE(ctypes.c_ssize_t, ctypes.c_void_p, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_size_t, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_size_t)
+SignerCallback = ctypes.CFUNCTYPE(
+ ctypes.c_ssize_t, ctypes.c_void_p, ctypes.POINTER(
+ ctypes.c_ubyte), ctypes.c_size_t, ctypes.POINTER(
+ ctypes.c_ubyte), ctypes.c_size_t)
+
class StreamContext(ctypes.Structure):
"""Opaque structure for stream context."""
_fields_ = [] # Empty as it's opaque in the C API
+
class C2paSigner(ctypes.Structure):
"""Opaque structure for signer context."""
_fields_ = [] # Empty as it's opaque in the C API
+
class C2paStream(ctypes.Structure):
"""A C2paStream is a Rust Read/Write/Seek stream that can be created in C.
@@ -157,13 +182,19 @@ class C2paStream(ctypes.Structure):
between Python and the C2PA library without intermediate copies.
"""
_fields_ = [
- ("context", ctypes.POINTER(StreamContext)), # Opaque context pointer for the stream
- ("reader", ReadCallback), # Function to read data from the stream
- ("seeker", SeekCallback), # Function to change stream position
- ("writer", WriteCallback), # Function to write data to the stream
- ("flusher", FlushCallback), # Function to flush buffered data
+ # Opaque context pointer for the stream
+ ("context", ctypes.POINTER(StreamContext)),
+ # Function to read data from the stream
+ ("reader", ReadCallback),
+ # Function to change stream position
+ ("seeker", SeekCallback),
+ # Function to write data to the stream
+ ("writer", WriteCallback),
+ # Function to flush buffered data
+ ("flusher", FlushCallback),
]
+
class C2paSignerInfo(ctypes.Structure):
"""Configuration for a Signer."""
_fields_ = [
@@ -173,23 +204,32 @@ class C2paSignerInfo(ctypes.Structure):
("ta_url", ctypes.c_char_p),
]
+
class C2paReader(ctypes.Structure):
"""Opaque structure for reader context."""
_fields_ = [] # Empty as it's opaque in the C API
+
class C2paBuilder(ctypes.Structure):
"""Opaque structure for builder context."""
_fields_ = [] # Empty as it's opaque in the C API
# Helper function to set function prototypes
+
+
def _setup_function(func, argtypes, restype=None):
func.argtypes = argtypes
func.restype = restype
+
# Set up function prototypes
_setup_function(_lib.c2pa_create_stream,
- [ctypes.POINTER(StreamContext), ReadCallback, SeekCallback, WriteCallback, FlushCallback],
- ctypes.POINTER(C2paStream))
+ [ctypes.POINTER(StreamContext),
+ ReadCallback,
+ SeekCallback,
+ WriteCallback,
+ FlushCallback],
+ ctypes.POINTER(C2paStream))
# Add release_stream prototype
_setup_function(_lib.c2pa_release_stream, [ctypes.POINTER(C2paStream)], None)
@@ -198,78 +238,127 @@ def _setup_function(func, argtypes, restype=None):
_setup_function(_lib.c2pa_version, [], ctypes.c_void_p)
_setup_function(_lib.c2pa_error, [], ctypes.c_void_p)
_setup_function(_lib.c2pa_string_free, [ctypes.c_void_p], None)
-_setup_function(_lib.c2pa_load_settings, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_int)
-_setup_function(_lib.c2pa_read_file, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p)
-_setup_function(_lib.c2pa_read_ingredient_file, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p)
-_setup_function(_lib.c2pa_sign_file,
- [ctypes.c_char_p, ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paSignerInfo), ctypes.c_char_p],
- ctypes.c_void_p)
+_setup_function(
+ _lib.c2pa_load_settings, [
+ ctypes.c_char_p, ctypes.c_char_p], ctypes.c_int)
+_setup_function(
+ _lib.c2pa_read_file, [
+ ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p)
+_setup_function(
+ _lib.c2pa_read_ingredient_file, [
+ ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p)
+_setup_function(_lib.c2pa_sign_file,
+ [ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.POINTER(C2paSignerInfo),
+ ctypes.c_char_p],
+ ctypes.c_void_p)
# Set up Reader and Builder function prototypes
-_setup_function(_lib.c2pa_reader_from_stream,
- [ctypes.c_char_p, ctypes.POINTER(C2paStream)],
- ctypes.POINTER(C2paReader))
-_setup_function(_lib.c2pa_reader_from_manifest_data_and_stream,
- [ctypes.c_char_p, ctypes.POINTER(C2paStream), ctypes.POINTER(ctypes.c_ubyte), ctypes.c_size_t],
- ctypes.POINTER(C2paReader))
+_setup_function(_lib.c2pa_reader_from_stream,
+ [ctypes.c_char_p, ctypes.POINTER(C2paStream)],
+ ctypes.POINTER(C2paReader))
+_setup_function(
+ _lib.c2pa_reader_from_manifest_data_and_stream, [
+ ctypes.c_char_p, ctypes.POINTER(C2paStream), ctypes.POINTER(
+ ctypes.c_ubyte), ctypes.c_size_t], ctypes.POINTER(C2paReader))
_setup_function(_lib.c2pa_reader_free, [ctypes.POINTER(C2paReader)], None)
-_setup_function(_lib.c2pa_reader_json, [ctypes.POINTER(C2paReader)], ctypes.c_void_p)
-_setup_function(_lib.c2pa_reader_resource_to_stream,
- [ctypes.POINTER(C2paReader), ctypes.c_char_p, ctypes.POINTER(C2paStream)],
- ctypes.c_int64)
+_setup_function(
+ _lib.c2pa_reader_json, [
+ ctypes.POINTER(C2paReader)], ctypes.c_void_p)
+_setup_function(_lib.c2pa_reader_resource_to_stream, [ctypes.POINTER(
+ C2paReader), ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int64)
# Set up Builder function prototypes
-_setup_function(_lib.c2pa_builder_from_json, [ctypes.c_char_p], ctypes.POINTER(C2paBuilder))
-_setup_function(_lib.c2pa_builder_from_archive, [ctypes.POINTER(C2paStream)], ctypes.POINTER(C2paBuilder))
+_setup_function(
+ _lib.c2pa_builder_from_json, [
+ ctypes.c_char_p], ctypes.POINTER(C2paBuilder))
+_setup_function(_lib.c2pa_builder_from_archive,
+ [ctypes.POINTER(C2paStream)],
+ ctypes.POINTER(C2paBuilder))
_setup_function(_lib.c2pa_builder_free, [ctypes.POINTER(C2paBuilder)], None)
-_setup_function(_lib.c2pa_builder_set_no_embed, [ctypes.POINTER(C2paBuilder)], None)
-_setup_function(_lib.c2pa_builder_set_remote_url, [ctypes.POINTER(C2paBuilder), ctypes.c_char_p], ctypes.c_int)
-_setup_function(_lib.c2pa_builder_add_resource,
- [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream)],
- ctypes.c_int)
+_setup_function(_lib.c2pa_builder_set_no_embed, [
+ ctypes.POINTER(C2paBuilder)], None)
+_setup_function(
+ _lib.c2pa_builder_set_remote_url, [
+ ctypes.POINTER(C2paBuilder), ctypes.c_char_p], ctypes.c_int)
+_setup_function(_lib.c2pa_builder_add_resource, [ctypes.POINTER(
+ C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int)
_setup_function(_lib.c2pa_builder_add_ingredient_from_stream,
- [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)],
- ctypes.c_int)
+ [ctypes.POINTER(C2paBuilder),
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.POINTER(C2paStream)],
+ ctypes.c_int)
# Set up additional Builder function prototypes
_setup_function(_lib.c2pa_builder_to_archive,
- [ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paStream)],
- ctypes.c_int)
+ [ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paStream)],
+ ctypes.c_int)
_setup_function(_lib.c2pa_builder_sign,
- [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream),
- ctypes.POINTER(C2paStream), ctypes.POINTER(C2paSigner), ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
- ctypes.c_int64)
-_setup_function(_lib.c2pa_manifest_bytes_free, [ctypes.POINTER(ctypes.c_ubyte)], None)
-_setup_function(_lib.c2pa_builder_data_hashed_placeholder,
- [ctypes.POINTER(C2paBuilder), ctypes.c_size_t, ctypes.c_char_p, ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
- ctypes.c_int64)
+ [ctypes.POINTER(C2paBuilder),
+ ctypes.c_char_p,
+ ctypes.POINTER(C2paStream),
+ ctypes.POINTER(C2paStream),
+ ctypes.POINTER(C2paSigner),
+ ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
+ ctypes.c_int64)
+_setup_function(
+ _lib.c2pa_manifest_bytes_free, [
+ ctypes.POINTER(
+ ctypes.c_ubyte)], None)
+_setup_function(
+ _lib.c2pa_builder_data_hashed_placeholder, [
+ ctypes.POINTER(C2paBuilder), ctypes.c_size_t, ctypes.c_char_p, ctypes.POINTER(
+ ctypes.POINTER(
+ ctypes.c_ubyte))], ctypes.c_int64)
# Set up additional function prototypes
_setup_function(_lib.c2pa_builder_sign_data_hashed_embeddable,
- [ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paSigner), ctypes.c_char_p, ctypes.c_char_p,
- ctypes.POINTER(C2paStream), ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
- ctypes.c_int64)
-_setup_function(_lib.c2pa_format_embeddable,
- [ctypes.c_char_p, ctypes.POINTER(ctypes.c_ubyte), ctypes.c_size_t,
- ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
- ctypes.c_int64)
+ [ctypes.POINTER(C2paBuilder),
+ ctypes.POINTER(C2paSigner),
+ ctypes.c_char_p,
+ ctypes.c_char_p,
+ ctypes.POINTER(C2paStream),
+ ctypes.POINTER(ctypes.POINTER(ctypes.c_ubyte))],
+ ctypes.c_int64)
+_setup_function(
+ _lib.c2pa_format_embeddable, [
+ ctypes.c_char_p, ctypes.POINTER(
+ ctypes.c_ubyte), ctypes.c_size_t, ctypes.POINTER(
+ ctypes.POINTER(
+ ctypes.c_ubyte))], ctypes.c_int64)
_setup_function(_lib.c2pa_signer_create,
- [ctypes.c_void_p, SignerCallback, ctypes.c_int, ctypes.c_char_p, ctypes.c_char_p],
- ctypes.POINTER(C2paSigner))
+ [ctypes.c_void_p,
+ SignerCallback,
+ ctypes.c_int,
+ ctypes.c_char_p,
+ ctypes.c_char_p],
+ ctypes.POINTER(C2paSigner))
_setup_function(_lib.c2pa_signer_from_info,
- [ctypes.POINTER(C2paSignerInfo)],
- ctypes.POINTER(C2paSigner))
+ [ctypes.POINTER(C2paSignerInfo)],
+ ctypes.POINTER(C2paSigner))
# Set up final function prototypes
-_setup_function(_lib.c2pa_signer_reserve_size, [ctypes.POINTER(C2paSigner)], ctypes.c_int64)
+_setup_function(
+ _lib.c2pa_signer_reserve_size, [
+ ctypes.POINTER(C2paSigner)], ctypes.c_int64)
_setup_function(_lib.c2pa_signer_free, [ctypes.POINTER(C2paSigner)], None)
-_setup_function(_lib.c2pa_ed25519_sign,
- [ctypes.POINTER(ctypes.c_ubyte), ctypes.c_size_t, ctypes.c_char_p],
- ctypes.POINTER(ctypes.c_ubyte))
-_setup_function(_lib.c2pa_signature_free, [ctypes.POINTER(ctypes.c_ubyte)], None)
+_setup_function(
+ _lib.c2pa_ed25519_sign, [
+ ctypes.POINTER(
+ ctypes.c_ubyte), ctypes.c_size_t, ctypes.c_char_p], ctypes.POINTER(
+ ctypes.c_ubyte))
+_setup_function(
+ _lib.c2pa_signature_free, [
+ ctypes.POINTER(
+ ctypes.c_ubyte)], None)
+
class C2paError(Exception):
"""Exception raised for C2PA errors."""
+
def __init__(self, message: str = ""):
self.message = message
super().__init__(message)
@@ -334,6 +423,7 @@ class Verify(Exception):
"""Exception raised for verification errors."""
pass
+
class _StringContainer:
"""Container class to hold encoded strings and prevent them from being garbage collected.
@@ -343,17 +433,22 @@ class _StringContainer:
This is an internal implementation detail and should not be used outside this module.
"""
+
def __init__(self):
"""Initialize an empty string container."""
pass
-def _parse_operation_result_for_error(result: ctypes.c_void_p, check_error: bool = True) -> Optional[str]:
+
+def _parse_operation_result_for_error(
+ result: ctypes.c_void_p,
+ check_error: bool = True) -> Optional[str]:
"""Helper function to handle string results from C2PA functions."""
if not result:
if check_error:
error = _lib.c2pa_error()
if error:
- error_str = ctypes.cast(error, ctypes.c_char_p).value.decode('utf-8')
+ error_str = ctypes.cast(
+ error, ctypes.c_char_p).value.decode('utf-8')
_lib.c2pa_string_free(error)
print("## error_str:", error_str)
parts = error_str.split(' ', 1)
@@ -398,6 +493,7 @@ def _parse_operation_result_for_error(result: ctypes.c_void_p, check_error: bool
return py_string
+
def sdk_version() -> str:
"""
Returns the c2pa-rs version string, e.g., "0.49.5".
@@ -409,15 +505,17 @@ def sdk_version() -> str:
return part.split("/", 1)[1]
return vstr # fallback if not found
+
def version() -> str:
"""Get the C2PA library version."""
result = _lib.c2pa_version()
- #print(f"Type: {type(result)}")
- #print(f"Address: {hex(result)}")
+ # print(f"Type: {type(result)}")
+ # print(f"Address: {hex(result)}")
py_string = ctypes.cast(result, ctypes.c_char_p).value.decode("utf-8")
_lib.c2pa_string_free(result) # Free the Rust-allocated memory
return py_string
+
def load_settings(settings: str, format: str = "json") -> None:
"""Load C2PA settings from a string.
@@ -437,7 +535,9 @@ def load_settings(settings: str, format: str = "json") -> None:
if error:
raise C2paError(error)
-def read_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = None) -> str:
+
+def read_file(path: Union[str, Path],
+ data_dir: Optional[Union[str, Path]] = None) -> str:
"""Read a C2PA manifest from a file.
Args:
@@ -453,12 +553,15 @@ def read_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = Non
container = _StringContainer()
container._path_str = str(path).encode('utf-8')
- container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None
+ container._data_dir_str = str(data_dir).encode(
+ 'utf-8') if data_dir else None
result = _lib.c2pa_read_file(container._path_str, container._data_dir_str)
return _parse_operation_result_for_error(result)
-def read_ingredient_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = None) -> str:
+
+def read_ingredient_file(
+ path: Union[str, Path], data_dir: Optional[Union[str, Path]] = None) -> str:
"""Read a C2PA ingredient from a file.
Args:
@@ -474,11 +577,14 @@ def read_ingredient_file(path: Union[str, Path], data_dir: Optional[Union[str, P
container = _StringContainer()
container._path_str = str(path).encode('utf-8')
- container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None
+ container._data_dir_str = str(data_dir).encode(
+ 'utf-8') if data_dir else None
- result = _lib.c2pa_read_ingredient_file(container._path_str, container._data_dir_str)
+ result = _lib.c2pa_read_ingredient_file(
+ container._path_str, container._data_dir_str)
return _parse_operation_result_for_error(result)
+
def sign_file(
source_path: Union[str, Path],
dest_path: Union[str, Path],
@@ -507,9 +613,11 @@ def sign_file(
signer_info._source_str = str(source_path).encode('utf-8')
signer_info._dest_str = str(dest_path).encode('utf-8')
signer_info._manifest_str = manifest.encode('utf-8')
- signer_info._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None
+ signer_info._data_dir_str = str(data_dir).encode(
+ 'utf-8') if data_dir else None
except UnicodeError as e:
- raise C2paError.Encoding(f"Invalid UTF-8 characters in input strings: {str(e)}")
+ raise C2paError.Encoding(
+ f"Invalid UTF-8 characters in input strings: {str(e)}")
result = _lib.c2pa_sign_file(
signer_info._source_str,
@@ -520,6 +628,7 @@ def sign_file(
)
return _parse_operation_result_for_error(result)
+
class Stream:
# Class-level counter for generating unique stream IDs
# (useful for tracing streams usage in debug)
@@ -542,6 +651,11 @@ def __init__(self, file):
Raises:
TypeError: If the file object doesn't implement all required methods
"""
+ # Initialize _closed first to prevent AttributeError during garbage collection
+ self._closed = False
+ self._initialized = False
+ self._stream = None
+
# Generate unique stream ID with timestamp
timestamp = int(time.time() * 1000) # milliseconds since epoch
@@ -553,34 +667,16 @@ def __init__(self, file):
# Rest of the existing initialization code...
required_methods = ['read', 'write', 'seek', 'tell', 'flush']
- missing_methods = [method for method in required_methods if not hasattr(file, method)]
+ missing_methods = [
+ method for method in required_methods if not hasattr(
+ file, method)]
if missing_methods:
- raise TypeError("Object must be a stream-like object with methods: {}. Missing: {}".format(
- ', '.join(required_methods),
- ', '.join(missing_methods)
- ))
+ raise TypeError(
+ "Object must be a stream-like object with methods: {}. Missing: {}".format(
+ ', '.join(required_methods),
+ ', '.join(missing_methods)))
self._file = file
- self._stream = None # Initialize to None to track if stream was created
- self._closed = False # Track if the stream has been closed
- self._initialized = False # Track if stream was successfully initialized
-
- # print(f'## Created stream {self._stream_id} for file {self._file}')
-
- # Pre-allocate error message strings to avoid string formatting overhead
- self._error_messages = {
- 'read': "Error: Attempted to read from uninitialized or closed stream",
- 'seek': "Error: Attempted to seek in uninitialized or closed stream",
- 'write': "Error: Attempted to write to uninitialized or closed stream",
- 'flush': "Error: Attempted to flush uninitialized or closed stream",
- 'read_error': "Error reading from stream: {}",
- 'seek_error': "Error seeking in stream: {}",
- 'write_error': "Error writing to stream: {}",
- 'flush_error': "Error flushing stream: {}",
- 'cleanup_error': "Error during cleanup: {}",
- 'callback_error': "Error cleaning up callback {}: {}",
- 'stream_error': "Error releasing stream: {}"
- }
def read_callback(ctx, data, length):
"""Callback function for reading data from the Python stream.
@@ -615,7 +711,9 @@ def read_callback(ctx, data, length):
# Ensure we don't write beyond the allocated memory
actual_length = min(len(buffer), length)
# Create a view of the buffer to avoid copying
- buffer_view = (ctypes.c_ubyte * actual_length).from_buffer_copy(buffer)
+ buffer_view = (
+ ctypes.c_ubyte *
+ actual_length).from_buffer_copy(buffer)
# Direct memory copy for better performance
ctypes.memmove(data, buffer_view, actual_length)
return actual_length
@@ -747,7 +845,8 @@ def __exit__(self, exc_type, exc_val, exc_tb):
def __del__(self):
"""Ensure resources are cleaned up if close() wasn't called."""
- self.close()
+ if hasattr(self, '_closed'):
+ self.close()
def close(self):
"""Release the stream resources.
@@ -766,7 +865,9 @@ def close(self):
try:
_lib.c2pa_release_stream(self._stream)
except Exception as e:
- print(self._error_messages['stream_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['stream_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._stream = None
@@ -776,11 +877,15 @@ def close(self):
try:
setattr(self, attr, None)
except Exception as e:
- print(self._error_messages['callback_error'].format(attr, str(e)), file=sys.stderr)
+ print(
+ self._error_messages['callback_error'].format(
+ attr, str(e)), file=sys.stderr)
# Note: We don't close self._file as we don't own it
except Exception as e:
- print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['cleanup_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._closed = True
self._initialized = False
@@ -803,10 +908,15 @@ def initialized(self) -> bool:
"""
return self._initialized
+
class Reader:
"""High-level wrapper for C2PA Reader operations."""
- def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = None, manifest_data: Optional[Any] = None):
+ def __init__(self,
+ format_or_path: Union[str,
+ Path],
+ stream: Optional[Any] = None,
+ manifest_data: Optional[Any] = None):
"""Create a new Reader.
Args:
@@ -849,13 +959,16 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
mimetypes = sys.modules['mimetypes']
path = str(format_or_path)
- mime_type = mimetypes.guess_type(path)[0] or 'application/octet-stream'
+ mime_type = mimetypes.guess_type(
+ path)[0] or 'application/octet-stream'
# Keep mime_type string alive
try:
self._mime_type_str = mime_type.encode('utf-8')
except UnicodeError as e:
- raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e)))
+ raise C2paError.Encoding(
+ self._error_messages['encoding_error'].format(
+ str(e)))
try:
# Open the file and create a stream
@@ -870,10 +983,12 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
if not self._reader:
self._own_stream.close()
file.close()
- error = _parse_operation_result_for_error(_lib.c2pa_error())
+ error = _parse_operation_result_for_error(
+ _lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['reader_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['reader_error'].format("Unknown error"))
# Store the file to close it later
self._file = file
@@ -883,7 +998,9 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
self._own_stream.close()
if hasattr(self, '_file'):
self._file.close()
- raise C2paError.Io(self._error_messages['io_error'].format(str(e)))
+ raise C2paError.Io(
+ self._error_messages['io_error'].format(
+ str(e)))
elif isinstance(stream, str):
# If stream is a string, treat it as a path and try to open it
try:
@@ -892,11 +1009,16 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
self._format_str = format_or_path.encode('utf-8')
if manifest_data is None:
- self._reader = _lib.c2pa_reader_from_stream(self._format_str, self._own_stream._stream)
+ self._reader = _lib.c2pa_reader_from_stream(
+ self._format_str, self._own_stream._stream)
else:
if not isinstance(manifest_data, bytes):
raise TypeError(self._error_messages['manifest_error'])
- manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data)
+ manifest_array = (
+ ctypes.c_ubyte *
+ len(manifest_data))(
+ *
+ manifest_data)
self._reader = _lib.c2pa_reader_from_manifest_data_and_stream(
self._format_str,
self._own_stream._stream,
@@ -907,10 +1029,12 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
if not self._reader:
self._own_stream.close()
file.close()
- error = _parse_operation_result_for_error(_lib.c2pa_error())
+ error = _parse_operation_result_for_error(
+ _lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['reader_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['reader_error'].format("Unknown error"))
self._file = file
except Exception as e:
@@ -918,7 +1042,9 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
self._own_stream.close()
if hasattr(self, '_file'):
self._file.close()
- raise C2paError.Io(self._error_messages['io_error'].format(str(e)))
+ raise C2paError.Io(
+ self._error_messages['io_error'].format(
+ str(e)))
else:
# Use the provided stream
# Keep format string alive
@@ -926,23 +1052,26 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
with Stream(stream) as stream_obj:
if manifest_data is None:
- self._reader = _lib.c2pa_reader_from_stream(self._format_str, stream_obj._stream)
+ self._reader = _lib.c2pa_reader_from_stream(
+ self._format_str, stream_obj._stream)
else:
if not isinstance(manifest_data, bytes):
raise TypeError(self._error_messages['manifest_error'])
- manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data)
+ manifest_array = (
+ ctypes.c_ubyte *
+ len(manifest_data))(
+ *
+ manifest_data)
self._reader = _lib.c2pa_reader_from_manifest_data_and_stream(
- self._format_str,
- stream_obj._stream,
- manifest_array,
- len(manifest_data)
- )
+ self._format_str, stream_obj._stream, manifest_array, len(manifest_data))
if not self._reader:
- error = _parse_operation_result_for_error(_lib.c2pa_error())
+ error = _parse_operation_result_for_error(
+ _lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['reader_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['reader_error'].format("Unknown error"))
def __enter__(self):
return self
@@ -971,7 +1100,9 @@ def close(self):
try:
_lib.c2pa_reader_free(self._reader)
except Exception as e:
- print(self._error_messages['reader_cleanup'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['reader_cleanup'].format(
+ str(e)), file=sys.stderr)
finally:
self._reader = None
@@ -980,7 +1111,9 @@ def close(self):
try:
self._own_stream.close()
except Exception as e:
- print(self._error_messages['stream_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['stream_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._own_stream = None
@@ -989,7 +1122,9 @@ def close(self):
try:
self._file.close()
except Exception as e:
- print(self._error_messages['file_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['file_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._file = None
@@ -997,7 +1132,9 @@ def close(self):
if hasattr(self, '_strings'):
self._strings.clear()
except Exception as e:
- print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['cleanup_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._closed = True
@@ -1035,7 +1172,8 @@ def resource_to_stream(self, uri: str, stream: Any) -> int:
# Keep uri string alive
self._uri_str = uri.encode('utf-8')
with Stream(stream) as stream_obj:
- result = _lib.c2pa_reader_resource_to_stream(self._reader, self._uri_str, stream_obj._stream)
+ result = _lib.c2pa_reader_resource_to_stream(
+ self._reader, self._uri_str, stream_obj._stream)
if result < 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
@@ -1044,6 +1182,7 @@ def resource_to_stream(self, uri: str, stream: Any) -> int:
return result
+
class Signer:
"""High-level wrapper for C2PA Signer operations."""
@@ -1091,7 +1230,8 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer':
if error:
# More detailed error message when possible
raise C2paError(error)
- raise C2paError("Failed to create signer from configured signer info")
+ raise C2paError(
+ "Failed to create signer from configured signer info")
return cls(signer_ptr)
@@ -1120,10 +1260,12 @@ def from_callback(
"""
# Validate inputs before creating
if not certs:
- raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate data"))
+ raise C2paError(
+ cls._error_messages['invalid_certs'].format("Missing certificate data"))
if tsa_url and not tsa_url.startswith(('http://', 'https://')):
- raise C2paError(cls._error_messages['invalid_tsa'].format("Invalid TSA URL format"))
+ raise C2paError(
+ cls._error_messages['invalid_tsa'].format("Invalid TSA URL format"))
# Create a wrapper callback that handles errors and memory management
def wrapped_callback(data: bytes) -> bytes:
@@ -1132,7 +1274,9 @@ def wrapped_callback(data: bytes) -> bytes:
raise ValueError("Empty data provided for signing")
return callback(data)
except Exception as e:
- print(cls._error_messages['callback_error'].format(str(e)), file=sys.stderr)
+ print(
+ cls._error_messages['callback_error'].format(
+ str(e)), file=sys.stderr)
raise C2paError.Signature(str(e))
# Encode strings with error handling
@@ -1140,7 +1284,9 @@ def wrapped_callback(data: bytes) -> bytes:
certs_bytes = certs.encode('utf-8')
tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None
except UnicodeError as e:
- raise C2paError.Encoding(cls._error_messages['encoding_error'].format(str(e)))
+ raise C2paError.Encoding(
+ cls._error_messages['encoding_error'].format(
+ str(e)))
# Create the signer with the wrapped callback
signer_ptr = _lib.c2pa_signer_create(
@@ -1184,11 +1330,15 @@ def close(self):
try:
_lib.c2pa_signer_free(self._signer)
except Exception as e:
- print(self._error_messages['signer_cleanup'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['signer_cleanup'].format(
+ str(e)), file=sys.stderr)
finally:
self._signer = None
except Exception as e:
- print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['cleanup_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._closed = True
@@ -1226,6 +1376,7 @@ def closed(self) -> bool:
"""
return self._closed
+
class Builder:
"""High-level wrapper for C2PA Builder operations."""
@@ -1260,12 +1411,16 @@ def __init__(self, manifest_json: Any):
try:
manifest_json = json.dumps(manifest_json)
except (TypeError, ValueError) as e:
- raise C2paError.Json(self._error_messages['json_error'].format(str(e)))
+ raise C2paError.Json(
+ self._error_messages['json_error'].format(
+ str(e)))
try:
json_str = manifest_json.encode('utf-8')
except UnicodeError as e:
- raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e)))
+ raise C2paError.Encoding(
+ self._error_messages['encoding_error'].format(
+ str(e)))
self._builder = _lib.c2pa_builder_from_json(json_str)
@@ -1273,7 +1428,8 @@ def __init__(self, manifest_json: Any):
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['builder_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['builder_error'].format("Unknown error"))
@classmethod
def from_json(cls, manifest_json: Any) -> 'Builder':
@@ -1317,7 +1473,8 @@ def from_archive(cls, stream: Any) -> 'Builder':
def __del__(self):
"""Ensure resources are cleaned up if close() wasn't called."""
- self.close()
+ if hasattr(self, '_closed'):
+ self.close()
def close(self):
"""Release the builder resources.
@@ -1339,11 +1496,15 @@ def close(self):
try:
_lib.c2pa_builder_free(self._builder)
except Exception as e:
- print(self._error_messages['builder_cleanup'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['builder_cleanup'].format(
+ str(e)), file=sys.stderr)
finally:
self._builder = None
except Exception as e:
- print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr)
+ print(
+ self._error_messages['cleanup_error'].format(
+ str(e)), file=sys.stderr)
finally:
self._closed = True
@@ -1391,7 +1552,8 @@ def set_remote_url(self, remote_url: str):
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['url_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['url_error'].format("Unknown error"))
def add_resource(self, uri: str, stream: Any):
"""Add a resource to the builder.
@@ -1408,13 +1570,15 @@ def add_resource(self, uri: str, stream: Any):
uri_str = uri.encode('utf-8')
with Stream(stream) as stream_obj:
- result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream)
+ result = _lib.c2pa_builder_add_resource(
+ self._builder, uri_str, stream_obj._stream)
if result != 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['resource_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['resource_error'].format("Unknown error"))
def add_ingredient(self, ingredient_json: str, format: str, source: Any):
"""Add an ingredient to the builder.
@@ -1435,18 +1599,26 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any):
ingredient_str = ingredient_json.encode('utf-8')
format_str = format.encode('utf-8')
except UnicodeError as e:
- raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e)))
+ raise C2paError.Encoding(
+ self._error_messages['encoding_error'].format(
+ str(e)))
source_stream = Stream(source)
- result = _lib.c2pa_builder_add_ingredient_from_stream(self._builder, ingredient_str, format_str, source_stream._stream)
+ result = _lib.c2pa_builder_add_ingredient_from_stream(
+ self._builder, ingredient_str, format_str, source_stream._stream)
if result != 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['ingredient_error'].format("Unknown error"))
-
- def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: Any):
+ raise C2paError(
+ self._error_messages['ingredient_error'].format("Unknown error"))
+
+ def add_ingredient_from_stream(
+ self,
+ ingredient_json: str,
+ format: str,
+ source: Any):
"""Add an ingredient from a stream to the builder.
Args:
@@ -1465,7 +1637,9 @@ def add_ingredient_from_stream(self, ingredient_json: str, format: str, source:
ingredient_str = ingredient_json.encode('utf-8')
format_str = format.encode('utf-8')
except UnicodeError as e:
- raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e)))
+ raise C2paError.Encoding(
+ self._error_messages['encoding_error'].format(
+ str(e)))
with Stream(source) as source_stream:
result = _lib.c2pa_builder_add_ingredient_from_stream(
@@ -1475,7 +1649,8 @@ def add_ingredient_from_stream(self, ingredient_json: str, format: str, source:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['ingredient_error'].format("Unknown error"))
+ raise C2paError(
+ self._error_messages['ingredient_error'].format("Unknown error"))
def to_archive(self, stream: Any):
"""Write an archive of the builder to a stream.
@@ -1490,15 +1665,22 @@ def to_archive(self, stream: Any):
raise C2paError(self._error_messages['closed_error'])
with Stream(stream) as stream_obj:
- result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream)
+ result = _lib.c2pa_builder_to_archive(
+ self._builder, stream_obj._stream)
if result != 0:
error = _parse_operation_result_for_error(_lib.c2pa_error())
if error:
raise C2paError(error)
- raise C2paError(self._error_messages['archive_error'].format("Unknown error"))
-
- def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Optional[bytes]:
+ raise C2paError(
+ self._error_messages['archive_error'].format("Unknown error"))
+
+ def sign(
+ self,
+ signer: Signer,
+ format: str,
+ source: Any,
+ dest: Any = None) -> Optional[bytes]:
"""Sign the builder's content and write to a destination stream.
Args:
@@ -1551,7 +1733,13 @@ def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Op
source_stream.close()
dest_stream.close()
- def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], signer: Signer) -> tuple[int, Optional[bytes]]:
+ def sign_file(self,
+ source_path: Union[str,
+ Path],
+ dest_path: Union[str,
+ Path],
+ signer: Signer) -> tuple[int,
+ Optional[bytes]]:
"""Sign a file and write the signed data to an output file.
Args:
@@ -1593,6 +1781,7 @@ def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path],
return result, manifest_bytes
+
def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]:
"""Convert a binary C2PA manifest into an embeddable version.
@@ -1630,6 +1819,7 @@ def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]:
return size, result_bytes
+
def create_signer(
callback: Callable[[bytes], bytes],
alg: C2paSigningAlg,
@@ -1655,7 +1845,8 @@ def create_signer(
certs_bytes = certs.encode('utf-8')
tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None
except UnicodeError as e:
- raise C2paError.Encoding(f"Invalid UTF-8 characters in certificate data or TSA URL: {str(e)}")
+ raise C2paError.Encoding(
+ f"Invalid UTF-8 characters in certificate data or TSA URL: {str(e)}")
signer_ptr = _lib.c2pa_signer_create(
None, # context
@@ -1674,6 +1865,7 @@ def create_signer(
return Signer(signer_ptr)
+
def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer:
"""Create a signer from signer information.
@@ -1697,9 +1889,11 @@ def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer:
return Signer(signer_ptr)
+
# Rename the old create_signer to _create_signer since it's now internal
_create_signer = create_signer
+
def ed25519_sign(data: bytes, private_key: str) -> bytes:
"""Sign data using the Ed25519 algorithm.
@@ -1718,7 +1912,8 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes:
try:
key_str = private_key.encode('utf-8')
except UnicodeError as e:
- raise C2paError.Encoding(f"Invalid UTF-8 characters in private key: {str(e)}")
+ raise C2paError.Encoding(
+ f"Invalid UTF-8 characters in private key: {str(e)}")
signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str)
@@ -1736,6 +1931,7 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes:
return signature
+
__all__ = [
'C2paError',
'C2paSeekMode',
@@ -1753,4 +1949,4 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes:
'format_embeddable',
'ed25519_sign',
'sdk_version'
-]
\ No newline at end of file
+]
diff --git a/src/c2pa/lib.py b/src/c2pa/lib.py
index 466f85c6..fb38645c 100644
--- a/src/c2pa/lib.py
+++ b/src/c2pa/lib.py
@@ -24,11 +24,13 @@
)
logger = logging.getLogger(__name__)
+
class CPUArchitecture(Enum):
"""CPU architecture enum for platform-specific identifiers."""
AARCH64 = "aarch64"
X86_64 = "x86_64"
+
def get_platform_identifier(cpu_arch: Optional[CPUArchitecture] = None) -> str:
"""Get the full platform identifier (arch-os) for the current system,
matching the downloaded identifiers used by the Github publisher.
@@ -55,7 +57,8 @@ def get_platform_identifier(cpu_arch: Optional[CPUArchitecture] = None) -> str:
elif cpu_arch == CPUArchitecture.X86_64:
return "x86_64-apple-darwin"
else:
- raise ValueError(f"Unsupported CPU architecture for macOS: {cpu_arch}")
+ raise ValueError(
+ f"Unsupported CPU architecture for macOS: {cpu_arch}")
elif system == "windows":
return "x86_64-pc-windows-msvc"
elif system == "linux":
@@ -63,6 +66,7 @@ def get_platform_identifier(cpu_arch: Optional[CPUArchitecture] = None) -> str:
else:
raise ValueError(f"Unsupported operating system: {system}")
+
def _get_architecture() -> str:
"""
Get the current system architecture.
@@ -102,7 +106,9 @@ def _get_platform_dir() -> str:
else:
raise RuntimeError(f"Unsupported platform: {sys.platform}")
-def _load_single_library(lib_name: str, search_paths: list[Path]) -> Optional[ctypes.CDLL]:
+
+def _load_single_library(lib_name: str,
+ search_paths: list[Path]) -> Optional[ctypes.CDLL]:
"""
Load a single library from the given search paths.
@@ -139,6 +145,7 @@ def _load_single_library(lib_name: str, search_paths: list[Path]) -> Optional[ct
logger.debug(f"Library not found at: {lib_path}")
return None
+
def _get_possible_search_paths() -> list[Path]:
"""
Get a list of possible paths where the library might be located.
@@ -179,11 +186,14 @@ def _get_possible_search_paths() -> list[Path]:
possible_paths.append(base_path / platform_id)
# Add system library paths
- possible_paths.extend([Path(p) for p in os.environ.get("LD_LIBRARY_PATH", "").split(os.pathsep) if p])
+ possible_paths.extend([Path(p) for p in os.environ.get(
+ "LD_LIBRARY_PATH", "").split(os.pathsep) if p])
return possible_paths
-def dynamically_load_library(lib_name: Optional[str] = None) -> Optional[ctypes.CDLL]:
+
+def dynamically_load_library(
+ lib_name: Optional[str] = None) -> Optional[ctypes.CDLL]:
"""
Load the dynamic library containing the C-API based on the platform.
@@ -213,7 +223,8 @@ def dynamically_load_library(lib_name: Optional[str] = None) -> Optional[ctypes.
env_lib_name = os.environ.get("C2PA_LIBRARY_NAME")
if env_lib_name:
if DEBUG_LIBRARY_LOADING:
- logger.info(f"Using library name from env var C2PA_LIBRARY_NAME: {env_lib_name}")
+ logger.info(
+ f"Using library name from env var C2PA_LIBRARY_NAME: {env_lib_name}")
try:
possible_paths = _get_possible_search_paths()
lib = _load_single_library(env_lib_name, possible_paths)
@@ -221,10 +232,12 @@ def dynamically_load_library(lib_name: Optional[str] = None) -> Optional[ctypes.
return lib
else:
logger.error(f"Could not find library {env_lib_name} in any of the search paths")
- # Continue with normal loading if environment variable library name fails
+ # Continue with normal loading if environment variable library
+ # name fails
except Exception as e:
logger.error(f"Failed to load library from C2PA_LIBRARY_NAME: {e}")
- # Continue with normal loading if environment variable library name fails
+ # Continue with normal loading if environment variable library name
+ # fails
possible_paths = _get_possible_search_paths()
diff --git a/tests/benchmark.py b/tests/benchmark.py
index 7b1e7df6..58e138cf 100644
--- a/tests/benchmark.py
+++ b/tests/benchmark.py
@@ -1,9 +1,16 @@
-from c2pa import Builder, Error, Reader, SigningAlg, create_signer, sdk_version, sign_ps256
import os
import io
+import pytest
+from c2pa import Reader, Builder, Signer, C2paSigningAlg, C2paSignerInfo
+
PROJECT_PATH = os.getcwd()
-testPath = os.path.join(PROJECT_PATH, "tests", "fixtures", "C.jpg")
+# Test paths
+test_path = os.path.join(PROJECT_PATH, "tests", "fixtures", "C.jpg")
+output_path = "target/python_out.jpg"
+
+# Ensure target directory exists
+os.makedirs(os.path.dirname(output_path), exist_ok=True)
manifestDefinition = {
"claim_generator": "python_test",
@@ -29,43 +36,70 @@
}
]
}
-private_key = open("tests/fixtures/ps256.pem","rb").read()
-
-# Define a function that signs data with PS256 using a private key
-def sign(data: bytes) -> bytes:
- print("date len = ", len(data))
- return sign_ps256(data, private_key)
-# load the public keys from a pem file
-certs = open("tests/fixtures/ps256.pub","rb").read()
+# Load private key and certificates
+private_key = open("tests/fixtures/ps256.pem", "rb").read()
+certs = open("tests/fixtures/ps256.pub", "rb").read()
# Create a local Ps256 signer with certs and a timestamp server
-signer = create_signer(sign, SigningAlg.PS256, certs, "http://timestamp.digicert.com")
-
+signer_info = C2paSignerInfo(
+ alg=b"ps256",
+ sign_cert=certs,
+ private_key=private_key,
+ ta_url=b"http://timestamp.digicert.com"
+)
+signer = Signer.from_info(signer_info)
builder = Builder(manifestDefinition)
-source = open(testPath, "rb").read()
+# Load source image
+source = open(test_path, "rb").read()
+
+# Run the benchmark: python -m pytest tests/benchmark.py -v
-testPath = "/Users/gpeacock/Pictures/Lightroom Saved Photos/IMG_0483.jpg"
-testPath = "tests/fixtures/c.jpg"
-outputPath = "target/python_out.jpg"
+def test_files_read():
+ """Benchmark reading a C2PA asset from a file."""
+ with open(test_path, "rb") as f:
+ reader = Reader("image/jpeg", f)
+ result = reader.json()
+ assert result is not None
+ return result
+
+def test_streams_read():
+ """Benchmark reading a C2PA asset from a stream."""
+ with open(test_path, "rb") as file:
+ source = file.read()
+ reader = Reader("image/jpeg", io.BytesIO(source))
+ result = reader.json()
+ assert result is not None
+ return result
def test_files_build():
- # Delete the output file if it exists
- if os.path.exists(outputPath):
- os.remove(outputPath)
- builder.sign_file(signer, testPath, outputPath)
+ """Benchmark building a C2PA asset from a file."""
+ # Delete the output file if it exists
+ if os.path.exists(output_path):
+ os.remove(output_path)
+ with open(test_path, "rb") as source_file:
+ with open(output_path, "wb") as dest_file:
+ builder.sign(signer, "image/jpeg", source_file, dest_file)
def test_streams_build():
- #with open(testPath, "rb") as file:
+ """Benchmark building a C2PA asset from a stream."""
output = io.BytesIO(bytearray())
- builder.sign(signer, "image/jpeg", io.BytesIO(source), output)
+ with open(test_path, "rb") as source_file:
+ builder.sign(signer, "image/jpeg", source_file, output)
-def test_func(benchmark):
- benchmark(test_files_build)
+def test_files_reading(benchmark):
+ """Benchmark file-based reading."""
+ benchmark(test_files_read)
-def test_streams(benchmark):
- benchmark(test_streams_build)
+def test_streams_reading(benchmark):
+ """Benchmark stream-based reading."""
+ benchmark(test_streams_read)
+
+def test_files_builder_signer_benchmark(benchmark):
+ """Benchmark file-based building."""
+ benchmark(test_files_build)
-#def test_signer(benchmark):
-# benchmark(sign_ps256, data, private_key)
\ No newline at end of file
+def test_streams_builder_benchmark(benchmark):
+ """Benchmark stream-based building."""
+ benchmark(test_streams_build)
\ No newline at end of file
diff --git a/tests/fixtures/files-for-reading-tests/CA.jpg b/tests/fixtures/files-for-reading-tests/CA.jpg
new file mode 100644
index 00000000..551e611e
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/CA.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/CACAE-uri-CA.jpg b/tests/fixtures/files-for-reading-tests/CACAE-uri-CA.jpg
new file mode 100644
index 00000000..1ca39817
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/CACAE-uri-CA.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/CA_ct.jpg b/tests/fixtures/files-for-reading-tests/CA_ct.jpg
new file mode 100644
index 00000000..8f464fd7
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/CA_ct.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/CIE-sig-CA.jpg b/tests/fixtures/files-for-reading-tests/CIE-sig-CA.jpg
new file mode 100644
index 00000000..400d27c3
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/CIE-sig-CA.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/C_with_CAWG_data.jpg b/tests/fixtures/files-for-reading-tests/C_with_CAWG_data.jpg
new file mode 100644
index 00000000..4088d990
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/C_with_CAWG_data.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/E-sig-CA.jpg b/tests/fixtures/files-for-reading-tests/E-sig-CA.jpg
new file mode 100644
index 00000000..2bbedb96
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/E-sig-CA.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/XCA.jpg b/tests/fixtures/files-for-reading-tests/XCA.jpg
new file mode 100644
index 00000000..18723ffa
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/XCA.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/boxhash.jpg b/tests/fixtures/files-for-reading-tests/boxhash.jpg
new file mode 100644
index 00000000..96a2bedc
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/boxhash.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/cloud.jpg b/tests/fixtures/files-for-reading-tests/cloud.jpg
new file mode 100644
index 00000000..9735b328
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/cloud.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/legacy_ingredient_hash.jpg b/tests/fixtures/files-for-reading-tests/legacy_ingredient_hash.jpg
new file mode 100644
index 00000000..b7213f17
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/legacy_ingredient_hash.jpg differ
diff --git a/tests/fixtures/files-for-reading-tests/video1.mp4 b/tests/fixtures/files-for-reading-tests/video1.mp4
new file mode 100644
index 00000000..5802d5d2
Binary files /dev/null and b/tests/fixtures/files-for-reading-tests/video1.mp4 differ
diff --git a/tests/fixtures/files-for-signing-tests/IMG_0003.jpg b/tests/fixtures/files-for-signing-tests/IMG_0003.jpg
new file mode 100644
index 00000000..be277a89
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/IMG_0003.jpg differ
diff --git a/tests/fixtures/files-for-signing-tests/P1000827.jpg b/tests/fixtures/files-for-signing-tests/P1000827.jpg
new file mode 100644
index 00000000..819e6360
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/P1000827.jpg differ
diff --git a/tests/fixtures/files-for-signing-tests/TUSCANY.TIF b/tests/fixtures/files-for-signing-tests/TUSCANY.TIF
new file mode 100644
index 00000000..048f0017
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/TUSCANY.TIF differ
diff --git a/tests/fixtures/files-for-signing-tests/cloudx.jpg b/tests/fixtures/files-for-signing-tests/cloudx.jpg
new file mode 100755
index 00000000..30b68ca0
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/cloudx.jpg differ
diff --git a/tests/fixtures/files-for-signing-tests/earth_apollo17.jpg b/tests/fixtures/files-for-signing-tests/earth_apollo17.jpg
new file mode 100644
index 00000000..4d7ec6a5
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/earth_apollo17.jpg differ
diff --git a/tests/fixtures/files-for-signing-tests/exp-test1.png b/tests/fixtures/files-for-signing-tests/exp-test1.png
new file mode 100644
index 00000000..6b8dc108
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/exp-test1.png differ
diff --git a/tests/fixtures/files-for-signing-tests/legacy.mp4 b/tests/fixtures/files-for-signing-tests/legacy.mp4
new file mode 100644
index 00000000..51f9f093
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/legacy.mp4 differ
diff --git a/tests/fixtures/files-for-signing-tests/libpng-test.png b/tests/fixtures/files-for-signing-tests/libpng-test.png
new file mode 100644
index 00000000..c4af2ada
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/libpng-test.png differ
diff --git a/tests/fixtures/files-for-signing-tests/mars.webp b/tests/fixtures/files-for-signing-tests/mars.webp
new file mode 100644
index 00000000..31446651
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/mars.webp differ
diff --git a/tests/fixtures/files-for-signing-tests/prerelease.jpg b/tests/fixtures/files-for-signing-tests/prerelease.jpg
new file mode 100644
index 00000000..e142ed12
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/prerelease.jpg differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.avif b/tests/fixtures/files-for-signing-tests/sample1.avif
new file mode 100644
index 00000000..755463c6
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.avif differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.gif b/tests/fixtures/files-for-signing-tests/sample1.gif
new file mode 100644
index 00000000..7d0d1a41
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.gif differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.heic b/tests/fixtures/files-for-signing-tests/sample1.heic
new file mode 100644
index 00000000..00cc549c
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.heic differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.heif b/tests/fixtures/files-for-signing-tests/sample1.heif
new file mode 100644
index 00000000..7a68f35d
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.heif differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.m4a b/tests/fixtures/files-for-signing-tests/sample1.m4a
new file mode 100644
index 00000000..f6d5e925
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.m4a differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.mp3 b/tests/fixtures/files-for-signing-tests/sample1.mp3
new file mode 100644
index 00000000..d134f76d
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.mp3 differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.png b/tests/fixtures/files-for-signing-tests/sample1.png
new file mode 100644
index 00000000..cfd2f19a
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.png differ
diff --git a/tests/fixtures/files-for-signing-tests/sample1.webp b/tests/fixtures/files-for-signing-tests/sample1.webp
new file mode 100644
index 00000000..abc8d790
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/sample1.webp differ
diff --git a/tests/fixtures/files-for-signing-tests/test.avi b/tests/fixtures/files-for-signing-tests/test.avi
new file mode 100644
index 00000000..cc5ef6a7
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/test.avi differ
diff --git a/tests/fixtures/files-for-signing-tests/test.webp b/tests/fixtures/files-for-signing-tests/test.webp
new file mode 100644
index 00000000..c4a7b16c
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/test.webp differ
diff --git a/tests/fixtures/files-for-signing-tests/test_lossless.webp b/tests/fixtures/files-for-signing-tests/test_lossless.webp
new file mode 100644
index 00000000..288a232c
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/test_lossless.webp differ
diff --git a/tests/fixtures/files-for-signing-tests/test_xmp.webp b/tests/fixtures/files-for-signing-tests/test_xmp.webp
new file mode 100644
index 00000000..da71bcdd
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/test_xmp.webp differ
diff --git a/tests/fixtures/files-for-signing-tests/thumbnail.jpg b/tests/fixtures/files-for-signing-tests/thumbnail.jpg
new file mode 100644
index 00000000..be277a89
Binary files /dev/null and b/tests/fixtures/files-for-signing-tests/thumbnail.jpg differ
diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py
index 81cd1f8a..46713057 100644
--- a/tests/test_unit_tests.py
+++ b/tests/test_unit_tests.py
@@ -16,13 +16,16 @@
import json
import unittest
from unittest.mock import mock_open, patch
+import ctypes
-from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version # load_settings_file
+from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version
+from c2pa.c2pa import Stream
PROJECT_PATH = os.getcwd()
testPath = os.path.join(PROJECT_PATH, "tests", "fixtures", "C.jpg")
+
class TestC2paSdk(unittest.TestCase):
def test_version(self):
self.assertIn("0.55.0", sdk_version())
@@ -47,23 +50,101 @@ def test_stream_read_and_parse(self):
title = manifest_store["manifests"][manifest_store["active_manifest"]]["title"]
self.assertEqual(title, "C.jpg")
- #def test_json_decode_err(self):
- # """Test that attempting to read from an invalid file path raises an IO error"""
- # with self.assertRaises(Error.Io):
- # manifest_store = Reader("image/jpeg", "foo")
-
def test_reader_bad_format(self):
with self.assertRaises(Error.NotSupported):
with open(self.testPath, "rb") as file:
reader = Reader("badFormat", file)
def test_settings_trust(self):
- #load_settings_file("tests/fixtures/settings.toml")
+ # load_settings_file("tests/fixtures/settings.toml")
with open(self.testPath, "rb") as file:
reader = Reader("image/jpeg", file)
json_data = reader.json()
self.assertIn("C.jpg", json_data)
+ def test_reader_double_close(self):
+ """Test that multiple close calls are handled gracefully."""
+ with open(self.testPath, "rb") as file:
+ reader = Reader("image/jpeg", file)
+ reader.close()
+ # Second close should not raise an exception
+ reader.close()
+ # Verify reader is closed
+ with self.assertRaises(Error):
+ reader.json()
+
+ def test_reader_close_cleanup(self):
+ """Test that close properly cleans up all resources."""
+ with open(self.testPath, "rb") as file:
+ reader = Reader("image/jpeg", file)
+ # Store references to internal objects
+ reader_ref = reader._reader
+ stream_ref = reader._own_stream
+ # Close the reader
+ reader.close()
+ # Verify all resources are cleaned up
+ self.assertIsNone(reader._reader)
+ self.assertIsNone(reader._own_stream)
+ # Verify reader is marked as closed
+ self.assertTrue(reader._closed)
+
+ def test_read_all_files(self):
+ """Test reading C2PA metadata from all files in the fixtures/files-for-reading-tests directory"""
+ reading_dir = os.path.join(self.data_dir, "files-for-reading-tests")
+
+ # Map of file extensions to MIME types
+ mime_types = {
+ '.jpg': 'image/jpeg',
+ '.jpeg': 'image/jpeg',
+ '.png': 'image/png',
+ '.gif': 'image/gif',
+ '.webp': 'image/webp',
+ '.heic': 'image/heic',
+ '.heif': 'image/heif',
+ '.avif': 'image/avif',
+ '.tif': 'image/tiff',
+ '.tiff': 'image/tiff',
+ '.mp4': 'video/mp4',
+ '.avi': 'video/x-msvideo',
+ '.mp3': 'audio/mpeg',
+ '.m4a': 'audio/mp4',
+ '.wav': 'audio/wav'
+ }
+
+ # Skip system files
+ skip_files = {
+ '.DS_Store'
+ }
+
+ for filename in os.listdir(reading_dir):
+ if filename in skip_files:
+ continue
+
+ file_path = os.path.join(reading_dir, filename)
+ if not os.path.isfile(file_path):
+ continue
+
+ # Get file extension and corresponding MIME type
+ _, ext = os.path.splitext(filename)
+ ext = ext.lower()
+ if ext not in mime_types:
+ continue
+
+ mime_type = mime_types[ext]
+
+ try:
+ with open(file_path, "rb") as file:
+ reader = Reader(mime_type, file)
+ json_data = reader.json()
+ self.assertIsInstance(json_data, str)
+ # Verify the manifest contains expected fields
+ manifest = json.loads(json_data)
+ self.assertIn("manifests", manifest)
+ self.assertIn("active_manifest", manifest)
+ except Exception as e:
+ self.fail(f"Failed to read metadata from {filename}: {str(e)}")
+
+
class TestBuilder(unittest.TestCase):
def setUp(self):
# Use the fixtures_dir fixture to set up paths
@@ -95,12 +176,12 @@ def setUp(self):
"title": "Python Test Image",
"ingredients": [],
"assertions": [
- { 'label': 'stds.schema-org.CreativeWork',
+ {'label': 'stds.schema-org.CreativeWork',
'data': {
'@context': 'http://schema.org/',
'@type': 'CreativeWork',
'author': [
- { '@type': 'Person',
+ {'@type': 'Person',
'name': 'Gavin Peacock'
}
]
@@ -110,7 +191,6 @@ def setUp(self):
]
}
-
def test_streams_sign(self):
with open(self.testPath, "rb") as file:
builder = Builder(self.manifestDefinition)
@@ -144,7 +224,8 @@ def test_remote_sign(self):
builder = Builder(self.manifestDefinition)
builder.set_no_embed()
output = io.BytesIO(bytearray())
- manifest_data = builder.sign(self.signer, "image/jpeg", file, output)
+ manifest_data = builder.sign(
+ self.signer, "image/jpeg", file, output)
output.seek(0)
reader = Reader("image/jpeg", output, manifest_data)
json_data = reader.json()
@@ -152,5 +233,218 @@ def test_remote_sign(self):
self.assertNotIn("validation_status", json_data)
output.close()
+ def test_builder_double_close(self):
+ """Test that multiple close calls are handled gracefully."""
+ builder = Builder(self.manifestDefinition)
+ # First close
+ builder.close()
+ # Second close should not raise an exception
+ builder.close()
+ # Verify builder is closed
+ with self.assertRaises(Error):
+ builder.set_no_embed()
+
+ def test_sign_all_files(self):
+ """Test signing all files in both fixtures directories"""
+ signing_dir = os.path.join(self.data_dir, "files-for-signing-tests")
+ reading_dir = os.path.join(self.data_dir, "files-for-reading-tests")
+
+ # Map of file extensions to MIME types
+ mime_types = {
+ '.jpg': 'image/jpeg',
+ '.jpeg': 'image/jpeg',
+ '.png': 'image/png',
+ '.gif': 'image/gif',
+ '.webp': 'image/webp',
+ '.heic': 'image/heic',
+ '.heif': 'image/heif',
+ '.avif': 'image/avif',
+ '.tif': 'image/tiff',
+ '.tiff': 'image/tiff',
+ '.mp4': 'video/mp4',
+ '.avi': 'video/x-msvideo',
+ '.mp3': 'audio/mpeg',
+ '.m4a': 'audio/mp4',
+ '.wav': 'audio/wav'
+ }
+
+ # Skip files that are known to be invalid or unsupported
+ skip_files = {
+ 'sample3.invalid.wav', # Invalid file
+ }
+
+ # Process both directories
+ for directory in [signing_dir, reading_dir]:
+ for filename in os.listdir(directory):
+ if filename in skip_files:
+ continue
+
+ file_path = os.path.join(directory, filename)
+ if not os.path.isfile(file_path):
+ continue
+
+ # Get file extension and corresponding MIME type
+ _, ext = os.path.splitext(filename)
+ ext = ext.lower()
+ if ext not in mime_types:
+ continue
+
+ mime_type = mime_types[ext]
+
+ try:
+ with open(file_path, "rb") as file:
+ builder = Builder(self.manifestDefinition)
+ output = io.BytesIO(bytearray())
+ builder.sign(self.signer, mime_type, file, output)
+ output.seek(0)
+ reader = Reader(mime_type, output)
+ json_data = reader.json()
+ self.assertIn("Python Test", json_data)
+ self.assertNotIn("validation_status", json_data)
+ output.close()
+ except Error.NotSupported:
+ continue
+ except Exception as e:
+ self.fail(f"Failed to sign {filename}: {str(e)}")
+
+
+class TestStream(unittest.TestCase):
+ def setUp(self):
+ # Create a temporary file for testing
+ self.temp_file = io.BytesIO()
+ self.test_data = b"Hello, World!"
+ self.temp_file.write(self.test_data)
+ self.temp_file.seek(0)
+
+ def tearDown(self):
+ self.temp_file.close()
+
+ def test_stream_initialization(self):
+ """Test proper initialization of Stream class."""
+ stream = Stream(self.temp_file)
+ self.assertTrue(stream.initialized)
+ self.assertFalse(stream.closed)
+ stream.close()
+
+ def test_stream_initialization_with_invalid_object(self):
+ """Test initialization with an invalid object."""
+ with self.assertRaises(TypeError):
+ Stream("not a file-like object")
+
+ def test_stream_read(self):
+ """Test reading from a stream."""
+ stream = Stream(self.temp_file)
+ try:
+ # Create a buffer to read into
+ buffer = (ctypes.c_ubyte * 13)()
+ # Read the data
+ bytes_read = stream._read_cb(None, buffer, 13)
+ # Verify the data
+ self.assertEqual(bytes_read, 13)
+ self.assertEqual(bytes(buffer[:bytes_read]), self.test_data)
+ finally:
+ stream.close()
+
+ def test_stream_write(self):
+ """Test writing to a stream."""
+ output = io.BytesIO()
+ stream = Stream(output)
+ try:
+ # Create test data
+ test_data = b"Test Write"
+ buffer = (ctypes.c_ubyte * len(test_data))(*test_data)
+ # Write the data
+ bytes_written = stream._write_cb(None, buffer, len(test_data))
+ # Verify the data
+ self.assertEqual(bytes_written, len(test_data))
+ output.seek(0)
+ self.assertEqual(output.read(), test_data)
+ finally:
+ stream.close()
+
+ def test_stream_seek(self):
+ """Test seeking in a stream."""
+ stream = Stream(self.temp_file)
+ try:
+ # Seek to position 7 (after "Hello, ")
+ new_pos = stream._seek_cb(None, 7, 0) # 0 = SEEK_SET
+ self.assertEqual(new_pos, 7)
+ # Read from new position
+ buffer = (ctypes.c_ubyte * 6)()
+ bytes_read = stream._read_cb(None, buffer, 6)
+ self.assertEqual(bytes(buffer[:bytes_read]), b"World!")
+ finally:
+ stream.close()
+
+ def test_stream_flush(self):
+ """Test flushing a stream."""
+ output = io.BytesIO()
+ stream = Stream(output)
+ try:
+ # Write some data
+ test_data = b"Test Flush"
+ buffer = (ctypes.c_ubyte * len(test_data))(*test_data)
+ stream._write_cb(None, buffer, len(test_data))
+ # Flush the stream
+ result = stream._flush_cb(None)
+ self.assertEqual(result, 0)
+ finally:
+ stream.close()
+
+ def test_stream_context_manager(self):
+ """Test stream as a context manager."""
+ with Stream(self.temp_file) as stream:
+ self.assertTrue(stream.initialized)
+ self.assertFalse(stream.closed)
+ self.assertTrue(stream.closed)
+
+ def test_stream_double_close(self):
+ """Test that multiple close calls are handled gracefully."""
+ stream = Stream(self.temp_file)
+ stream.close()
+ # Second close should not raise an exception
+ stream.close()
+ self.assertTrue(stream.closed)
+
+ def test_stream_read_after_close(self):
+ """Test reading from a closed stream."""
+ stream = Stream(self.temp_file)
+ # Store callbacks before closing
+ read_cb = stream._read_cb
+ stream.close()
+ buffer = (ctypes.c_ubyte * 13)()
+ # Reading from closed stream should return -1
+ self.assertEqual(read_cb(None, buffer, 13), -1)
+
+ def test_stream_write_after_close(self):
+ """Test writing to a closed stream."""
+ stream = Stream(self.temp_file)
+ # Store callbacks before closing
+ write_cb = stream._write_cb
+ stream.close()
+ test_data = b"Test Write"
+ buffer = (ctypes.c_ubyte * len(test_data))(*test_data)
+ # Writing to closed stream should return -1
+ self.assertEqual(write_cb(None, buffer, len(test_data)), -1)
+
+ def test_stream_seek_after_close(self):
+ """Test seeking in a closed stream."""
+ stream = Stream(self.temp_file)
+ # Store callbacks before closing
+ seek_cb = stream._seek_cb
+ stream.close()
+ # Seeking in closed stream should return -1
+ self.assertEqual(seek_cb(None, 5, 0), -1)
+
+ def test_stream_flush_after_close(self):
+ """Test flushing a closed stream."""
+ stream = Stream(self.temp_file)
+ # Store callbacks before closing
+ flush_cb = stream._flush_cb
+ stream.close()
+ # Flushing closed stream should return -1
+ self.assertEqual(flush_cb(None), -1)
+
+
if __name__ == '__main__':
unittest.main()