Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 46 additions & 85 deletions certmanager/checkupdate.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@ def read_bootstrap_properties(key):

# Function to check if certificate is expired
def is_certificate_expired(expiration_date):
# Parse expiration date string
expiration_date = datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z")
# Get current date
current_date = datetime.utcnow()
# Compare expiration date with current date
return current_date > expiration_date

# Function to write expired certificates to a text file
Expand All @@ -31,47 +28,31 @@ def write_to_expired_txt(cert_name):

# Function to format certificate data
def format_certificate(cert_data):
# Replace line breaks with "\\n"
formatted_cert_data = cert_data.replace("\n", "\\n")
return formatted_cert_data
return cert_data.replace("\n", "\\n")

# Function to retrieve certificate data from the database
def retrieve_certificate_data(partner_id, db_host, db_port, db_user, db_password):
try:
# Connect to the PMS database
pms_conn = psycopg2.connect(
host=db_host,
port=db_port,
database="mosip_pms",
user=db_user,
password=db_password
host=db_host, port=db_port, database="mosip_pms",
user=db_user, password=db_password
)
pms_cursor = pms_conn.cursor()

# Query to retrieve the certificate alias
sql_query_cert_alias = f"SELECT certificate_alias FROM pms.partner WHERE id = '{partner_id}';"
pms_cursor.execute(sql_query_cert_alias)
certificate_alias = pms_cursor.fetchone()[0]

# Query to retrieve cert_data using the certificate alias
sql_query_cert_data = f"SELECT cert_data FROM keymgr.partner_cert_store WHERE cert_id = '{certificate_alias}';"

# Connect to the Keymgr database
keymgr_conn = psycopg2.connect(
host=db_host,
port=db_port,
database="mosip_keymgr",
user=db_user,
password=db_password
host=db_host, port=db_port, database="mosip_keymgr",
user=db_user, password=db_password
)
keymgr_cursor = keymgr_conn.cursor()
sql_query_cert_data = f"SELECT cert_data FROM keymgr.partner_cert_store WHERE cert_id = '{certificate_alias}';"
keymgr_cursor.execute(sql_query_cert_data)
cert_data = keymgr_cursor.fetchone()[0]

# Format the certificate data
formatted_cert_data = format_certificate(cert_data)

# Close connections
pms_cursor.close()
pms_conn.close()
keymgr_cursor.close()
Expand All @@ -96,14 +77,13 @@ def authenticate_and_get_token(base_url, client_secret):
"clientId": "mosip-pms-client",
"secretKey": client_secret
},
"requesttime": "", # Generate timestamp in desired format
"requesttime": "",
"version": "string"
}

response = requests.post(auth_url, headers=headers, json=auth_data)
if response.status_code == 200:
token = response.headers.get("authorization")
return token
return response.headers.get("authorization")
else:
print("Authentication failed.")
print("Auth API Response:", response.text)
Expand All @@ -117,7 +97,6 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url):
"Cookie": f"Authorization={token}"
}

# Format certificate data
formatted_cert_data = cert_data.replace("\\n", "\n")

upload_data = {
Expand All @@ -128,7 +107,7 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url):
"partnerDomain": "AUTH",
"partnerId": partner_id
},
"requesttime": "", # Generate timestamp in desired format
"requesttime": "",
"version": "string"
}

Expand Down Expand Up @@ -166,7 +145,6 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url):
if not pre_expiry_days:
missing_env_vars.append('pre-expiry-days')

# If any environment variables are not set, read from bootstrap.properties file
if missing_env_vars:
print(f"Missing environment variables: {', '.join(missing_env_vars)}. Falling back to bootstrap.properties.")
config = ConfigParser()
Expand All @@ -179,67 +157,50 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url):
client_secret = config.get('API', 'mosip_pms_client_secret', fallback=client_secret)
pre_expiry_days = config.get('API', 'pre-expiry-days', fallback=pre_expiry_days)

# Authenticate and get the token
TOKEN = authenticate_and_get_token(base_url, client_secret)

