diff --git a/tests/c2pa.py b/tests/c2pa.py index e84d3f4c..270ef070 100644 --- a/tests/c2pa.py +++ b/tests/c2pa.py @@ -5,6 +5,7 @@ import os from pathlib import Path from typing import Optional, Union, Callable, Any +import time # Determine the library name based on the platform if sys.platform == "win32": @@ -51,27 +52,27 @@ def _validate_library_exports(lib): """Validate that all required functions are present in the loaded library. - + This validation is crucial for several security and reliability reasons: - + 1. Security: - Prevents loading of libraries that might be missing critical functions - Ensures the library has all expected functionality before any code execution - Helps detect tampered or incomplete libraries - + 2. Reliability: - Fails fast if the library is incomplete or corrupted - Prevents runtime errors from missing functions - Ensures all required functionality is available before use - + 3. Version Compatibility: - Helps detect version mismatches where the library doesn't have all expected functions - Prevents partial functionality that could lead to undefined behavior - Ensures the library matches the expected API version - + Args: lib: The loaded library object - + Raises: ImportError: If any required function is missing, with a detailed message listing the missing functions. This helps diagnose issues with the library @@ -81,7 +82,7 @@ def _validate_library_exports(lib): for func_name in _REQUIRED_FUNCTIONS: if not hasattr(lib, func_name): missing_functions.append(func_name) - + if missing_functions: raise ImportError( f"Library is missing required functions symbols: {', '.join(missing_functions)}\n" @@ -90,13 +91,13 @@ def _validate_library_exports(lib): def _try_load_library(path): """Attempt to load and validate a library from the given path. - + Args: path: Path to the library file - + Returns: The loaded library object if successful - + Raises: ImportError: If the library cannot be loaded, with specific error messages for: - Permission errors @@ -108,7 +109,7 @@ def _try_load_library(path): # Check file permissions if not os.access(path, os.R_OK): raise ImportError(f"Permission denied: Cannot read library at {path}") - + # Try to load the library try: lib = ctypes.CDLL(str(path)) @@ -119,12 +120,12 @@ def _try_load_library(path): raise ImportError(f"Corrupted or invalid library at {path}") else: raise ImportError(f"Failed to load library at {path}: {e}") - + # Validate the library exports _validate_library_exports(lib) - + return lib - + except ImportError: raise except Exception as e: @@ -187,23 +188,23 @@ class C2paSigner(ctypes.Structure): class C2paStream(ctypes.Structure): """A C2paStream is a Rust Read/Write/Seek stream that can be created in C. - + This class represents a low-level stream interface that bridges Python and Rust/C code. It implements the Rust Read/Write/Seek traits in C, allowing for efficient data transfer between Python and the C2PA library without unnecessary copying. - + The stream is used for various operations including: - Reading manifest data from files - Writing signed content to files - Handling binary resources - Managing ingredient data - + The structure contains function pointers that implement the stream operations: - reader: Function to read data from the stream - seeker: Function to change the stream position - writer: Function to write data to the stream - flusher: Function to flush any buffered data - + This is a critical component for performance as it allows direct memory access between Python and the C2PA library without intermediate copies. """ @@ -278,14 +279,11 @@ def _setup_function(func, argtypes, restype=None): _setup_function(_lib.c2pa_builder_add_resource, [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int) -#_setup_function(_lib.c2pa_builder_add_ingredient, -# [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)], -# ctypes.c_int) - -# Set up additional Builder function prototypes _setup_function(_lib.c2pa_builder_add_ingredient_from_stream, [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int) + +# Set up additional Builder function prototypes _setup_function(_lib.c2pa_builder_to_archive, [ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paStream)], ctypes.c_int) @@ -390,11 +388,11 @@ class Verify(Exception): class _StringContainer: """Container class to hold encoded strings and prevent them from being garbage collected. - + This class is used to store encoded strings that need to remain in memory while being used by C functions. The strings are stored as instance attributes to prevent them from being garbage collected. - + This is an internal implementation detail and should not be used outside this module. """ def __init__(self): @@ -409,6 +407,7 @@ def _handle_string_result(result: ctypes.c_void_p, check_error: bool = True) -> if error: error_str = ctypes.cast(error, ctypes.c_char_p).value.decode('utf-8') _lib.c2pa_string_free(error) + print("## error_str:", error_str) parts = error_str.split(' ', 1) if len(parts) > 1: error_type, message = parts @@ -444,10 +443,11 @@ def _handle_string_result(result: ctypes.c_void_p, check_error: bool = True) -> raise C2paError.Verify(message) return error_str return None - + # Convert to Python string and free the Rust-allocated memory py_string = ctypes.cast(result, ctypes.c_char_p).value.decode('utf-8') _lib.c2pa_string_free(result) + return py_string def sdk_version() -> str: @@ -464,11 +464,11 @@ def version() -> str: def load_settings(settings: str, format: str = "json") -> None: """Load C2PA settings from a string. - + Args: settings: The settings string to load format: The format of the settings string (default: "json") - + Raises: C2paError: If there was an error loading the settings """ @@ -483,43 +483,43 @@ def load_settings(settings: str, format: str = "json") -> None: def read_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = None) -> str: """Read a C2PA manifest from a file. - + Args: path: Path to the file to read data_dir: Optional directory to write binary resources to - + Returns: The manifest as a JSON string - + Raises: C2paError: If there was an error reading the file """ container = _StringContainer() - + container._path_str = str(path).encode('utf-8') container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None - + result = _lib.c2pa_read_file(container._path_str, container._data_dir_str) return _handle_string_result(result) def read_ingredient_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = None) -> str: """Read a C2PA ingredient from a file. - + Args: path: Path to the file to read data_dir: Optional directory to write binary resources to - + Returns: The ingredient as a JSON string - + Raises: C2paError: If there was an error reading the file """ container = _StringContainer() - + container._path_str = str(path).encode('utf-8') container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None - + result = _lib.c2pa_read_ingredient_file(container._path_str, container._data_dir_str) return _handle_string_result(result) @@ -531,17 +531,17 @@ def sign_file( data_dir: Optional[Union[str, Path]] = None ) -> str: """Sign a file with a C2PA manifest. - + Args: source_path: Path to the source file dest_path: Path to write the signed file to manifest: The manifest JSON string signer_info: Signing configuration data_dir: Optional directory to write binary resources to - + Returns: Result information as a JSON string - + Raises: C2paError: If there was an error signing the file C2paError.Encoding: If any of the string inputs contain invalid UTF-8 characters @@ -554,7 +554,7 @@ def sign_file( signer_info._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None except UnicodeError as e: raise C2paError.Encoding(f"Invalid UTF-8 characters in input strings: {str(e)}") - + result = _lib.c2pa_sign_file( signer_info._source_str, signer_info._dest_str, @@ -565,44 +565,37 @@ def sign_file( return _handle_string_result(result) class Stream: - """High-level wrapper for C2paStream operations that provides a Python-friendly interface. - - This class serves as a bridge between Python's file-like objects and the low-level C2paStream - interface. It provides several important benefits: - - 1. Memory Safety: - - Manages memory allocation and deallocation for C callbacks - - Prevents memory leaks through proper cleanup in __del__ and close() - - Handles buffer management for read/write operations - - 2. Error Handling: - - Provides detailed error messages for stream operations - - Implements proper error propagation to Python exceptions - - Ensures resources are cleaned up even when errors occur - - 3. Resource Management: - - Implements context manager protocol (__enter__/__exit__) - - Ensures proper cleanup of C resources - - Handles file descriptor lifecycle - - 4. Performance: - - Minimizes data copying between Python and C - - Uses direct memory access where possible - - Implements efficient buffer management - - The class wraps any Python file-like object that implements the standard stream interface - (read, write, seek, tell, flush) and provides the necessary callbacks for the C2PA library. - """ + # Class-level counter for generating unique stream IDs + # (useful for tracing streams usage in debug) + _next_stream_id = 0 + # Maximum value for a 32-bit signed integer (2^31 - 1) + # This prevents integer overflow which could cause: + # 1. Unexpected behavior in stream ID generation + # 2. Potential security issues if IDs wrap around + # 3. Memory issues if the number grows too large + # When this limit is reached, we reset to 0 since the timestamp component + # of the stream ID ensures uniqueness even after counter reset + _MAX_STREAM_ID = 2**31 - 1 + def __init__(self, file): """Initialize a new Stream wrapper around a file-like object. - + Args: file: A file-like object that implements read, write, seek, tell, and flush methods - + Raises: TypeError: If the file object doesn't implement all required methods """ - # Validate that the object has the required stream-like methods + # Generate unique stream ID with timestamp + timestamp = int(time.time() * 1000) # milliseconds since epoch + + # Safely increment stream ID with overflow protection + if Stream._next_stream_id >= Stream._MAX_STREAM_ID: + Stream._next_stream_id = 0 # Reset to 0 if we hit the maximum + self._stream_id = f"{timestamp}-{Stream._next_stream_id}" + Stream._next_stream_id += 1 + + # Rest of the existing initialization code... required_methods = ['read', 'write', 'seek', 'tell', 'flush'] missing_methods = [method for method in required_methods if not hasattr(file, method)] if missing_methods: @@ -610,12 +603,14 @@ def __init__(self, file): ', '.join(required_methods), ', '.join(missing_methods) )) - + self._file = file self._stream = None # Initialize to None to track if stream was created self._closed = False # Track if the stream has been closed self._initialized = False # Track if stream was successfully initialized - + + # print(f'## Created stream {self._stream_id} for file {self._file}') + # Pre-allocate error message strings to avoid string formatting overhead self._error_messages = { 'read': "Error: Attempted to read from uninitialized or closed stream", @@ -630,22 +625,22 @@ def __init__(self, file): 'callback_error': "Error cleaning up callback {}: {}", 'stream_error': "Error releasing stream: {}" } - + def read_callback(ctx, data, length): """Callback function for reading data from the Python stream. - + This function is called by the C2PA library when it needs to read data. It handles: - Stream state validation - Memory safety - Error handling - Buffer management - + Args: ctx: The stream context (unused) data: Pointer to the buffer to read into length: Maximum number of bytes to read - + Returns: Number of bytes read, or -1 on error """ @@ -656,11 +651,11 @@ def read_callback(ctx, data, length): if not data or length <= 0: # print(self._error_messages['memory_error'].format("Invalid read parameters"), file=sys.stderr) return -1 - + buffer = self._file.read(length) if not buffer: # EOF return 0 - + # Ensure we don't write beyond the allocated memory actual_length = min(len(buffer), length) # Create a view of the buffer to avoid copying @@ -671,21 +666,21 @@ def read_callback(ctx, data, length): except Exception as e: # print(self._error_messages['read_error'].format(str(e)), file=sys.stderr) return -1 - + def seek_callback(ctx, offset, whence): """Callback function for seeking in the Python stream. - + This function is called by the C2PA library when it needs to change the stream position. It handles: - Stream state validation - Position validation - Error handling - + Args: ctx: The stream context (unused) offset: The offset to seek to whence: The reference point (0=start, 1=current, 2=end) - + Returns: New position in the stream, or -1 on error """ @@ -698,22 +693,22 @@ def seek_callback(ctx, offset, whence): except Exception as e: # print(self._error_messages['seek_error'].format(str(e)), file=sys.stderr) return -1 - + def write_callback(ctx, data, length): """Callback function for writing data to the Python stream. - + This function is called by the C2PA library when it needs to write data. It handles: - Stream state validation - Memory safety - Error handling - Buffer management - + Args: ctx: The stream context (unused) data: Pointer to the data to write length: Number of bytes to write - + Returns: Number of bytes written, or -1 on error """ @@ -724,7 +719,7 @@ def write_callback(ctx, data, length): if not data or length <= 0: # print(self._error_messages['memory_error'].format("Invalid write parameters"), file=sys.stderr) return -1 - + # Create a temporary buffer to safely handle the data temp_buffer = (ctypes.c_ubyte * length)() try: @@ -739,18 +734,18 @@ def write_callback(ctx, data, length): except Exception as e: # print(self._error_messages['write_error'].format(str(e)), file=sys.stderr) return -1 - + def flush_callback(ctx): """Callback function for flushing the Python stream. - + This function is called by the C2PA library when it needs to ensure all buffered data is written. It handles: - Stream state validation - Error handling - + Args: ctx: The stream context (unused) - + Returns: 0 on success, -1 on error """ @@ -763,13 +758,13 @@ def flush_callback(ctx): except Exception as e: # print(self._error_messages['flush_error'].format(str(e)), file=sys.stderr) return -1 - + # Create callbacks that will be kept alive by being instance attributes self._read_cb = ReadCallback(read_callback) self._seek_cb = SeekCallback(seek_callback) self._write_cb = WriteCallback(write_callback) self._flush_cb = FlushCallback(flush_callback) - + # Create the stream self._stream = _lib.c2pa_create_stream( None, # context @@ -781,7 +776,7 @@ def flush_callback(ctx): if not self._stream: error = _handle_string_result(_lib.c2pa_error()) raise Exception("Failed to create stream: {}".format(error)) - + self._initialized = True def __enter__(self): @@ -800,11 +795,12 @@ def __del__(self): def close(self): """Release the stream resources. - + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. Errors during cleanup are logged but not raised to ensure cleanup completes. Multiple calls to close() are handled gracefully. """ + if self._closed: return @@ -836,7 +832,7 @@ def close(self): @property def closed(self) -> bool: """Check if the stream is closed. - + Returns: bool: True if the stream is closed, False otherwise """ @@ -845,7 +841,7 @@ def closed(self) -> bool: @property def initialized(self) -> bool: """Check if the stream is properly initialized. - + Returns: bool: True if the stream is initialized, False otherwise """ @@ -853,19 +849,20 @@ def initialized(self) -> bool: class Reader: """High-level wrapper for C2PA Reader operations.""" - + def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = None, manifest_data: Optional[Any] = None): """Create a new Reader. - + Args: format_or_path: The format or path to read from stream: Optional stream to read from (any Python stream-like object) manifest_data: Optional manifest data in bytes - + Raises: C2paError: If there was an error creating the reader C2paError.Encoding: If any of the string inputs contain invalid UTF-8 characters """ + self._reader = None self._own_stream = None self._error_messages = { @@ -879,11 +876,11 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non 'reader_cleanup': "Error cleaning up reader: {}", 'encoding_error': "Invalid UTF-8 characters in input: {}" } - + # Check for unsupported format if format_or_path == "badFormat": raise C2paError.NotSupported(self._error_messages['unsupported']) - + if stream is None: # Create a stream from the file path @@ -894,49 +891,50 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non import mimetypes else: mimetypes = sys.modules['mimetypes'] - + path = str(format_or_path) mime_type = mimetypes.guess_type(path)[0] or 'application/octet-stream' - + # Keep mime_type string alive try: self._mime_type_str = mime_type.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) - + try: # Open the file and create a stream file = open(path, 'rb') self._own_stream = Stream(file) - - # Create reader from the file stream + self._reader = _lib.c2pa_reader_from_stream( self._mime_type_str, self._own_stream._stream ) - + if not self._reader: self._own_stream.close() file.close() - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) + # Store the file to close it later self._file = file - + except Exception as e: if self._own_stream: self._own_stream.close() if hasattr(self, '_file'): self._file.close() raise C2paError.Io(self._error_messages['io_error'].format(str(e))) - elif isinstance(stream, str): # If stream is a string, treat it as a path and try to open it try: file = open(stream, 'rb') self._own_stream = Stream(file) self._format_str = format_or_path.encode('utf-8') - + if manifest_data is None: self._reader = _lib.c2pa_reader_from_stream(self._format_str, self._own_stream._stream) else: @@ -944,17 +942,20 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non raise TypeError(self._error_messages['manifest_error']) manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) self._reader = _lib.c2pa_reader_from_manifest_data_and_stream( - self._format_str, - self._own_stream._stream, - manifest_array, + self._format_str, + self._own_stream._stream, + manifest_array, len(manifest_data) ) - + if not self._reader: self._own_stream.close() file.close() - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) + self._file = file except Exception as e: if self._own_stream: @@ -966,6 +967,7 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non # Use the provided stream # Keep format string alive self._format_str = format_or_path.encode('utf-8') + with Stream(stream) as stream_obj: if manifest_data is None: self._reader = _lib.c2pa_reader_from_stream(self._format_str, stream_obj._stream) @@ -974,28 +976,32 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non raise TypeError(self._error_messages['manifest_error']) manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) self._reader = _lib.c2pa_reader_from_manifest_data_and_stream( - self._format_str, - stream_obj._stream, - manifest_array, + self._format_str, + stream_obj._stream, + manifest_array, len(manifest_data) ) - + if not self._reader: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) + def __enter__(self): return self - + def __exit__(self, exc_type, exc_val, exc_tb): self.close() - + def close(self): """Release the reader resources. - + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. Errors during cleanup are logged but not raised to ensure cleanup completes. Multiple calls to close() are handled gracefully. """ + # Track if we've already cleaned up if not hasattr(self, '_closed'): self._closed = False @@ -1012,7 +1018,7 @@ def close(self): print(self._error_messages['reader_cleanup'].format(str(e)), file=sys.stderr) finally: self._reader = None - + # Clean up stream if hasattr(self, '_own_stream') and self._own_stream: try: @@ -1021,7 +1027,7 @@ def close(self): print(self._error_messages['stream_error'].format(str(e)), file=sys.stderr) finally: self._own_stream = None - + # Clean up file if hasattr(self, '_file'): try: @@ -1038,53 +1044,56 @@ def close(self): print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) finally: self._closed = True - + def json(self) -> str: """Get the manifest store as a JSON string. - + Returns: The manifest store as a JSON string - + Raises: C2paError: If there was an error getting the JSON """ + if not self._reader: raise C2paError("Reader is closed") result = _lib.c2pa_reader_json(self._reader) return _handle_string_result(result) - + def resource_to_stream(self, uri: str, stream: Any) -> int: """Write a resource to a stream. - + Args: uri: The URI of the resource to write stream: The stream to write to (any Python stream-like object) - + Returns: The number of bytes written - + Raises: C2paError: If there was an error writing the resource """ if not self._reader: raise C2paError("Reader is closed") - + # Keep uri string alive self._uri_str = uri.encode('utf-8') with Stream(stream) as stream_obj: result = _lib.c2pa_reader_resource_to_stream(self._reader, self._uri_str, stream_obj._stream) - + if result < 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + return result class Signer: """High-level wrapper for C2PA Signer operations.""" - + def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): """Initialize a new Signer instance. - + Note: This constructor is not meant to be called directly. Use from_info() or from_callback() instead. """ @@ -1101,31 +1110,35 @@ def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): 'invalid_certs': "Invalid certificate data: {}", 'invalid_tsa': "Invalid TSA URL: {}" } - + @classmethod def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': """Create a new Signer from signer information. - + Args: signer_info: The signer configuration - + Returns: A new Signer instance - + Raises: C2paError: If there was an error creating the signer """ # Validate signer info before creating if not signer_info.sign_cert or not signer_info.private_key: - raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate or private key")) - + raise C2paError("Missing certificate or private key") + signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) - + if not signer_ptr: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer from configured signer info") + return cls(signer_ptr) - + @classmethod def from_callback( cls, @@ -1135,16 +1148,16 @@ def from_callback( tsa_url: Optional[str] = None ) -> 'Signer': """Create a signer from a callback function. - + Args: callback: Function that signs data and returns the signature alg: The signing algorithm to use certs: Certificate chain in PEM format tsa_url: Optional RFC 3161 timestamp authority URL - + Returns: A new Signer instance - + Raises: C2paError: If there was an error creating the signer C2paError.Encoding: If the certificate data or TSA URL contains invalid UTF-8 characters @@ -1152,10 +1165,10 @@ def from_callback( # Validate inputs before creating if not certs: raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate data")) - + if tsa_url and not tsa_url.startswith(('http://', 'https://')): raise C2paError(cls._error_messages['invalid_tsa'].format("Invalid TSA URL format")) - + # Create a wrapper callback that handles errors and memory management def wrapped_callback(data: bytes) -> bytes: try: @@ -1165,14 +1178,14 @@ def wrapped_callback(data: bytes) -> bytes: except Exception as e: print(cls._error_messages['callback_error'].format(str(e)), file=sys.stderr) raise C2paError.Signature(str(e)) - + # Encode strings with error handling try: certs_bytes = certs.encode('utf-8') tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None except UnicodeError as e: raise C2paError.Encoding(cls._error_messages['encoding_error'].format(str(e))) - + # Create the signer with the wrapped callback signer_ptr = _lib.c2pa_signer_create( None, # context @@ -1181,32 +1194,35 @@ def wrapped_callback(data: bytes) -> bytes: certs_bytes, tsa_url_bytes ) - + if not signer_ptr: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to create signer") + return cls(signer_ptr) - + def __enter__(self): """Context manager entry.""" if self._closed: raise C2paError(self._error_messages['closed_error']) return self - + def __exit__(self, exc_type, exc_val, exc_tb): """Context manager exit.""" self.close() - + def close(self): """Release the signer resources. - + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. Errors during cleanup are logged but not raised to ensure cleanup completes. Multiple calls to close() are handled gracefully. """ if self._closed: return - + try: if self._signer: try: @@ -1219,33 +1235,36 @@ def close(self): print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) finally: self._closed = True - + def reserve_size(self) -> int: """Get the size to reserve for signatures from this signer. - + Returns: The size to reserve in bytes - + Raises: C2paError: If there was an error getting the size """ if self._closed or not self._signer: raise C2paError(self._error_messages['closed_error']) - + try: result = _lib.c2pa_signer_reserve_size(self._signer) - + if result < 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to get reserve size") + return result except Exception as e: raise C2paError(self._error_messages['size_error'].format(str(e))) - + @property def closed(self) -> bool: """Check if the signer is closed. - + Returns: bool: True if the signer is closed, False otherwise """ @@ -1256,13 +1275,14 @@ class Builder: def __init__(self, manifest_json: Any): """Initialize a new Builder instance. - + Args: manifest_json: The manifest JSON definition (string or dict) - + Raises: C2paError: If there was an error creating the builder C2paError.Encoding: If the manifest JSON contains invalid UTF-8 characters + C2paError.Json: If the manifest JSON cannot be serialized """ self._builder = None self._error_messages = { @@ -1276,57 +1296,67 @@ def __init__(self, manifest_json: Any): 'ingredient_error': "Error adding ingredient: {}", 'archive_error': "Error writing archive: {}", 'sign_error': "Error during signing: {}", - 'encoding_error': "Invalid UTF-8 characters in manifest: {}" + 'encoding_error': "Invalid UTF-8 characters in manifest: {}", + 'json_error': "Failed to serialize manifest JSON: {}" } - + if not isinstance(manifest_json, str): - manifest_json = json.dumps(manifest_json) - + try: + manifest_json = json.dumps(manifest_json) + except (TypeError, ValueError) as e: + raise C2paError.Json(self._error_messages['json_error'].format(str(e))) + try: json_str = manifest_json.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) - + self._builder = _lib.c2pa_builder_from_json(json_str) - + if not self._builder: - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['builder_error'].format("Unknown error")) @classmethod def from_json(cls, manifest_json: Any) -> 'Builder': """Create a new Builder from a JSON manifest. - + Args: manifest_json: The JSON manifest definition - + Returns: A new Builder instance - + Raises: C2paError: If there was an error creating the builder """ return cls(manifest_json) - + @classmethod def from_archive(cls, stream: Any) -> 'Builder': """Create a new Builder from an archive stream. - + Args: stream: The stream containing the archive (any Python stream-like object) - + Returns: A new Builder instance - + Raises: - C2paError: If there was an error creating the builder + C2paError: If there was an error creating the builder from the archive """ builder = cls({}) stream_obj = Stream(stream) builder._builder = _lib.c2pa_builder_from_archive(stream_obj._stream) - + if not builder._builder: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to create builder from archive") + return builder def __del__(self): @@ -1335,7 +1365,7 @@ def __del__(self): def close(self): """Release the builder resources. - + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. Errors during cleanup are logged but not raised to ensure cleanup completes. Multiple calls to close() are handled gracefully. @@ -1366,163 +1396,178 @@ def set_manifest(self, manifest): manifest = json.dumps(manifest) super().with_json(manifest) return self - + def __enter__(self): return self - + def __exit__(self, exc_type, exc_val, exc_tb): self.close() - + def set_no_embed(self): """Set the no-embed flag. - + When set, the builder will not embed a C2PA manifest store into the asset when signing. This is useful when creating cloud or sidecar manifests. """ if not self._builder: raise C2paError(self._error_messages['closed_error']) _lib.c2pa_builder_set_no_embed(self._builder) - + def set_remote_url(self, remote_url: str): """Set the remote URL. - + When set, the builder will embed a remote URL into the asset when signing. This is useful when creating cloud based Manifests. - + Args: remote_url: The remote URL to set - + Raises: - C2paError: If there was an error setting the URL + C2paError: If there was an error setting the remote URL """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + url_str = remote_url.encode('utf-8') result = _lib.c2pa_builder_set_remote_url(self._builder, url_str) - + if result != 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['url_error'].format("Unknown error")) + def add_resource(self, uri: str, stream: Any): """Add a resource to the builder. - + Args: uri: The URI to identify the resource stream: The stream containing the resource data (any Python stream-like object) - + Raises: C2paError: If there was an error adding the resource """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + uri_str = uri.encode('utf-8') with Stream(stream) as stream_obj: result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream) - + if result != 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['resource_error'].format("Unknown error")) + def add_ingredient(self, ingredient_json: str, format: str, source: Any): """Add an ingredient to the builder. - + Args: ingredient_json: The JSON ingredient definition format: The MIME type or extension of the ingredient source: The stream containing the ingredient data (any Python stream-like object) - + Raises: C2paError: If there was an error adding the ingredient C2paError.Encoding: If the ingredient JSON contains invalid UTF-8 characters """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + try: ingredient_str = ingredient_json.encode('utf-8') format_str = format.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) - + source_stream = Stream(source) result = _lib.c2pa_builder_add_ingredient_from_stream(self._builder, ingredient_str, format_str, source_stream._stream) - + if result != 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['ingredient_error'].format("Unknown error")) + def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: Any): """Add an ingredient from a stream to the builder. - + Args: ingredient_json: The JSON ingredient definition format: The MIME type or extension of the ingredient source: The stream containing the ingredient data (any Python stream-like object) - + Raises: C2paError: If there was an error adding the ingredient C2paError.Encoding: If the ingredient JSON or format contains invalid UTF-8 characters """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + try: ingredient_str = ingredient_json.encode('utf-8') format_str = format.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) - + with Stream(source) as source_stream: result = _lib.c2pa_builder_add_ingredient_from_stream( self._builder, ingredient_str, format_str, source_stream._stream) - + if result != 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['ingredient_error'].format("Unknown error")) + def to_archive(self, stream: Any): """Write an archive of the builder to a stream. - + Args: stream: The stream to write the archive to (any Python stream-like object) - + Raises: C2paError: If there was an error writing the archive """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + with Stream(stream) as stream_obj: result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream) - + if result != 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['archive_error'].format("Unknown error")) + def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Optional[bytes]: """Sign the builder's content and write to a destination stream. - + Args: format: The MIME type or extension of the content source: The source stream (any Python stream-like object) dest: The destination stream (any Python stream-like object) signer: The signer to use - + Returns: A tuple of (size of C2PA data, optional manifest bytes) - + Raises: C2paError: If there was an error during signing """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + # Convert Python streams to Stream objects source_stream = Stream(source) dest_stream = Stream(dest) - + try: format_str = format.encode('utf-8') manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() - + result = _lib.c2pa_builder_sign( self._builder, format_str, @@ -1531,17 +1576,19 @@ def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Op signer._signer, ctypes.byref(manifest_bytes_ptr) ) - + if result < 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + manifest_bytes = None if manifest_bytes_ptr: # Convert the manifest bytes to a Python bytes object size = result manifest_bytes = bytes(manifest_bytes_ptr[:size]) _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) - + return manifest_bytes finally: # Ensure both streams are cleaned up @@ -1550,25 +1597,24 @@ def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Op def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], signer: Signer) -> tuple[int, Optional[bytes]]: """Sign a file and write the signed data to an output file. - + Args: source_path: Path to the source file dest_path: Path to write the signed file to - signer: The signer to use - + Returns: A tuple of (size of C2PA data, optional manifest bytes) - + Raises: C2paError: If there was an error during signing """ if not self._builder: raise C2paError(self._error_messages['closed_error']) - + source_path_str = str(source_path).encode('utf-8') dest_path_str = str(dest_path).encode('utf-8') manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() - + result = _lib.c2pa_builder_sign_file( self._builder, source_path_str, @@ -1576,51 +1622,56 @@ def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], signer._signer, ctypes.byref(manifest_bytes_ptr) ) - + if result < 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + manifest_bytes = None if manifest_bytes_ptr: # Convert the manifest bytes to a Python bytes object size = result manifest_bytes = bytes(manifest_bytes_ptr[:size]) _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) - + return result, manifest_bytes def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]: """Convert a binary C2PA manifest into an embeddable version. - + Args: format: The MIME type or extension of the target format manifest_bytes: The raw manifest bytes - + Returns: A tuple of (size of result bytes, embeddable manifest bytes) - + Raises: C2paError: If there was an error converting the manifest """ format_str = format.encode('utf-8') manifest_array = (ctypes.c_ubyte * len(manifest_bytes))(*manifest_bytes) result_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() - + result = _lib.c2pa_format_embeddable( format_str, manifest_array, len(manifest_bytes), ctypes.byref(result_bytes_ptr) ) - + if result < 0: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to format embeddable manifest") + # Convert the result bytes to a Python bytes object size = result result_bytes = bytes(result_bytes_ptr[:size]) _lib.c2pa_manifest_bytes_free(result_bytes_ptr) - + return size, result_bytes def create_signer( @@ -1630,16 +1681,16 @@ def create_signer( tsa_url: Optional[str] = None ) -> Signer: """Create a signer from a callback function. - + Args: callback: Function that signs data and returns the signature alg: The signing algorithm to use certs: Certificate chain in PEM format tsa_url: Optional RFC 3161 timestamp authority URL - + Returns: A new Signer instance - + Raises: C2paError: If there was an error creating the signer C2paError.Encoding: If the certificate data or TSA URL contains invalid UTF-8 characters @@ -1649,7 +1700,7 @@ def create_signer( tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None except UnicodeError as e: raise C2paError.Encoding(f"Invalid UTF-8 characters in certificate data or TSA URL: {str(e)}") - + signer_ptr = _lib.c2pa_signer_create( None, # context SignerCallback(callback), @@ -1657,29 +1708,37 @@ def create_signer( certs_bytes, tsa_url_bytes ) - + if not signer_ptr: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer") + return Signer(signer_ptr) def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer: """Create a signer from signer information. - + Args: signer_info: The signer configuration - + Returns: A new Signer instance - + Raises: C2paError: If there was an error creating the signer """ signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) - + if not signer_ptr: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer from info") + return Signer(signer_ptr) # Rename the old create_signer to _create_signer since it's now internal @@ -1687,14 +1746,14 @@ def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer: def ed25519_sign(data: bytes, private_key: str) -> bytes: """Sign data using the Ed25519 algorithm. - + Args: data: The data to sign private_key: The private key in PEM format - + Returns: The signature bytes - + Raises: C2paError: If there was an error signing the data C2paError.Encoding: If the private key contains invalid UTF-8 characters @@ -1704,18 +1763,21 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: key_str = private_key.encode('utf-8') except UnicodeError as e: raise C2paError.Encoding(f"Invalid UTF-8 characters in private key: {str(e)}") - + signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str) - + if not signature_ptr: - _handle_string_result(_lib.c2pa_error()) - + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to sign data with Ed25519") + try: # Ed25519 signatures are always 64 bytes signature = bytes(signature_ptr[:64]) finally: _lib.c2pa_signature_free(signature_ptr) - + return signature __all__ = [ diff --git a/tests/fixtures/A-signed.png b/tests/fixtures/A-signed.png new file mode 100644 index 00000000..ffcddfcc Binary files /dev/null and b/tests/fixtures/A-signed.png differ diff --git a/tests/fixtures/A.png b/tests/fixtures/A.png new file mode 100644 index 00000000..068d866c Binary files /dev/null and b/tests/fixtures/A.png differ diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index ed509fad..d3c93701 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -17,7 +17,7 @@ import unittest from unittest.mock import mock_open, patch -from c2pa import Builder, C2paError as Error, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version #, load_settings_file +from c2pa import Builder, C2paError as Error, C2paError, Reader, C2paSigningAlg as SigningAlg, C2paSignerInfo, Signer, sdk_version #, load_settings_file PROJECT_PATH = os.getcwd() @@ -88,6 +88,32 @@ class TestBuilder(unittest.TestCase): ] } + manifestDefinitionV2 = { + "claim_generator": "python_test", + "claim_generator_info": [{ + "name": "python_test", + "version": "0.0.1", + }], + "claim_version": 2, + "format": "image/jpeg", + "title": "Python Test Image", + "ingredients": [], + "assertions": [ + { 'label': 'stds.schema-org.CreativeWork', + 'data': { + '@context': 'http://schema.org/', + '@type': 'CreativeWork', + 'author': [ + { '@type': 'Person', + 'name': 'Gavin Peacock' + } + ] + }, + 'kind': 'Json' + } + ] + } + # Define a function that signs data with PS256 using a private key #def sign(data: bytes) -> bytes: # key = open("tests/fixtures/ps256.pem","rb").read() @@ -112,6 +138,7 @@ class TestBuilder(unittest.TestCase): def _read_manifest(self, reader): json_data = reader.json() + # print(json_data) self.assertIn("Python Test", json_data) self.assertNotIn("validation_status", json_data) @@ -124,6 +151,54 @@ def test_streams_sign(self): with Reader("image/jpeg", output) as reader: self._read_manifest(reader) + def test_streams_sign_with_ingredient(self): + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as output, \ + Builder(TestBuilder.manifestDefinition) as builder: + + ingredient_json = '{"title": "test-ingredient"}' + with open(os.path.join(PROJECT_PATH, "tests", "fixtures", "A.png"), 'rb') as f: + builder.add_ingredient(ingredient_json, "image/png", f) + + builder.sign(TestBuilder.signer, "image/jpeg", file, output) + output.seek(0) + with Reader("image/jpeg", output) as reader: + self._read_manifest(reader) + + def test_streams_sign_with_ingredient_with_manifest_v2(self): + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as output, \ + Builder(TestBuilder.manifestDefinitionV2) as builder: + + ingredient_json = '{"title": "test-ingredient"}' + # The A-signed ingredient is recent, signed with v2 claims... + # So we need the Builder manifest json to be flagged as v2 too + with open(os.path.join(PROJECT_PATH, "tests", "fixtures", "A-signed.png"), 'rb') as f: + builder.add_ingredient(ingredient_json, "image/png", f) + + builder.sign(TestBuilder.signer, "image/jpeg", file, output) + + output.seek(0) + + with Reader("image/jpeg", output) as reader: + self._read_manifest(reader) + + def test_streams_handle_claims_version_mismatch_during_signing(self): + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as output, \ + Builder(TestBuilder.manifestDefinition) as builder: + + ingredient_json = '{"title": "test-ingredient"}' + # We are using an ingredient with v2 claims, but the manifest in the builder is v1 + # So we expect a signing failure here + with open(os.path.join(PROJECT_PATH, "tests", "fixtures", "A-signed.png"), 'rb') as f: + builder.add_ingredient(ingredient_json, "image/png", f) + + with self.assertRaises(C2paError) as context: + builder.sign(TestBuilder.signer, "image/jpeg", file, output) + + self.assertIn("ingredient version too new", str(context.exception)) + def test_archive_sign(self): with open(testPath, "rb") as file, \ io.BytesIO(bytearray()) as archive: @@ -147,5 +222,81 @@ def test_remote_sign(self): with Reader("image/jpeg", output, manifest_data) as reader: self._read_manifest(reader) +class TestErrorHandling(unittest.TestCase): + def test_create_signer_from_info_error_handling(self): + # Test with invalid signer info (missing required fields) + invalid_signer_info = C2paSignerInfo( + alg=b"es256", + sign_cert=None, # Missing certificate + private_key=None, # Missing private key + ta_url=None + ) + + with self.assertRaises(C2paError) as context: + Signer.from_info(invalid_signer_info) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Missing certificate or private key", error_msg) + + def test_create_signer_from_info_later_error_handling(self): + # load the public keys from a pem file to create valid signer info + data_dir = "tests/fixtures/" + with open(data_dir + "es256_certs.pem", "rb") as cert_file, \ + open(data_dir + "es256_private.key", "rb") as key_file: + certs = cert_file.read() + key = key_file.read() + + # Test with invalid signer info (invalid algorithm) + invalid_signer_info = C2paSignerInfo( + alg=b"invalid-algorithm", + sign_cert=certs, # Missing certificate + private_key=key, # Missing private key + ta_url=None + ) + + with self.assertRaises(C2paError) as context: + Signer.from_info(invalid_signer_info) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Other Invalid signing algorithm", error_msg) + + def test_reader_initialization_error_handling(self): + # Test with non-existent file + with self.assertRaises(C2paError.Io) as context: + Reader("image/jpeg", "non_existent_file.jpg") + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("IO error", error_msg) + + # Test with invalid manifest data + with self.assertRaises(TypeError) as context: + Reader("image/jpeg", io.BytesIO(b"test"), manifest_data="not bytes") + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Invalid manifest data", error_msg) + + # Test with invalid format + with self.assertRaises(C2paError.NotSupported) as context: + Reader("badFormat", io.BytesIO(b"test")) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Unsupported format", error_msg) + + def test_reader_closed_stream_handling(self): + # Test with closed stream + closed_stream = io.BytesIO(b"test") + closed_stream.close() + with self.assertRaises(C2paError) as context: + Reader("image/jpeg", closed_stream) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Io Undefined error", error_msg) + if __name__ == '__main__': unittest.main()