From 307aba5c2cc17456cff10f65a88842f6cd42fe4c Mon Sep 17 00:00:00 2001 From: "Mahesh.Binayak" Date: Tue, 4 Feb 2025 15:28:44 +0530 Subject: [PATCH] [MOSIP-39249] added option to get partner-id input from an env variable Signed-off-by: Mahesh.Binayak --- certmanager/checkupdate.py | 131 +++++++++++++------------------------ 1 file changed, 46 insertions(+), 85 deletions(-) diff --git a/certmanager/checkupdate.py b/certmanager/checkupdate.py index c21fc3a..9a6b2dd 100644 --- a/certmanager/checkupdate.py +++ b/certmanager/checkupdate.py @@ -17,11 +17,8 @@ def read_bootstrap_properties(key): # Function to check if certificate is expired def is_certificate_expired(expiration_date): - # Parse expiration date string expiration_date = datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z") - # Get current date current_date = datetime.utcnow() - # Compare expiration date with current date return current_date > expiration_date # Function to write expired certificates to a text file @@ -31,47 +28,31 @@ def write_to_expired_txt(cert_name): # Function to format certificate data def format_certificate(cert_data): - # Replace line breaks with "\\n" - formatted_cert_data = cert_data.replace("\n", "\\n") - return formatted_cert_data + return cert_data.replace("\n", "\\n") # Function to retrieve certificate data from the database def retrieve_certificate_data(partner_id, db_host, db_port, db_user, db_password): try: - # Connect to the PMS database pms_conn = psycopg2.connect( - host=db_host, - port=db_port, - database="mosip_pms", - user=db_user, - password=db_password + host=db_host, port=db_port, database="mosip_pms", + user=db_user, password=db_password ) pms_cursor = pms_conn.cursor() - - # Query to retrieve the certificate alias sql_query_cert_alias = f"SELECT certificate_alias FROM pms.partner WHERE id = '{partner_id}';" pms_cursor.execute(sql_query_cert_alias) certificate_alias = pms_cursor.fetchone()[0] - # Query to retrieve cert_data using the certificate alias - sql_query_cert_data = f"SELECT cert_data FROM keymgr.partner_cert_store WHERE cert_id = '{certificate_alias}';" - - # Connect to the Keymgr database keymgr_conn = psycopg2.connect( - host=db_host, - port=db_port, - database="mosip_keymgr", - user=db_user, - password=db_password + host=db_host, port=db_port, database="mosip_keymgr", + user=db_user, password=db_password ) keymgr_cursor = keymgr_conn.cursor() + sql_query_cert_data = f"SELECT cert_data FROM keymgr.partner_cert_store WHERE cert_id = '{certificate_alias}';" keymgr_cursor.execute(sql_query_cert_data) cert_data = keymgr_cursor.fetchone()[0] - # Format the certificate data formatted_cert_data = format_certificate(cert_data) - # Close connections pms_cursor.close() pms_conn.close() keymgr_cursor.close() @@ -96,14 +77,13 @@ def authenticate_and_get_token(base_url, client_secret): "clientId": "mosip-pms-client", "secretKey": client_secret }, - "requesttime": "", # Generate timestamp in desired format + "requesttime": "", "version": "string" } response = requests.post(auth_url, headers=headers, json=auth_data) if response.status_code == 200: - token = response.headers.get("authorization") - return token + return response.headers.get("authorization") else: print("Authentication failed.") print("Auth API Response:", response.text) @@ -117,7 +97,6 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url): "Cookie": f"Authorization={token}" } - # Format certificate data formatted_cert_data = cert_data.replace("\\n", "\n") upload_data = { @@ -128,7 +107,7 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url): "partnerDomain": "AUTH", "partnerId": partner_id }, - "requesttime": "", # Generate timestamp in desired format + "requesttime": "", "version": "string" } @@ -166,7 +145,6 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url): if not pre_expiry_days: missing_env_vars.append('pre-expiry-days') -# If any environment variables are not set, read from bootstrap.properties file if missing_env_vars: print(f"Missing environment variables: {', '.join(missing_env_vars)}. Falling back to bootstrap.properties.") config = ConfigParser() @@ -179,67 +157,50 @@ def upload_certificate_with_token(token, cert_data, partner_id, base_url): client_secret = config.get('API', 'mosip_pms_client_secret', fallback=client_secret) pre_expiry_days = config.get('API', 'pre-expiry-days', fallback=pre_expiry_days) -# Authenticate and get the token -TOKEN = authenticate_and_get_token(base_url, client_secret) - -# Check if token is obtained successfully -if TOKEN: - # Read pre-expiry days from bootstrap.properties - PRE_EXPIRY_DAYS = pre_expiry_days +# **NEW MODIFICATION: Fetch Partner IDs from ENV or partner.properties** +partner_ids_env = os.environ.get("PARTNER_IDS_ENV") - # PARTNER_IDS read from partner.properties +if partner_ids_env: + partner_ids = partner_ids_env.split(',') +else: + print("PARTNER_IDS_ENV not found. Falling back to partner.properties.") with open('partner.properties', 'r') as file: for line in file: if line.startswith('PARTNER_ID'): partner_ids = line.strip().split('=')[1].split(',') - for PARTNER_ID in partner_ids: - print(f"\nProcessing partner ID: {PARTNER_ID.strip()}") - # Request certificate information - try: - req = Request(f"https://{base_url}/v1/partnermanager/partners/{PARTNER_ID.strip()}/certificate", - headers={ - "Content-Type": "application/json", - "Cookie": f"Authorization={TOKEN}" - }, - method="GET") - response = urlopen(req) - response_data = json.loads(response.read().decode('utf-8')) - CERTIFICATE_DATA = response_data.get('response', {}).get('certificateData') - print(CERTIFICATE_DATA) - # Run openssl command to print certificate details - openssl_command = f"echo '{CERTIFICATE_DATA}' | openssl x509 -noout -enddate" - expiration_date = os.popen(openssl_command).read().split('=')[1].strip() - print("Certificate expiration date:", expiration_date) - # Check if certificate is expired or pre-expiry - if is_certificate_expired(expiration_date) or \ - (datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z") - datetime.utcnow()) <= timedelta(days=int(PRE_EXPIRY_DAYS)): - write_to_expired_txt(PARTNER_ID.strip()) - except HTTPError as e: - print(f"Error occurred while fetching certificate information for {PARTNER_ID}: {e}") - continue - - if not CERTIFICATE_DATA: - print(f"No data available for {PARTNER_ID} in keymanager.") - continue - - # Check if expired.txt exists before trying to read from it - if os.path.exists("expired.txt"): - with open("expired.txt", "r") as file: - expired_partner_ids = [line.strip() for line in file if line.strip()] - else: - expired_partner_ids = [] + break + else: + partner_ids = [] + +# Authenticate and get the token +TOKEN = authenticate_and_get_token(base_url, client_secret) + +if TOKEN: + PRE_EXPIRY_DAYS = int(pre_expiry_days) + + for PARTNER_ID in partner_ids: + PARTNER_ID = PARTNER_ID.strip() + print(f"\nProcessing partner ID: {PARTNER_ID}") + + try: + req = Request(f"https://{base_url}/v1/partnermanager/partners/{PARTNER_ID}/certificate", + headers={"Content-Type": "application/json", "Cookie": f"Authorization={TOKEN}"}, + method="GET") + response = urlopen(req) + response_data = json.loads(response.read().decode('utf-8')) + CERTIFICATE_DATA = response_data.get('response', {}).get('certificateData') + + openssl_command = f"echo '{CERTIFICATE_DATA}' | openssl x509 -noout -enddate" + expiration_date = os.popen(openssl_command).read().split('=')[1].strip() - # Check if any certificates were found to be expired - if not expired_partner_ids: - print("None of the certs have expired.") - exit(0) + if is_certificate_expired(expiration_date) or \ + (datetime.strptime(expiration_date, "%b %d %H:%M:%S %Y %Z") - datetime.utcnow()) <= timedelta(days=PRE_EXPIRY_DAYS): + write_to_expired_txt(PARTNER_ID) - for partner_id in expired_partner_ids: - print(f"Certificate renewal started for Partner ID: {partner_id}") - cert_data = retrieve_certificate_data(partner_id, postgres_host, postgres_port, postgres_user, postgres_password) - if cert_data is not None: - upload_certificate_with_token(TOKEN, cert_data, partner_id, base_url) + except HTTPError as e: + print(f"Error fetching certificate for {PARTNER_ID}: {e}") + continue print("Certificate check and renewal process completed.") else: - print("Failed while trying to get auth-token") + print("Failed while trying to get auth-token.")