-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathamaConnector.py
208 lines (167 loc) · 7.25 KB
/
amaConnector.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
import psycopg2
import pandas.io.sql as sqlio
from shapely import wkt, wkb
import geopandas
import pandas
import warnings
warnings.filterwarnings('ignore') # setting ignore as a parameter
class amaAccess:
access = []
def __init__(self, inputfile, sep=':'):
try:
self.access = pandas.read_csv(inputfile,sep=sep)
except:
print("=========Error======")
print("You most likely don't have the configuration file configured out.")
print ("Please paste or create a configuration file into location '%s'"%(inputfile))
print("This is a simple CSV (comma separated) textfile")
print("make sure it uses the '%s' separator"%sep)
print("and has the following column names in the first line -in fact, just copy&paste that and fill in the correct values in the second line, or ask the DB guy for a prepped file:")
print()
print('paste into %s and modify:'%inputfile)
print('==================')
print(sep.join(['host','db','port','user','password']))
print(sep.join(['example.ama.host','example_ama_db','12345','your_username','super secret!!!']))
print("==================")
def insertQuery(self, schema, table, df):
columns = ','.join('"'+df.columns+'"')
server_ip = self.access['host'][0]
db_name = self.access['db'][0]
username = self.access['user'][0]
pwd = self.access['password'][0]
port = self.access['port'][0]
# ...and connect please
connstr = "host='{}' port={} dbname='{}' user={} password='{}'".format(server_ip, port, db_name, username,
pwd)
conn = psycopg2.connect(connstr
)
cursor = conn.cursor()
insert_req = []
for index, data in df.iterrows():
values=[]
for col in data:
s = str(col)
if (s == 'None') or (s=='nan'):
s = 'NULL'
if (type(col) == int or type(col)== float or s=='NULL' or s=='True' or s=='False'):
values.append(r'%s'%s)
else:
values.append(r"'%s'" % s)
strval = ','.join(values)
insert_req.append("INSERT into %s.%s(%s) values(%s) ON CONFLICT DO NOTHING" % (schema, table, columns, strval))
try:
for el in insert_req:
print(el)
cursor.execute(el)
conn.commit()
except (Exception, psycopg2.DatabaseError) as error:
print("Error: %s" % error)
conn.rollback()
cursor.close()
return 1
cursor.close()
def query(self,query_list='SELECT * FROM event_full', constraint=''):
try:
if (query_list.lower().find('from') != -1):
# Set up our connection
#access = getAccess(r'c:\temp\ama_out\access.txt')
server_ip = self.access['host'][0]
db_name = self.access['db'][0]
username = self.access['user'][0]
pwd = self.access['password'][0]
port = self.access['port'][0]
# ...and connect please
connstr = "host='{}' port={} dbname='{}' user={} password='{}'".format(server_ip, port, db_name, username, pwd)
conn = psycopg2.connect(connstr
)
sql = '{} {};'.format(query_list.strip(), constraint.strip())
dat = sqlio.read_sql_query(sql, conn)
conn = None
return dat
else:
print('Something wrong with the query. No "select" and/or "from" encountered.')
except:
print('Error: Connection error.')
raise
access =[]
def getAccess(inputfile, sep=';'):
access = pandas.read_csv(inputfile,sep=sep)
return access
def query(query_list='SELECT * FROM event_full', constraint=''):
try:
if (query_list.lower().find('from') != -1):
# Set up our connection
#access = getAccess(r'c:\temp\ama_out\access.txt')
server_ip = access['host'][0]
db_name = access['db'][0]
username = access['user'][0]
pwd = access['password'][0]
port = access['port'][0]
# ...and connect please
connstr = "host='{}' port={} dbname='{}' user={} password='{}'".format(server_ip, port, db_name, username, pwd)
conn = psycopg2.connect(connstr
)
sql = '{} {};'.format(query_list.strip(), constraint.strip())
dat = sqlio.read_sql_query(sql, conn)
conn = None
return dat
else:
print('Something wrong with the query. No "select" and/or "from" encountered.')
except:
print('Error: Connection error.')
raise
# Connect to an existing database
def eventlist(constraint='', geom_only=False):
if (geom_only):
sql = 'SELECT * FROM event_geom'
else:
sql = 'SELECT * FROM event_full'
dat = query(sql, constraint)
# drop connection after use
# use well-known-text geometry, create designated geometry column for geopandas
dat['geom'] = dat['pt'].apply(wkt.loads)
if dat is not None:
geodat = geopandas.GeoDataFrame(dat, geometry='geom')
return geodat
def eventlist(constraint='', geom_only=False):
sql = 'SELECT * FROM event_full'
dat = query(sql, constraint)
# drop connection after use
# use well-known-text geometry, create designated geometry column for geopandas
dat['geom'] = dat['pt'].apply(wkt.loads)
if dat is not None:
geodat = geopandas.GeoDataFrame(dat, geometry='geom')
return geodat
def designlist(constraint='', geom_only=False):
sql = 'SELECT * FROM event_full2 where not (geom_rel_event_poly3d is NULL)'
dat = query(sql, constraint)
# drop connection after use
# use well-known-text geometry, create designated geometry column for geopandas
dat['geom'] = dat['geom_rel_event_poly3d'].apply(wkb.loads, hex=True)
if dat is not None:
geodat = geopandas.GeoDataFrame(dat, geometry='geom')
return geodat
# Connect to an existing database
def pathlist(constraint=''):
sql = 'SELECT * FROM event_full2'
dat = query(sql, constraint)
# drop connection after use
# use well-known-text geometry, create designated geometry column for geopandas
dat['geom'] = dat['path_ln'].apply(wkb.loads,hex=True)
if dat is not None:
geodat = geopandas.GeoDataFrame(dat, geometry='geom')
geodat.to_csv(filePath, sep=',', float_format='%.')
return geodat
def event_minimum(constraint='', use3d=True):
# will return only event point and path line geometries (either 3d oder 2d) from events as selected by the constraint list
if (use3d):
query_list = 'SELECT pt3d, path_ln3d FROM event_full'
geom_col = 'pt3d'
else:
query_list = 'SELECT pt, path_ln FROM event_full'
geom_col = 'pt'
dat = query(query_list, constraint)
dat['geom'] = dat[geom_col].apply(wkt.loads)
if dat is not None:
geodat = geopandas.GeoDataFrame(dat, geometry='geom')
return geodat