# Check if token is obtained successfully
if TOKEN:
# Read pre-expiry days from bootstrap.properties
PRE_EXPIRY_DAYS = pre_expiry_days
# **NEW MODIFICATION: Fetch Partner IDs from ENV or partner.properties**
partner_ids_env = os.environ.get("PARTNER_IDS_ENV")

# PARTNER_IDS read from partner.properties
if partner_ids_env:
partner_ids = partner_ids_env.split(',')
else:
print("PARTNER_IDS_ENV not found. Falling back to partner.properties.")
with open('partner.properties', 'r') as file:
for line in file:
if line.startswith('PARTNER_ID'):
partner_ids = line.strip().split('=')[1].split(',')
for PARTNER_ID in partner_ids:
print(f"\nProcessing partner ID: {PARTNER_ID.strip()}")
# Request certificate information
try:
req = Request(f"https://{base_url}/v1/partnermanager/partners/{PARTNER_ID.strip()}/certificate",
headers={
"Content-Type": "application/json",
"Cookie": f"Authorization={TOKEN}"
},
method="GET")
response = urlopen(req)
response_data = json.loads(response.read().decode('utf-8'))
CERTIFICATE_DATA = response_data.get('response', {}).get('certificateData')
print(CERTIFICATE_DATA)
# Run openssl command to print certificate details
openssl_command = f"echo '{CERTIFICATE_DATA}' | openssl x509 -noout -enddate"
expiration_date = os.popen(openssl_command).read().split('=')[1].strip()
print("Certificate expiration date:", expiration_date)
# Check if certificate is expired or pre-expiry
if is_certificate_expired(expiration_date) or \
(datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z") - datetime.utcnow()) <= timedelta(days=int(PRE_EXPIRY_DAYS)):
write_to_expired_txt(PARTNER_ID.strip())
except HTTPError as e:
print(f"Error occurred while fetching certificate information for {PARTNER_ID}: {e}")
continue

if not CERTIFICATE_DATA:
print(f"No data available for {PARTNER_ID} in keymanager.")
continue

# Check if expired.txt exists before trying to read from it
if os.path.exists("expired.txt"):
with open("expired.txt", "r") as file:
expired_partner_ids = [line.strip() for line in file if line.strip()]
else:
expired_partner_ids = []
break
else:
partner_ids = []

# Authenticate and get the token
TOKEN = authenticate_and_get_token(base_url, client_secret)

if TOKEN:
PRE_EXPIRY_DAYS = int(pre_expiry_days)

for PARTNER_ID in partner_ids:
PARTNER_ID = PARTNER_ID.strip()
print(f"\nProcessing partner ID: {PARTNER_ID}")

try:
req = Request(f"https://{base_url}/v1/partnermanager/partners/{PARTNER_ID}/certificate",
headers={"Content-Type": "application/json", "Cookie": f"Authorization={TOKEN}"},
method="GET")
response = urlopen(req)
response_data = json.loads(response.read().decode('utf-8'))
CERTIFICATE_DATA = response_data.get('response', {}).get('certificateData')

openssl_command = f"echo '{CERTIFICATE_DATA}' | openssl x509 -noout -enddate"
expiration_date = os.popen(openssl_command).read().split('=')[1].strip()

# Check if any certificates were found to be expired
if not expired_partner_ids:
print("None of the certs have expired.")
exit(0)
if is_certificate_expired(expiration_date) or \
(datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z") - datetime.utcnow()) <= timedelta(days=PRE_EXPIRY_DAYS):
write_to_expired_txt(PARTNER_ID)

for partner_id in expired_partner_ids:
print(f"Certificate renewal started for Partner ID: {partner_id}")
cert_data = retrieve_certificate_data(partner_id, postgres_host, postgres_port, postgres_user, postgres_password)
if cert_data is not None:
upload_certificate_with_token(TOKEN, cert_data, partner_id, base_url)
except HTTPError as e:
print(f"Error fetching certificate for {PARTNER_ID}: {e}")
continue

print("Certificate check and renewal process completed.")
else:
print("Failed while trying to get auth-token")
print("Failed while trying to get auth-token.")