Skip to content

#48 - Fix proper shutdown for futures #51

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog], [markdownlint],
and this project adheres to [Semantic Versioning].

## [0.0.8] - 2025-06-20

### Changed in 0.0.8

- Small improvements to Python snippets

## [0.0.7] - 2025-06-03

### Changed in 0.0.7
Expand Down
7 changes: 4 additions & 3 deletions python/deleting/delete_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ def delete_record(engine, record_to_delete):


def futures_del(engine, input_file):
success_recs = 0
error_recs = 0
shutdown = False
success_recs = 0

with open(input_file, "r", encoding="utf-8") as in_file:
with concurrent.futures.ThreadPoolExecutor() as executor:
Expand All @@ -51,14 +52,14 @@ def futures_del(engine, input_file):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
finally:
if record := in_file.readline():
if not shutdown and (record := in_file.readline()):
futures[executor.submit(delete_record, engine, record)] = record

del futures[f]
Expand Down
8 changes: 4 additions & 4 deletions python/deleting/delete_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ def mock_logger(level, error, error_record=None):


def del_records_from_file(engine, input_file):
success_recs = error_recs = 0
error_recs = 0
success_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
with open(input_file, "r", encoding="utf-8") as in_file:

for record_to_delete in file:
for record_to_delete in in_file:
try:
record_dict = json.loads(record_to_delete)
data_source = record_dict.get("DATA_SOURCE", "")
Expand All @@ -37,7 +38,6 @@ def del_records_from_file(engine, input_file):
mock_logger("WARN", err, record_to_delete)
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, record_to_delete)
raise err
else:
success_recs += 1
Expand Down
7 changes: 4 additions & 3 deletions python/deleting/delete_with_info_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ def delete_record(engine, record_to_delete):


def futures_del(engine, input_file, output_file):
success_recs = 0
error_recs = 0
shutdown = False
success_recs = 0

with open(output_file, "w", encoding="utf-8") as out_file:
with open(input_file, "r", encoding="utf-8") as in_file:
Expand All @@ -59,7 +60,7 @@ def futures_del(engine, input_file, output_file):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
out_file.write(f"{result}\n")
Expand All @@ -68,7 +69,7 @@ def futures_del(engine, input_file, output_file):
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
finally:
if record := in_file.readline():
if not shutdown and (record := in_file.readline()):
futures[executor.submit(delete_record, engine, record)] = record

del futures[f]
Expand Down
2 changes: 1 addition & 1 deletion python/information/check_datastore_performance.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
from senzing import SzError
from senzing_core import SzAbstractFactoryCore

SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
INSTANCE_NAME = Path(__file__).stem
SECONDS_TO_RUN = 3
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")

try:
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
Expand Down
2 changes: 1 addition & 1 deletion python/information/get_datastore_info.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from senzing import SzError
from senzing_core import SzAbstractFactoryCore

SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
INSTANCE_NAME = Path(__file__).stem
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")

try:
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
Expand Down
2 changes: 1 addition & 1 deletion python/information/get_license.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from senzing import SzError
from senzing_core import SzAbstractFactoryCore

SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
INSTANCE_NAME = Path(__file__).stem
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")

try:
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
Expand Down
11 changes: 6 additions & 5 deletions python/information/get_stats.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@
from senzing import SzBadInputError, SzError, SzRetryableError, SzUnrecoverableError
from senzing_core import SzAbstractFactoryCore

SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
INPUT_FILE = Path("../../resources/data/load-500.jsonl").resolve()
INSTANCE_NAME = Path(__file__).stem
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")


def mock_logger(level, error, error_record=None):
Expand Down Expand Up @@ -40,13 +40,14 @@ def engine_stats(engine):

def futures_add(engine, input_file):
success_recs = 0
shutdown = False
error_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
with open(input_file, "r", encoding="utf-8") as in_file:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(add_record, engine, record): record
for record in itertools.islice(file, executor._max_workers)
for record in itertools.islice(in_file, executor._max_workers)
}

while futures:
Expand All @@ -61,7 +62,7 @@ def futures_add(engine, input_file):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
success_recs += 1
Expand All @@ -71,7 +72,7 @@ def futures_add(engine, input_file):
if success_recs % 200 == 0:
engine_stats(engine)
finally:
if record := file.readline():
if not shutdown and (record := in_file.readline()):
futures[executor.submit(add_record, engine, record)] = record

del futures[f]
Expand Down
2 changes: 1 addition & 1 deletion python/information/get_version.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
from senzing import SzError
from senzing_core import SzAbstractFactoryCore

SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
INSTANCE_NAME = Path(__file__).stem
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")

try:
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
Expand Down
1 change: 0 additions & 1 deletion python/initialization/abstract_factory_parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

try:
sz_factory = SzAbstractFactoryCore(**FACTORY_PARAMETERS)
sz_config = sz_factory.create_config()
sz_configmgr = sz_factory.create_configmanager()
sz_diagnostic = sz_factory.create_diagnostic()
sz_engine = sz_factory.create_engine()
Expand Down
1 change: 0 additions & 1 deletion python/initialization/factory_and_engines.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@

try:
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
sz_config = sz_factory.create_config()
sz_configmgr = sz_factory.create_configmanager()
sz_diagnostic = sz_factory.create_diagnostic()
sz_engine = sz_factory.create_engine()
Expand Down
11 changes: 6 additions & 5 deletions python/loading/add_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def add_record(engine, record_to_add):


