diff --git a/tools/PccsAdminTool/lib/intelsgx/pcs.py b/tools/PccsAdminTool/lib/intelsgx/pcs.py index 9f1d22455..08f4c27a8 100644 --- a/tools/PccsAdminTool/lib/intelsgx/pcs.py +++ b/tools/PccsAdminTool/lib/intelsgx/pcs.py @@ -4,7 +4,35 @@ import json import binascii from urllib import parse -from OpenSSL import crypto + +from cryptography import x509 +from cryptography.exceptions import InvalidSignature +from cryptography.hazmat.primitives import hashes, serialization +from cryptography.hazmat.primitives.asymmetric import ec + +# Prefer pycryptography for cert verification if new +# enough, but fallback to pyopenssl +try: + # 'verification' module available from >= 42.0.0, but + # the required 'ExtensionPolicy' API is from >= 45.0.0 + from cryptography.x509 import verification + if not hasattr(verification, 'ExtensionPolicy'): + verification = None + else: + crypto = None +except ImportError: + verification = None + +if verification is None: + try: + from OpenSSL import crypto + except ModuleNotFoundError: + # Fallback to spawning 'openssl' binary if + # pyopenssl is not available + crypto = None + import tempfile + import subprocess + from pypac import PACSession from platform import system from lib.intelsgx.credential import Credentials @@ -16,6 +44,9 @@ certEnd= '-----END CERTIFICATE-----' certEndOffset= len(certEnd) +def CN(name): + return name.get_attributes_for_oid(x509.NameOID.COMON_NAME)[0].value + class PCS: BaseUrl= '' ApiVersion= 3 @@ -92,7 +123,7 @@ def init_cert_store(self, pychain): store= crypto.X509Store() for tcert in pychain: - store.add_cert(tcert) + store.add_cert(crypto.X509.from_cryptography(tcert)) return store @@ -100,11 +131,6 @@ def verify_crl_trust(self, pychain_in, pycrl): # Copy our list so we don't modify the original pychain= pychain_in[:] - # PyOpenSSL doesn't have methods for verifying a CRL issuer, - # so we need to translate from it to cryptography. - - crl= pycrl.to_cryptography() - # The chain_pem is our CRL issuer and the CA for the issuer. # Verify that first. @@ -115,31 +141,66 @@ def verify_crl_trust(self, pychain_in, pycrl): # Now verify the CRL signature - signer_key= pycert.get_pubkey().to_cryptography_key() + signer_key= pycert.public_key() - if not crl.is_signature_valid(signer_key): + if not pycrl.is_signature_valid(signer_key): self.error("Could not verify CRL signature") return False # Check the crl issuer - if pycrl.get_issuer() != pycert.get_subject(): + if pycrl.issuer != pycert.subject: self.error("CRL issuer doesn't match issuer chain") return False return True def verify_cert_trust(self, pychain, pycerts): - store= self.init_cert_store(pychain) - - for pycert in pycerts: - store_ctx= crypto.X509StoreContext(store, pycert) - try: - store_ctx.verify_certificate() - except crypto.X509StoreContextError as e: - # Printing or logging the error details - print(e) - return False + if verification is not None: + store= verification.Store(pychain) + + builder= verification.PolicyBuilder().store(store) + builder= builder.extension_policies( + ee_policy=verification.ExtensionPolicy.permit_all(), + ca_policy=verification.ExtensionPolicy.webpki_defaults_ca()) + + verifier= builder.build_client_verifier() + for pycert in pycerts: + try: + verifier.verify(pycert,[]) + except verification.VerificationError as e: + # Printing or logging the error details + print(e) + return False + elif crypto is not None: + store= self.init_cert_store(pychain) + + for pycert in pycerts: + store_ctx= crypto.X509StoreContext( + store, crypto.X509.from_cryptography(pycert)) + try: + store_ctx.verify_certificate() + except crypto.X509StoreContextError as e: + # Printing or logging the error details + print(e) + return False + else: + with tempfile.NamedTemporaryFile("wb") as chainfile: + for cert in pychain: + chainfile.write(cert.public_bytes(serialization.Encoding.PEM)) + chainfile.flush() + + for cert in pycerts: + with tempfile.NamedTemporaryFile("wb") as certfile: + certfile.write(cert.public_bytes(serialization.Encoding.PEM)) + certfile.flush() + + try: + subprocess.check_call(["openssl", "verify", + "-CAfile", chainfile.name, certfile.name], + stdout=subprocess.DEVNULL) + except subprocess.CalledProcessError as e: + return False return True @@ -165,22 +226,21 @@ def verify_signature(self, pycert, signature, msg): sig= bytes([0x30,len(r)+len(s)+4,2,len(r)]) + r + bytes([2,len(s)]) + s try: - crypto.verify(pycert, sig, msg, "sha256") - except crypto.Error as e: + pycert.public_key().verify( + sig, msg, ec.ECDSA(hashes.SHA256())) + except InvalidSignature as e: self.error('Signature verification failed: {:s}'.format(str(e))) return False return True def pem_to_pycert(self, cert_pem): - return crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) + return x509.load_pem_x509_certificate(cert_pem.encode("utf-8")) def pems_to_pycerts(self, certs_pem): pycerts= [] for cert_pem in certs_pem: - pycerts.append( - crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem) - ) + pycerts.append(self.pem_to_pycert(cert_pem)) return pycerts def parse_chain_pem(self, chain_pem): @@ -213,9 +273,9 @@ def sort_pycert_chain(self, chain_in): cert0= chain_in[0] cert1= chain_in[1] - if cert0.get_subject() == cert1.get_issuer(): + if cert0.subject == cert1.issuer: return chain_in - elif cert1.get_subject() == cert0.get_issuer(): + elif cert1.subject == cert0.issuer: chain_in.reverse() return chain_in else: @@ -228,7 +288,7 @@ def sort_pycert_chain(self, chain_in): for i in range(1, len(chain_in)): cert= chain_in[i] pcert= chain_in[i-1] - if cert.get_issuer() != pcert.get_subject(): + if cert.issuer != pcert.subject: sorted= False break @@ -244,10 +304,10 @@ def sort_pycert_chain(self, chain_in): rootidx= -1 for i in range(0, len(chain)): cert= chain[i] - subject= cert.get_subject() - issuer= cert.get_issuer() - cert_subjects[subject.CN]= cert - print("cert: {:s} <- {:s}" . format(subject.CN, issuer.CN)) + subject= cert.subject + issuer= cert.issuer + cert_subjects[CN(subject)]= cert + print("cert: {:s} <- {:s}" . format(CN(subject), CN(issuer))) if subject == issuer: if len(sorted_chain) > 0: @@ -266,8 +326,8 @@ def sort_pycert_chain(self, chain_in): issuer_to= {} for cert in chain: - issuer= cert.get_issuer().CN - subject= cert.get_subject().CN + issuer= CN(cert.issuer) + subject= CN(cert.subject) if issuer in issued_by: self.error('multiple certs issued by same cert in chain') @@ -284,7 +344,7 @@ def sort_pycert_chain(self, chain_in): if len(sorted_chain) > 0: for cert in chain: - issuer= cert.get_issuer().CN + issuer= CN(cert.issuer) if issuer not in issued_by: if len(sorted_chain) > 0: self.error('multiple certs with no issuer') @@ -300,7 +360,7 @@ def sort_pycert_chain(self, chain_in): cert= sorted_chain[0] while len(sorted_chain) < lchain: - issuer_subject= cert.get_subject().der() + issuer_subject= CN(cert.subject) if issuer_subject not in issuer_to: self.error('cert in chain with no issuer') @@ -515,10 +575,10 @@ def get_pck_crl(self, target, dec=None): crl= response.content if self.ApiVersion<3: crl_str= str(crl, dec) - pycrl= crypto.load_crl(crypto.FILETYPE_PEM, crl) + pycrl= x509.load_pem_x509_crl(crl) else: crl_str= binascii.hexlify(crl).decode(dec) - pycrl= crypto.load_crl(crypto.FILETYPE_ASN1, crl) + pycrl= x509.load_der_x509_crl(crl) if not self.verify_crl_trust(pychain, pycrl): self.error("Could not validate certificate using trust chain")