Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 99 additions & 39 deletions tools/PccsAdminTool/lib/intelsgx/pcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,35 @@
import json
import binascii
from urllib import parse
from OpenSSL import crypto

from cryptography import x509
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec

# Prefer pycryptography for cert verification if new
# enough, but fallback to pyopenssl
try:
# 'verification' module available from >= 42.0.0, but
# the required 'ExtensionPolicy' API is from >= 45.0.0
from cryptography.x509 import verification
if not hasattr(verification, 'ExtensionPolicy'):
verification = None
else:
crypto = None
except ImportError:
verification = None

if verification is None:
try:
from OpenSSL import crypto
except ModuleNotFoundError:
# Fallback to spawning 'openssl' binary if
# pyopenssl is not available
crypto = None
import tempfile
import subprocess

from pypac import PACSession
from platform import system
from lib.intelsgx.credential import Credentials
Expand All @@ -16,6 +44,9 @@
certEnd= '-----END CERTIFICATE-----'
certEndOffset= len(certEnd)

def CN(name):
return name.get_attributes_for_oid(x509.NameOID.COMON_NAME)[0].value

class PCS:
BaseUrl= ''
ApiVersion= 3
Expand Down Expand Up @@ -92,19 +123,14 @@ def init_cert_store(self, pychain):
store= crypto.X509Store()

for tcert in pychain:
store.add_cert(tcert)
store.add_cert(crypto.X509.from_cryptography(tcert))

return store

def verify_crl_trust(self, pychain_in, pycrl):
# Copy our list so we don't modify the original
pychain= pychain_in[:]

# PyOpenSSL doesn't have methods for verifying a CRL issuer,
# so we need to translate from it to cryptography.

crl= pycrl.to_cryptography()

# The chain_pem is our CRL issuer and the CA for the issuer.
# Verify that first.

Expand All @@ -115,31 +141,66 @@ def verify_crl_trust(self, pychain_in, pycrl):

# Now verify the CRL signature

signer_key= pycert.get_pubkey().to_cryptography_key()
signer_key= pycert.public_key()

if not crl.is_signature_valid(signer_key):
if not pycrl.is_signature_valid(signer_key):
self.error("Could not verify CRL signature")
return False

# Check the crl issuer

if pycrl.get_issuer() != pycert.get_subject():
if pycrl.issuer != pycert.subject:
self.error("CRL issuer doesn't match issuer chain")
return False

return True

def verify_cert_trust(self, pychain, pycerts):
store= self.init_cert_store(pychain)

for pycert in pycerts:
store_ctx= crypto.X509StoreContext(store, pycert)
try:
store_ctx.verify_certificate()
except crypto.X509StoreContextError as e:
# Printing or logging the error details
print(e)
return False
if verification is not None:
store= verification.Store(pychain)

builder= verification.PolicyBuilder().store(store)
builder= builder.extension_policies(
ee_policy=verification.ExtensionPolicy.permit_all(),
ca_policy=verification.ExtensionPolicy.webpki_defaults_ca())

verifier= builder.build_client_verifier()
for pycert in pycerts:
try:
verifier.verify(pycert,[])
except verification.VerificationError as e:
# Printing or logging the error details
print(e)
return False
elif crypto is not None:
store= self.init_cert_store(pychain)

for pycert in pycerts:
store_ctx= crypto.X509StoreContext(
store, crypto.X509.from_cryptography(pycert))
try:
store_ctx.verify_certificate()
except crypto.X509StoreContextError as e:
# Printing or logging the error details
print(e)
return False
else:
with tempfile.NamedTemporaryFile("wb") as chainfile:
for cert in pychain:
chainfile.write(cert.public_bytes(serialization.Encoding.PEM))
chainfile.flush()

for cert in pycerts:
with tempfile.NamedTemporaryFile("wb") as certfile:
certfile.write(cert.public_bytes(serialization.Encoding.PEM))
certfile.flush()

try:
subprocess.check_call(["openssl", "verify",
"-CAfile", chainfile.name, certfile.name],
stdout=subprocess.DEVNULL)
except subprocess.CalledProcessError as e:
return False

