Skip to content

Commit c4991d0

Browse files
authored
#48 - Fix proper shutdown for futures (#51)
1 parent 9453fb7 commit c4991d0

22 files changed

+73
-64
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog], [markdownlint],
66
and this project adheres to [Semantic Versioning].
77

8+
## [0.0.8] - 2025-06-20
9+
10+
### Changed in 0.0.8
11+
12+
- Small improvements to Python snippets
13+
814
## [0.0.7] - 2025-06-03
915

1016
### Changed in 0.0.7

python/deleting/delete_futures.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,8 +29,9 @@ def delete_record(engine, record_to_delete):
2929

3030

3131
def futures_del(engine, input_file):
32-
success_recs = 0
3332
error_recs = 0
33+
shutdown = False
34+
success_recs = 0
3435

3536
with open(input_file, "r", encoding="utf-8") as in_file:
3637
with concurrent.futures.ThreadPoolExecutor() as executor:
@@ -51,14 +52,14 @@ def futures_del(engine, input_file):
5152
mock_logger("WARN", err, futures[f])
5253
error_recs += 1
5354
except (SzUnrecoverableError, SzError) as err:
54-
mock_logger("CRITICAL", err, futures[f])
55+
shutdown = True
5556
raise err
5657
else:
5758
success_recs += 1
5859
if success_recs % 100 == 0:
5960
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
6061
finally:
61-
if record := in_file.readline():
62+
if not shutdown and (record := in_file.readline()):
6263
futures[executor.submit(delete_record, engine, record)] = record
6364

6465
del futures[f]

python/deleting/delete_loop.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,12 @@ def mock_logger(level, error, error_record=None):
2020

2121

2222
def del_records_from_file(engine, input_file):
23-
success_recs = error_recs = 0
23+
error_recs = 0
24+
success_recs = 0
2425

25-
with open(input_file, "r", encoding="utf-8") as file:
26+
with open(input_file, "r", encoding="utf-8") as in_file:
2627

27-
for record_to_delete in file:
28+
for record_to_delete in in_file:
2829
try:
2930
record_dict = json.loads(record_to_delete)
3031
data_source = record_dict.get("DATA_SOURCE", "")
@@ -37,7 +38,6 @@ def del_records_from_file(engine, input_file):
3738
mock_logger("WARN", err, record_to_delete)
3839
error_recs += 1
3940
except (SzUnrecoverableError, SzError) as err:
40-
mock_logger("CRITICAL", err, record_to_delete)
4141
raise err
4242
else:
4343
success_recs += 1

python/deleting/delete_with_info_futures.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,8 +36,9 @@ def delete_record(engine, record_to_delete):
3636

3737

3838
def futures_del(engine, input_file, output_file):
39-
success_recs = 0
4039
error_recs = 0
40+
shutdown = False
41+
success_recs = 0
4142

4243
with open(output_file, "w", encoding="utf-8") as out_file:
4344
with open(input_file, "r", encoding="utf-8") as in_file:
@@ -59,7 +60,7 @@ def futures_del(engine, input_file, output_file):
5960
mock_logger("WARN", err, futures[f])
6061
error_recs += 1
6162
except (SzUnrecoverableError, SzError) as err:
62-
mock_logger("CRITICAL", err, futures[f])
63+
shutdown = True
6364
raise err
6465
else:
6566
out_file.write(f"{result}\n")
@@ -68,7 +69,7 @@ def futures_del(engine, input_file, output_file):
6869
if success_recs % 100 == 0:
6970
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
7071
finally:
71-
if record := in_file.readline():
72+
if not shutdown and (record := in_file.readline()):
7273
futures[executor.submit(delete_record, engine, record)] = record
7374

7475
del futures[f]

python/information/check_datastore_performance.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,9 @@
66
from senzing import SzError
77
from senzing_core import SzAbstractFactoryCore
88

9-
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
109
INSTANCE_NAME = Path(__file__).stem
1110
SECONDS_TO_RUN = 3
11+
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1212

1313
try:
1414
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)

python/information/get_datastore_info.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from senzing import SzError
77
from senzing_core import SzAbstractFactoryCore
88

9-
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
109
INSTANCE_NAME = Path(__file__).stem
10+
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1111

1212
try:
1313
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)

python/information/get_license.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from senzing import SzError
77
from senzing_core import SzAbstractFactoryCore
88

9-
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
109
INSTANCE_NAME = Path(__file__).stem
10+
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1111

1212
try:
1313
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)

python/information/get_stats.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,9 @@
1010
from senzing import SzBadInputError, SzError, SzRetryableError, SzUnrecoverableError
1111
from senzing_core import SzAbstractFactoryCore
1212

13-
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1413
INPUT_FILE = Path("../../resources/data/load-500.jsonl").resolve()
1514
INSTANCE_NAME = Path(__file__).stem
15+
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1616

1717

1818
def mock_logger(level, error, error_record=None):
@@ -40,13 +40,14 @@ def engine_stats(engine):
4040

