From 9c700de12e071397dc875d417ccbb219f402faad Mon Sep 17 00:00:00 2001 From: Ant Date: Tue, 3 Jun 2025 12:12:15 -0400 Subject: [PATCH] #32- Fix thread pool drain if non-terminal error occurs --- CHANGELOG.md | 6 ++++ Python/Tasks/Deleting/DeleteFutures.py | 24 +++++--------- .../Tasks/Deleting/DeleteWithInfoFutures.py | 24 +++++--------- Python/Tasks/Loading/Add100KFutures.py | 23 ++++--------- Python/Tasks/Loading/Add10KFutures.py | 23 ++++--------- Python/Tasks/Loading/Add50KFutures.py | 24 +++++--------- Python/Tasks/Loading/Add50KWithInfoFutures.py | 24 +++++--------- Python/Tasks/Redo/RedoContinuousFutures.py | 33 +++++++------------ .../Replacing/Replace5KWithInfoFutures.py | 24 +++++--------- Python/Tasks/Replacing/Replace5kFutures.py | 24 +++++--------- Python/Tasks/Searching/Search5kFutures.py | 23 ++++--------- 11 files changed, 86 insertions(+), 166 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index c118fdd..45399db 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog], [markdownlint], and this project adheres to [Semantic Versioning]. +## [1.1.2] - 2025-06-03 + +### Fixed in 1.1.2 + +- Fix potential for futures thread pool to drain on non-terminal errors + ## [1.1.1] - 2024-05-24 ### Changed in 1.1.1 diff --git a/Python/Tasks/Deleting/DeleteFutures.py b/Python/Tasks/Deleting/DeleteFutures.py index eabdb74..8c5a8b5 100755 --- a/Python/Tasks/Deleting/DeleteFutures.py +++ b/Python/Tasks/Deleting/DeleteFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -63,9 +64,7 @@ def futures_del(engine, input_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: f.result() @@ -79,27 +78,20 @@ def futures_del(engine, input_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[executor.submit(del_record, engine, record)] = ( - record - ) - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = in_file.readline() + if record: + futures[executor.submit(del_record, engine, record)] = record + del futures[f] - print( - f"Successfully deleted {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully deleted {success_recs:,} records, with" f" {error_recs:,} errors") try: diff --git a/Python/Tasks/Deleting/DeleteWithInfoFutures.py b/Python/Tasks/Deleting/DeleteWithInfoFutures.py index 1f1f6bc..935ac29 100755 --- a/Python/Tasks/Deleting/DeleteWithInfoFutures.py +++ b/Python/Tasks/Deleting/DeleteWithInfoFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -66,9 +67,7 @@ def futures_del(engine, input_file, output_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: result = f.result() @@ -82,29 +81,22 @@ def futures_del(engine, input_file, output_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[executor.submit(del_record, engine, record)] = ( - record - ) - out_file.write(f"{result}\n") success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = in_file.readline() + if record: + futures[executor.submit(del_record, engine, record)] = record + del futures[f] - print( - f"Successfully deleted {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully deleted {success_recs:,} records, with" f" {error_recs:,} errors") print(f"With info responses written to {output_file}") diff --git a/Python/Tasks/Loading/Add100KFutures.py b/Python/Tasks/Loading/Add100KFutures.py index 9a9babc..c665c0a 100755 --- a/Python/Tasks/Loading/Add100KFutures.py +++ b/Python/Tasks/Loading/Add100KFutures.py @@ -64,9 +64,7 @@ def futures_add(engine, input_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: f.result() @@ -80,27 +78,20 @@ def futures_add(engine, input_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = ( - record - ) - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print( - f"Successfully loaded {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors") try: diff --git a/Python/Tasks/Loading/Add10KFutures.py b/Python/Tasks/Loading/Add10KFutures.py index 248e7c0..8310937 100755 --- a/Python/Tasks/Loading/Add10KFutures.py +++ b/Python/Tasks/Loading/Add10KFutures.py @@ -64,9 +64,7 @@ def futures_add(engine, input_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: f.result() @@ -80,27 +78,20 @@ def futures_add(engine, input_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = ( - record - ) - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print( - f"Successfully loaded {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors") try: diff --git a/Python/Tasks/Loading/Add50KFutures.py b/Python/Tasks/Loading/Add50KFutures.py index 2488cdb..ac7ed2b 100755 --- a/Python/Tasks/Loading/Add50KFutures.py +++ b/Python/Tasks/Loading/Add50KFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -63,9 +64,7 @@ def futures_add(engine, input_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: f.result() @@ -79,27 +78,20 @@ def futures_add(engine, input_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = ( - record - ) - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print( - f"Successfully loaded {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors") try: diff --git a/Python/Tasks/Loading/Add50KWithInfoFutures.py b/Python/Tasks/Loading/Add50KWithInfoFutures.py index d157f01..3cf435b 100755 --- a/Python/Tasks/Loading/Add50KWithInfoFutures.py +++ b/Python/Tasks/Loading/Add50KWithInfoFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -66,9 +67,7 @@ def futures_add(engine, input_file, output_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: result = f.result() @@ -82,29 +81,22 @@ def futures_add(engine, input_file, output_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[executor.submit(add_record, engine, record)] = ( - record - ) - out_file.write(f"{result}\n") success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) finally: + record = in_file.readline() + if record: + futures[executor.submit(add_record, engine, record)] = record + del futures[f] - print( - f"Successfully loaded {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors") print(f"With info responses written to {output_file}") diff --git a/Python/Tasks/Redo/RedoContinuousFutures.py b/Python/Tasks/Redo/RedoContinuousFutures.py index 6cef3b2..85a1c9b 100755 --- a/Python/Tasks/Redo/RedoContinuousFutures.py +++ b/Python/Tasks/Redo/RedoContinuousFutures.py @@ -4,6 +4,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -71,10 +72,7 @@ def redo_count(engine): def redo_pause(success): - print( - "No redo records to process, pausing for 30 seconds. Total processed:" - f" {success:,} (CTRL-C to exit)..." - ) + print("No redo records to process, pausing for 30 seconds. Total processed:" f" {success:,} (CTRL-C to exit)...") time.sleep(30) @@ -94,9 +92,7 @@ def futures_redo(engine): break while True: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: _ = f.result() @@ -110,24 +106,19 @@ def futures_redo(engine): mock_logger("CRITICAL", err, futures[f]) raise else: - record = get_redo_record(engine) - if record: - futures[ - executor.submit(process_redo_record, engine, record) - ] = record - else: - redo_paused = True - success_recs += 1 if success_recs % 100 == 0: - print( - f"Processed {success_recs:,} redo records, with" - f" {error_recs:,} errors" - ) + print(f"Processed {success_recs:,} redo records, with" f" {error_recs:,} errors") if success_recs % 1000 == 0: engine_stats(engine) finally: + record = get_redo_record(engine) + if record: + futures[executor.submit(process_redo_record, engine, record)] = record + else: + redo_paused = True + del futures[f] if redo_paused: @@ -137,9 +128,7 @@ def futures_redo(engine): while len(futures) < executor._max_workers: record = get_redo_record(engine) if record: - futures[ - executor.submit(process_redo_record, engine, record) - ] = record + futures[executor.submit(process_redo_record, engine, record)] = record try: diff --git a/Python/Tasks/Replacing/Replace5KWithInfoFutures.py b/Python/Tasks/Replacing/Replace5KWithInfoFutures.py index 5565fae..ccb7f28 100755 --- a/Python/Tasks/Replacing/Replace5KWithInfoFutures.py +++ b/Python/Tasks/Replacing/Replace5KWithInfoFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -66,9 +67,7 @@ def futures_replace(engine, input_file, output_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: result = f.result() @@ -82,29 +81,22 @@ def futures_replace(engine, input_file, output_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[ - executor.submit(replace_record, engine, record) - ] = record - out_file.write(f"{result}\n") success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 5000 == 0: engine_stats(engine) finally: + record = in_file.readline() + if record: + futures[executor.submit(replace_record, engine, record)] = record + del futures[f] - print( - f"Successfully replaced {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully replaced {success_recs:,} records, with" f" {error_recs:,} errors") print(f"With info responses written to {output_file}") diff --git a/Python/Tasks/Replacing/Replace5kFutures.py b/Python/Tasks/Replacing/Replace5kFutures.py index 5a65674..3baa6d6 100755 --- a/Python/Tasks/Replacing/Replace5kFutures.py +++ b/Python/Tasks/Replacing/Replace5kFutures.py @@ -6,6 +6,7 @@ import os import sys import time + from senzing import ( G2BadInputException, G2Engine, @@ -63,9 +64,7 @@ def futures_replace(engine, input_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: f.result() @@ -79,27 +78,20 @@ def futures_replace(engine, input_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[executor.submit(replace_record, engine, record)] = ( - record - ) - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 2500 == 0: engine_stats(engine) finally: + record = in_file.readline() + if record: + futures[executor.submit(replace_record, engine, record)] = record + del futures[f] - print( - f"Successfully replaced {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"Successfully replaced {success_recs:,} records, with" f" {error_recs:,} errors") try: diff --git a/Python/Tasks/Searching/Search5kFutures.py b/Python/Tasks/Searching/Search5kFutures.py index d30b9fe..a8ac3d6 100755 --- a/Python/Tasks/Searching/Search5kFutures.py +++ b/Python/Tasks/Searching/Search5kFutures.py @@ -81,9 +81,7 @@ def futures_search(engine, input_file, output_file): } while futures: - done, _ = concurrent.futures.wait( - futures, return_when=concurrent.futures.FIRST_COMPLETED - ) + done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED) for f in done: try: result = f.result() @@ -97,29 +95,22 @@ def futures_search(engine, input_file, output_file): mock_logger("CRITICAL", err, futures[f]) raise else: - record = in_file.readline() - if record: - futures[ - executor.submit(search_record, engine, record) - ] = record - success_recs += 1 if success_recs % 1000 == 0: - prev_time = record_stats( - success_recs, error_recs, prev_time - ) + prev_time = record_stats(success_recs, error_recs, prev_time) if success_recs % 10000 == 0: engine_stats(engine) search_results(result, futures[f], out_file) finally: + record = in_file.readline() + if record: + futures[executor.submit(search_record, engine, record)] = record + del futures[f] - print( - f"\nSuccessfully searched {success_recs:,} records, with" - f" {error_recs:,} errors" - ) + print(f"\nSuccessfully searched {success_recs:,} records, with" f" {error_recs:,} errors") print(f"Search results are located in: {output_file}")