-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathdata_validator_module.py
More file actions
329 lines (256 loc) · 9.66 KB
/
data_validator_module.py
File metadata and controls
329 lines (256 loc) · 9.66 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
"""
Data Validator Module
Validates and cleans extracted data to ensure integrity
"""
from collections import defaultdict
import re
class DataValidator:
def __init__(self):
self.seen_hashes = set()
def validate_and_clean(self, data):
"""
Main validation and cleaning method
Args:
data: Extracted data dictionary
Returns:
Cleaned and validated data
"""
validated_data = {
'metadata': data['metadata'],
'pages': []
}
for page in data['pages']:
validated_page = {
'page_number': page['page_number'],
'tables': self._validate_tables(page.get('tables', [])),
'charts': self._validate_charts(page.get('charts', [])),
'text_data': self._validate_text_data(page.get('text_data', []))
}
validated_data['pages'].append(validated_page)
return validated_data
def _validate_tables(self, tables):
validated_tables = []
for table in tables:
# Skip empty tables
if not table.get('data') and not table.get('rows'):
continue
# Clean headers
if 'headers' in table:
table['headers'] = self._clean_headers(table['headers'])
# Clean rows
if 'rows' in table:
table['rows'] = self._clean_rows(table['rows'])
# Clean dictionary data
if 'data' in table:
table['data'] = self._clean_dict_data(
table['data'],
table.get('headers', [])
)
# Check for duplicates
table_hash = self._hash_table(table)
if table_hash not in self.seen_hashes:
self.seen_hashes.add(table_hash)
validated_tables.append(table)
return validated_tables
def _clean_headers(self, headers):
"""
Clean and standardize headers
"""
cleaned = []
seen = set()
for header in headers:
# Remove extra whitespace
header = ' '.join(header.split())
# Remove special characters except spaces and underscores
header = re.sub(r'[^\w\s]', '', header)
# Handle empty headers
if not header:
header = f"Column_{len(cleaned) + 1}"
# Handle duplicates
original = header
counter = 1
while header in seen:
header = f"{original}_{counter}"
counter += 1
seen.add(header)
cleaned.append(header)
return cleaned
def _clean_rows(self, rows):
"""
Clean row data
"""
cleaned_rows = []
for row in rows:
cleaned_row = []
for cell in row:
# Clean cell value
cleaned_cell = self._clean_cell_value(cell)
cleaned_row.append(cleaned_cell)
# Skip completely empty rows
if any(cell for cell in cleaned_row):
cleaned_rows.append(cleaned_row)
return cleaned_rows
def _clean_dict_data(self, data, headers):
"""
Clean dictionary format data
"""
cleaned_data = []
seen_rows = set()
for row in data:
cleaned_row = {}
for key, value in row.items():
# Clean key
clean_key = ' '.join(key.split())
# Clean value
clean_value = self._clean_cell_value(value)
cleaned_row[clean_key] = clean_value
# Check for duplicate rows
row_hash = str(sorted(cleaned_row.items()))
if row_hash not in seen_rows:
seen_rows.add(row_hash)
cleaned_data.append(cleaned_row)
return cleaned_data
def _clean_cell_value(self, value):
"""
Clean individual cell value
"""
if not value:
return ""
# Convert to string
value = str(value)
# Remove extra whitespace
value = ' '.join(value.split())
# Remove common OCR artifacts
value = value.replace('|', 'I') # Pipe to I
value = value.replace('О', '0') # Cyrillic O to zero
# Try to detect numbers
if self._is_number(value):
# Clean number formatting
value = value.replace(',', '')
try:
# Try to convert to float
num = float(value)
# If it's a whole number, convert to int
if num.is_integer():
value = str(int(num))
else:
value = str(num)
except ValueError:
pass
return value
def _is_number(self, value):
"""
Check if value is a number
"""
# Remove commas and spaces
cleaned = value.replace(',', '').replace(' ', '')
try:
float(cleaned)
return True
except ValueError:
return False
def _hash_table(self, table):
"""
Create hash for table to detect duplicates
"""
# Use headers and first few rows for hash
hash_str = str(table.get('headers', []))
if 'rows' in table and table['rows']:
hash_str += str(table['rows'][:2]) # First 2 rows
return hash(hash_str)
def _validate_charts(self, charts):
"""
Validate chart data
"""
validated_charts = []
for chart in charts:
# Skip empty charts
if not chart.get('data'):
continue
# Clean chart data
cleaned_data = []
for item in chart['data']:
cleaned_item = {}
for key, value in item.items():
if key == 'label':
cleaned_item[key] = self._clean_cell_value(value)
else:
cleaned_item[key] = value
cleaned_data.append(cleaned_item)
chart['data'] = cleaned_data
# Check for duplicates
chart_hash = hash(str(chart['type']) + str(chart.get('coordinates', {})))
if chart_hash not in self.seen_hashes:
self.seen_hashes.add(chart_hash)
validated_charts.append(chart)
return validated_charts
def _validate_text_data(self, text_data):
"""
Validate text data
"""
validated_text = []
seen_texts = set()
for item in text_data:
# Clean text
text = self._clean_cell_value(item['text'])
# Skip empty or very short text
if len(text) < 2:
continue
# Skip duplicates
if text in seen_texts:
continue
seen_texts.add(text)
item['text'] = text
validated_text.append(item)
return validated_text
def validate_column_integrity(self, table):
"""
Ensure column headers match data columns
"""
if 'headers' not in table or 'data' not in table:
return table
headers = table['headers']
# Check each row has correct number of columns
validated_data = []
for row in table['data']:
if isinstance(row, dict):
# Ensure all headers are present
validated_row = {}
for header in headers:
validated_row[header] = row.get(header, "")
validated_data.append(validated_row)
else:
validated_data.append(row)
table['data'] = validated_data
return table
def detect_merged_cells(self, table):
"""
Detect and handle merged cells in tables
"""
# Check for repeated values that might indicate merged cells
if 'rows' not in table or not table['rows']:
return table
# Track potential merged cells
merged_info = []
for col_idx in range(len(table['rows'][0])):
prev_value = None
merge_start = 0
for row_idx, row in enumerate(table['rows']):
if col_idx < len(row):
current_value = row[col_idx]
if current_value == prev_value and current_value:
# Potential merge
pass
else:
if row_idx - merge_start > 1:
merged_info.append({
'column': col_idx,
'start_row': merge_start,
'end_row': row_idx - 1,
'value': prev_value
})
merge_start = row_idx
prev_value = current_value
if merged_info:
table['merged_cells'] = merged_info
return table