4141
def futures_add(engine, input_file):
4242
success_recs = 0
43+
shutdown = False
4344
error_recs = 0
4445

45-
with open(input_file, "r", encoding="utf-8") as file:
46+
with open(input_file, "r", encoding="utf-8") as in_file:
4647
with concurrent.futures.ThreadPoolExecutor() as executor:
4748
futures = {
4849
executor.submit(add_record, engine, record): record
49-
for record in itertools.islice(file, executor._max_workers)
50+
for record in itertools.islice(in_file, executor._max_workers)
5051
}
5152

5253
while futures:
@@ -61,7 +62,7 @@ def futures_add(engine, input_file):
6162
mock_logger("WARN", err, futures[f])
6263
error_recs += 1
6364
except (SzUnrecoverableError, SzError) as err:
64-
mock_logger("CRITICAL", err, futures[f])
65+
shutdown = True
6566
raise err
6667
else:
6768
success_recs += 1
@@ -71,7 +72,7 @@ def futures_add(engine, input_file):
7172
if success_recs % 200 == 0:
7273
engine_stats(engine)
7374
finally:
74-
if record := file.readline():
75+
if not shutdown and (record := in_file.readline()):
7576
futures[executor.submit(add_record, engine, record)] = record
7677

7778
del futures[f]

python/information/get_version.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@
66
from senzing import SzError
77
from senzing_core import SzAbstractFactoryCore
88

9-
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
109
INSTANCE_NAME = Path(__file__).stem
10+
SETTINGS = os.getenv("SENZING_ENGINE_CONFIGURATION_JSON", "{}")
1111

1212
try:
1313
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)

python/initialization/abstract_factory_parameters.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919

2020
try:
2121
sz_factory = SzAbstractFactoryCore(**FACTORY_PARAMETERS)
22-
sz_config = sz_factory.create_config()
2322
sz_configmgr = sz_factory.create_configmanager()
2423
sz_diagnostic = sz_factory.create_diagnostic()
2524
sz_engine = sz_factory.create_engine()

python/initialization/factory_and_engines.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@
1111

1212
try:
1313
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
14-
sz_config = sz_factory.create_config()
1514
sz_configmgr = sz_factory.create_configmanager()
1615
sz_diagnostic = sz_factory.create_diagnostic()
1716
sz_engine = sz_factory.create_engine()

python/loading/add_futures.py

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ def add_record(engine, record_to_add):
2929

3030

3131
def futures_add(engine, input_file):
32-
success_recs = 0
3332
error_recs = 0
33+
shutdown = False
34+
success_recs = 0
3435

35-
with open(input_file, "r", encoding="utf-8") as file:
36+
with open(input_file, "r", encoding="utf-8") as in_file:
3637
with concurrent.futures.ThreadPoolExecutor() as executor:
3738
futures = {
3839
executor.submit(add_record, engine, record): record
39-
for record in itertools.islice(file, executor._max_workers)
40+
for record in itertools.islice(in_file, executor._max_workers)
4041
}
4142

4243
while futures:
@@ -51,14 +52,14 @@ def futures_add(engine, input_file):
5152
mock_logger("WARN", err, futures[f])
5253
error_recs += 1
5354
except (SzUnrecoverableError, SzError) as err:
54-
mock_logger("CRITICAL", err, futures[f])
55+
shutdown = True
5556
raise err
5657
else:
5758
success_recs += 1
5859
if success_recs % 100 == 0:
5960
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
6061
finally:
61-
if record := file.readline():
62+
if not shutdown and (record := in_file.readline()):
6263
futures[executor.submit(add_record, engine, record)] = record
6364

6465
del futures[f]

python/loading/add_queue.py

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,14 +29,15 @@ def add_record(engine, record_to_add):
2929

3030

3131
def producer(input_file, queue):
32-
with open(input_file, "r", encoding="utf-8") as file:
33-
for record in file:
32+
with open(input_file, "r", encoding="utf-8") as in_file:
33+
for record in in_file:
3434
queue.put(record, block=True)
3535

3636

3737
def consumer(engine, queue):
38-
success_recs = 0
3938
error_recs = 0
39+
shutdown = False
40+
success_recs = 0
4041

4142
with concurrent.futures.ThreadPoolExecutor() as executor:
4243
futures = {executor.submit(add_record, engine, queue.get()): _ for _ in range(executor._max_workers)}
@@ -53,14 +54,14 @@ def consumer(engine, queue):
5354
mock_logger("WARN", err, futures[f])
5455
error_recs += 1
5556
except (SzUnrecoverableError, SzError) as err:
56-
mock_logger("CRITICAL", err, futures[f])
57+
shutdown = True
5758
raise err
5859
else:
5960
success_recs += 1
6061
if success_recs % 100 == 0:
6162
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
6263
finally:
63-
if not queue.empty():
64+
if not shutdown and not queue.empty():
6465
record = queue.get()
6566
futures[executor.submit(add_record, engine, record)] = record
6667

