|
| 1 | +import sys |
| 2 | +import psycopg2 |
| 3 | +import json |
| 4 | +from psycopg2.extras import Json |
| 5 | +# Redundant function |
| 6 | +# Used to find nested json keys |
| 7 | +def id_generator(dict_var): |
| 8 | + for k, v in dict_var.items(): |
| 9 | + v = isinstance(v, str) and v.replace("True", "true").replace("False", "false").replace('"', "'").replace("'", '"') |
| 10 | + try: |
| 11 | + v = json.loads(v) |
| 12 | + print(f'Parsed: {k}: {v}') |
| 13 | + except: |
| 14 | + print(f'Cannot parse {k}: {v}') |
| 15 | + pass |
| 16 | + # Dont show any error |
| 17 | + if isinstance(v, dict): |
| 18 | + for id_val in id_generator(v): |
| 19 | + yield id_val |
| 20 | + else: |
| 21 | + yield k |
| 22 | +def process_business_attributes(): |
| 23 | + conn = psycopg2.connect(host='172.17.0.6', user='postgres', dbname='canary_db') |
| 24 | + cur = conn.cursor() |
| 25 | + cur.execute("SELECT * FROM business_attribute") |
| 26 | + rows = cur.fetchall() |
| 27 | + ops = [] |
| 28 | + for i, r in enumerate(rows): |
| 29 | + isUpdated = False |
| 30 | + attributes = r[1] |
| 31 | + for k, v in attributes.items(): |
| 32 | + if isinstance(v, str) and '{' in v and '}' in v: |
| 33 | + # Replace True and False to true and false respectively as these are the standard JSON notations. |
| 34 | + # Replace the single quotes with double and the double ones with single, this is a problem in the source data. JSON data needs to have the keys enclosed in double quotes. |
| 35 | + v = v.replace("True", "true").replace("False", "false").replace('"', "'").replace("'", '"') |
| 36 | + try: |
| 37 | + v = json.loads(v) |
| 38 | + attributes[k] = v |
| 39 | + isUpdated = True |
| 40 | + except: |
| 41 | + pass |
| 42 | + if isUpdated: |
| 43 | + ops.append({"business_id": r[0], "attributes": Json(attributes)}) |
| 44 | + print(f'Starting to update {len(ops)} rows in business_attribute table...') |
| 45 | + # The following query takes around 50 minutes |
| 46 | + cur.executemany("UPDATE business_attribute SET attribute = %(attributes)s WHERE business_id = %(business_id)s", ops) |
| 47 | + print(f'Updated {len(ops)} rows in business_attributes') |
| 48 | + # This query takes around 13 seconds |
| 49 | + cur.execute( |
| 50 | + """ |
| 51 | + CREATE TABLE business_attribute_temp AS (with recursive flat(business_id, key, value) as |
| 52 | + ( |
| 53 | + SELECT business_id, key, value FROM business_attribute, jsonb_each(attribute) |
| 54 | + UNION |
| 55 | + SELECT f.business_id, concat(f.key, '.', j.key), j.value FROM flat f, jsonb_each(f.value) j WHERE jsonb_typeof(f.value) = 'object' |
| 56 | + ) |
| 57 | + SELECT business_id, jsonb_object_agg(key, value) as data from flat WHERE jsonb_typeof(value) <> 'object' GROUP BY business_id); |
| 58 | + create or replace function create_jsonb_flat_view |
| 59 | + (table_name text, regular_columns text, json_column text) |
| 60 | + returns text language plpgsql as $$ |
| 61 | + declare |
| 62 | + cols text; |
| 63 | + begin |
| 64 | + execute format ($ex$ |
| 65 | + select string_agg(format('%2$s->>%%1$L "%%1$s"', key), ', ') |
| 66 | + from ( |
| 67 | + select distinct key |
| 68 | + from %1$s, jsonb_each(%2$s) |
| 69 | + order by 1 |
| 70 | + ) s; |
| 71 | + $ex$, table_name, json_column) |
| 72 | + into cols; |
| 73 | + execute format($ex$ |
| 74 | + drop view if exists %1$s_view; |
| 75 | + create view %1$s_view as |
| 76 | + select %2$s, %3$s from %1$s |
| 77 | + $ex$, table_name, regular_columns, cols); |
| 78 | + return cols; |
| 79 | + end $$; |
| 80 | + SELECT create_jsonb_flat_view('business_attribute_temp', 'business_id', 'data'); |
| 81 | + CREATE TABLE business_attributes_exploded AS (SELECT * FROM business_attribute_temp_view); |
| 82 | + ALTER TABLE business_attributes_exploded ADD PRIMARY KEY (business_id); |
| 83 | + """ |
| 84 | + ) |
| 85 | + cur.close() |
| 86 | + conn.commit() |
| 87 | + |
| 88 | +process_business_attributes() |
0 commit comments