-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathkeycloak_admin_api.py
More file actions
130 lines (114 loc) · 5.77 KB
/
keycloak_admin_api.py
File metadata and controls
130 lines (114 loc) · 5.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
import logging
import urllib.parse
import urllib.error
import http.client
import json
import time
import auth
class KeycloakAdminAPIException(Exception):
def __init__(self, message: str, error_code: int = 0):
super().__init__(message)
self.error_code = error_code
#API SPEC: https://www.keycloak.org/docs-api/22.0.5/rest-api/index.html#_users
class KeycloakAdminAPIClient:
def __init__(self, authClient: auth.AuthClient, apiURL: str):
self.apiURL = urllib.parse.urlparse(apiURL)
if self.apiURL.hostname is None: raise Exception('Wrong apiUrl.')
self.authClient = authClient
def _get_connection(self):
if self.apiURL.hostname is None: raise Exception('Wrong apiUrl.')
return http.client.HTTPSConnection(self.apiURL.hostname, self.apiURL.port)
def _get_headers(self):
headers = {}
headers['Authorization'] = 'bearer ' + self.authClient.get_token()
return headers
def _GET_JSON(self, path):
connection = self._get_connection()
try:
connection.request("GET", self.apiURL.path + path, body="", headers=self._get_headers())
res = connection.getresponse()
httpStatusCode = res.status
msg = res.read() # whole response must be readed in order to do more requests using the same connection
finally:
connection.close()
if httpStatusCode != 200:
logging.root.error('KeycloakAdminAPI error. Code: %d %s' % (httpStatusCode, res.reason))
raise KeycloakAdminAPIException('Internal server error: KeycloakAdminAPI call failed.', httpStatusCode)
logging.root.debug('KeycloakAdminAPI call success.')
return json.loads(msg)
def _PUT_JSON(self, path, content):
connection = self._get_connection()
headers = self._get_headers()
headers['Content-Type'] = 'application/json'
try:
connection.request("PUT", self.apiURL.path + path, content, headers)
res = connection.getresponse()
httpStatusCode = res.status
finally:
connection.close()
if httpStatusCode != 204:
logging.root.error('KeycloakAdminAPI error. Code: %d %s' % (httpStatusCode, res.reason))
raise KeycloakAdminAPIException('Internal server error: KeycloakAdminAPI call failed.', httpStatusCode)
logging.root.debug('KeycloakAdminAPI call success.')
def getUserId(self, username):
logging.root.debug('Getting user ID from KeycloakAdminAPI...')
response = self._GET_JSON("users?exact=true&briefRepresentation=true&username="+urllib.parse.quote_plus(username))
try:
if len(response) == 0: return None
if len(response) != 1: raise Exception("Unexpected response, username not unique")
user = response[0]
if user["username"] != username: raise Exception("Unexpected response, username not match")
return user["id"]
except (Exception) as e:
logging.root.error('KeycloakAdminAPI response unexpected: %s' % (response))
raise KeycloakAdminAPIException('Internal server error: KeycloakAdminAPI response unexpected.')
def getUserEmail(self, userId):
logging.root.debug('Getting user from KeycloakAdminAPI...')
user = self._GET_JSON("users/"+userId)
return user["email"]
def getUserAttribute(self, userId, attributeName):
logging.root.debug('Getting user from KeycloakAdminAPI...')
user = self._GET_JSON("users/"+userId)
if "attributes" in user and attributeName in user["attributes"]:
return user["attributes"][attributeName][0] # If the attribute is repeated there will be more than one items,
# but let's return only the first value.
else: return None
def setUserAttribute(self, userId, attributeName, attributeValue):
logging.root.debug('Getting user from KeycloakAdminAPI...')
user = self._GET_JSON("users/"+userId)
if not "attributes" in user: user["attributes"] = {}
if not attributeName in user["attributes"]:
attributeValues = [attributeValue]
else:
attributeValues = user["attributes"][attributeName]
attributeValues[0] = attributeValue
user["attributes"][attributeName] = attributeValues
logging.root.debug('Setting user attribute with KeycloakAdminAPI...')
self._PUT_JSON("users/"+userId, json.dumps(user))
def _POST_JSON(self, path, content):
connection = self._get_connection()
headers = self._get_headers()
headers['Content-Type'] = 'application/json'
try:
connection.request("POST", self.apiURL.path + path, content, headers)
res = connection.getresponse()
httpStatusCode = res.status
finally:
connection.close()
if httpStatusCode != 201:
logging.root.error('KeycloakAdminAPI error. Code: %d %s' % (httpStatusCode, res.reason))
raise KeycloakAdminAPIException('Internal server error: KeycloakAdminAPI call failed.', httpStatusCode)
logging.root.debug('KeycloakAdminAPI call success.')
def createSpecialUser(self, username, email, firstName, lastName):
logging.root.debug('Creatingg user attribute with KeycloakAdminAPI...')
user = {
"requiredActions":[],
"emailVerified":True,
"username":username,
"email":email,
"firstName":firstName,
"lastName":lastName,
"attributes":{"companyOrOrganization":"","projects":"","eucaimNegotiationID":"","additionalComments":"","confirmation":["no"]},
"groups":[],
"enabled":True}
self._POST_JSON("users", json.dumps(user))