@@ -73,7 +74,7 @@ def consumer(engine, queue):
7374
sz_factory = SzAbstractFactoryCore(INSTANCE_NAME, SETTINGS, verbose_logging=False)
7475
sz_engine = sz_factory.create_engine()
7576

76-
input_queue = Queue(maxsize=200) # type: ignore
77+
input_queue = Queue(maxsize=200)
7778
producer_proc = Process(target=producer, args=(INPUT_FILE, input_queue))
7879
producer_proc.start()
7980
consumer_proc = Process(target=consumer, args=(sz_engine, input_queue))

python/loading/add_records_loop.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,11 @@ def mock_logger(level, error, error_record=None):
2020

2121

2222
def add_records_from_file(engine, input_file):
23-
success_recs = 0
2423
error_recs = 0
24+
success_recs = 0
2525

26-
with open(input_file, "r", encoding="utf-8") as file:
27-
for record_to_add in file:
26+
with open(input_file, "r", encoding="utf-8") as in_file:
27+
for record_to_add in in_file:
2828
try:
2929
record_dict = json.loads(record_to_add)
3030
data_source = record_dict.get("DATA_SOURCE", "")
@@ -37,7 +37,6 @@ def add_records_from_file(engine, input_file):
3737
mock_logger("WARN", err, record_to_add)
3838
error_recs += 1
3939
except (SzUnrecoverableError, SzError) as err:
40-
mock_logger("CRITICAL", err, record_to_add)
4140
raise err
4241
else:
4342
success_recs += 1

python/loading/add_truthset_loop.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,10 @@ def add_records_from_file(engine, input_file):
2727
success_recs = 0
2828
error_recs = 0
2929

30-
with open(input_file, "r", encoding="utf-8") as file:
30+
with open(input_file, "r", encoding="utf-8") as in_file:
3131
print(f"\nAdding records from {input_file}")
3232

33-
for record_to_add in file:
33+
for record_to_add in in_file:
3434
try:
3535
record_dict = json.loads(record_to_add)
3636
data_source = record_dict.get("DATA_SOURCE", "")
@@ -43,7 +43,6 @@ def add_records_from_file(engine, input_file):
4343
mock_logger("WARN", err, record_to_add)
4444
error_recs += 1
4545
except (SzUnrecoverableError, SzError) as err:
46-
mock_logger("CRITICAL", err, record_to_add)
4746
raise err
4847
else:
4948
success_recs += 1

python/loading/add_with_info_futures.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -46,8 +46,9 @@ def engine_stats(engine):
4646

4747

4848
def futures_add(engine, input_file, output_file):
49-
success_recs = 0
5049
error_recs = 0
50+
shutdown = False
51+
success_recs = 0
5152

5253
with open(output_file, "w", encoding="utf-8") as out_file:
5354
with open(input_file, "r", encoding="utf-8") as in_file:
@@ -69,7 +70,7 @@ def futures_add(engine, input_file, output_file):
6970
mock_logger("WARN", err, futures[f])
7071
error_recs += 1
7172
except (SzUnrecoverableError, SzError) as err:
72-
mock_logger("CRITICAL", err, futures[f])
73+
shutdown = True
7374
raise err
7475
else:
7576
out_file.write(f"{result}\n")
@@ -81,7 +82,7 @@ def futures_add(engine, input_file, output_file):
8182
if success_recs % 200 == 0:
8283
engine_stats(engine)
8384
finally:
84-
if record := in_file.readline():
85+
if not shutdown and (record := in_file.readline()):
8586
futures[executor.submit(add_record, engine, record)] = record
8687

8788
del futures[f]

python/redo/add_with_redo.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -24,12 +24,12 @@ def mock_logger(level, error, error_record=None):
2424

2525

2626
def add_records_from_file(engine, input_file):
27-
success_recs = 0
2827
error_recs = 0
28+
success_recs = 0
2929

30-
with open(input_file, "r", encoding="utf-8") as file:
30+
with open(input_file, "r", encoding="utf-8") as in_file:
3131

32-
for record_to_add in file:
32+
for record_to_add in in_file:
3333
try:
3434
record_dict = json.loads(record_to_add)
3535
data_source = record_dict.get("DATA_SOURCE", None)
@@ -42,7 +42,6 @@ def add_records_from_file(engine, input_file):
4242
mock_logger("WARN", err, record_to_add)
4343
error_recs += 1
4444
except (SzUnrecoverableError, SzError) as err:
45-
mock_logger("CRITICAL", err, record_to_add)
4645
raise err
4746
else:
4847
success_recs += 1
@@ -54,8 +53,8 @@ def add_records_from_file(engine, input_file):
5453

5554

5655
def process_redo(engine):
57-
success_recs = 0
5856
error_recs = 0
57+
success_recs = 0
5958

6059
print("\nStarting to process redo records...")
6160

0 commit comments

Comments
 (0)