Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 86 additions & 45 deletions tests/c2pa.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,14 +279,11 @@ def _setup_function(func, argtypes, restype=None):
_setup_function(_lib.c2pa_builder_add_resource,
[ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.POINTER(C2paStream)],
ctypes.c_int)
#_setup_function(_lib.c2pa_builder_add_ingredient,
# [ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)],
# ctypes.c_int)

# Set up additional Builder function prototypes
_setup_function(_lib.c2pa_builder_add_ingredient_from_stream,
[ctypes.POINTER(C2paBuilder), ctypes.c_char_p, ctypes.c_char_p, ctypes.POINTER(C2paStream)],
ctypes.c_int)

# Set up additional Builder function prototypes
_setup_function(_lib.c2pa_builder_to_archive,
[ctypes.POINTER(C2paBuilder), ctypes.POINTER(C2paStream)],
ctypes.c_int)
Expand Down Expand Up @@ -917,7 +914,10 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
if not self._reader:
self._own_stream.close()
file.close()
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['reader_error'].format("Unknown error"))

# Store the file to close it later
self._file = file
Expand All @@ -942,16 +942,19 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
raise TypeError(self._error_messages['manifest_error'])
manifest_array = (ctypes.c_ubyte * len(manifest_data))(*manifest_data)
self._reader = _lib.c2pa_reader_from_manifest_data_and_stream(
self._format_str,
self._own_stream._stream,
manifest_array,
self._format_str,
self._own_stream._stream,
manifest_array,
len(manifest_data)
)

if not self._reader:
self._own_stream.close()
file.close()
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['reader_error'].format("Unknown error"))

self._file = file
except Exception as e:
Expand Down Expand Up @@ -980,8 +983,10 @@ def __init__(self, format_or_path: Union[str, Path], stream: Optional[Any] = Non
)

if not self._reader:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['reader_error'].format("Unknown error"))

def __enter__(self):
return self
Expand Down Expand Up @@ -1121,13 +1126,16 @@ def from_info(cls, signer_info: C2paSignerInfo) -> 'Signer':
"""
# Validate signer info before creating
if not signer_info.sign_cert or not signer_info.private_key:
raise C2paError(cls._error_messages['invalid_certs'].format("Missing certificate or private key"))
raise C2paError("Missing certificate or private key")

signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info))

if not signer_ptr:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
# More detailed error message when possible
raise C2paError(error)
raise C2paError("Failed to create signer from configured signer info")

return cls(signer_ptr)

Expand Down Expand Up @@ -1188,8 +1196,10 @@ def wrapped_callback(data: bytes) -> bytes:
)

if not signer_ptr:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError("Failed to create signer")

return cls(signer_ptr)

Expand Down Expand Up @@ -1242,8 +1252,10 @@ def reserve_size(self) -> int:
result = _lib.c2pa_signer_reserve_size(self._signer)

if result < 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError("Failed to get reserve size")

return result
except Exception as e:
Expand All @@ -1270,6 +1282,7 @@ def __init__(self, manifest_json: Any):
Raises:
C2paError: If there was an error creating the builder
C2paError.Encoding: If the manifest JSON contains invalid UTF-8 characters
C2paError.Json: If the manifest JSON cannot be serialized
"""
self._builder = None
self._error_messages = {
Expand All @@ -1283,11 +1296,15 @@ def __init__(self, manifest_json: Any):
'ingredient_error': "Error adding ingredient: {}",
'archive_error': "Error writing archive: {}",
'sign_error': "Error during signing: {}",
'encoding_error': "Invalid UTF-8 characters in manifest: {}"
'encoding_error': "Invalid UTF-8 characters in manifest: {}",
'json_error': "Failed to serialize manifest JSON: {}"
}

if not isinstance(manifest_json, str):
manifest_json = json.dumps(manifest_json)
try:
manifest_json = json.dumps(manifest_json)
except (TypeError, ValueError) as e:
raise C2paError.Json(self._error_messages['json_error'].format(str(e)))

try:
json_str = manifest_json.encode('utf-8')
Expand All @@ -1297,8 +1314,10 @@ def __init__(self, manifest_json: Any):
self._builder = _lib.c2pa_builder_from_json(json_str)

if not self._builder:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['builder_error'].format("Unknown error"))

@classmethod
def from_json(cls, manifest_json: Any) -> 'Builder':
Expand Down Expand Up @@ -1326,15 +1345,17 @@ def from_archive(cls, stream: Any) -> 'Builder':
A new Builder instance

Raises:
C2paError: If there was an error creating the builder
C2paError: If there was an error creating the builder from the archive
"""
builder = cls({})
stream_obj = Stream(stream)
builder._builder = _lib.c2pa_builder_from_archive(stream_obj._stream)

if not builder._builder:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError("Failed to create builder from archive")

return builder

Expand Down Expand Up @@ -1402,7 +1423,7 @@ def set_remote_url(self, remote_url: str):
remote_url: The remote URL to set

Raises:
C2paError: If there was an error setting the URL
C2paError: If there was an error setting the remote URL
"""
if not self._builder:
raise C2paError(self._error_messages['closed_error'])
Expand All @@ -1411,8 +1432,10 @@ def set_remote_url(self, remote_url: str):
result = _lib.c2pa_builder_set_remote_url(self._builder, url_str)

if result != 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['url_error'].format("Unknown error"))

