Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 138 additions & 14 deletions spoon_toolkits/crypto/evm/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,39 @@
ENV_PRIVATE_KEY = "EVM_PRIVATE_KEY"
ENV_TURNKEY_SIGN_WITH = "TURNKEY_SIGN_WITH"
ENV_TURNKEY_ADDRESS = "TURNKEY_ADDRESS"
ENV_TURNKEY_API_PRIVATE_KEY = "TURNKEY_API_PRIVATE_KEY"


def _get_turnkey_api_private_key_from_vault() -> Optional[str]:
"""
Get decrypted Turnkey API private key from SecretVault.
If an encrypted key exists in env but not in vault, auto-decrypt it first.
"""
value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY)
if value:
logger.debug(f"Retrieved {ENV_TURNKEY_API_PRIVATE_KEY} from vault (already decrypted)")
return value

env_value = os.getenv(ENV_TURNKEY_API_PRIVATE_KEY)
if env_value:
if _is_encrypted(env_value):
logger.info(f"Found encrypted {ENV_TURNKEY_API_PRIVATE_KEY} in environment, attempting decryption...")
if _auto_decrypt_to_vault(ENV_TURNKEY_API_PRIVATE_KEY):
value = _get_from_vault(ENV_TURNKEY_API_PRIVATE_KEY)
if value:
logger.info(f"Successfully decrypted {ENV_TURNKEY_API_PRIVATE_KEY} and retrieved from vault")
return value

# CRITICAL: If it's encrypted but decryption failed, we must NOT return None
# because that would cause the client to fallback to the encrypted string.
raise SignerError(
f"Failed to decrypt {ENV_TURNKEY_API_PRIVATE_KEY}. "
"The password in SPOON_MASTER_PWD is likely incorrect or missing."
)
else:
logger.debug(f"{ENV_TURNKEY_API_PRIVATE_KEY} in environment is plaintext")
return env_value
return None


def _is_encrypted(value: str) -> bool:
Expand Down Expand Up @@ -111,13 +144,29 @@ def _get_private_key_from_vault() -> Optional[str]:
# Check if already in vault
value = _get_from_vault(ENV_PRIVATE_KEY)
if value:
logger.debug(f"Retrieved {ENV_PRIVATE_KEY} from vault (already decrypted)")
return value

# Try auto-decrypt if encrypted in env
env_value = os.getenv(ENV_PRIVATE_KEY)
if env_value and _is_encrypted(env_value):
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
return _get_from_vault(ENV_PRIVATE_KEY)
if env_value:
if _is_encrypted(env_value):
logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, attempting decryption...")
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
value = _get_from_vault(ENV_PRIVATE_KEY)
if value:
logger.info(f"Successfully decrypted {ENV_PRIVATE_KEY} and retrieved from vault")
return value

# CRITICAL: If it's encrypted but decryption failed, we must NOT return None
# because that would cause the client to fallback to the encrypted string.
raise SignerError(
f"Failed to decrypt {ENV_PRIVATE_KEY}. "
"The password in SPOON_MASTER_PWD is likely incorrect or missing."
)
else:
logger.debug(f"{ENV_PRIVATE_KEY} in environment is plaintext")
return env_value

return None

Expand Down Expand Up @@ -222,7 +271,9 @@ def _get_turnkey_client(self):
if self._turnkey is None:
try:
from spoon_ai.turnkey import Turnkey
self._turnkey = Turnkey()
# Get decrypted API private key from vault
api_private_key = _get_turnkey_api_private_key_from_vault()
self._turnkey = Turnkey(api_private_key=api_private_key)
except Exception as e:
raise SignerError(f"Failed to initialize Turnkey client: {str(e)}")
return self._turnkey
Expand All @@ -233,7 +284,7 @@ async def sign_transaction(self, tx_dict: Dict[str, Any], rpc_url: str) -> str:
from web3 import Web3
import rlp

w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else Web3()
w3 = Web3(HTTPProvider(rpc_url)) if rpc_url else None

# Helper function to convert int to bytes
def int_to_bytes(value: int) -> bytes:
Expand All @@ -245,7 +296,16 @@ def int_to_bytes(value: int) -> bytes:
# Check if it's EIP-1559 (has maxFeePerGas) or legacy (has gasPrice)
if "maxFeePerGas" in tx_dict or "maxPriorityFeePerGas" in tx_dict:
# EIP-1559 transaction (type 2)
chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1)
chain_id = tx_dict.get("chainId")
if chain_id is None:
if w3 and rpc_url:
try:
chain_id = w3.eth.chain_id
except Exception:
chain_id = 1
else:
chain_id = 1