def futures_add(engine, input_file):
success_recs = 0
error_recs = 0
shutdown = False
success_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
with open(input_file, "r", encoding="utf-8") as in_file:
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {
executor.submit(add_record, engine, record): record
for record in itertools.islice(file, executor._max_workers)
for record in itertools.islice(in_file, executor._max_workers)
}

while futures:
Expand All @@ -51,14 +52,14 @@ def futures_add(engine, input_file):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
finally:
if record := file.readline():
if not shutdown and (record := in_file.readline()):
futures[executor.submit(add_record, engine, record)] = record

del futures[f]
Expand Down
13 changes: 7 additions & 6 deletions python/loading/add_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,15 @@ def add_record(engine, record_to_add):


def producer(input_file, queue):
with open(input_file, "r", encoding="utf-8") as file:
for record in file:
with open(input_file, "r", encoding="utf-8") as in_file:
for record in in_file:
queue.put(record, block=True)


def consumer(engine, queue):
success_recs = 0
error_recs = 0
shutdown = False
success_recs = 0

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = {executor.submit(add_record, engine, queue.get()): _ for _ in range(executor._max_workers)}
Expand All @@ -53,14 +54,14 @@ def consumer(engine, queue):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
success_recs += 1
if success_recs % 100 == 0:
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
finally:
if not queue.empty():
if not shutdown and not queue.empty():
record = queue.get()
futures[executor.submit(add_record, engine, record)] = record

Expand All @@ -73,7 +74,7 @@ def consumer(engine, queue):
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
sz_engine = sz_factory.create_engine()

input_queue = Queue(maxsize=200) # type: ignore
input_queue = Queue(maxsize=200)
producer_proc = Process(target=producer, args=(INPUT_FILE, input_queue))
producer_proc.start()
consumer_proc = Process(target=consumer, args=(sz_engine, input_queue))
Expand Down
7 changes: 3 additions & 4 deletions python/loading/add_records_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ def mock_logger(level, error, error_record=None):


def add_records_from_file(engine, input_file):
success_recs = 0
error_recs = 0
success_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
for record_to_add in file:
with open(input_file, "r", encoding="utf-8") as in_file:
for record_to_add in in_file:
try:
record_dict = json.loads(record_to_add)
data_source = record_dict.get("DATA_SOURCE", "")
Expand All @@ -37,7 +37,6 @@ def add_records_from_file(engine, input_file):
mock_logger("WARN", err, record_to_add)
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, record_to_add)
raise err
else:
success_recs += 1
Expand Down
5 changes: 2 additions & 3 deletions python/loading/add_truthset_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ def add_records_from_file(engine, input_file):
success_recs = 0
error_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
with open(input_file, "r", encoding="utf-8") as in_file:
print(f"\nAdding records from {input_file}")

for record_to_add in file:
for record_to_add in in_file:
try:
record_dict = json.loads(record_to_add)
data_source = record_dict.get("DATA_SOURCE", "")
Expand All @@ -43,7 +43,6 @@ def add_records_from_file(engine, input_file):
mock_logger("WARN", err, record_to_add)
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, record_to_add)
raise err
else:
success_recs += 1
Expand Down
7 changes: 4 additions & 3 deletions python/loading/add_with_info_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ def engine_stats(engine):


def futures_add(engine, input_file, output_file):
success_recs = 0
error_recs = 0
shutdown = False
success_recs = 0

with open(output_file, "w", encoding="utf-8") as out_file:
with open(input_file, "r", encoding="utf-8") as in_file:
Expand All @@ -69,7 +70,7 @@ def futures_add(engine, input_file, output_file):
mock_logger("WARN", err, futures[f])
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, futures[f])
shutdown = True
raise err
else:
out_file.write(f"{result}\n")
Expand All @@ -81,7 +82,7 @@ def futures_add(engine, input_file, output_file):
if success_recs % 200 == 0:
engine_stats(engine)
finally:
if record := in_file.readline():
if not shutdown and (record := in_file.readline()):
futures[executor.submit(add_record, engine, record)] = record

del futures[f]
Expand Down
9 changes: 4 additions & 5 deletions python/redo/add_with_redo.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,12 @@ def mock_logger(level, error, error_record=None):


def add_records_from_file(engine, input_file):
success_recs = 0
error_recs = 0
success_recs = 0

with open(input_file, "r", encoding="utf-8") as file:
with open(input_file, "r", encoding="utf-8") as in_file:

for record_to_add in file:
for record_to_add in in_file:
try:
record_dict = json.loads(record_to_add)
data_source = record_dict.get("DATA_SOURCE", None)
Expand All @@ -42,7 +42,6 @@ def add_records_from_file(engine, input_file):
mock_logger("WARN", err, record_to_add)
error_recs += 1
except (SzUnrecoverableError, SzError) as err:
mock_logger("CRITICAL", err, record_to_add)
raise err
else:
success_recs += 1
Expand All @@ -54,8 +53,8 @@ def add_records_from_file(engine, input_file):


def process_redo(engine):
success_recs = 0
error_recs = 0
success_recs = 0

print("\nStarting to process redo records...")

Expand Down
Loading
Loading