-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathsync_perms.py
More file actions
175 lines (151 loc) · 7.72 KB
/
sync_perms.py
File metadata and controls
175 lines (151 loc) · 7.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# sync_perms.py
#
# Baseline script to sync permissions of tables, volumes, schemas and catalogs between two workspaces.
#
# NOTE: This script must be run in the PRIMARY workspace.
#
# This script will attempt to sync the permissions of all catalog, schema and table securables between a source and
# target workspace. All securables should already exist in the target workspace, i.e., this script assumes a metadata
# sync has already been performed. It will also only update permissions on objects that exist in the primary; i.e., if
# an object exists in the secondary but not the primary, no updates will be applied.
#
# Params that must be specified below:
# -source_host: the hostname of the primary workspace.
# -source_pat: an access token for the primary workspace; must be an ADMIN user.
# -target_host: the hostname of the secondary workspace.
# -target_pat: an access token for the secondary workspace; must be an ADMIN user.
# -catalogs_to_copy: a list of the catalogs to be replicated between workspaces.
# -num_exec: the number of threads to spawn in the ThreadPoolExecutor.
from itertools import repeat
from databricks.sdk.service import catalog
from databricks.sdk import WorkspaceClient
from concurrent.futures import ThreadPoolExecutor
from databricks.sdk.errors.platform import NotFound
from common import (target_pat, target_host,
source_pat, source_host,
catalogs_to_copy, num_exec)
# helper function to update object grants between source and target WS
def sync_grants(w_src, w_tgt, obj_name, obj_type):
# get source and target grants
source_grants = w_src.grants.get_effective(obj_type, obj_name)
# if the object does not exist in the secondary workspace, we cannot fetch it
try:
target_grants = w_tgt.grants.get_effective(obj_type, obj_name)
except NotFound:
return {"name": obj_name, "status": "NotFound"}
# get list of all distinct users with grants on the object
user_list = {u.principal for u in source_grants.privilege_assignments}.union(
{u.principal for u in target_grants.privilege_assignments})
# create PermissionsChange object for each user where a change exists
change_list = []
for u in user_list:
# get the source/target privileges; these may not exist in one or the other environment
try:
source_privs = [x.privilege for x in
[p.privileges for p in source_grants.privilege_assignments if p.principal == u][0]
if x.privilege is not None]
except IndexError:
source_privs = []
try:
target_privs = [x.privilege for x in
[p.privileges for p in target_grants.privilege_assignments if p.principal == u][0]
if x.privilege is not None]
except IndexError:
target_privs = []
add_perms = list(set(source_privs) - set(target_privs))
rem_perms = list(set(target_privs) - set(source_privs))
# for the change list based on which types of changes exist
if add_perms and rem_perms:
change_list.append(catalog.PermissionsChange(
add=add_perms,
remove=rem_perms,
principal=u))
elif add_perms:
change_list.append(catalog.PermissionsChange(
add=add_perms,
principal=u))
elif rem_perms:
change_list.append(catalog.PermissionsChange(
remove=rem_perms,
principal=u))
# if any grants changed, update the object in target
if change_list:
w_tgt.grants.update(full_name=obj_name,
securable_type=obj_type,
changes=change_list)
return {"name": obj_name, "status": "SUCCESS"}
else:
return {"name": obj_name, "status": None}
# create the WorkspaceClients for source and target workspaces
w_source = WorkspaceClient(host=source_host, token=source_pat)
w_target = WorkspaceClient(host=target_host, token=target_pat)
# get all tables in the source ws
table_info = spark.sql("SELECT * FROM system.information_schema.tables")
volume_info = spark.sql("SELECT * FROM system.information_schema.volumes")
# iterate through catalogs
for cat in catalogs_to_copy:
filtered_tables = table_info.filter(
(table_info.table_catalog == cat) &
(table_info.table_schema != "information_schema")).collect()
filtered_volumes = volume_info.filter(volume_info.volume_catalog == cat).collect()
# sync the catalog grants first
res = sync_grants(w_source, w_target, cat, catalog.SecurableType.CATALOG)
if res["status"] == "SUCCESS":
print(f"Synced grants for catalog {cat}.")
elif res["status"] == "NotFound":
print(f"ERROR: catalog {cat} does not exist in target workspace. Sync metadata and re-run.")
else:
print(f"No changes to sync for catalog {cat}.")
# get list of fully qualified schemas and tables
schemas = {f"{cat}.{schema}" for schema in [row['table_schema'] for row in filtered_tables]}
table_names = [f"{cat}.{schema}.{table}" for schema, table in
zip([row['table_schema'] for row in filtered_tables],
[row['table_name'] for row in filtered_tables])]
volume_names = [f"{cat}.{schema}.{table}" for schema, table in
zip([row['volume_schema'] for row in filtered_volumes],
[row['volume_name'] for row in filtered_volumes])]
# update schema grants in parallel
with ThreadPoolExecutor(max_workers=num_exec) as executor:
threads = executor.map(sync_grants,
repeat(w_source),
repeat(w_target),
schemas,
repeat(catalog.SecurableType.SCHEMA))
for thread in threads:
name = thread["name"]
if thread["status"] == "SUCCESS":
print(f"Synced grants for schema {name}.")
elif thread["status"] == "NotFound":
print(f"ERROR: schema {name} does not exist in target workspace. Sync metadata and re-run.")
else:
print(f"No changes to sync for schema {name}.")
# update table grants in parallel
with ThreadPoolExecutor(max_workers=num_exec) as executor:
threads = executor.map(sync_grants,
repeat(w_source),
repeat(w_target),
table_names,
repeat(catalog.SecurableType.TABLE))
for thread in threads:
name = thread["name"]
if thread["status"] == "SUCCESS":
print(f"Synced grants for table {name}.")
elif thread["status"] == "NotFound":
print(f"ERROR: table {name} does not exist in target workspace. Sync metadata and re-run.")
else:
print(f"No changes to sync for table {name}.")
# update volume grants in parallel
with ThreadPoolExecutor(max_workers=num_exec) as executor:
threads = executor.map(sync_grants,
repeat(w_source),
repeat(w_target),
volume_names,
repeat(catalog.SecurableType.VOLUME))
for thread in threads:
name = thread["name"]
if thread["status"] == "SUCCESS":
print(f"Synced grants for volume {name}.")
elif thread["status"] == "NotFound":
print(f"ERROR: volume {name} does not exist in target workspace. Sync volumes and re-run.")
else:
print(f"No changes to sync for volume {name}.")