diff --git a/tests/c2pa.py b/tests/c2pa.py index c5a114d5..270ef070 100644 --- a/tests/c2pa.py +++ b/tests/c2pa.py @@ -279,14 +279,11 @@ def _setup_function(func, argtypes, restype=None): _setup_function(_lib.c2pa_builder_add_resource, [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int) -#_setup_function(_lib.c2pa_builder_add_ingredient, -# [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)], -# ctypes.c_int) - -# Set up additional Builder function prototypes _setup_function(_lib.c2pa_builder_add_ingredient_from_stream, [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)], ctypes.c_int) + +# Set up additional Builder function prototypes _setup_function(_lib.c2pa_builder_to_archive, [ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paStream)], ctypes.c_int) @@ -917,7 +914,10 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non if not self._reader: self._own_stream.close() file.close() - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) # Store the file to close it later self._file = file @@ -942,16 +942,19 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non raise TypeError(self._error_messages['manifest_error']) manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data) self._reader = _lib.c2pa_reader_from_manifest_data_and_stream( - self._format_str, - self._own_stream._stream, - manifest_array, + self._format_str, + self._own_stream._stream, + manifest_array, len(manifest_data) ) if not self._reader: self._own_stream.close() file.close() - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) self._file = file except Exception as e: @@ -980,8 +983,10 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non ) if not self._reader: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['reader_error'].format("Unknown error")) def __enter__(self): return self @@ -1121,13 +1126,16 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer': """ # Validate signer info before creating if not signer_info.sign_cert or not signer_info.private_key: - raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate or private key")) + raise C2paError("Missing certificate or private key") signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) if not signer_ptr: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer from configured signer info") return cls(signer_ptr) @@ -1188,8 +1196,10 @@ def wrapped_callback(data: bytes) -> bytes: ) if not signer_ptr: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to create signer") return cls(signer_ptr) @@ -1242,8 +1252,10 @@ def reserve_size(self) -> int: result = _lib.c2pa_signer_reserve_size(self._signer) if result < 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to get reserve size") return result except Exception as e: @@ -1270,6 +1282,7 @@ def __init__(self, manifest_json: Any): Raises: C2paError: If there was an error creating the builder C2paError.Encoding: If the manifest JSON contains invalid UTF-8 characters + C2paError.Json: If the manifest JSON cannot be serialized """ self._builder = None self._error_messages = { @@ -1283,11 +1296,15 @@ def __init__(self, manifest_json: Any): 'ingredient_error': "Error adding ingredient: {}", 'archive_error': "Error writing archive: {}", 'sign_error': "Error during signing: {}", - 'encoding_error': "Invalid UTF-8 characters in manifest: {}" + 'encoding_error': "Invalid UTF-8 characters in manifest: {}", + 'json_error': "Failed to serialize manifest JSON: {}" } if not isinstance(manifest_json, str): - manifest_json = json.dumps(manifest_json) + try: + manifest_json = json.dumps(manifest_json) + except (TypeError, ValueError) as e: + raise C2paError.Json(self._error_messages['json_error'].format(str(e))) try: json_str = manifest_json.encode('utf-8') @@ -1297,8 +1314,10 @@ def __init__(self, manifest_json: Any): self._builder = _lib.c2pa_builder_from_json(json_str) if not self._builder: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['builder_error'].format("Unknown error")) @classmethod def from_json(cls, manifest_json: Any) -> 'Builder': @@ -1326,15 +1345,17 @@ def from_archive(cls, stream: Any) -> 'Builder': A new Builder instance Raises: - C2paError: If there was an error creating the builder + C2paError: If there was an error creating the builder from the archive """ builder = cls({}) stream_obj = Stream(stream) builder._builder = _lib.c2pa_builder_from_archive(stream_obj._stream) if not builder._builder: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to create builder from archive") return builder @@ -1402,7 +1423,7 @@ def set_remote_url(self, remote_url: str): remote_url: The remote URL to set Raises: - C2paError: If there was an error setting the URL + C2paError: If there was an error setting the remote URL """ if not self._builder: raise C2paError(self._error_messages['closed_error']) @@ -1411,8 +1432,10 @@ def set_remote_url(self, remote_url: str): result = _lib.c2pa_builder_set_remote_url(self._builder, url_str) if result != 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['url_error'].format("Unknown error")) def add_resource(self, uri: str, stream: Any): """Add a resource to the builder. @@ -1432,8 +1455,10 @@ def add_resource(self, uri: str, stream: Any): result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream) if result != 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['resource_error'].format("Unknown error")) def add_ingredient(self, ingredient_json: str, format: str, source: Any): """Add an ingredient to the builder. @@ -1460,8 +1485,10 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any): result = _lib.c2pa_builder_add_ingredient_from_stream(self._builder, ingredient_str, format_str, source_stream._stream) if result != 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['ingredient_error'].format("Unknown error")) def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: Any): """Add an ingredient from a stream to the builder. @@ -1489,8 +1516,10 @@ def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: self._builder, ingredient_str, format_str, source_stream._stream) if result != 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['ingredient_error'].format("Unknown error")) def to_archive(self, stream: Any): """Write an archive of the builder to a stream. @@ -1508,8 +1537,10 @@ def to_archive(self, stream: Any): result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream) if result != 0: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError(self._error_messages['archive_error'].format("Unknown error")) def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Optional[bytes]: """Sign the builder's content and write to a destination stream. @@ -1631,8 +1662,10 @@ def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]: ) if result < 0: - # TODO Handle error better - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to format embeddable manifest") # Convert the result bytes to a Python bytes object size = result @@ -1677,8 +1710,11 @@ def create_signer( ) if not signer_ptr: - # TODO Handle error better - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer") return Signer(signer_ptr) @@ -1697,8 +1733,11 @@ def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer: signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info)) if not signer_ptr: - # TODO Handle error better - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + # More detailed error message when possible + raise C2paError(error) + raise C2paError("Failed to create signer from info") return Signer(signer_ptr) @@ -1728,8 +1767,10 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes: signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str) if not signature_ptr: - ## TODO: Raise error - _handle_string_result(_lib.c2pa_error()) + error = _handle_string_result(_lib.c2pa_error()) + if error: + raise C2paError(error) + raise C2paError("Failed to sign data with Ed25519") try: # Ed25519 signatures are always 64 bytes diff --git a/tests/test_unit_tests.py b/tests/test_unit_tests.py index 65e743ce..d3c93701 100644 --- a/tests/test_unit_tests.py +++ b/tests/test_unit_tests.py @@ -222,5 +222,81 @@ def test_remote_sign(self): with Reader("image/jpeg", output, manifest_data) as reader: self._read_manifest(reader) +class TestErrorHandling(unittest.TestCase): + def test_create_signer_from_info_error_handling(self): + # Test with invalid signer info (missing required fields) + invalid_signer_info = C2paSignerInfo( + alg=b"es256", + sign_cert=None, # Missing certificate + private_key=None, # Missing private key + ta_url=None + ) + + with self.assertRaises(C2paError) as context: + Signer.from_info(invalid_signer_info) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Missing certificate or private key", error_msg) + + def test_create_signer_from_info_later_error_handling(self): + # load the public keys from a pem file to create valid signer info + data_dir = "tests/fixtures/" + with open(data_dir + "es256_certs.pem", "rb") as cert_file, \ + open(data_dir + "es256_private.key", "rb") as key_file: + certs = cert_file.read() + key = key_file.read() + + # Test with invalid signer info (invalid algorithm) + invalid_signer_info = C2paSignerInfo( + alg=b"invalid-algorithm", + sign_cert=certs, # Missing certificate + private_key=key, # Missing private key + ta_url=None + ) + + with self.assertRaises(C2paError) as context: + Signer.from_info(invalid_signer_info) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Other Invalid signing algorithm", error_msg) + + def test_reader_initialization_error_handling(self): + # Test with non-existent file + with self.assertRaises(C2paError.Io) as context: + Reader("image/jpeg", "non_existent_file.jpg") + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("IO error", error_msg) + + # Test with invalid manifest data + with self.assertRaises(TypeError) as context: + Reader("image/jpeg", io.BytesIO(b"test"), manifest_data="not bytes") + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Invalid manifest data", error_msg) + + # Test with invalid format + with self.assertRaises(C2paError.NotSupported) as context: + Reader("badFormat", io.BytesIO(b"test")) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Unsupported format", error_msg) + + def test_reader_closed_stream_handling(self): + # Test with closed stream + closed_stream = io.BytesIO(b"test") + closed_stream.close() + with self.assertRaises(C2paError) as context: + Reader("image/jpeg", closed_stream) + + # Verify we get a meaningful error message + error_msg = str(context.exception) + self.assertIn("Io Undefined error", error_msg) + if __name__ == '__main__': unittest.main()