diff --git a/spoon_toolkits/crypto/evm/signers.py b/spoon_toolkits/crypto/evm/signers.py index ad46b6d..38a13ff 100644 --- a/spoon_toolkits/crypto/evm/signers.py +++ b/spoon_toolkits/crypto/evm/signers.py @@ -26,6 +26,39 @@ ENV_PRIVATE_KEY = "EVM_PRIVATE_KEY" ENV_TURNKEY_SIGN_WITH = "TURNKEY_SIGN_WITH" ENV_TURNKEY_ADDRESS = "TURNKEY_ADDRESS" +ENV_TURNKEY_API_PRIVATE_KEY = "TURNKEY_API_PRIVATE_KEY" + + +def _get_turnkey_api_private_key_from_vault() -> Optional[str]: + """ + Get decrypted Turnkey API private key from SecretVault. + If an encrypted key exists in env but not in vault, auto-decrypt it first. + """ + value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY) + if value: + logger.debug(f"Retrieved {ENV_TURNKEY_API_PRIVATE_KEY} from vault (already decrypted)") + return value + + env_value = os.getenv(ENV_TURNKEY_API_PRIVATE_KEY) + if env_value: + if _is_encrypted(env_value): + logger.info(f"Found encrypted {ENV_TURNKEY_API_PRIVATE_KEY} in environment, attempting decryption...") + if _auto_decrypt_to_vault(ENV_TURNKEY_API_PRIVATE_KEY): + value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY) + if value: + logger.info(f"Successfully decrypted {ENV_TURNKEY_API_PRIVATE_KEY} and retrieved from vault") + return value + + # CRITICAL: If it's encrypted but decryption failed, we must NOT return None + # because that would cause the client to fallback to the encrypted string. + raise SignerError( + f"Failed to decrypt {ENV_TURNKEY_API_PRIVATE_KEY}. " + "The password in SPOON_MASTER_PWD is likely incorrect or missing." + ) + else: + logger.debug(f"{ENV_TURNKEY_API_PRIVATE_KEY} in environment is plaintext") + return env_value + return None def _is_encrypted(value: str) -> bool: @@ -111,13 +144,29 @@ def _get_private_key_from_vault() -> Optional[str]: # Check if already in vault value = _get_from_vault(ENV_PRIVATE_KEY) if value: + logger.debug(f"Retrieved {ENV_PRIVATE_KEY} from vault (already decrypted)") return value # Try auto-decrypt if encrypted in env env_value = os.getenv(ENV_PRIVATE_KEY) - if env_value and _is_encrypted(env_value): - if _auto_decrypt_to_vault(ENV_PRIVATE_KEY): - return _get_from_vault(ENV_PRIVATE_KEY) + if env_value: + if _is_encrypted(env_value): + logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, attempting decryption...") + if _auto_decrypt_to_vault(ENV_PRIVATE_KEY): + value = _get_from_vault(ENV_PRIVATE_KEY) + if value: + logger.info(f"Successfully decrypted {ENV_PRIVATE_KEY} and retrieved from vault") + return value + + # CRITICAL: If it's encrypted but decryption failed, we must NOT return None + # because that would cause the client to fallback to the encrypted string. + raise SignerError( + f"Failed to decrypt {ENV_PRIVATE_KEY}. " + "The password in SPOON_MASTER_PWD is likely incorrect or missing." + ) + else: + logger.debug(f"{ENV_PRIVATE_KEY} in environment is plaintext") + return env_value return None @@ -222,7 +271,9 @@ def _get_turnkey_client(self): if self._turnkey is None: try: from spoon_ai.turnkey import Turnkey - self._turnkey = Turnkey() + # Get decrypted API private key from vault + api_private_key = _get_turnkey_api_private_key_from_vault() + self._turnkey = Turnkey(api_private_key=api_private_key) except Exception as e: raise SignerError(f"Failed to initialize Turnkey client: {str(e)}") return self._turnkey @@ -233,7 +284,7 @@ async def sign_transaction(self, tx_dict: Dict[str, Any], rpc_url: str) -> str: from web3 import Web3 import rlp - w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else Web3() + w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else None # Helper function to convert int to bytes def int_to_bytes(value: int) -> bytes: @@ -245,7 +296,16 @@ def int_to_bytes(value: int) -> bytes: # Check if it's EIP-1559 (has maxFeePerGas) or legacy (has gasPrice) if "maxFeePerGas" in tx_dict or "maxPriorityFeePerGas" in tx_dict: # EIP-1559 transaction (type 2) - chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1) + chain_id = tx_dict.get("chainId") + if chain_id is None: + if w3 and rpc_url: + try: + chain_id = w3.eth.chain_id + except Exception: + chain_id = 1 + else: + chain_id = 1 + nonce = tx_dict.get("nonce", 0) max_priority_fee_per_gas = tx_dict.get("maxPriorityFeePerGas", 0) max_fee_per_gas = tx_dict.get("maxFeePerGas", 0) @@ -279,7 +339,16 @@ def int_to_bytes(value: int) -> bytes: else: # Legacy transaction (type 0) - convert to EIP-1559 format for Turnkey # Turnkey prefers EIP-1559 format, so we'll convert legacy tx to EIP-1559 - chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1) + chain_id = tx_dict.get("chainId") + if chain_id is None: + if w3 and rpc_url: + try: + chain_id = w3.eth.chain_id + except Exception: + chain_id = 1 + else: + chain_id = 1 + nonce = tx_dict.get("nonce", 0) gas_price = tx_dict.get("gasPrice", 0) gas_limit = tx_dict.get("gas", tx_dict.get("gasLimit", 21000)) @@ -395,7 +464,7 @@ def create_signer( signer_type = "local" # 3. Turnkey remote signing - elif os.getenv(ENV_TURNKEY_SIGN_WITH): + elif os.getenv(ENV_TURNKEY_SIGN_WITH) or _get_turnkey_api_private_key_from_vault(): signer_type = "turnkey" else: @@ -407,25 +476,64 @@ def create_signer( ) if signer_type == "local": - # Try sources in priority order: param -> plain env -> vault (auto-decrypt) + # Try sources in priority order: param -> env -> vault (auto-decrypt) + key = None + + # 1. Check parameter first + if private_key: + if _is_encrypted(private_key): + logger.info("Found encrypted private_key parameter, decrypting to vault...") + password = os.getenv("SPOON_MASTER_PWD") + if not password: + raise SignerError( + "Found encrypted private key but SPOON_MASTER_PWD is not set. " + "Please export SPOON_MASTER_PWD to decrypt the key." + ) + try: + from spoon_ai.wallet.security import decrypt_and_store + from spoon_ai.wallet.vault import get_vault + vault = get_vault() + param_vault_key = f"{ENV_PRIVATE_KEY}_PARAM" + decrypt_and_store(private_key, password, param_vault_key, vault=vault) + key = _get_from_vault(param_vault_key) + if key: + logger.info("Successfully decrypted provided private key and stored in vault.") + except Exception as e: + raise SignerError( + f"Failed to decrypt provided private key: {str(e)}. " + "Check if SPOON_MASTER_PWD is correct." + ) + else: + logger.debug("Using plaintext private_key parameter") key = private_key + + # 2. Check environment variable if not key: env_key = os.getenv(ENV_PRIVATE_KEY) - if env_key and not _is_encrypted(env_key): + if env_key: + if _is_encrypted(env_key): + logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, will decrypt via vault...") + key = _get_private_key_from_vault() + else: + logger.debug(f"Using plaintext {ENV_PRIVATE_KEY} from environment") key = env_key + + # 3. Check vault (for already decrypted keys or fallback) if not key: + logger.debug("Checking vault for decrypted private key...") key = _get_private_key_from_vault() if not key: raise ValueError( f"Private key required for local signing. " - f"Set {ENV_PRIVATE_KEY} or decrypt encrypted key to vault." + f"Set {ENV_PRIVATE_KEY} or provide a private_key parameter." ) # Ensure private key has 0x prefix key = key.strip() if not key.startswith("0x"): key = "0x" + key + logger.debug("Private key retrieved successfully, creating LocalSigner") return LocalSigner(key) elif signer_type == "turnkey": diff --git a/spoon_toolkits/crypto/neo/signers.py b/spoon_toolkits/crypto/neo/signers.py index 6735101..38ae0e7 100644 --- a/spoon_toolkits/crypto/neo/signers.py +++ b/spoon_toolkits/crypto/neo/signers.py @@ -203,7 +203,7 @@ def _ensure_initialized(self): return try: - from neo3.wallet import Account + from neo3.wallet.account import Account from neo3.core import types # Try WIF format first, then hex @@ -217,14 +217,20 @@ def _ensure_initialized(self): key_hex = self._private_key # Convert hex to bytes and create account - key_bytes = bytes.fromhex(key_hex) + try: + key_bytes = bytes.fromhex(key_hex) + except ValueError as e: + raise SignerError( + f"Invalid hex private key format: {e}. " + f"Expected a valid hexadecimal string (WIF or hex format)." + ) self._account = Account.from_private_key(key_bytes) self._initialized = True except ImportError: raise SignerError( - "neo3 library not installed. Install with: pip install neo3" + "neo-mamba library not installed. Install with: pip install neo-mamba" ) except Exception as e: raise SignerError(f"Failed to initialize Neo account: {e}") @@ -388,12 +394,25 @@ def _parse_signature(self, result: Dict[str, Any]) -> bytes: # Check for direct signature signature_hex = result.get("signature") or result.get("signatureHex") if signature_hex: - return bytes.fromhex(signature_hex.replace("0x", "")) + if not isinstance(signature_hex, str): + raise SignerError( + f"Invalid signature type: expected str, got {type(signature_hex).__name__}" + ) + try: + return bytes.fromhex(signature_hex.replace("0x", "")) + except ValueError as e: + raise SignerError(f"Invalid signature hex format: {e}") # Reconstruct from r and s r_hex = result.get("r", "") s_hex = result.get("s", "") + # Type check for r and s + if not isinstance(r_hex, str) or not isinstance(s_hex, str): + raise SignerError( + f"Invalid r or s type: expected str, got r={type(r_hex).__name__}, s={type(s_hex).__name__}" + ) + if not r_hex or not s_hex: raise SignerError("Turnkey signature missing r or s components") @@ -401,8 +420,11 @@ def _parse_signature(self, result: Dict[str, Any]) -> bytes: s_hex = s_hex.replace("0x", "") # Pad to 32 bytes each for P-256 curve - r_bytes = bytes.fromhex(r_hex.zfill(64)) - s_bytes = bytes.fromhex(s_hex.zfill(64)) + try: + r_bytes = bytes.fromhex(r_hex.zfill(64)) + s_bytes = bytes.fromhex(s_hex.zfill(64)) + except ValueError as e: + raise SignerError(f"Invalid r or s hex format: {e}") return r_bytes + s_bytes @@ -451,14 +473,14 @@ async def get_script_hash(self) -> str: try: from neo3.core import types - from neo3.wallet import Account + from neo3.wallet.account import Account # Convert Neo address to script hash script_hash = Account.address_to_script_hash(address) self._cached_script_hash = f"0x{script_hash}" return self._cached_script_hash except ImportError: - raise SignerError("neo3 library required for address conversion") + raise SignerError("neo-mamba library required for address conversion. Install with: pip install neo-mamba") except Exception as e: raise SignerError(f"Failed to convert address to script hash: {e}") @@ -502,7 +524,7 @@ def create_signer( elif turnkey_sign_with: signer_type = "turnkey" else: - # Priority: plain env -> vault -> turnkey + # Priority: plain env -> vault -> auto-decrypt -> turnkey env_key = os.getenv(ENV_PRIVATE_KEY) # 1. Plain private key from env (not encrypted) @@ -513,6 +535,11 @@ def create_signer( elif _get_from_vault(ENV_PRIVATE_KEY): signer_type = "local" + # 2b. Try auto-decrypt if encrypted in env + elif env_key and _is_encrypted(env_key): + if _auto_decrypt_to_vault(ENV_PRIVATE_KEY): + signer_type = "local" + # 3. Turnkey remote signing elif os.getenv(ENV_TURNKEY_SIGN_WITH): signer_type = "turnkey" @@ -526,14 +553,15 @@ def create_signer( ) if signer_type == "local": - # Try sources in priority order: param -> plain env -> vault + # Try sources in priority order: param -> plain env -> vault (with auto-decrypt) key = private_key if not key: env_key = os.getenv(ENV_PRIVATE_KEY) if env_key and not _is_encrypted(env_key): key = env_key if not key: - key = _get_from_vault(ENV_PRIVATE_KEY) + # Use _get_private_key_from_vault which handles auto-decryption + key = _get_private_key_from_vault() if not key: raise ValueError(