nonce = tx_dict.get("nonce", 0)
max_priority_fee_per_gas = tx_dict.get("maxPriorityFeePerGas", 0)
max_fee_per_gas = tx_dict.get("maxFeePerGas", 0)
Expand Down Expand Up @@ -279,7 +339,16 @@ def int_to_bytes(value: int) -> bytes:
else:
# Legacy transaction (type 0) - convert to EIP-1559 format for Turnkey
# Turnkey prefers EIP-1559 format, so we'll convert legacy tx to EIP-1559
chain_id = tx_dict.get("chainId", w3.eth.chain_id if rpc_url else 1)
chain_id = tx_dict.get("chainId")
if chain_id is None:
if w3 and rpc_url:
try:
chain_id = w3.eth.chain_id
except Exception:
chain_id = 1
else:
chain_id = 1

nonce = tx_dict.get("nonce", 0)
gas_price = tx_dict.get("gasPrice", 0)
gas_limit = tx_dict.get("gas", tx_dict.get("gasLimit", 21000))
Expand Down Expand Up @@ -379,7 +448,23 @@ def create_signer(
if signer_type == "auto":
# Check explicit parameters first
if private_key:
signer_type = "local"
# If encrypted, check if we can decrypt before committing to local
if _is_encrypted(private_key):
password = os.getenv("SPOON_MASTER_PWD")
if password:
signer_type = "local"
elif turnkey_sign_with or os.getenv(ENV_TURNKEY_SIGN_WITH):
# Can't decrypt, but Turnkey is available - use Turnkey
logger.warning(
"Encrypted private_key provided but SPOON_MASTER_PWD not set. "
"Falling back to Turnkey signing."
)
signer_type = "turnkey"
else:
# No Turnkey fallback, will fail later with helpful error
signer_type = "local"
else:
signer_type = "local"
elif turnkey_sign_with:
signer_type = "turnkey"
else:
Expand All @@ -395,7 +480,7 @@ def create_signer(
signer_type = "local"

# 3. Turnkey remote signing
elif os.getenv(ENV_TURNKEY_SIGN_WITH):
elif os.getenv(ENV_TURNKEY_SIGN_WITH) or _get_turnkey_api_private_key_from_vault():
signer_type = "turnkey"

else:
Expand All @@ -407,25 +492,64 @@ def create_signer(
)

if signer_type == "local":
# Try sources in priority order: param -> plain env -> vault (auto-decrypt)
key = private_key
# Try sources in priority order: param -> env -> vault (auto-decrypt)
key = None

# 1. Check parameter first
if private_key:
if _is_encrypted(private_key):
logger.info("Found encrypted private_key parameter, decrypting to vault...")
password = os.getenv("SPOON_MASTER_PWD")
if not password:
raise SignerError(
"Found encrypted private key but SPOON_MASTER_PWD is not set. "
"Please export SPOON_MASTER_PWD to decrypt the key."
)
try:
from spoon_ai.wallet.security import decrypt_and_store
from spoon_ai.wallet.vault import get_vault
vault = get_vault()
param_vault_key = f"{ENV_PRIVATE_KEY}_PARAM"
decrypt_and_store(private_key, password, param_vault_key, vault=vault)
key = _get_from_vault(param_vault_key)
if key:
logger.info("Successfully decrypted provided private key and stored in vault.")
except Exception as e:
raise SignerError(
f"Failed to decrypt provided private key: {str(e)}. "
"Check if SPOON_MASTER_PWD is correct."
)
else:
logger.debug("Using plaintext private_key parameter")
key = private_key

# 2. Check environment variable
if not key:
env_key = os.getenv(ENV_PRIVATE_KEY)
if env_key and not _is_encrypted(env_key):
key = env_key
if env_key:
if _is_encrypted(env_key):
logger.info(f"Found encrypted {ENV_PRIVATE_KEY} in environment, will decrypt via vault...")
key = _get_private_key_from_vault()
else:
logger.debug(f"Using plaintext {ENV_PRIVATE_KEY} from environment")
key = env_key

# 3. Check vault (for already decrypted keys or fallback)
if not key:
logger.debug("Checking vault for decrypted private key...")
key = _get_private_key_from_vault()

if not key:
raise ValueError(
f"Private key required for local signing. "
f"Set {ENV_PRIVATE_KEY} or decrypt encrypted key to vault."
f"Set {ENV_PRIVATE_KEY} or provide a private_key parameter."
)

# Ensure private key has 0x prefix
key = key.strip()
if not key.startswith("0x"):
key = "0x" + key
logger.debug("Private key retrieved successfully, creating LocalSigner")
return LocalSigner(key)

elif signer_type == "turnkey":
Expand Down
50 changes: 39 additions & 11 deletions spoon_toolkits/crypto/neo/signers.py
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ def _ensure_initialized(self):
return

try:
from neo3.wallet import Account
from neo3.wallet.account import Account
from neo3.core import types

# Try WIF format first, then hex
Expand All @@ -217,14 +217,20 @@ def _ensure_initialized(self):
key_hex = self._private_key

# Convert hex to bytes and create account
key_bytes = bytes.fromhex(key_hex)
try:
key_bytes = bytes.fromhex(key_hex)
except ValueError as e:
raise SignerError(
f"Invalid hex private key format: {e}. "
f"Expected a valid hexadecimal string (WIF or hex format)."
)
self._account = Account.from_private_key(key_bytes)

self._initialized = True

except ImportError:
raise SignerError(
"neo3 library not installed. Install with: pip install neo3"
"neo-mamba library not installed. Install with: pip install neo-mamba"
)
except Exception as e:
raise SignerError(f"Failed to initialize Neo account: {e}")
Expand Down Expand Up @@ -388,21 +394,37 @@ def _parse_signature(self, result: Dict[str, Any]) -> bytes:
# Check for direct signature
signature_hex = result.get("signature") or result.get("signatureHex")
if signature_hex:
return bytes.fromhex(signature_hex.replace("0x", ""))
if not isinstance(signature_hex, str):
raise SignerError(
f"Invalid signature type: expected str, got {type(signature_hex).__name__}"
)
try:
return bytes.fromhex(signature_hex.replace("0x", ""))
except ValueError as e:
raise SignerError(f"Invalid signature hex format: {e}")

# Reconstruct from r and s
r_hex = result.get("r", "")
s_hex = result.get("s", "")

# Type check for r and s
if not isinstance(r_hex, str) or not isinstance(s_hex, str):
raise SignerError(
f"Invalid r or s type: expected str, got r={type(r_hex).__name__}, s={type(s_hex).__name__}"
)

if not r_hex or not s_hex:
raise SignerError("Turnkey signature missing r or s components")

r_hex = r_hex.replace("0x", "")
s_hex = s_hex.replace("0x", "")

# Pad to 32 bytes each for P-256 curve
r_bytes = bytes.fromhex(r_hex.zfill(64))
s_bytes = bytes.fromhex(s_hex.zfill(64))
try:
r_bytes = bytes.fromhex(r_hex.zfill(64))
s_bytes = bytes.fromhex(s_hex.zfill(64))
except ValueError as e:
raise SignerError(f"Invalid r or s hex format: {e}")

return r_bytes + s_bytes

Expand Down Expand Up @@ -451,14 +473,14 @@ async def get_script_hash(self) -> str:

try:
from neo3.core import types
from neo3.wallet import Account
from neo3.wallet.account import Account

# Convert Neo address to script hash
script_hash = Account.address_to_script_hash(address)
self._cached_script_hash = f"0x{script_hash}"
return self._cached_script_hash
except ImportError:
raise SignerError("neo3 library required for address conversion")
raise SignerError("neo-mamba library required for address conversion. Install with: pip install neo-mamba")
except Exception as e:
raise SignerError(f"Failed to convert address to script hash: {e}")

Expand Down Expand Up @@ -502,7 +524,7 @@ def create_signer(
elif turnkey_sign_with:
signer_type = "turnkey"
else:
# Priority: plain env -> vault -> turnkey
# Priority: plain env -> vault -> auto-decrypt -> turnkey
env_key = os.getenv(ENV_PRIVATE_KEY)

# 1. Plain private key from env (not encrypted)
Expand All @@ -513,6 +535,11 @@ def create_signer(
elif _get_from_vault(ENV_PRIVATE_KEY):
signer_type = "local"

# 2b. Try auto-decrypt if encrypted in env
elif env_key and _is_encrypted(env_key):
if _auto_decrypt_to_vault(ENV_PRIVATE_KEY):
signer_type = "local"

# 3. Turnkey remote signing
elif os.getenv(ENV_TURNKEY_SIGN_WITH):
signer_type = "turnkey"
Expand All @@ -526,14 +553,15 @@ def create_signer(
)

if signer_type == "local":
# Try sources in priority order: param -> plain env -> vault
# Try sources in priority order: param -> plain env -> vault (with auto-decrypt)
key = private_key
if not key:
env_key = os.getenv(ENV_PRIVATE_KEY)
if env_key and not _is_encrypted(env_key):
key = env_key
if not key:
key = _get_from_vault(ENV_PRIVATE_KEY)
# Use _get_private_key_from_vault which handles auto-decryption
key = _get_private_key_from_vault()

if not key:
raise ValueError(
Expand Down
Loading