return True

Expand All @@ -165,22 +226,21 @@ def verify_signature(self, pycert, signature, msg):
sig= bytes([0x30,len(r)+len(s)+4,2,len(r)]) + r + bytes([2,len(s)]) + s

try:
crypto.verify(pycert, sig, msg, "sha256")
except crypto.Error as e:
pycert.public_key().verify(
sig, msg, ec.ECDSA(hashes.SHA256()))
except InvalidSignature as e:
self.error('Signature verification failed: {:s}'.format(str(e)))
return False

return True

def pem_to_pycert(self, cert_pem):
return crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
return x509.load_pem_x509_certificate(cert_pem.encode("utf-8"))

def pems_to_pycerts(self, certs_pem):
pycerts= []
for cert_pem in certs_pem:
pycerts.append(
crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem)
)
pycerts.append(self.pem_to_pycert(cert_pem))
return pycerts

def parse_chain_pem(self, chain_pem):
Expand Down Expand Up @@ -213,9 +273,9 @@ def sort_pycert_chain(self, chain_in):
cert0= chain_in[0]
cert1= chain_in[1]

if cert0.get_subject() == cert1.get_issuer():
if cert0.subject == cert1.issuer:
return chain_in
elif cert1.get_subject() == cert0.get_issuer():
elif cert1.subject == cert0.issuer:
chain_in.reverse()
return chain_in
else:
Expand All @@ -228,7 +288,7 @@ def sort_pycert_chain(self, chain_in):
for i in range(1, len(chain_in)):
cert= chain_in[i]
pcert= chain_in[i-1]
if cert.get_issuer() != pcert.get_subject():
if cert.issuer != pcert.subject:
sorted= False
break

Expand All @@ -244,10 +304,10 @@ def sort_pycert_chain(self, chain_in):
rootidx= -1
for i in range(0, len(chain)):
cert= chain[i]
subject= cert.get_subject()
issuer= cert.get_issuer()
cert_subjects[subject.CN]= cert
print("cert: {:s} <- {:s}" . format(subject.CN, issuer.CN))
subject= cert.subject
issuer= cert.issuer
cert_subjects[CN(subject)]= cert
print("cert: {:s} <- {:s}" . format(CN(subject), CN(issuer)))

if subject == issuer:
if len(sorted_chain) > 0:
Expand All @@ -266,8 +326,8 @@ def sort_pycert_chain(self, chain_in):
issuer_to= {}

for cert in chain:
issuer= cert.get_issuer().CN
subject= cert.get_subject().CN
issuer= CN(cert.issuer)
subject= CN(cert.subject)

if issuer in issued_by:
self.error('multiple certs issued by same cert in chain')
Expand All @@ -284,7 +344,7 @@ def sort_pycert_chain(self, chain_in):

if len(sorted_chain) > 0:
for cert in chain:
issuer= cert.get_issuer().CN
issuer= CN(cert.issuer)
if issuer not in issued_by:
if len(sorted_chain) > 0:
self.error('multiple certs with no issuer')
Expand All @@ -300,7 +360,7 @@ def sort_pycert_chain(self, chain_in):
cert= sorted_chain[0]

while len(sorted_chain) < lchain:
issuer_subject= cert.get_subject().der()
issuer_subject= CN(cert.subject)

if issuer_subject not in issuer_to:
self.error('cert in chain with no issuer')
Expand Down Expand Up @@ -515,10 +575,10 @@ def get_pck_crl(self, target, dec=None):
crl= response.content
if self.ApiVersion<3:
crl_str= str(crl, dec)
pycrl= crypto.load_crl(crypto.FILETYPE_PEM, crl)
pycrl= x509.load_pem_x509_crl(crl)
else:
crl_str= binascii.hexlify(crl).decode(dec)
pycrl= crypto.load_crl(crypto.FILETYPE_ASN1, crl)
pycrl= x509.load_der_x509_crl(crl)

if not self.verify_crl_trust(pychain, pycrl):
self.error("Could not validate certificate using trust chain")
Expand Down