Skip to content

Commit 035cf73

Browse files
Implement full e2e reprocess
1 parent 24e6674 commit 035cf73

File tree

1 file changed

+68
-40
lines changed

1 file changed

+68
-40
lines changed

detector/detector/detector.py

+68-40
Original file line numberDiff line numberDiff line change
@@ -209,11 +209,12 @@ def rebuild_feeds(events: pd.DataFrame) -> int:
209209
# change is not too heavy
210210
cnt = 0
211211
sql = """SELECT test_name, probe_cc, probe_asn, input, time, status
212-
FROM blocking_events
213-
WHERE test_name = %(test_name)s AND input = %(inp)s
214-
AND probe_cc = %(cc)s AND probe_asn = %(asn)s
215-
ORDER BY time
212+
FROM blocking_events
213+
WHERE test_name = %(test_name)s AND input = %(inp)s
214+
AND probe_cc = %(cc)s AND probe_asn = %(asn)s
215+
ORDER BY time
216216
"""
217+
events = events.reset_index()
217218
unique_tcais = events[TCAI].drop_duplicates()
218219
update_time = datetime.utcnow()
219220
for x in unique_tcais.itertuples():
@@ -224,6 +225,7 @@ def rebuild_feeds(events: pd.DataFrame) -> int:
224225
write_feed(feed_data, path)
225226
cnt += len(history)
226227

228+
log.info(f"[re]created {cnt} feeds")
227229
return cnt
228230

229231

