Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 119 additions & 11 deletions spoon_toolkits/crypto/evm/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@
ENV_PRIVATE_KEY = "EVM_PRIVATE_KEY"
ENV_TURNKEY_SIGN_WITH = "TURNKEY_SIGN_WITH"
ENV_TURNKEY_ADDRESS = "TURNKEY_ADDRESS"
ENV_TURNKEY_API_PRIVATE_KEY = "TURNKEY_API_PRIVATE_KEY"


def _get_turnkey_api_private_key_from_vault() -> Optional[str]:
"""
Get decrypted Turnkey API private key from SecretVault.
If an encrypted key exists in env but not in vault, auto-decrypt it first.
"""
value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY)
if value:
logger.debug(f"Retrieved {ENV_TURNKEY_API_PRIVATE_KEY} from vault (already decrypted)")
return value

env_value = os.getenv(ENV_TURNKEY_API_PRIVATE_KEY)
if env_value:
if _is_encrypted(env_value):
logger.info(f"Found encrypted {ENV_TURNKEY_API_PRIVATE_KEY} in environment, attempting decryption...")
if _auto_decrypt_to_vault(ENV_TURNKEY_API_PRIVATE_KEY):
value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY)
if value:
logger.info(f"Successfully decrypted {ENV_TURNKEY_API_PRIVATE_KEY} and retrieved from vault")
return value

# CRITICAL: If it's encrypted but decryption failed, we must NOT return None
# because that would cause the client to fallback to the encrypted string.
raise SignerError(
f"Failed to decrypt {ENV_TURNKEY_API_PRIVATE_KEY}. "
"The password in SPOON_MASTER_PWD is likely incorrect or missing."
)
else:
logger.debug(f"{ENV_TURNKEY_API_PRIVATE_KEY} in environment is plaintext")
return env_value
return None


def _is_encrypted(value: str) -> bool:
Expand Down Expand Up @@ -111,13 +144,29 @@ def _get_private_key_from_vault() -> Optional[str]:
# Check if already in vault
value = _get_from_vault(ENV_PRIVATE_KEY)
if value:
logger.debug(f"Retrieved {ENV_PRIVATE_KEY} from vault (already decrypted)")
return value

# Try auto-decrypt if encrypted in env
env_value = os.getenv(ENV_PRIVATE_KEY)
if env_value and _is_encrypted(env_value):
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
return _get_from_vault(ENV_PRIVATE_KEY)
if env_value:
if _is_encrypted(env_value):
logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, attempting decryption...")
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
value = _get_from_vault(ENV_PRIVATE_KEY)
if value:
logger.info(f"Successfully decrypted {ENV_PRIVATE_KEY} and retrieved from vault")
return value

# CRITICAL: If it's encrypted but decryption failed, we must NOT return None
# because that would cause the client to fallback to the encrypted string.
raise SignerError(
f"Failed to decrypt {ENV_PRIVATE_KEY}. "
"The password in SPOON_MASTER_PWD is likely incorrect or missing."
)
else:
logger.debug(f"{ENV_PRIVATE_KEY} in environment is plaintext")
return env_value

return None

Expand Down Expand Up @@ -222,7 +271,9 @@ def _get_turnkey_client(self):
if self._turnkey is None:
try:
from spoon_ai.turnkey import Turnkey
self._turnkey = Turnkey()
# Get decrypted API private key from vault
api_private_key = _get_turnkey_api_private_key_from_vault()
self._turnkey = Turnkey(api_private_key=api_private_key)
except Exception as e:
raise SignerError(f"Failed to initialize Turnkey client: {str(e)}")
return self._turnkey
Expand All @@ -233,7 +284,7 @@ async def sign_transaction(self, tx_dict: Dict[str, Any], rpc_url: str) -> str:
from web3 import Web3
import rlp

w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else Web3()
w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else None

# Helper function to convert int to bytes
def int_to_bytes(value: int) -> bytes:
Expand All @@ -245,7 +296,16 @@ def int_to_bytes(value: int) -> bytes:
# Check if it's EIP-1559 (has maxFeePerGas) or legacy (has gasPrice)
if "maxFeePerGas" in tx_dict or "maxPriorityFeePerGas" in tx_dict:
# EIP-1559 transaction (type 2)
chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1)
chain_id = tx_dict.get("chainId")
if chain_id is None:
if w3 and rpc_url:
try:
chain_id = w3.eth.chain_id
except Exception:
chain_id = 1
else:
chain_id = 1

