diff --git a/.gitignore b/.gitignore index 8bbae039..89cd774b 100644 --- a/.gitignore +++ b/.gitignore @@ -6,3 +6,6 @@ # Mac OS X files .DS_Store .idea + +# Python cache files +tests/__pycache__/ diff --git a/tests/c2pa.py b/tests/c2pa.py index b9467477..e84d3f4c 100644 --- a/tests/c2pa.py +++ b/tests/c2pa.py @@ -2,6 +2,7 @@ import enum import json import sys +import os from pathlib import Path from typing import Optional, Union, Callable, Any @@ -13,6 +14,122 @@ else: _lib_name = "libc2pa_c.so" +# Define required function names +_REQUIRED_FUNCTIONS = [ + 'c2pa_version', + 'c2pa_error', + 'c2pa_string_free', + 'c2pa_load_settings', + 'c2pa_read_file', + 'c2pa_read_ingredient_file', + 'c2pa_sign_file', + 'c2pa_reader_from_stream', + 'c2pa_reader_from_manifest_data_and_stream', + 'c2pa_reader_free', + 'c2pa_reader_json', + 'c2pa_reader_resource_to_stream', + 'c2pa_builder_from_json', + 'c2pa_builder_from_archive', + 'c2pa_builder_free', + 'c2pa_builder_set_no_embed', + 'c2pa_builder_set_remote_url', + 'c2pa_builder_add_resource', + 'c2pa_builder_add_ingredient_from_stream', + 'c2pa_builder_to_archive', + 'c2pa_builder_sign', + 'c2pa_manifest_bytes_free', + 'c2pa_builder_data_hashed_placeholder', + 'c2pa_builder_sign_data_hashed_embeddable', + 'c2pa_format_embeddable', + 'c2pa_signer_create', + 'c2pa_signer_from_info', + 'c2pa_signer_reserve_size', + 'c2pa_signer_free', + 'c2pa_ed25519_sign', + 'c2pa_signature_free', +] + +def _validate_library_exports(lib): + """Validate that all required functions are present in the loaded library. + + This validation is crucial for several security and reliability reasons: + + 1. Security: + - Prevents loading of libraries that might be missing critical functions + - Ensures the library has all expected functionality before any code execution + - Helps detect tampered or incomplete libraries + + 2. Reliability: + - Fails fast if the library is incomplete or corrupted + - Prevents runtime errors from missing functions + - Ensures all required functionality is available before use + + 3. Version Compatibility: + - Helps detect version mismatches where the library doesn't have all expected functions + - Prevents partial functionality that could lead to undefined behavior + - Ensures the library matches the expected API version + + Args: + lib: The loaded library object + + Raises: + ImportError: If any required function is missing, with a detailed message listing + the missing functions. This helps diagnose issues with the library + installation or version compatibility. + """ + missing_functions = [] + for func_name in _REQUIRED_FUNCTIONS: + if not hasattr(lib, func_name): + missing_functions.append(func_name) + + if missing_functions: + raise ImportError( + f"Library is missing required functions symbols: {', '.join(missing_functions)}\n" + "This could indicate an incomplete or corrupted library installation or a version mismatch between the library and this Python wrapper" + ) + +def _try_load_library(path): + """Attempt to load and validate a library from the given path. + + Args: + path: Path to the library file + + Returns: + The loaded library object if successful + + Raises: + ImportError: If the library cannot be loaded, with specific error messages for: + - Permission errors + - Corrupted libraries + - Missing functions + - Other loading errors + """ + try: + # Check file permissions + if not os.access(path, os.R_OK): + raise ImportError(f"Permission denied: Cannot read library at {path}") + + # Try to load the library + try: + lib = ctypes.CDLL(str(path)) + except OSError as e: + if "Permission denied" in str(e): + raise ImportError(f"Permission denied: Cannot load library at {path}") + elif "invalid ELF header" in str(e) or "not a valid Win32 application" in str(e): + raise ImportError(f"Corrupted or invalid library at {path}") + else: + raise ImportError(f"Failed to load library at {path}: {e}") + + # Validate the library exports + _validate_library_exports(lib) + + return lib + + except ImportError: + raise + except Exception as e: + raise ImportError(f"Unexpected error loading library at {path}: {e}") + # Try to find the library in common locations _lib_paths = [ Path(__file__).parent / _lib_name, # Same directory as this file @@ -20,10 +137,18 @@ Path(__file__).parent.parent / "target" / "release" / _lib_name, # ../target/release ] +# Try each path until we find a valid library for _path in _lib_paths: if _path.exists(): - _lib = ctypes.CDLL(str(_path)) - break + try: + _lib = _try_load_library(_path) + break + except ImportError as e: + # If this is the last path, raise the error + if _path == _lib_paths[-1]: + raise ImportError(f"Could not find a valid {_lib_name} in any of: {[str(p) for p in _lib_paths]}\nLast error: {e}") + # Otherwise continue to the next path + continue else: raise ImportError(f"Could not find {_lib_name} in any of: {[str(p) for p in _lib_paths]}") @@ -61,13 +186,33 @@ class C2paSigner(ctypes.Structure): _fields_ = [] # Empty as it's opaque in the C API class C2paStream(ctypes.Structure): - """A C2paStream is a Rust Read/Write/Seek stream that can be created in C.""" + """A C2paStream is a Rust Read/Write/Seek stream that can be created in C. + + This class represents a low-level stream interface that bridges Python and Rust/C code. + It implements the Rust Read/Write/Seek traits in C, allowing for efficient data transfer + between Python and the C2PA library without unnecessary copying. + + The stream is used for various operations including: + - Reading manifest data from files + - Writing signed content to files + - Handling binary resources + - Managing ingredient data + + The structure contains function pointers that implement the stream operations: + - reader: Function to read data from the stream + - seeker: Function to change the stream position + - writer: Function to write data to the stream + - flusher: Function to flush any buffered data + + This is a critical component for performance as it allows direct memory access + between Python and the C2PA library without intermediate copies. + """ _fields_ = [ - ("context", ctypes.POINTER(StreamContext)), - ("reader", ReadCallback), - ("seeker", SeekCallback), - ("writer", WriteCallback), - ("flusher", FlushCallback), + ("context", ctypes.POINTER(StreamContext)), # Opaque context pointer for the stream + ("reader", ReadCallback), # Function to read data from the stream + ("seeker", SeekCallback), # Function to change stream position + ("writer", WriteCallback), # Function to write data to the stream + ("flusher", FlushCallback), # Function to flush buffered data ] class C2paSignerInfo(ctypes.Structure): @@ -104,7 +249,6 @@ def _setup_function(func, argtypes, restype=None): _setup_function(_lib.c2pa_version, [], ctypes.c_void_p) _setup_function(_lib.c2pa_error, [], ctypes.c_void_p) _setup_function(_lib.c2pa_string_free, [ctypes.c_void_p], None) -_setup_function(_lib.c2pa_release_string, [ctypes.c_void_p], None) _setup_function(_lib.c2pa_load_settings, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_int) _setup_function(_lib.c2pa_read_file, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p) _setup_function(_lib.c2pa_read_ingredient_file, [ctypes.c_char_p, ctypes.c_char_p], ctypes.c_void_p) @@ -244,6 +388,19 @@ class Verify(Exception): """Exception raised for verification errors.""" pass +class _StringContainer: + """Container class to hold encoded strings and prevent them from being garbage collected. + + This class is used to store encoded strings that need to remain in memory + while being used by C functions. The strings are stored as instance attributes + to prevent them from being garbage collected. + + This is an internal implementation detail and should not be used outside this module. + """ + def __init__(self): + """Initialize an empty string container.""" + pass + def _handle_string_result(result: ctypes.c_void_p, check_error: bool = True) -> Optional[str]: """Helper function to handle string results from C2PA functions.""" if not result: # NULL pointer @@ -337,10 +494,7 @@ def read_file(path: Union[str, Path], data_dir: Optional[Union[str, Path]] = Non Raises: C2paError: If there was an error reading the file """ - # Create a container to hold our strings - class StringContainer: - pass - container = StringContainer() + container = _StringContainer() container._path_str = str(path).encode('utf-8') container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None @@ -361,10 +515,7 @@ def read_ingredient_file(path: Union[str, Path], data_dir: Optional[Union[str, P Raises: C2paError: If there was an error reading the file """ - # Create a container to hold our strings - class StringContainer: - pass - container = StringContainer() + container = _StringContainer() container._path_str = str(path).encode('utf-8') container._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None @@ -393,12 +544,16 @@ def sign_file( Raises: C2paError: If there was an error signing the file + C2paError.Encoding: If any of the string inputs contain invalid UTF-8 characters """ # Store encoded strings as attributes of signer_info to keep them alive - signer_info._source_str = str(source_path).encode('utf-8') - signer_info._dest_str = str(dest_path).encode('utf-8') - signer_info._manifest_str = manifest.encode('utf-8') - signer_info._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None + try: + signer_info._source_str = str(source_path).encode('utf-8') + signer_info._dest_str = str(dest_path).encode('utf-8') + signer_info._manifest_str = manifest.encode('utf-8') + signer_info._data_dir_str = str(data_dir).encode('utf-8') if data_dir else None + except UnicodeError as e: + raise C2paError.Encoding(f"Invalid UTF-8 characters in input strings: {str(e)}") result = _lib.c2pa_sign_file( signer_info._source_str, @@ -409,47 +564,204 @@ def sign_file( ) return _handle_string_result(result) -# Helper class for stream operations class Stream: - """High-level wrapper for C2paStream operations.""" + """High-level wrapper for C2paStream operations that provides a Python-friendly interface. + + This class serves as a bridge between Python's file-like objects and the low-level C2paStream + interface. It provides several important benefits: + + 1. Memory Safety: + - Manages memory allocation and deallocation for C callbacks + - Prevents memory leaks through proper cleanup in __del__ and close() + - Handles buffer management for read/write operations + + 2. Error Handling: + - Provides detailed error messages for stream operations + - Implements proper error propagation to Python exceptions + - Ensures resources are cleaned up even when errors occur + + 3. Resource Management: + - Implements context manager protocol (__enter__/__exit__) + - Ensures proper cleanup of C resources + - Handles file descriptor lifecycle + + 4. Performance: + - Minimizes data copying between Python and C + - Uses direct memory access where possible + - Implements efficient buffer management + + The class wraps any Python file-like object that implements the standard stream interface + (read, write, seek, tell, flush) and provides the necessary callbacks for the C2PA library. + """ def __init__(self, file): + """Initialize a new Stream wrapper around a file-like object. + + Args: + file: A file-like object that implements read, write, seek, tell, and flush methods + + Raises: + TypeError: If the file object doesn't implement all required methods + """ # Validate that the object has the required stream-like methods - #required_methods = ['read', 'write', 'seek', 'tell', 'flush'] - #missing_methods = [method for method in required_methods if not hasattr(file, method)] - #if missing_methods: - # raise TypeError(f"Object must be a stream-like object with methods: {', '.join(required_methods)}. Missing: {', '.join(missing_methods)}") + required_methods = ['read', 'write', 'seek', 'tell', 'flush'] + missing_methods = [method for method in required_methods if not hasattr(file, method)] + if missing_methods: + raise TypeError("Object must be a stream-like object with methods: {}. Missing: {}".format( + ', '.join(required_methods), + ', '.join(missing_methods) + )) self._file = file + self._stream = None # Initialize to None to track if stream was created + self._closed = False # Track if the stream has been closed + self._initialized = False # Track if stream was successfully initialized + + # Pre-allocate error message strings to avoid string formatting overhead + self._error_messages = { + 'read': "Error: Attempted to read from uninitialized or closed stream", + 'seek': "Error: Attempted to seek in uninitialized or closed stream", + 'write': "Error: Attempted to write to uninitialized or closed stream", + 'flush': "Error: Attempted to flush uninitialized or closed stream", + 'read_error': "Error reading from stream: {}", + 'seek_error': "Error seeking in stream: {}", + 'write_error': "Error writing to stream: {}", + 'flush_error': "Error flushing stream: {}", + 'cleanup_error': "Error during cleanup: {}", + 'callback_error': "Error cleaning up callback {}: {}", + 'stream_error': "Error releasing stream: {}" + } def read_callback(ctx, data, length): + """Callback function for reading data from the Python stream. + + This function is called by the C2PA library when it needs to read data. + It handles: + - Stream state validation + - Memory safety + - Error handling + - Buffer management + + Args: + ctx: The stream context (unused) + data: Pointer to the buffer to read into + length: Maximum number of bytes to read + + Returns: + Number of bytes read, or -1 on error + """ + if not self._initialized or self._closed: + # print(self._error_messages['read'], file=sys.stderr) + return -1 try: + if not data or length <= 0: + # print(self._error_messages['memory_error'].format("Invalid read parameters"), file=sys.stderr) + return -1 + buffer = self._file.read(length) - for i, b in enumerate(buffer): - data[i] = b - return len(buffer) - except Exception: + if not buffer: # EOF + return 0 + + # Ensure we don't write beyond the allocated memory + actual_length = min(len(buffer), length) + # Create a view of the buffer to avoid copying + buffer_view = (ctypes.c_ubyte * actual_length).from_buffer_copy(buffer) + # Direct memory copy for better performance + ctypes.memmove(data, buffer_view, actual_length) + return actual_length + except Exception as e: + # print(self._error_messages['read_error'].format(str(e)), file=sys.stderr) return -1 def seek_callback(ctx, offset, whence): + """Callback function for seeking in the Python stream. + + This function is called by the C2PA library when it needs to change + the stream position. It handles: + - Stream state validation + - Position validation + - Error handling + + Args: + ctx: The stream context (unused) + offset: The offset to seek to + whence: The reference point (0=start, 1=current, 2=end) + + Returns: + New position in the stream, or -1 on error + """ + if not self._initialized or self._closed: + # print(self._error_messages['seek'], file=sys.stderr) + return -1 try: self._file.seek(offset, whence) return self._file.tell() - except Exception: + except Exception as e: + # print(self._error_messages['seek_error'].format(str(e)), file=sys.stderr) return -1 def write_callback(ctx, data, length): + """Callback function for writing data to the Python stream. + + This function is called by the C2PA library when it needs to write data. + It handles: + - Stream state validation + - Memory safety + - Error handling + - Buffer management + + Args: + ctx: The stream context (unused) + data: Pointer to the data to write + length: Number of bytes to write + + Returns: + Number of bytes written, or -1 on error + """ + if not self._initialized or self._closed: + # print(self._error_messages['write'], file=sys.stderr) + return -1 try: - buffer = bytes(data[:length]) - self._file.write(buffer) - return length - except Exception: + if not data or length <= 0: + # print(self._error_messages['memory_error'].format("Invalid write parameters"), file=sys.stderr) + return -1 + + # Create a temporary buffer to safely handle the data + temp_buffer = (ctypes.c_ubyte * length)() + try: + # Copy data to our temporary buffer + ctypes.memmove(temp_buffer, data, length) + # Write from our safe buffer + self._file.write(bytes(temp_buffer)) + return length + finally: + # Ensure temporary buffer is cleared + ctypes.memset(temp_buffer, 0, length) + except Exception as e: + # print(self._error_messages['write_error'].format(str(e)), file=sys.stderr) return -1 def flush_callback(ctx): + """Callback function for flushing the Python stream. + + This function is called by the C2PA library when it needs to ensure + all buffered data is written. It handles: + - Stream state validation + - Error handling + + Args: + ctx: The stream context (unused) + + Returns: + 0 on success, -1 on error + """ + if not self._initialized or self._closed: + # print(self._error_messages['flush'], file=sys.stderr) + return -1 try: self._file.flush() return 0 - except Exception: + except Exception as e: + # print(self._error_messages['flush_error'].format(str(e)), file=sys.stderr) return -1 # Create callbacks that will be kept alive by being instance attributes @@ -467,57 +779,157 @@ def flush_callback(ctx): self._flush_cb ) if not self._stream: - raise Exception("Failed to create stream") + error = _handle_string_result(_lib.c2pa_error()) + raise Exception("Failed to create stream: {}".format(error)) + + self._initialized = True + + def __enter__(self): + """Context manager entry.""" + if not self._initialized: + raise RuntimeError("Stream was not properly initialized") + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" + self.close() def __del__(self): - if hasattr(self, '_stream') and self._stream: - _lib.c2pa_release_stream(self._stream) + """Ensure resources are cleaned up if close() wasn't called.""" + self.close() + + def close(self): + """Release the stream resources. + + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. + Errors during cleanup are logged but not raised to ensure cleanup completes. + Multiple calls to close() are handled gracefully. + """ + if self._closed: + return + + try: + # Clean up stream first as it depends on callbacks + if self._stream: + try: + _lib.c2pa_release_stream(self._stream) + except Exception as e: + print(self._error_messages['stream_error'].format(str(e)), file=sys.stderr) + finally: + self._stream = None + + # Clean up callbacks + for attr in ['_read_cb', '_seek_cb', '_write_cb', '_flush_cb']: + if hasattr(self, attr): + try: + setattr(self, attr, None) + except Exception as e: + print(self._error_messages['callback_error'].format(attr, str(e)), file=sys.stderr) + + # Note: We don't close self._file as we don't own it + except Exception as e: + print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) + finally: + self._closed = True + self._initialized = False + + @property + def closed(self) -> bool: + """Check if the stream is closed. + + Returns: + bool: True if the stream is closed, False otherwise + """ + return self._closed + + @property + def initialized(self) -> bool: + """Check if the stream is properly initialized. + + Returns: + bool: True if the stream is initialized, False otherwise + """ + return self._initialized class Reader: """High-level wrapper for C2PA Reader operations.""" - def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = None, manifest_data: Optional[Any] = None): + def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = None, manifest_data: Optional[Any] = None): """Create a new Reader. Args: format_or_path: The format or path to read from stream: Optional stream to read from (any Python stream-like object) + manifest_data: Optional manifest data in bytes + + Raises: + C2paError: If there was an error creating the reader + C2paError.Encoding: If any of the string inputs contain invalid UTF-8 characters """ self._reader = None self._own_stream = None - self._strings = [] # Keep encoded strings alive + self._error_messages = { + 'unsupported': "Unsupported format", + 'io_error': "IO error: {}", + 'manifest_error': "Invalid manifest data: must be bytes", + 'reader_error': "Failed to create reader: {}", + 'cleanup_error': "Error during cleanup: {}", + 'stream_error': "Error cleaning up stream: {}", + 'file_error': "Error cleaning up file: {}", + 'reader_cleanup': "Error cleaning up reader: {}", + 'encoding_error': "Invalid UTF-8 characters in input: {}" + } # Check for unsupported format if format_or_path == "badFormat": - raise C2paError.NotSupported("Unsupported format") + raise C2paError.NotSupported(self._error_messages['unsupported']) if stream is None: # Create a stream from the file path - import mimetypes + + # Check if mimetypes is already imported to avoid duplicate imports + # This is important because mimetypes initialization can be expensive + # and we want to reuse the existing module if it's already loaded + if 'mimetypes' not in sys.modules: + import mimetypes + else: + mimetypes = sys.modules['mimetypes'] + path = str(format_or_path) mime_type = mimetypes.guess_type(path)[0] or 'application/octet-stream' # Keep mime_type string alive - self._mime_type_str = mime_type.encode('utf-8') - - # Open the file and create a stream - file = open(path, 'rb') - self._own_stream = Stream(file) - - # Create reader from the file stream - self._reader = _lib.c2pa_reader_from_stream( - self._mime_type_str, - self._own_stream._stream - ) - - if not self._reader: - self._own_stream.close() - file.close() - _handle_string_result(_lib.c2pa_error()) - - # Store the file to close it later - self._file = file + try: + self._mime_type_str = mime_type.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) + try: + # Open the file and create a stream + file = open(path, 'rb') + self._own_stream = Stream(file) + + # Create reader from the file stream + self._reader = _lib.c2pa_reader_from_stream( + self._mime_type_str, + self._own_stream._stream + ) + + if not self._reader: + self._own_stream.close() + file.close() + _handle_string_result(_lib.c2pa_error()) + + # Store the file to close it later + self._file = file + + except Exception as e: + if self._own_stream: + self._own_stream.close() + if hasattr(self, '_file'): + self._file.close() + raise C2paError.Io(self._error_messages['io_error'].format(str(e))) + elif isinstance(stream, str): # If stream is a string, treat it as a path and try to open it try: @@ -529,9 +941,14 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non self._reader = _lib.c2pa_reader_from_stream(self._format_str, self._own_stream._stream) else: if not isinstance(manifest_data, bytes): - raise TypeError("manifest_data must be bytes") + raise TypeError(self._error_messages['manifest_error']) manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) - self._reader = _lib.c2pa_reader_from_manifest_data_and_stream(self._format_str, self._own_stream._stream, manifest_array, len(manifest_data)) + self._reader = _lib.c2pa_reader_from_manifest_data_and_stream( + self._format_str, + self._own_stream._stream, + manifest_array, + len(manifest_data) + ) if not self._reader: self._own_stream.close() @@ -544,22 +961,27 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non self._own_stream.close() if hasattr(self, '_file'): self._file.close() - raise C2paError.Io(str(e)) + raise C2paError.Io(self._error_messages['io_error'].format(str(e))) else: # Use the provided stream # Keep format string alive self._format_str = format_or_path.encode('utf-8') - stream_obj = Stream(stream) - if manifest_data is None: - self._reader = _lib.c2pa_reader_from_stream(self._format_str, stream_obj._stream) - else: - if not isinstance(manifest_data, bytes): - raise TypeError("manifest_data must be bytes") - manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) - self._reader = _lib.c2pa_reader_from_manifest_data_and_stream(self._format_str, stream_obj._stream, manifest_array, len(manifest_data)) - - if not self._reader: - _handle_string_result(_lib.c2pa_error()) + with Stream(stream) as stream_obj: + if manifest_data is None: + self._reader = _lib.c2pa_reader_from_stream(self._format_str, stream_obj._stream) + else: + if not isinstance(manifest_data, bytes): + raise TypeError(self._error_messages['manifest_error']) + manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) + self._reader = _lib.c2pa_reader_from_manifest_data_and_stream( + self._format_str, + stream_obj._stream, + manifest_array, + len(manifest_data) + ) + + if not self._reader: + _handle_string_result(_lib.c2pa_error()) def __enter__(self): return self @@ -568,18 +990,54 @@ def __exit__(self, exc_type, exc_val, exc_tb): self.close() def close(self): - """Release the reader resources.""" - if self._reader: - _lib.c2pa_reader_free(self._reader) - self._reader = None + """Release the reader resources. - if hasattr(self, '_own_stream') and self._own_stream: - self._own_stream.close() - self._own_stream = None + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. + Errors during cleanup are logged but not raised to ensure cleanup completes. + Multiple calls to close() are handled gracefully. + """ + # Track if we've already cleaned up + if not hasattr(self, '_closed'): + self._closed = False + + if self._closed: + return + + try: + # Clean up reader + if hasattr(self, '_reader') and self._reader: + try: + _lib.c2pa_reader_free(self._reader) + except Exception as e: + print(self._error_messages['reader_cleanup'].format(str(e)), file=sys.stderr) + finally: + self._reader = None - if hasattr(self, '_file'): - self._file.close() - del self._file + # Clean up stream + if hasattr(self, '_own_stream') and self._own_stream: + try: + self._own_stream.close() + except Exception as e: + print(self._error_messages['stream_error'].format(str(e)), file=sys.stderr) + finally: + self._own_stream = None + + # Clean up file + if hasattr(self, '_file'): + try: + self._file.close() + except Exception as e: + print(self._error_messages['file_error'].format(str(e)), file=sys.stderr) + finally: + self._file = None + + # Clear any stored strings + if hasattr(self, '_strings'): + self._strings.clear() + except Exception as e: + print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) + finally: + self._closed = True def json(self) -> str: """Get the manifest store as a JSON string. @@ -613,17 +1071,37 @@ def resource_to_stream(self, uri: str, stream: Any) -> int: # Keep uri string alive self._uri_str = uri.encode('utf-8') - stream_obj = Stream(stream) - result = _lib.c2pa_reader_resource_to_stream(self._reader, self._uri_str, stream_obj._stream) - - if result < 0: - _handle_string_result(_lib.c2pa_error()) - - return result + with Stream(stream) as stream_obj: + result = _lib.c2pa_reader_resource_to_stream(self._reader, self._uri_str, stream_obj._stream) + + if result < 0: + _handle_string_result(_lib.c2pa_error()) + + return result class Signer: """High-level wrapper for C2PA Signer operations.""" + def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): + """Initialize a new Signer instance. + + Note: This constructor is not meant to be called directly. + Use from_info() or from_callback() instead. + """ + self._signer = signer_ptr + self._closed = False + self._error_messages = { + 'closed_error': "Signer is closed", + 'cleanup_error': "Error during cleanup: {}", + 'signer_cleanup': "Error cleaning up signer: {}", + 'size_error': "Error getting reserve size: {}", + 'callback_error': "Error in signer callback: {}", + 'info_error': "Error creating signer from info: {}", + 'invalid_data': "Invalid data for signing: {}", + 'invalid_certs': "Invalid certificate data: {}", + 'invalid_tsa': "Invalid TSA URL: {}" + } + @classmethod def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': """Create a new Signer from signer information. @@ -637,6 +1115,10 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': Raises: C2paError: If there was an error creating the signer """ + # Validate signer info before creating + if not signer_info.sign_cert or not signer_info.private_key: + raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate or private key")) + signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) if not signer_ptr: @@ -665,13 +1147,39 @@ def from_callback( Raises: C2paError: If there was an error creating the signer + C2paError.Encoding: If the certificate data or TSA URL contains invalid UTF-8 characters """ + # Validate inputs before creating + if not certs: + raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate data")) + + if tsa_url and not tsa_url.startswith(('http://', 'https://')): + raise C2paError(cls._error_messages['invalid_tsa'].format("Invalid TSA URL format")) + + # Create a wrapper callback that handles errors and memory management + def wrapped_callback(data: bytes) -> bytes: + try: + if not data: + raise ValueError("Empty data provided for signing") + return callback(data) + except Exception as e: + print(cls._error_messages['callback_error'].format(str(e)), file=sys.stderr) + raise C2paError.Signature(str(e)) + + # Encode strings with error handling + try: + certs_bytes = certs.encode('utf-8') + tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None + except UnicodeError as e: + raise C2paError.Encoding(cls._error_messages['encoding_error'].format(str(e))) + + # Create the signer with the wrapped callback signer_ptr = _lib.c2pa_signer_create( None, # context - SignerCallback(callback), + SignerCallback(wrapped_callback), alg, - certs.encode('utf-8'), - tsa_url.encode('utf-8') if tsa_url else None + certs_bytes, + tsa_url_bytes ) if not signer_ptr: @@ -679,25 +1187,38 @@ def from_callback( return cls(signer_ptr) - def __init__(self, signer_ptr: ctypes.POINTER(C2paSigner)): - """Initialize a new Signer instance. - - Note: This constructor is not meant to be called directly. - Use from_info() or from_callback() instead. - """ - self._signer = signer_ptr - def __enter__(self): + """Context manager entry.""" + if self._closed: + raise C2paError(self._error_messages['closed_error']) return self def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit.""" self.close() def close(self): - """Release the signer resources.""" - if self._signer: - _lib.c2pa_signer_free(self._signer) - self._signer = None + """Release the signer resources. + + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. + Errors during cleanup are logged but not raised to ensure cleanup completes. + Multiple calls to close() are handled gracefully. + """ + if self._closed: + return + + try: + if self._signer: + try: + _lib.c2pa_signer_free(self._signer) + except Exception as e: + print(self._error_messages['signer_cleanup'].format(str(e)), file=sys.stderr) + finally: + self._signer = None + except Exception as e: + print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) + finally: + self._closed = True def reserve_size(self) -> int: """Get the size to reserve for signatures from this signer. @@ -708,30 +1229,68 @@ def reserve_size(self) -> int: Raises: C2paError: If there was an error getting the size """ - if not self._signer: - raise C2paError("Signer is closed") + if self._closed or not self._signer: + raise C2paError(self._error_messages['closed_error']) - result = _lib.c2pa_signer_reserve_size(self._signer) - - if result < 0: - _handle_string_result(_lib.c2pa_error()) + try: + result = _lib.c2pa_signer_reserve_size(self._signer) - return result + if result < 0: + _handle_string_result(_lib.c2pa_error()) + + return result + except Exception as e: + raise C2paError(self._error_messages['size_error'].format(str(e))) + + @property + def closed(self) -> bool: + """Check if the signer is closed. + + Returns: + bool: True if the signer is closed, False otherwise + """ + return self._closed class Builder: """High-level wrapper for C2PA Builder operations.""" def __init__(self, manifest_json: Any): - """Initialize a new Builder instance.""" + """Initialize a new Builder instance. + + Args: + manifest_json: The manifest JSON definition (string or dict) + + Raises: + C2paError: If there was an error creating the builder + C2paError.Encoding: If the manifest JSON contains invalid UTF-8 characters + """ + self._builder = None + self._error_messages = { + 'builder_error': "Failed to create builder: {}", + 'cleanup_error': "Error during cleanup: {}", + 'builder_cleanup': "Error cleaning up builder: {}", + 'closed_error': "Builder is closed", + 'manifest_error': "Invalid manifest data: must be string or dict", + 'url_error': "Error setting remote URL: {}", + 'resource_error': "Error adding resource: {}", + 'ingredient_error': "Error adding ingredient: {}", + 'archive_error': "Error writing archive: {}", + 'sign_error': "Error during signing: {}", + 'encoding_error': "Invalid UTF-8 characters in manifest: {}" + } + if not isinstance(manifest_json, str): manifest_json = json.dumps(manifest_json) - json_str = manifest_json.encode('utf-8') + try: + json_str = manifest_json.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) + self._builder = _lib.c2pa_builder_from_json(json_str) if not self._builder: _handle_string_result(_lib.c2pa_error()) - @classmethod def from_json(cls, manifest_json: Any) -> 'Builder': @@ -770,14 +1329,43 @@ def from_archive(cls, stream: Any) -> 'Builder': return builder + def __del__(self): + """Ensure resources are cleaned up if close() wasn't called.""" + self.close() + + def close(self): + """Release the builder resources. + + This method ensures all resources are properly cleaned up, even if errors occur during cleanup. + Errors during cleanup are logged but not raised to ensure cleanup completes. + Multiple calls to close() are handled gracefully. + """ + # Track if we've already cleaned up + if not hasattr(self, '_closed'): + self._closed = False + + if self._closed: + return + try: + # Clean up builder + if hasattr(self, '_builder') and self._builder: + try: + _lib.c2pa_builder_free(self._builder) + except Exception as e: + print(self._error_messages['builder_cleanup'].format(str(e)), file=sys.stderr) + finally: + self._builder = None + except Exception as e: + print(self._error_messages['cleanup_error'].format(str(e)), file=sys.stderr) + finally: + self._closed = True def set_manifest(self, manifest): if not isinstance(manifest, str): manifest = json.dumps(manifest) super().with_json(manifest) return self - def __enter__(self): return self @@ -785,12 +1373,6 @@ def __enter__(self): def __exit__(self, exc_type, exc_val, exc_tb): self.close() - def close(self): - """Release the builder resources.""" - if self._builder: - _lib.c2pa_builder_free(self._builder) - self._builder = None - def set_no_embed(self): """Set the no-embed flag. @@ -798,7 +1380,7 @@ def set_no_embed(self): This is useful when creating cloud or sidecar manifests. """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) _lib.c2pa_builder_set_no_embed(self._builder) def set_remote_url(self, remote_url: str): @@ -814,7 +1396,7 @@ def set_remote_url(self, remote_url: str): C2paError: If there was an error setting the URL """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) url_str = remote_url.encode('utf-8') result = _lib.c2pa_builder_set_remote_url(self._builder, url_str) @@ -833,14 +1415,14 @@ def add_resource(self, uri: str, stream: Any): C2paError: If there was an error adding the resource """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) uri_str = uri.encode('utf-8') - stream_obj = Stream(stream) - result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream) - - if result != 0: - _handle_string_result(_lib.c2pa_error()) + with Stream(stream) as stream_obj: + result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream) + + if result != 0: + _handle_string_result(_lib.c2pa_error()) def add_ingredient(self, ingredient_json: str, format: str, source: Any): """Add an ingredient to the builder. @@ -852,14 +1434,19 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any): Raises: C2paError: If there was an error adding the ingredient + C2paError.Encoding: If the ingredient JSON contains invalid UTF-8 characters """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) + + try: + ingredient_str = ingredient_json.encode('utf-8') + format_str = format.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) - ingredient_str = ingredient_json.encode('utf-8') - format_str = format.encode('utf-8') source_stream = Stream(source) - result = _lib.c2pa_builder_add_ingredient(self._builder, ingredient_str, format_str, source_stream._stream) + result = _lib.c2pa_builder_add_ingredient_from_stream(self._builder, ingredient_str, format_str, source_stream._stream) if result != 0: _handle_string_result(_lib.c2pa_error()) @@ -874,18 +1461,23 @@ def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: Raises: C2paError: If there was an error adding the ingredient + C2paError.Encoding: If the ingredient JSON or format contains invalid UTF-8 characters """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) - ingredient_str = ingredient_json.encode('utf-8') - format_str = format.encode('utf-8') - source_stream = Stream(source) - result = _lib.c2pa_builder_add_ingredient_from_stream( - self._builder, ingredient_str, format_str, source_stream._stream) - - if result != 0: - _handle_string_result(_lib.c2pa_error()) + try: + ingredient_str = ingredient_json.encode('utf-8') + format_str = format.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding(self._error_messages['encoding_error'].format(str(e))) + + with Stream(source) as source_stream: + result = _lib.c2pa_builder_add_ingredient_from_stream( + self._builder, ingredient_str, format_str, source_stream._stream) + + if result != 0: + _handle_string_result(_lib.c2pa_error()) def to_archive(self, stream: Any): """Write an archive of the builder to a stream. @@ -897,13 +1489,13 @@ def to_archive(self, stream: Any): C2paError: If there was an error writing the archive """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) - stream_obj = Stream(stream) - result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream) - - if result != 0: - _handle_string_result(_lib.c2pa_error()) + with Stream(stream) as stream_obj: + result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream) + + if result != 0: + _handle_string_result(_lib.c2pa_error()) def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Optional[bytes]: """Sign the builder's content and write to a destination stream. @@ -921,35 +1513,40 @@ def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Op C2paError: If there was an error during signing """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) # Convert Python streams to Stream objects source_stream = Stream(source) dest_stream = Stream(dest) - - format_str = format.encode('utf-8') - manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() - - result = _lib.c2pa_builder_sign( - self._builder, - format_str, - source_stream._stream, - dest_stream._stream, - signer._signer, - ctypes.byref(manifest_bytes_ptr) - ) - if result < 0: - _handle_string_result(_lib.c2pa_error()) + try: + format_str = format.encode('utf-8') + manifest_bytes_ptr = ctypes.POINTER(ctypes.c_ubyte)() - manifest_bytes = None - if manifest_bytes_ptr: - # Convert the manifest bytes to a Python bytes object - size = result - manifest_bytes = bytes(manifest_bytes_ptr[:size]) - _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) + result = _lib.c2pa_builder_sign( + self._builder, + format_str, + source_stream._stream, + dest_stream._stream, + signer._signer, + ctypes.byref(manifest_bytes_ptr) + ) - return manifest_bytes + if result < 0: + _handle_string_result(_lib.c2pa_error()) + + manifest_bytes = None + if manifest_bytes_ptr: + # Convert the manifest bytes to a Python bytes object + size = result + manifest_bytes = bytes(manifest_bytes_ptr[:size]) + _lib.c2pa_manifest_bytes_free(manifest_bytes_ptr) + + return manifest_bytes + finally: + # Ensure both streams are cleaned up + source_stream.close() + dest_stream.close() def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], signer: Signer) -> tuple[int, Optional[bytes]]: """Sign a file and write the signed data to an output file. @@ -966,7 +1563,7 @@ def sign_file(self, source_path: Union[str, Path], dest_path: Union[str, Path], C2paError: If there was an error during signing """ if not self._builder: - raise C2paError("Builder is closed") + raise C2paError(self._error_messages['closed_error']) source_path_str = str(source_path).encode('utf-8') dest_path_str = str(dest_path).encode('utf-8') @@ -1045,13 +1642,20 @@ def create_signer( Raises: C2paError: If there was an error creating the signer + C2paError.Encoding: If the certificate data or TSA URL contains invalid UTF-8 characters """ + try: + certs_bytes = certs.encode('utf-8') + tsa_url_bytes = tsa_url.encode('utf-8') if tsa_url else None + except UnicodeError as e: + raise C2paError.Encoding(f"Invalid UTF-8 characters in certificate data or TSA URL: {str(e)}") + signer_ptr = _lib.c2pa_signer_create( None, # context SignerCallback(callback), alg, - certs.encode('utf-8'), - tsa_url.encode('utf-8') if tsa_url else None + certs_bytes, + tsa_url_bytes ) if not signer_ptr: @@ -1093,9 +1697,13 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: Raises: C2paError: If there was an error signing the data + C2paError.Encoding: If the private key contains invalid UTF-8 characters """ data_array = (ctypes.c_ubyte * len(data))(*data) - key_str = private_key.encode('utf-8') + try: + key_str = private_key.encode('utf-8') + except UnicodeError as e: + raise C2paError.Encoding(f"Invalid UTF-8 characters in private key: {str(e)}") signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str) @@ -1126,4 +1734,4 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: 'sign_file', 'format_embeddable', 'ed25519_sign', -] \ No newline at end of file +] \ No newline at end of file diff --git a/tests/test_c2pa.py b/tests/test_c2pa.py index d97f72ff..54ee18da 100644 --- a/tests/test_c2pa.py +++ b/tests/test_c2pa.py @@ -1,6 +1,7 @@ import os -import pytest +import unittest import tempfile +import json from pathlib import Path from c2pa import ( C2paError, @@ -22,241 +23,230 @@ # Test data directory TEST_DATA_DIR = Path(__file__).parent / "fixtures" -# Test files -UNSIGNED_FILE = TEST_DATA_DIR / "A.jpg" -SIGNED_FILE = TEST_DATA_DIR / "C.jpg" -CERT_FILE = TEST_DATA_DIR / "test.crt" -KEY_FILE = TEST_DATA_DIR / "test.key" - -@pytest.fixture -def unsigned_file(): - """Fixture providing path to unsigned test file.""" - assert UNSIGNED_FILE.exists(), f"Test file {UNSIGNED_FILE} not found" - return UNSIGNED_FILE - -@pytest.fixture -def signed_file(): - """Fixture providing path to signed test file.""" - assert SIGNED_FILE.exists(), f"Test file {SIGNED_FILE} not found" - return SIGNED_FILE - -@pytest.fixture -def cert_data(): - """Fixture providing certificate data.""" - assert CERT_FILE.exists(), f"Certificate file {CERT_FILE} not found" - with open(CERT_FILE, 'rb') as f: - return f.read() - -@pytest.fixture -def key_data(): - """Fixture providing private key data.""" - assert KEY_FILE.exists(), f"Key file {KEY_FILE} not found" - with open(KEY_FILE, 'rb') as f: - return f.read() - -@pytest.fixture -def signer_info(cert_data, key_data): - """Fixture providing C2paSignerInfo with valid certificate and key.""" - return C2paSignerInfo( - alg=b"ES256", - sign_cert=cert_data, - private_key=key_data, - ta_url=None #b"http://test.tsa" - ) - -def test_version(): - """Test that version() returns a non-empty string.""" - v = version() - assert isinstance(v, str) - assert len(v) > 0 - -def test_load_settings(): - """Test loading settings.""" - settings = '{"test": "value"}' - load_settings(settings) - # No exception means success - -def test_read_unsigned_file(unsigned_file): - """Test reading an unsigned JPEG file.""" - with pytest.raises(C2paError) as exc_info: - read_file(unsigned_file) - assert "ManifestNotFound" in str(exc_info.value) - -def test_read_signed_file(signed_file): - """Test reading a signed JPEG file with C2PA manifest.""" - manifest = read_file(signed_file) - assert isinstance(manifest, str) - assert len(manifest) > 0 - # Verify it's valid JSON - import json - manifest_data = json.loads(manifest) - assert isinstance(manifest_data, dict) - # Verify it has expected C2PA structure - assert "claim_generator_info" in manifest_data - assert "title" in manifest_data - assert "format" in manifest_data - -def test_read_ingredient_file(unsigned_file): - """Test reading a JPEG file as an ingredient.""" - ingredient = read_ingredient_file(unsigned_file) - assert isinstance(ingredient, str) - # Verify it's valid JSON - import json - ingredient_data = json.loads(ingredient) - assert isinstance(ingredient_data, dict) - # Verify it has expected ingredient structure - assert "format" in ingredient_data - assert "title" in ingredient_data - -def test_stream(unsigned_file): - """Test Stream class with a real file.""" - with open(unsigned_file, 'rb') as f: - stream = Stream(f) - assert stream._stream is not None - stream.close() - -def test_reader_unsigned(unsigned_file): - """Test Reader class with an unsigned file.""" - reader = Reader(unsigned_file) - assert reader._reader is not None - - try: - manifest = reader.json() - assert manifest is None # Unsigned file should have no manifest - except C2paError as e: - # This is expected if the file isn't a valid C2PA file - assert "not a valid C2PA file" in str(e) - - reader.close() - -def test_reader_signed(signed_file): - """Test Reader class with a signed file.""" - reader = Reader(signed_file) - assert reader._reader is not None - - manifest = reader.json() - assert isinstance(manifest, str) - assert len(manifest) > 0 - - # Verify it's valid JSON - import json - manifest_data = json.loads(manifest) - assert isinstance(manifest_data, dict) - assert "claim_generator" in manifest_data - - reader.close() - -def test_signer(signer_info): - """Test Signer class creation and operations.""" - # Test creating signer from info - signer = Signer.from_info(signer_info) - assert signer._signer is not None - - # Test reserve_size - try: - size = signer.reserve_size() - assert isinstance(size, int) - assert size >= 0 - except C2paError as e: - # This is expected if the signer info is invalid - assert "invalid signer info" in str(e) - - signer.close() - - # Test creating signer from callback - def test_callback(data: bytes) -> bytes: - return b"test_signature" - - signer = Signer.from_callback( - callback=test_callback, - alg=C2paSigningAlg.ES256, - certs=cert_data.decode('utf-8'), - tsa_url="http://test.tsa" - ) - assert signer._signer is not None - signer.close() - -def test_builder(unsigned_file): - """Test Builder class operations with a real file.""" - # Test creating builder from JSON - manifest_json = '{"test": "value"}' - builder = Builder.from_json(manifest_json) - assert builder._builder is not None - - # Test builder operations - builder.set_no_embed() - builder.set_remote_url("http://test.url") - - # Test adding resource - with open(unsigned_file, 'rb') as f: - builder.add_resource("test_uri", f) - - # Test adding ingredient - ingredient_json = '{"test": "ingredient"}' - with open(unsigned_file, 'rb') as f: - builder.add_ingredient(ingredient_json, "image/jpeg", f) - - builder.close() - -def test_ed25519_sign(): - """Test Ed25519 signing.""" - data = b"test data" - private_key = "test_private_key" - - try: - signature = ed25519_sign(data, private_key) - assert isinstance(signature, bytes) - assert len(signature) == 64 # Ed25519 signatures are always 64 bytes - except C2paError as e: - # This is expected if the private key is invalid - assert "invalid private key" in str(e) - -def test_format_embeddable(): - """Test formatting embeddable manifest.""" - format_str = "image/jpeg" - manifest_bytes = b"test manifest" - - try: - size, result = format_embeddable(format_str, manifest_bytes) - assert isinstance(size, int) - assert size >= 0 - assert isinstance(result, bytes) - assert len(result) == size - except C2paError as e: - # This is expected if the format or manifest is invalid - assert "invalid format" in str(e) or "invalid manifest" in str(e) - -def test_context_managers(unsigned_file, signed_file, signer_info): - """Test context manager functionality with real files.""" - # Test Reader context manager with unsigned file - with Reader(unsigned_file) as reader: - assert reader._reader is not None +class TestC2PA(unittest.TestCase): + @classmethod + def setUpClass(cls): + """Set up test data that will be used by all test methods.""" + cls.unsigned_file = TEST_DATA_DIR / "A.jpg" + cls.signed_file = TEST_DATA_DIR / "C.jpg" + cls.cert_file = TEST_DATA_DIR / "test.crt" + cls.key_file = TEST_DATA_DIR / "test.key" + + # Verify test files exist + assert cls.unsigned_file.exists(), f"Test file {cls.unsigned_file} not found" + assert cls.signed_file.exists(), f"Test file {cls.signed_file} not found" + assert cls.cert_file.exists(), f"Certificate file {cls.cert_file} not found" + assert cls.key_file.exists(), f"Key file {cls.key_file} not found" + + # Load certificate and key data + with open(cls.cert_file, 'rb') as f: + cls.cert_data = f.read() + with open(cls.key_file, 'rb') as f: + cls.key_data = f.read() + + # Create signer info + cls.signer_info = C2paSignerInfo( + alg=b"ES256", + sign_cert=cls.cert_data, + private_key=cls.key_data, + ta_url=None # b"http://test.tsa" + ) + + def test_version(self): + """Test that version() returns a non-empty string.""" + v = version() + self.assertIsInstance(v, str) + self.assertGreater(len(v), 0) + + def test_load_settings(self): + """Test loading settings.""" + settings = '{"test": "value"}' + load_settings(settings) + # No exception means success + + def test_read_unsigned_file(self): + """Test reading an unsigned JPEG file.""" + with self.assertRaises(C2paError) as context: + read_file(self.unsigned_file) + self.assertIn("ManifestNotFound", str(context.exception)) + + def test_read_signed_file(self): + """Test reading a signed JPEG file with C2PA manifest.""" + manifest = read_file(self.signed_file) + self.assertIsInstance(manifest, str) + self.assertGreater(len(manifest), 0) + + # Verify it's valid JSON + manifest_data = json.loads(manifest) + self.assertIsInstance(manifest_data, dict) + # Verify it has expected C2PA structure + self.assertIn("claim_generator_info", manifest_data) + self.assertIn("title", manifest_data) + self.assertIn("format", manifest_data) + + def test_read_ingredient_file(self): + """Test reading a JPEG file as an ingredient.""" + ingredient = read_ingredient_file(self.unsigned_file) + self.assertIsInstance(ingredient, str) + + # Verify it's valid JSON + ingredient_data = json.loads(ingredient) + self.assertIsInstance(ingredient_data, dict) + # Verify it has expected ingredient structure + self.assertIn("format", ingredient_data) + self.assertIn("title", ingredient_data) + + def test_stream(self): + """Test Stream class with a real file.""" + with open(self.unsigned_file, 'rb') as f: + stream = Stream(f) + self.assertIsNotNone(stream._stream) + stream.close() + + def test_reader_unsigned(self): + """Test Reader class with an unsigned file.""" + reader = Reader(self.unsigned_file) + self.assertIsNotNone(reader._reader) + try: manifest = reader.json() - assert manifest is None # Unsigned file should have no manifest - except C2paError: - pass - - # Test Reader context manager with signed file - with Reader(signed_file) as reader: - assert reader._reader is not None + self.assertIsNone(manifest) # Unsigned file should have no manifest + except C2paError as e: + # This is expected if the file isn't a valid C2PA file + self.assertIn("not a valid C2PA file", str(e)) + + reader.close() + + def test_reader_signed(self): + """Test Reader class with a signed file.""" + reader = Reader(self.signed_file) + self.assertIsNotNone(reader._reader) + manifest = reader.json() - assert isinstance(manifest, str) - assert len(manifest) > 0 - - # Test Signer context manager - with Signer.from_info(signer_info) as signer: - assert signer._signer is not None + self.assertIsInstance(manifest, str) + self.assertGreater(len(manifest), 0) + + # Verify it's valid JSON + manifest_data = json.loads(manifest) + self.assertIsInstance(manifest_data, dict) + self.assertIn("claim_generator", manifest_data) + + reader.close() + + def test_signer(self): + """Test Signer class creation and operations.""" + # Test creating signer from info + signer = Signer.from_info(self.signer_info) + self.assertIsNotNone(signer._signer) + + # Test reserve_size try: size = signer.reserve_size() - assert isinstance(size, int) - assert size >= 0 - except C2paError: - pass - - # Test Builder context manager - manifest_json = '{"test": "value"}' - with Builder.from_json(manifest_json) as builder: - assert builder._builder is not None + self.assertIsInstance(size, int) + self.assertGreaterEqual(size, 0) + except C2paError as e: + # This is expected if the signer info is invalid + self.assertIn("invalid signer info", str(e)) + + signer.close() + + # Test creating signer from callback + def test_callback(data: bytes) -> bytes: + return b"test_signature" + + signer = Signer.from_callback( + callback=test_callback, + alg=C2paSigningAlg.ES256, + certs=self.cert_data.decode('utf-8'), + tsa_url="http://test.tsa" + ) + self.assertIsNotNone(signer._signer) + signer.close() + + def test_builder(self): + """Test Builder class operations with a real file.""" + # Test creating builder from JSON + manifest_json = '{"test": "value"}' + builder = Builder.from_json(manifest_json) + self.assertIsNotNone(builder._builder) + + # Test builder operations builder.set_no_embed() - builder.set_remote_url("http://test.url") \ No newline at end of file + builder.set_remote_url("http://test.url") + + # Test adding resource + with open(self.unsigned_file, 'rb') as f: + builder.add_resource("test_uri", f) + + # Test adding ingredient + ingredient_json = '{"test": "ingredient"}' + with open(self.unsigned_file, 'rb') as f: + builder.add_ingredient(ingredient_json, "image/jpeg", f) + + builder.close() + + def test_ed25519_sign(self): + """Test Ed25519 signing.""" + data = b"test data" + private_key = "test_private_key" + + try: + signature = ed25519_sign(data, private_key) + self.assertIsInstance(signature, bytes) + self.assertEqual(len(signature), 64) # Ed25519 signatures are always 64 bytes + except C2paError as e: + # This is expected if the private key is invalid + self.assertIn("invalid private key", str(e)) + + def test_format_embeddable(self): + """Test formatting embeddable manifest.""" + format_str = "image/jpeg" + manifest_bytes = b"test manifest" + + try: + size, result = format_embeddable(format_str, manifest_bytes) + self.assertIsInstance(size, int) + self.assertGreaterEqual(size, 0) + self.assertIsInstance(result, bytes) + self.assertEqual(len(result), size) + except C2paError as e: + # This is expected if the format or manifest is invalid + self.assertTrue("invalid format" in str(e) or "invalid manifest" in str(e)) + + def test_context_managers(self): + """Test context manager functionality with real files.""" + # Test Reader context manager with unsigned file + with Reader(self.unsigned_file) as reader: + self.assertIsNotNone(reader._reader) + try: + manifest = reader.json() + self.assertIsNone(manifest) # Unsigned file should have no manifest + except C2paError: + pass + + # Test Reader context manager with signed file + with Reader(self.signed_file) as reader: + self.assertIsNotNone(reader._reader) + manifest = reader.json() + self.assertIsInstance(manifest, str) + self.assertGreater(len(manifest), 0) + + # Test Signer context manager + with Signer.from_info(self.signer_info) as signer: + self.assertIsNotNone(signer._signer) + try: + size = signer.reserve_size() + self.assertIsInstance(size, int) + self.assertGreaterEqual(size, 0) + except C2paError: + pass + + # Test Builder context manager + manifest_json = '{"test": "value"}' + with Builder.from_json(manifest_json) as builder: + self.assertIsNotNone(builder._builder) + builder.set_no_embed() + builder.set_remote_url("http://test.url") + +if __name__ == '__main__': + unittest.main() \ No newline at end of file diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index 4c3d1717..ed509fad 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -25,39 +25,41 @@ class TestC2paSdk(unittest.TestCase): def test_version(self): - print(sdk_version()) + # print(sdk_version()) self.assertIn("0.8.0", sdk_version()) class TestReader(unittest.TestCase): def test_stream_read(self): with open(testPath, "rb") as file: - reader = Reader("image/jpeg",file) - json = reader.json() - self.assertIn("C.jpg", json) + with Reader("image/jpeg", file) as reader: + json = reader.json() + self.assertIn("C.jpg", json) def test_stream_read_and_parse(self): with open(testPath, "rb") as file: - reader = Reader("image/jpeg", file) - manifest_store = json.loads(reader.json()) - title = manifest_store["manifests"][manifest_store["active_manifest"]]["title"] - self.assertEqual(title, "C.jpg") + with Reader("image/jpeg", file) as reader: + manifest_store = json.loads(reader.json()) + title = manifest_store["manifests"][manifest_store["active_manifest"]]["title"] + self.assertEqual(title, "C.jpg") def test_json_decode_err(self): with self.assertRaises(Error.Io): - manifest_store = Reader("image/jpeg","foo") + with Reader("image/jpeg", "foo") as reader: + manifest_store = reader.json() def test_reader_bad_format(self): with self.assertRaises(Error.NotSupported): with open(testPath, "rb") as file: - reader = Reader("badFormat", file) + with Reader("badFormat", file) as reader: + pass def test_settings_trust(self): #load_settings_file("tests/fixtures/settings.toml") with open(testPath, "rb") as file: - reader = Reader("image/jpeg",file) - json = reader.json() - self.assertIn("C.jpg", json) + with Reader("image/jpeg", file) as reader: + json = reader.json() + self.assertIn("C.jpg", json) class TestBuilder(unittest.TestCase): # Define a manifest as a dictionary @@ -108,42 +110,42 @@ class TestBuilder(unittest.TestCase): signer = Signer.from_info(signer_info) #signer = create_signer(sign, SigningAlg.PS256, certs, "http://timestamp.digicert.com") + def _read_manifest(self, reader): + json_data = reader.json() + self.assertIn("Python Test", json_data) + self.assertNotIn("validation_status", json_data) + def test_streams_sign(self): - with open(testPath, "rb") as file: - builder = Builder(TestBuilder.manifestDefinition) - output = io.BytesIO(bytearray()) + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as output, \ + Builder(TestBuilder.manifestDefinition) as builder: builder.sign(TestBuilder.signer, "image/jpeg", file, output) output.seek(0) - reader = Reader("image/jpeg", output) - json_data = reader.json() - self.assertIn("Python Test", json_data) - self.assertNotIn("validation_status", json_data) + with Reader("image/jpeg", output) as reader: + self._read_manifest(reader) def test_archive_sign(self): - with open(testPath, "rb") as file: - builder = Builder(TestBuilder.manifestDefinition) - archive = io.BytesIO(bytearray()) - builder.to_archive(archive) - builder = Builder.from_archive(archive) - output = io.BytesIO(bytearray()) - builder.sign(TestBuilder.signer, "image/jpeg", file, output) - output.seek(0) - reader = Reader("image/jpeg", output) - json_data = reader.json() - self.assertIn("Python Test", json_data) - self.assertNotIn("validation_status", json_data) + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as archive: + with Builder(TestBuilder.manifestDefinition) as builder: + builder.to_archive(archive) + archive.seek(0) + with Builder.from_archive(archive) as builder2, \ + io.BytesIO(bytearray()) as output: + builder2.sign(TestBuilder.signer, "image/jpeg", file, output) + output.seek(0) + with Reader("image/jpeg", output) as reader: + self._read_manifest(reader) def test_remote_sign(self): - with open(testPath, "rb") as file: - builder = Builder(TestBuilder.manifestDefinition) + with open(testPath, "rb") as file, \ + io.BytesIO(bytearray()) as output, \ + Builder(TestBuilder.manifestDefinition) as builder: builder.set_no_embed() - output = io.BytesIO(bytearray()) manifest_data = builder.sign(TestBuilder.signer, "image/jpeg", file, output) output.seek(0) - reader = Reader("image/jpeg", output, manifest_data) - json_data = reader.json() - self.assertIn("Python Test", json_data) - self.assertNotIn("validation_status", json_data) + with Reader("image/jpeg", output, manifest_data) as reader: + self._read_manifest(reader) if __name__ == '__main__': unittest.main()