-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathauth.py
More file actions
68 lines (60 loc) · 2.75 KB
/
auth.py
File metadata and controls
68 lines (60 loc) · 2.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
from datetime import datetime, timezone
import logging
import urllib.parse
import urllib.error
import http.client
import json
import jwt
class LoginException(Exception):
pass
class AuthClient:
TOKEN_EXPIRATION_LEEWAY = 60 # seconds
def __init__(self, oidc_url: str, client_id: str, login_as_service_account: bool, client_secret: str = "", username: str = "", password = ""):
self._oidc_url = oidc_url
self._client_id = client_id
self._login_as_service_account = login_as_service_account
self._client_secret = client_secret
self._username = username
self._password = password
self._token = None
self._exp = None
def _login(self):
logging.root.debug("Logging into the auth service...")
auth = urllib.parse.urlparse(self._oidc_url)
if auth.hostname is None: raise Exception('Wrong oidc_url.')
connection = http.client.HTTPSConnection(auth.hostname, auth.port)
headers = {'Content-Type': 'application/x-www-form-urlencoded'}
if self._login_as_service_account:
payload = {'client_id' : self._client_id, 'grant_type': 'client_credentials', 'client_secret' : self._client_secret}
else:
payload = {'client_id' : self._client_id, 'grant_type': 'password', 'username' : self._username, 'password' : self._password}
if self._client_secret != "":
payload["client_secret"] = self._client_secret
payload = urllib.parse.urlencode(payload)
try:
connection.request("POST", auth.path, payload, headers)
res = connection.getresponse()
httpStatusCode = res.status
msg = res.read() # whole response must be readed in order to do more requests using the same connection
finally:
connection.close()
if httpStatusCode != 200:
logging.root.error('Auth login error. Code: %d %s' % (httpStatusCode, res.reason))
raise LoginException('Internal server error: Auth login failed.')
else:
logging.root.debug('Auth login success.')
response = json.loads(msg)
#print(response)
return response['access_token']
def get_token(self):
if self._token != None:
if self._exp is None:
decodedToken = jwt.decode(self._token, options={'verify_signature': False})
self._exp = int(decodedToken["exp"])
now = datetime.now(tz=timezone.utc).timestamp()
if now > (self._exp - self.TOKEN_EXPIRATION_LEEWAY): # token expired or few time left
self._token = None
self._exp = None
if self._token is None:
self._token = self._login()
return self._token