@@ -259,8 +261,8 @@ def load_country_name_map(devel: bool) -> dict:
259261
def create_tables() -> None:
260262
# Requires admin privileges
261263
sql = """
262-
CREATE TABLE IF NOT EXISTS blocking_status
263-
(
264+
CREATE TABLE IF NOT EXISTS blocking_status
265+
(
264266
`test_name` String,
265267
`input` String,
266268
`probe_cc` String,
@@ -274,27 +276,27 @@ def create_tables() -> None:
274276
`change` Float32,
275277
`stability` Float32,
276278
`update_time` DateTime64(0) MATERIALIZED now64()
277-
)
278-
ENGINE = ReplacingMergeTree
279-
ORDER BY (test_name, input, probe_cc, probe_asn)
280-
SETTINGS index_granularity = 4
281-
"""
279+
)
280+
ENGINE = ReplacingMergeTree
281+
ORDER BY (test_name, input, probe_cc, probe_asn)
282+
SETTINGS index_granularity = 4
283+
"""
282284
query(sql)
283285
sql = """
284-
CREATE TABLE IF NOT EXISTS blocking_events
285-
(
286+
CREATE TABLE IF NOT EXISTS blocking_events
287+
(
286288
`test_name` String,
287289
`input` String,
288290
`probe_cc` String,
289291
`probe_asn` Int32,
290292
`status` String,
291293
`time` DateTime64(3),
292294
`detection_time` DateTime64(0) MATERIALIZED now64()
293-
)
294-
ENGINE = ReplacingMergeTree
295-
ORDER BY (test_name, input, probe_cc, probe_asn, time)
296-
SETTINGS index_granularity = 4
297-
"""
295+
)
296+
ENGINE = ReplacingMergeTree
297+
ORDER BY (test_name, input, probe_cc, probe_asn, time)
298+
SETTINGS index_granularity = 4
299+
"""
298300
query(sql)
299301
sql = "CREATE USER IF NOT EXISTS detector IDENTIFIED WITH plaintext_password BY 'detector'"
300302
query(sql)
@@ -342,15 +344,15 @@ def reprocess_inner(
342344
status, events = process_data(status, new)
343345
if events is not None and len(events):
344346
events_tmp.append(events)
345-
if collect_hist:
346-
status_history_tmp.append(status)
347+
if collect_hist:
348+
status_history_tmp.append(status)
347349

348350
if events_tmp:
349351
events = pd.concat(events_tmp)
350352
else:
351353
events = None
352-
status_history = pd.concat(status_history_tmp) if collect_hist else None
353-
return events, status, status_history
354+
status_history = pd.concat(status_history_tmp) if collect_hist else None
355+
return events, status, status_history
354356

355357

356358
@metrics.timer("process_historical_data")
@@ -402,6 +404,7 @@ def process_fresh_data(
402404
urls = sorted(set(u for urls in services.values() for u in urls))
403405

404406
status = load_blocking_status()
407+
metrics.gauge("blocking_status_tblsize", len(status))
405408

406409
gen = gen_input(click, start_date, end_date, interval, urls)
407410
new = None
@@ -443,8 +446,20 @@ def process_fresh_data(
443446

444447
if events is not None and len(events):
445448
log.debug(f"Appending {len(events)} events to blocking_events table")
446-
sql = "INSERT INTO blocking_events VALUES"
447-
click.insert_dataframe(sql, events.reset_index(drop=True))
449+
ev = events.reset_index()
450+
ev = ev.drop(columns=["old_status"])
451+
ev["time"] = end_date # event detection time
452+
log.info(ev)
453+
assert ev.columns.values.tolist() == [
454+
"test_name",
455+
"probe_cc",
456+
"probe_asn",
457+
"input",
458+
"status",
459+
"time",
460+
]
461+
sql = "INSERT INTO blocking_events (test_name, probe_cc, probe_asn, input, status, time) VALUES"
462+
click.insert_dataframe(sql, ev)
448463

449464
log.info("Done")
450465
return events, status
@@ -673,6 +688,26 @@ def gen():
673688
return events, status, status_history
674689

675690

691+
def process(start, end, interval, services) -> None:
692+
events, status = process_fresh_data(start, end, interval, services)
693+
log.info(f"Events: {len(events)}")
694+
if events is not None and len(events):
695+
log.info("Rebuilding feeds")
696+
rebuild_feeds(events)
697+
# TODO: create an index of available RSS feeds
698+
699+
700+
def reprocess(conf, services) -> None:
701+
click.execute("TRUNCATE TABLE blocking_status SYNC")
702+
click.execute("TRUNCATE TABLE blocking_events SYNC")
703+
704+
t = conf.start_date
705+
while t < conf.end_date:
706+
te = t + conf.interval
707+
process(t, te, conf.interval, services)
708+
t += conf.interval
709+
710+
676711
def main():
677712
global click
678713
setup()
@@ -692,31 +727,24 @@ def main():
692727
"Instagram": ["https://www.instagram.com/"],
693728
}
694729
if conf.reprocess:
730+
# Destructing reprocess
695731
assert conf.start_date and conf.end_date, "Dates not set"
696-
events, status, _ = process_historical_data(
697-
conf.start_date, conf.end_date, conf.interval, services
698-
)
699-
s = status.reset_index()
700-
# log.info((s.accessible_perc, s.cnt, s.status))
732+
reprocess(conf, services)
701733
return
734+
# assert conf.start_date and conf.end_date, "Dates not set"
735+
# events, status, _ = process_historical_data(
736+
# conf.start_date, conf.end_date, conf.interval, services
737+
# )
738+
# s = status.reset_index()
739+
# log.info((s.accessible_perc, s.cnt, s.status))
702740

703741
else:
704742
# Process fresh data
705743
if conf.end_date is None:
706744
# Beginning of current UTC hour
707745
conf.end_date = datetime(*datetime.utcnow().timetuple()[:4])
708746
conf.start_date = conf.end_date - conf.interval
709-
events, status = process_fresh_data(
710-
conf.start_date, conf.end_date, conf.interval, services
711-
)
712-
log.info(f"Events: {len(events)}")
713-
# s = status.reset_index()
714-
# log.info((s.accessible_perc, s.cnt, s.status))
715-
716-
if events is not None and len(events):
717-
log.info("Rebuilding feeds")
718-
rebuild_feeds(events)
719-
# TODO: create an index of available RSS feeds
747+
process(conf.start_date, conf.end_date, conf.interval, services)
720748

721749
gen_stats()
722750
log.info("Done")

0 commit comments

Comments
 (0)