def add_resource(self, uri: str, stream: Any):
"""Add a resource to the builder.
Expand All @@ -1432,8 +1455,10 @@ def add_resource(self, uri: str, stream: Any):
result = _lib.c2pa_builder_add_resource(self._builder, uri_str, stream_obj._stream)

if result != 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['resource_error'].format("Unknown error"))

def add_ingredient(self, ingredient_json: str, format: str, source: Any):
"""Add an ingredient to the builder.
Expand All @@ -1460,8 +1485,10 @@ def add_ingredient(self, ingredient_json: str, format: str, source: Any):
result = _lib.c2pa_builder_add_ingredient_from_stream(self._builder, ingredient_str, format_str, source_stream._stream)

if result != 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['ingredient_error'].format("Unknown error"))

def add_ingredient_from_stream(self, ingredient_json: str, format: str, source: Any):
"""Add an ingredient from a stream to the builder.
Expand Down Expand Up @@ -1489,8 +1516,10 @@ def add_ingredient_from_stream(self, ingredient_json: str, format: str, source:
self._builder, ingredient_str, format_str, source_stream._stream)

if result != 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['ingredient_error'].format("Unknown error"))

def to_archive(self, stream: Any):
"""Write an archive of the builder to a stream.
Expand All @@ -1508,8 +1537,10 @@ def to_archive(self, stream: Any):
result = _lib.c2pa_builder_to_archive(self._builder, stream_obj._stream)

if result != 0:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError(self._error_messages['archive_error'].format("Unknown error"))

def sign(self, signer: Signer, format: str, source: Any, dest: Any = None) -> Optional[bytes]:
"""Sign the builder's content and write to a destination stream.
Expand Down Expand Up @@ -1631,8 +1662,10 @@ def format_embeddable(format: str, manifest_bytes: bytes) -> tuple[int, bytes]:
)

if result < 0:
# TODO Handle error better
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError("Failed to format embeddable manifest")

# Convert the result bytes to a Python bytes object
size = result
Expand Down Expand Up @@ -1677,8 +1710,11 @@ def create_signer(
)

if not signer_ptr:
# TODO Handle error better
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
# More detailed error message when possible
raise C2paError(error)
raise C2paError("Failed to create signer")

return Signer(signer_ptr)

Expand All @@ -1697,8 +1733,11 @@ def create_signer_from_info(signer_info: C2paSignerInfo) -> Signer:
signer_ptr = _lib.c2pa_signer_from_info(ctypes.byref(signer_info))

if not signer_ptr:
# TODO Handle error better
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
# More detailed error message when possible
raise C2paError(error)
raise C2paError("Failed to create signer from info")

return Signer(signer_ptr)

Expand Down Expand Up @@ -1728,8 +1767,10 @@ def ed25519_sign(data: bytes, private_key: str) -> bytes:
signature_ptr = _lib.c2pa_ed25519_sign(data_array, len(data), key_str)

if not signature_ptr:
## TODO: Raise error
_handle_string_result(_lib.c2pa_error())
error = _handle_string_result(_lib.c2pa_error())
if error:
raise C2paError(error)
raise C2paError("Failed to sign data with Ed25519")

try:
# Ed25519 signatures are always 64 bytes
Expand Down
76 changes: 76 additions & 0 deletions tests/test_unit_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,5 +222,81 @@ def test_remote_sign(self):
with Reader("image/jpeg", output, manifest_data) as reader:
self._read_manifest(reader)

class TestErrorHandling(unittest.TestCase):
def test_create_signer_from_info_error_handling(self):
# Test with invalid signer info (missing required fields)
invalid_signer_info = C2paSignerInfo(
alg=b"es256",
sign_cert=None, # Missing certificate
private_key=None, # Missing private key
ta_url=None
)

with self.assertRaises(C2paError) as context:
Signer.from_info(invalid_signer_info)

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("Missing certificate or private key", error_msg)

def test_create_signer_from_info_later_error_handling(self):
# load the public keys from a pem file to create valid signer info
data_dir = "tests/fixtures/"
with open(data_dir + "es256_certs.pem", "rb") as cert_file, \
open(data_dir + "es256_private.key", "rb") as key_file:
certs = cert_file.read()
key = key_file.read()

# Test with invalid signer info (invalid algorithm)
invalid_signer_info = C2paSignerInfo(
alg=b"invalid-algorithm",
sign_cert=certs, # Missing certificate
private_key=key, # Missing private key
ta_url=None
)

with self.assertRaises(C2paError) as context:
Signer.from_info(invalid_signer_info)

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("Other Invalid signing algorithm", error_msg)

def test_reader_initialization_error_handling(self):
# Test with non-existent file
with self.assertRaises(C2paError.Io) as context:
Reader("image/jpeg", "non_existent_file.jpg")

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("IO error", error_msg)

# Test with invalid manifest data
with self.assertRaises(TypeError) as context:
Reader("image/jpeg", io.BytesIO(b"test"), manifest_data="not bytes")

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("Invalid manifest data", error_msg)

# Test with invalid format
with self.assertRaises(C2paError.NotSupported) as context:
Reader("badFormat", io.BytesIO(b"test"))

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("Unsupported format", error_msg)

def test_reader_closed_stream_handling(self):
# Test with closed stream
closed_stream = io.BytesIO(b"test")
closed_stream.close()
with self.assertRaises(C2paError) as context:
Reader("image/jpeg", closed_stream)

# Verify we get a meaningful error message
error_msg = str(context.exception)
self.assertIn("Io Undefined error", error_msg)

if __name__ == '__main__':
unittest.main()
Loading