-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpostgres_import_data.py
More file actions
114 lines (101 loc) · 3.36 KB
/
postgres_import_data.py
File metadata and controls
114 lines (101 loc) · 3.36 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
import pandas as pd
import psycopg2
from psycopg2.extras import execute_values
import os
from dotenv import load_dotenv
load_dotenv()
# build connection string from environment
supabase_pwd = os.getenv("SUPABASE_PWD")
supabase_host = os.getenv("SUPABASE_HOST", "aws-1-us-east-1.pooler.supabase.com")
supabase_port = os.getenv("SUPABASE_PORT", "5432")
if not supabase_pwd:
raise ValueError("SUPABASE_PWD not set in .env")
db_url = f"postgresql://postgres.wtiopnzppsyjxecrogik:{supabase_pwd}@{supabase_host}:{supabase_port}/postgres"
# connect to supabase postgresql
try:
conn = psycopg2.connect(db_url)
cursor = conn.cursor()
print("Connected to Supabase PostgreSQL")
except Exception as e:
print(f"Failed to connect: {e}")
exit(1)
# load csv data
try:
df = pd.read_csv("data/parsed_jobs.csv")
print(f"Loaded {len(df)} rows from CSV")
except Exception as e:
print(f"Failed to load CSV: {e}")
exit(1)
# create jobs table if it doesn't exist
create_table_sql = """
CREATE TABLE IF NOT EXISTS "job-market-stream" (
job_id TEXT PRIMARY KEY,
job_title TEXT,
job_description TEXT,
company_name TEXT,
location TEXT,
job_function TEXT,
skills TEXT,
degree_requirement TEXT,
time_posted_parsed TIMESTAMP WITH TIME ZONE,
application_link TEXT,
num_applicants_int INTEGER,
work_mode TEXT,
scraped_at TIMESTAMP WITH TIME ZONE,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
"""
try:
cursor.execute(create_table_sql)
conn.commit()
print("Created jobs table")
except Exception as e:
print(f"Failed to create table: {e}")
conn.close()
exit(1)
# prepare data for insertion
records = []
for _, row in df.iterrows():
record = (
str(row.get("job_id", "")),
str(row.get("job_title", "")),
str(row.get("job_description", "")),
str(row.get("company_name", "")),
str(row.get("location", "")),
str(row.get("job_function", "")),
str(row.get("skills", "")),
str(row.get("degree_requirement", "")),
pd.to_datetime(row.get("time_posted_parsed")) if pd.notna(row.get("time_posted_parsed")) else None,
str(row.get("application_link", "")),
int(row.get("num_applicants_int")) if pd.notna(row.get("num_applicants_int")) else None,
str(row.get("work_mode", "")),
pd.to_datetime(row.get("scraped_at")) if pd.notna(row.get("scraped_at")) else None,
)
records.append(record)
# insert data using execute_values for efficiency
insert_sql = """
INSERT INTO "job-market-stream" (job_id, job_title, job_description, company_name, location, job_function,
skills, degree_requirement, time_posted_parsed, application_link,
num_applicants_int, work_mode, scraped_at)
VALUES %s
ON CONFLICT (job_id) DO NOTHING;
"""
try:
execute_values(cursor, insert_sql, records, page_size=1000)
conn.commit()
print(f"Inserted {len(records)} job records into PostgreSQL")
except Exception as e:
print(f"Failed to insert data: {e}")
conn.close()
exit(1)
# create index on job_id for faster queries
try:
cursor.execute("CREATE INDEX IF NOT EXISTS idx_job_id ON \"job-market-stream\"(job_id);")
conn.commit()
print("Created index on job_id")
except Exception as e:
print(f"Failed to create index: {e}")
# close connection
cursor.close()
conn.close()
print("Import complete")