nonce = tx_dict.get("nonce", 0)
max_priority_fee_per_gas = tx_dict.get("maxPriorityFeePerGas", 0)
max_fee_per_gas = tx_dict.get("maxFeePerGas", 0)
Expand Down Expand Up @@ -279,7 +339,16 @@ def int_to_bytes(value: int) -> bytes:
else:
# Legacy transaction (type 0) - convert to EIP-1559 format for Turnkey
# Turnkey prefers EIP-1559 format, so we'll convert legacy tx to EIP-1559
chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1)
chain_id = tx_dict.get("chainId")
if chain_id is None:
if w3 and rpc_url:
try:
chain_id = w3.eth.chain_id
except Exception:
chain_id = 1
else:
chain_id = 1

nonce = tx_dict.get("nonce", 0)
gas_price = tx_dict.get("gasPrice", 0)
gas_limit = tx_dict.get("gas", tx_dict.get("gasLimit", 21000))
Expand Down Expand Up @@ -395,7 +464,7 @@ def create_signer(
signer_type = "local"

# 3. Turnkey remote signing
elif os.getenv(ENV_TURNKEY_SIGN_WITH):
elif os.getenv(ENV_TURNKEY_SIGN_WITH) or _get_turnkey_api_private_key_from_vault():
signer_type = "turnkey"

else:
Expand All @@ -407,25 +476,64 @@ def create_signer(
)

if signer_type == "local":
# Try sources in priority order: param -> plain env -> vault (auto-decrypt)
# Try sources in priority order: param -> env -> vault (auto-decrypt)
key = None

# 1. Check parameter first
if private_key:
if _is_encrypted(private_key):
logger.info("Found encrypted private_key parameter, decrypting to vault...")
password = os.getenv("SPOON_MASTER_PWD")
if not password:
raise SignerError(
"Found encrypted private key but SPOON_MASTER_PWD is not set. "
"Please export SPOON_MASTER_PWD to decrypt the key."
)
try:
from spoon_ai.wallet.security import decrypt_and_store
from spoon_ai.wallet.vault import get_vault
vault = get_vault()
param_vault_key = f"{ENV_PRIVATE_KEY}_PARAM"
decrypt_and_store(private_key, password, param_vault_key, vault=vault)
key = _get_from_vault(param_vault_key)
if key:
logger.info("Successfully decrypted provided private key and stored in vault.")
except Exception as e:
raise SignerError(
f"Failed to decrypt provided private key: {str(e)}. "
"Check if SPOON_MASTER_PWD is correct."
)
else:
logger.debug("Using plaintext private_key parameter")
key = private_key
Comment on lines +506 to 508

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve decrypted private_key parameter

When private_key is encrypted, the code successfully decrypts it into key, but then key = private_key unconditionally overwrites that decrypted value with the original encrypted string. In the encrypted-parameter scenario (with SPOON_MASTER_PWD set), the LocalSigner will still receive an encrypted value, leading to invalid key errors or signing failures. The fix is to only set key = private_key for the plaintext case, not after the decryption branch.

Useful? React with 👍 / 👎.


# 2. Check environment variable
if not key:
env_key = os.getenv(ENV_PRIVATE_KEY)
if env_key and not _is_encrypted(env_key):
if env_key:
if _is_encrypted(env_key):
logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, will decrypt via vault...")
key = _get_private_key_from_vault()
else:
logger.debug(f"Using plaintext {ENV_PRIVATE_KEY} from environment")
key = env_key
Comment on lines +515 to 519

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not overwrite decrypted env key with encrypted value

In the env-key branch, the code calls _get_private_key_from_vault() for encrypted EVM_PRIVATE_KEY but then unconditionally assigns key = env_key afterward. That overwrites the decrypted vault value with the original encrypted string, so local signing still attempts to use the encrypted value and fails. This only happens when EVM_PRIVATE_KEY is encrypted and is a regression from the intended auto-decrypt flow.

Useful? React with 👍 / 👎.


# 3. Check vault (for already decrypted keys or fallback)
if not key:
logger.debug("Checking vault for decrypted private key...")
key = _get_private_key_from_vault()

if not key:
raise ValueError(
f"Private key required for local signing. "
f"Set {ENV_PRIVATE_KEY} or decrypt encrypted key to vault."
f"Set {ENV_PRIVATE_KEY} or provide a private_key parameter."
)

# Ensure private key has 0x prefix
key = key.strip()
if not key.startswith("0x"):
key = "0x" + key
logger.debug("Private key retrieved successfully, creating LocalSigner")
return LocalSigner(key)

elif signer_type == "turnkey":
Expand Down
50 changes: 39 additions & 11 deletions spoon_toolkits/crypto/neo/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _ensure_initialized(self):
return

try:
from neo3.wallet import Account
from neo3.wallet.account import Account
from neo3.core import types

# Try WIF format first, then hex
Expand All @@ -217,14 +217,20 @@ def _ensure_initialized(self):
key_hex = self._private_key

# Convert hex to bytes and create account
key_bytes = bytes.fromhex(key_hex)
try:
key_bytes = bytes.fromhex(key_hex)
except ValueError as e:
raise SignerError(
f"Invalid hex private key format: {e}. "
f"Expected a valid hexadecimal string (WIF or hex format)."
)
self._account = Account.from_private_key(key_bytes)

self._initialized = True

except ImportError:
raise SignerError(
"neo3 library not installed. Install with: pip install neo3"
"neo-mamba library not installed. Install with: pip install neo-mamba"
)
except Exception as e:
raise SignerError(f"Failed to initialize Neo account: {e}")
Expand Down Expand Up @@ -388,21 +394,37 @@ def _parse_signature(self, result: Dict[str, Any]) -> bytes:
# Check for direct signature
signature_hex = result.get("signature") or result.get("signatureHex")
if signature_hex:
return bytes.fromhex(signature_hex.replace("0x", ""))
if not isinstance(signature_hex, str):
raise SignerError(
f"Invalid signature type: expected str, got {type(signature_hex).__name__}"
)
try:
return bytes.fromhex(signature_hex.replace("0x", ""))
except ValueError as e:
raise SignerError(f"Invalid signature hex format: {e}")

# Reconstruct from r and s
r_hex = result.get("r", "")
s_hex = result.get("s", "")

# Type check for r and s
if not isinstance(r_hex, str) or not isinstance(s_hex, str):
raise SignerError(
f"Invalid r or s type: expected str, got r={type(r_hex).__name__}, s={type(s_hex).__name__}"
)

if not r_hex or not s_hex:
raise SignerError("Turnkey signature missing r or s components")

r_hex = r_hex.replace("0x", "")
s_hex = s_hex.replace("0x", "")

# Pad to 32 bytes each for P-256 curve
r_bytes = bytes.fromhex(r_hex.zfill(64))
s_bytes = bytes.fromhex(s_hex.zfill(64))
try:
r_bytes = bytes.fromhex(r_hex.zfill(64))
s_bytes = bytes.fromhex(s_hex.zfill(64))
except ValueError as e:
raise SignerError(f"Invalid r or s hex format: {e}")

return r_bytes + s_bytes

Expand Down Expand Up @@ -451,14 +473,14 @@ async def get_script_hash(self) -> str:

try:
from neo3.core import types
from neo3.wallet import Account
from neo3.wallet.account import Account

# Convert Neo address to script hash
script_hash = Account.address_to_script_hash(address)
self._cached_script_hash = f"0x{script_hash}"
return self._cached_script_hash
except ImportError:
raise SignerError("neo3 library required for address conversion")
raise SignerError("neo-mamba library required for address conversion. Install with: pip install neo-mamba")
except Exception as e:
raise SignerError(f"Failed to convert address to script hash: {e}")

Expand Down Expand Up @@ -502,7 +524,7 @@ def create_signer(
elif turnkey_sign_with:
signer_type = "turnkey"
else:
# Priority: plain env -> vault -> turnkey
# Priority: plain env -> vault -> auto-decrypt -> turnkey
env_key = os.getenv(ENV_PRIVATE_KEY)

# 1. Plain private key from env (not encrypted)
Expand All @@ -513,6 +535,11 @@ def create_signer(
elif _get_from_vault(ENV_PRIVATE_KEY):
signer_type = "local"

# 2b. Try auto-decrypt if encrypted in env
elif env_key and _is_encrypted(env_key):
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
signer_type = "local"

# 3. Turnkey remote signing
elif os.getenv(ENV_TURNKEY_SIGN_WITH):
signer_type = "turnkey"
Expand All @@ -526,14 +553,15 @@ def create_signer(
)

if signer_type == "local":
# Try sources in priority order: param -> plain env -> vault
# Try sources in priority order: param -> plain env -> vault (with auto-decrypt)
key = private_key
if not key:
env_key = os.getenv(ENV_PRIVATE_KEY)
if env_key and not _is_encrypted(env_key):
key = env_key
if not key:
key = _get_from_vault(ENV_PRIVATE_KEY)
# Use _get_private_key_from_vault which handles auto-decryption
key = _get_private_key_from_vault()

if not key:
raise ValueError(
Expand Down