diff --git a/CHANGELOG.md b/CHANGELOG.md index 521f577..f7b03de 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog], [markdownlint], and this project adheres to [Semantic Versioning]. +## [0.0.8] - 2025-06-20 + +### Changed in 0.0.8 + +- Small improvements to Python snippets + ## [0.0.7] - 2025-06-03 ### Changed in 0.0.7 diff --git a/python/deleting/delete_futures.py b/python/deleting/delete_futures.py index deeb2f0..564f3ea 100755 --- a/python/deleting/delete_futures.py +++ b/python/deleting/delete_futures.py @@ -29,8 +29,9 @@ def delete_record(engine, record_to_delete): def futures_del(engine, input_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with open(input_file, "r", encoding="utf-8") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: @@ -51,14 +52,14 @@ def futures_del(engine, input_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 if success_recs % 100 == 0: print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True) finally: - if record := in_file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(delete_record, engine, record)] = record del futures[f] diff --git a/python/deleting/delete_loop.py b/python/deleting/delete_loop.py index 1a1fe29..b256c69 100755 --- a/python/deleting/delete_loop.py +++ b/python/deleting/delete_loop.py @@ -20,11 +20,12 @@ def mock_logger(level, error, error_record=None): def del_records_from_file(engine, input_file): - success_recs = error_recs = 0 + error_recs = 0 + success_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: + with open(input_file, "r", encoding="utf-8") as in_file: - for record_to_delete in file: + for record_to_delete in in_file: try: record_dict = json.loads(record_to_delete) data_source = record_dict.get("DATA_SOURCE", "") @@ -37,7 +38,6 @@ def del_records_from_file(engine, input_file): mock_logger("WARN", err, record_to_delete) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, record_to_delete) raise err else: success_recs += 1 diff --git a/python/deleting/delete_with_info_futures.py b/python/deleting/delete_with_info_futures.py index 31c1582..7fc3ebc 100755 --- a/python/deleting/delete_with_info_futures.py +++ b/python/deleting/delete_with_info_futures.py @@ -36,8 +36,9 @@ def delete_record(engine, record_to_delete): def futures_del(engine, input_file, output_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with open(output_file, "w", encoding="utf-8") as out_file: with open(input_file, "r", encoding="utf-8") as in_file: @@ -59,7 +60,7 @@ def futures_del(engine, input_file, output_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: out_file.write(f"{result}\n") @@ -68,7 +69,7 @@ def futures_del(engine, input_file, output_file): if success_recs % 100 == 0: print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True) finally: - if record := in_file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(delete_record, engine, record)] = record del futures[f] diff --git a/python/information/check_datastore_performance.py b/python/information/check_datastore_performance.py index 1eff8bf..04400b3 100755 --- a/python/information/check_datastore_performance.py +++ b/python/information/check_datastore_performance.py @@ -6,9 +6,9 @@ from senzing import SzError from senzing_core import SzAbstractFactoryCore -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") INSTANCE_NAME = Path(__file__).stem SECONDS_TO_RUN = 3 +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) diff --git a/python/information/get_datastore_info.py b/python/information/get_datastore_info.py index 0c8ce24..f09e718 100755 --- a/python/information/get_datastore_info.py +++ b/python/information/get_datastore_info.py @@ -6,8 +6,8 @@ from senzing import SzError from senzing_core import SzAbstractFactoryCore -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") INSTANCE_NAME = Path(__file__).stem +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) diff --git a/python/information/get_license.py b/python/information/get_license.py index 4ab8b2c..51e3da1 100755 --- a/python/information/get_license.py +++ b/python/information/get_license.py @@ -6,8 +6,8 @@ from senzing import SzError from senzing_core import SzAbstractFactoryCore -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") INSTANCE_NAME = Path(__file__).stem +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) diff --git a/python/information/get_stats.py b/python/information/get_stats.py index dacbbcd..ab2f430 100755 --- a/python/information/get_stats.py +++ b/python/information/get_stats.py @@ -10,9 +10,9 @@ from senzing import SzBadInputError, SzError, SzRetryableError, SzUnrecoverableError from senzing_core import SzAbstractFactoryCore -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") INPUT_FILE = Path("../../resources/data/load-500.jsonl").resolve() INSTANCE_NAME = Path(__file__).stem +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") def mock_logger(level, error, error_record=None): @@ -40,13 +40,14 @@ def engine_stats(engine): def futures_add(engine, input_file): success_recs = 0 + shutdown = False error_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: + with open(input_file, "r", encoding="utf-8") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(add_record, engine, record): record - for record in itertools.islice(file, executor._max_workers) + for record in itertools.islice(in_file, executor._max_workers) } while futures: @@ -61,7 +62,7 @@ def futures_add(engine, input_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 @@ -71,7 +72,7 @@ def futures_add(engine, input_file): if success_recs % 200 == 0: engine_stats(engine) finally: - if record := file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(add_record, engine, record)] = record del futures[f] diff --git a/python/information/get_version.py b/python/information/get_version.py index 75ea7ab..68a8550 100755 --- a/python/information/get_version.py +++ b/python/information/get_version.py @@ -6,8 +6,8 @@ from senzing import SzError from senzing_core import SzAbstractFactoryCore -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") INSTANCE_NAME = Path(__file__).stem +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) diff --git a/python/initialization/abstract_factory_parameters.py b/python/initialization/abstract_factory_parameters.py index 10661a0..87351b8 100755 --- a/python/initialization/abstract_factory_parameters.py +++ b/python/initialization/abstract_factory_parameters.py @@ -19,7 +19,6 @@ try: sz_factory = SzAbstractFactoryCore(**FACTORY_PARAMETERS) - sz_config = sz_factory.create_config() sz_configmgr = sz_factory.create_configmanager() sz_diagnostic = sz_factory.create_diagnostic() sz_engine = sz_factory.create_engine() diff --git a/python/initialization/factory_and_engines.py b/python/initialization/factory_and_engines.py index 9e8ac8d..7012cd4 100755 --- a/python/initialization/factory_and_engines.py +++ b/python/initialization/factory_and_engines.py @@ -11,7 +11,6 @@ try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) - sz_config = sz_factory.create_config() sz_configmgr = sz_factory.create_configmanager() sz_diagnostic = sz_factory.create_diagnostic() sz_engine = sz_factory.create_engine() diff --git a/python/loading/add_futures.py b/python/loading/add_futures.py index 9c209ae..ce9af9d 100755 --- a/python/loading/add_futures.py +++ b/python/loading/add_futures.py @@ -29,14 +29,15 @@ def add_record(engine, record_to_add): def futures_add(engine, input_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: + with open(input_file, "r", encoding="utf-8") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: futures = { executor.submit(add_record, engine, record): record - for record in itertools.islice(file, executor._max_workers) + for record in itertools.islice(in_file, executor._max_workers) } while futures: @@ -51,14 +52,14 @@ def futures_add(engine, input_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 if success_recs % 100 == 0: print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True) finally: - if record := file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(add_record, engine, record)] = record del futures[f] diff --git a/python/loading/add_queue.py b/python/loading/add_queue.py index b50a157..91f4890 100755 --- a/python/loading/add_queue.py +++ b/python/loading/add_queue.py @@ -29,14 +29,15 @@ def add_record(engine, record_to_add): def producer(input_file, queue): - with open(input_file, "r", encoding="utf-8") as file: - for record in file: + with open(input_file, "r", encoding="utf-8") as in_file: + for record in in_file: queue.put(record, block=True) def consumer(engine, queue): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with concurrent.futures.ThreadPoolExecutor() as executor: futures = {executor.submit(add_record, engine, queue.get()): _ for _ in range(executor._max_workers)} @@ -53,14 +54,14 @@ def consumer(engine, queue): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 if success_recs % 100 == 0: print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True) finally: - if not queue.empty(): + if not shutdown and not queue.empty(): record = queue.get() futures[executor.submit(add_record, engine, record)] = record @@ -73,7 +74,7 @@ def consumer(engine, queue): sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) sz_engine = sz_factory.create_engine() - input_queue = Queue(maxsize=200) # type: ignore + input_queue = Queue(maxsize=200) producer_proc = Process(target=producer, args=(INPUT_FILE, input_queue)) producer_proc.start() consumer_proc = Process(target=consumer, args=(sz_engine, input_queue)) diff --git a/python/loading/add_records_loop.py b/python/loading/add_records_loop.py index 9b08617..3907dfe 100755 --- a/python/loading/add_records_loop.py +++ b/python/loading/add_records_loop.py @@ -20,11 +20,11 @@ def mock_logger(level, error, error_record=None): def add_records_from_file(engine, input_file): - success_recs = 0 error_recs = 0 + success_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: - for record_to_add in file: + with open(input_file, "r", encoding="utf-8") as in_file: + for record_to_add in in_file: try: record_dict = json.loads(record_to_add) data_source = record_dict.get("DATA_SOURCE", "") @@ -37,7 +37,6 @@ def add_records_from_file(engine, input_file): mock_logger("WARN", err, record_to_add) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, record_to_add) raise err else: success_recs += 1 diff --git a/python/loading/add_truthset_loop.py b/python/loading/add_truthset_loop.py index 9671ff7..4730f8b 100755 --- a/python/loading/add_truthset_loop.py +++ b/python/loading/add_truthset_loop.py @@ -27,10 +27,10 @@ def add_records_from_file(engine, input_file): success_recs = 0 error_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: + with open(input_file, "r", encoding="utf-8") as in_file: print(f"\nAdding records from {input_file}") - for record_to_add in file: + for record_to_add in in_file: try: record_dict = json.loads(record_to_add) data_source = record_dict.get("DATA_SOURCE", "") @@ -43,7 +43,6 @@ def add_records_from_file(engine, input_file): mock_logger("WARN", err, record_to_add) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, record_to_add) raise err else: success_recs += 1 diff --git a/python/loading/add_with_info_futures.py b/python/loading/add_with_info_futures.py index 27ee1b5..9d6e5bb 100755 --- a/python/loading/add_with_info_futures.py +++ b/python/loading/add_with_info_futures.py @@ -46,8 +46,9 @@ def engine_stats(engine): def futures_add(engine, input_file, output_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with open(output_file, "w", encoding="utf-8") as out_file: with open(input_file, "r", encoding="utf-8") as in_file: @@ -69,7 +70,7 @@ def futures_add(engine, input_file, output_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: out_file.write(f"{result}\n") @@ -81,7 +82,7 @@ def futures_add(engine, input_file, output_file): if success_recs % 200 == 0: engine_stats(engine) finally: - if record := in_file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(add_record, engine, record)] = record del futures[f] diff --git a/python/redo/add_with_redo.py b/python/redo/add_with_redo.py index a8266ca..d567397 100755 --- a/python/redo/add_with_redo.py +++ b/python/redo/add_with_redo.py @@ -24,12 +24,12 @@ def mock_logger(level, error, error_record=None): def add_records_from_file(engine, input_file): - success_recs = 0 error_recs = 0 + success_recs = 0 - with open(input_file, "r", encoding="utf-8") as file: + with open(input_file, "r", encoding="utf-8") as in_file: - for record_to_add in file: + for record_to_add in in_file: try: record_dict = json.loads(record_to_add) data_source = record_dict.get("DATA_SOURCE", None) @@ -42,7 +42,6 @@ def add_records_from_file(engine, input_file): mock_logger("WARN", err, record_to_add) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, record_to_add) raise err else: success_recs += 1 @@ -54,8 +53,8 @@ def add_records_from_file(engine, input_file): def process_redo(engine): - success_recs = 0 error_recs = 0 + success_recs = 0 print("\nStarting to process redo records...") diff --git a/python/redo/redo_continuous.py b/python/redo/redo_continuous.py index f8cc729..e4a325b 100755 --- a/python/redo/redo_continuous.py +++ b/python/redo/redo_continuous.py @@ -19,8 +19,8 @@ def mock_logger(level, error, error_record=None): def process_redo(engine): - success_recs = 0 error_recs = 0 + success_recs = 0 while True: try: @@ -44,7 +44,6 @@ def process_redo(engine): mock_logger("WARN", err) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err) raise err diff --git a/python/redo/redo_continuous_futures.py b/python/redo/redo_continuous_futures.py index 55a56cb..06c33c8 100755 --- a/python/redo/redo_continuous_futures.py +++ b/python/redo/redo_continuous_futures.py @@ -55,8 +55,9 @@ def redo_pause(success): def futures_redo(engine): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 redo_paused = False with concurrent.futures.ThreadPoolExecutor() as executor: @@ -82,14 +83,14 @@ def futures_redo(engine): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 if success_recs % 100 == 0: print(f"Processed {success_recs:,} redo records, with" f" {error_recs:,} errors") finally: - if record := get_redo_record(engine): + if not shutdown and (record := get_redo_record(engine)): futures[executor.submit(process_redo_record, engine, record)] = record else: redo_paused = True diff --git a/python/redo/redo_with_info_continuous.py b/python/redo/redo_with_info_continuous.py index c54f3d5..f62514d 100755 --- a/python/redo/redo_with_info_continuous.py +++ b/python/redo/redo_with_info_continuous.py @@ -20,7 +20,7 @@ SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") -def signal_handler(signum, frame): +def responses_message(signum, frame): print(f"\nWith info responses written to {OUTPUT_FILE}") sys.exit() @@ -37,12 +37,13 @@ def redo_pause(success): def process_redo(engine, output_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with open(output_file, "w", encoding="utf-8") as out_file: try: - while True: + while not shutdown: if not (redo_record := engine.get_redo_record()): redo_pause(success_recs) continue @@ -60,11 +61,11 @@ def process_redo(engine, output_file): mock_logger("WARN", err, redo_record) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, redo_record) + shutdown = True raise err -signal.signal(signal.SIGINT, signal_handler) +signal.signal(signal.SIGINT, responses_message) try: sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False) @@ -72,3 +73,4 @@ def process_redo(engine, output_file): process_redo(sz_engine, OUTPUT_FILE) except SzError as err: mock_logger("CRITICAL", err) + responses_message() diff --git a/python/searching/search_futures.py b/python/searching/search_futures.py index 30f864f..bfe5d43 100755 --- a/python/searching/search_futures.py +++ b/python/searching/search_futures.py @@ -26,8 +26,9 @@ def search_record(engine, record_to_search): def futures_search(engine, input_file): - success_recs = 0 error_recs = 0 + shutdown = False + success_recs = 0 with open(input_file, "r", encoding="utf-8") as in_file: with concurrent.futures.ThreadPoolExecutor() as executor: @@ -48,7 +49,7 @@ def futures_search(engine, input_file): mock_logger("WARN", err, futures[f]) error_recs += 1 except (SzUnrecoverableError, SzError) as err: - mock_logger("CRITICAL", err, futures[f]) + shutdown = True raise err else: success_recs += 1 @@ -58,7 +59,7 @@ def futures_search(engine, input_file): print(f"\n------ Searched: {futures[f]}", flush=True) print(f"\n{result}", flush=True) finally: - if record := in_file.readline(): + if not shutdown and (record := in_file.readline()): futures[executor.submit(search_record, engine, record)] = record del futures[f] diff --git a/python/searching/search_records.py b/python/searching/search_records.py index 85ea58e..b956dc0 100755 --- a/python/searching/search_records.py +++ b/python/searching/search_records.py @@ -9,9 +9,7 @@ from senzing_core import SzAbstractFactoryCore INSTANCE_NAME = Path(__file__).stem -SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") - -search_records = [ +SEARCH_RECORDS = [ { "NAME_FULL": "Susan Moony", "DATE_OF_BIRTH": "15/6/1998", @@ -28,6 +26,7 @@ "ADDR_FULL": "787 Rotary Drive Rotorville FL 78720", }, ] +SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}") def mock_logger(level, error, error_record=None): @@ -37,7 +36,7 @@ def mock_logger(level, error, error_record=None): def searcher(engine): - for search_record in search_records: + for search_record in SEARCH_RECORDS: try: record_str = json.dumps(search_record) response = engine.search_by_attributes(record_str)