Skip to content

Commit b43efb9

Browse files
authored
#42 - Fix potential for futures thread pool to drain on non-terminal … (#46)
* #42 - Fix potential for futures thread pool to drain on non-terminal errors in Python * #42 - Update Changelog
1 parent 5071059 commit b43efb9

20 files changed

+73
-1068
lines changed

CHANGELOG.md

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,31 +5,50 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog], [markdownlint],
66
and this project adheres to [Semantic Versioning].
77

8+
## [0.0.7] - 2025-06-03
9+
10+
### Changed in 0.0.7
11+
12+
- Small improvements to Python snippets
13+
14+
## [0.0.6]
15+
816
### Changed in 0.0.6
917

1018
- Improved Python add_data_sources.py
1119

20+
## [0.0.5]
21+
1222
### Changed in 0.0.5
1323

1424
- Modified configuration examples for new szconfig and szconfigmanager pattern
1525

26+
## [0.0.4]
27+
1628
### Added to 0.0.4
1729

1830
- C# examples
1931
- Updated Java examples to move declarations to the bottom
2032

33+
## [0.0.3]
34+
2135
### Added to 0.0.3
2236

2337
- Java examples
2438

39+
## [0.0.2]
40+
2541
### Changed in 0.0.2
2642

2743
- Modify Python imports to use senzing and senzing_core
2844

29-
### Added to 0.0.1
45+
46+
### Added to 0.0.2
3047

3148
- Couple of new examples
3249

50+
## [0.0.1]
51+
3352
### Added to 0.0.1
3453

3554
- Initial for V4

README.md

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -67,24 +67,24 @@ You may already have installed the Senzing and created a Senzing project by foll
6767

6868
### Configuration
6969

70-
When using a bare metal install, the initialization parameters used by the Senzing Python utilities are maintained within `<project_path>/etc/G2Module.ini`.
70+
When using a bare metal install, the initialization parameters used by the Senzing Python utilities are maintained within `<project_path>/etc/sz_engine_config.ini`.
7171

72-
🤔To convert an existing Senzing project G2Module.ini file to a JSON string use one of the following methods:
72+
🤔To convert an existing Senzing project sz_engine_config.ini file to a JSON string use one of the following methods:
7373

74-
- [g2_module_ini_to_json.py]
74+
- [sz_engine_config_ini_to_json.py]
7575

76-
- Modify the path to your projects G2Module.ini file.
76+
- Modify the path to your projects sz_engine_config.ini file.
7777

7878
- [jc]
7979

8080
- ```console
81-
cat <project_path>/etc/G2Module.ini | jc --ini
81+
cat <project_path>/etc/sz_engine_config.ini.ini | jc --ini
8282
```
8383

8484
- Python one liner
8585

8686
- ```python
87-
python3 -c $'import configparser; ini_file_name = "<project_path>/etc/G2Module.ini";settings = {};cfgp = configparser.ConfigParser();cfgp.optionxform = str;cfgp.read(ini_file_name)\nfor section in cfgp.sections(): settings[section] = dict(cfgp.items(section))\nprint(settings)'
87+
python3 -c $'import configparser; ini_file_name = "<project_path>/etc/sz_engine_config.ini";settings = {};cfgp = configparser.ConfigParser();cfgp.optionxform = str;cfgp.read(ini_file_name)\nfor section in cfgp.sections(): settings[section] = dict(cfgp.items(section))\nprint(settings)'
8888
```
8989

9090
:pencil2: `<project_path>` in the above example should point to your project.
@@ -172,7 +172,7 @@ There are different sized load files within the [data] path that can be used to
172172
[Configuration]: #configuration
173173
[data]: resources/data/
174174
[Docker Usage]: #docker-usage
175-
[g2_module_ini_to_json.py]: python/initialization/g2_module_ini_to_json.py
175+
[sz_engine_config_ini_to_json.py]: python/initialization/sz_engine_config_ini_to_json.py
176176
[Input Load File Sizes]: #input-load-file-sizes
177177
[Items of Note]: #items-of-note
178178
[jc]: https://github.com/kellyjonbrazil/jc

python/deleting/delete_futures.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,13 @@ def futures_del(engine, input_file):
5454
mock_logger("CRITICAL", err, futures[f])
5555
raise err
5656
else:
57-
record = in_file.readline()
58-
if record:
59-
futures[executor.submit(delete_record, engine, record)] = record
60-
6157
success_recs += 1
6258
if success_recs % 100 == 0:
63-
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
59+
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
6460
finally:
61+
if record := in_file.readline():
62+
futures[executor.submit(delete_record, engine, record)] = record
63+
6564
del futures[f]
6665

6766
print(f"\nSuccessfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")

python/deleting/delete_loop.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ def del_records_from_file(engine, input_file):
4343
success_recs += 1
4444

4545
if success_recs % 100 == 0:
46-
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
46+
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
4747

4848
print(f"\nSuccessfully deleted {success_recs:,} records, with {error_recs:,} errors")
4949

python/deleting/delete_with_info_futures.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -62,16 +62,15 @@ def futures_del(engine, input_file, output_file):
6262
mock_logger("CRITICAL", err, futures[f])
6363
raise err
6464
else:
65-
record = in_file.readline()
66-
if record:
67-
futures[executor.submit(delete_record, engine, record)] = record
68-
6965
out_file.write(f"{result}\n")
7066

7167
success_recs += 1
7268
if success_recs % 100 == 0:
73-
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
69+
print(f"Processed {success_recs:,} deletes, with {error_recs:,} errors", flush=True)
7470
finally:
71+
if record := in_file.readline():
72+
futures[executor.submit(delete_record, engine, record)] = record
73+
7574
del futures[f]
7675

7776
print(f"\nSuccessfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")

python/information/get_stats.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -64,17 +64,16 @@ def futures_add(engine, input_file):
6464
mock_logger("CRITICAL", err, futures[f])
6565
raise err
6666
else:
67-
record = file.readline()
68-
if record:
69-
futures[executor.submit(add_record, engine, record)] = record
70-
7167
success_recs += 1
7268
if success_recs % 100 == 0:
7369
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
7470

7571
if success_recs % 200 == 0:
7672
engine_stats(engine)
7773
finally:
74+
if record := file.readline():
75+
futures[executor.submit(add_record, engine, record)] = record
76+
7877
del futures[f]
7978

8079
print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")

python/initialization/README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,10 @@
88
- Priming the Senzing engine before use loads resource intensive assets upfront. Without priming the first SDK call to the engine will appear slower than usual as it causes these assets to be loaded
99
- **factory_and_engines.py**
1010
- Basic example of how to create an abstract Senzing factory and each of the available engines
11-
- **g2_module_ini_to_json.py**
11+
- **sz_engine_config_ini_to_json.py**
1212
- The snippets herein utilize the `SENZING_ENGINE_CONFIGURATION_JSON` environment variable for Senzing abstract factory creation
13-
- If you are familiar with working with a Senzing project you may be aware the same configuration data is held in the G2Module.ini file
14-
- Example to convert G2Module.ini to a JSON string for use with `SENZING_ENGINE_CONFIGURATION_JSON`
13+
- If you are familiar with working with a Senzing project you may be aware the same configuration data is held in the sz_engine_config.ini file
14+
- Example to convert sz_engine_config.ini to a JSON string for use with `SENZING_ENGINE_CONFIGURATION_JSON`
1515
- **purge_repository.py**
1616
- **WARNING** This script will remove all data from a Senzing repository, use with caution! **WARNING**
1717
- It will prompt first, still use with caution!

python/initialization/g2_module_ini_to_json.py renamed to python/initialization/sz_engine_config_ini_to_json.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import configparser
44
from pathlib import Path
55

6-
INI_FILE = Path("../../resources/g2module/G2Module.ini").resolve()
6+
INI_FILE = Path("../../resources/engine_config/sz_engine_config.ini").resolve()
77
settings = {}
88

99
cfgp = configparser.ConfigParser()

python/loading/add_futures.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,14 +54,13 @@ def futures_add(engine, input_file):
5454
mock_logger("CRITICAL", err, futures[f])
5555
raise err
5656
else:
57-
record = file.readline()
58-
if record:
59-
futures[executor.submit(add_record, engine, record)] = record
60-
6157
success_recs += 1
6258
if success_recs % 100 == 0:
6359
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
6460
finally:
61+
if record := file.readline():
62+
futures[executor.submit(add_record, engine, record)] = record
63+
6564
del futures[f]
6665

6766
print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")

python/loading/add_queue.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,14 @@ def consumer(engine, queue):
5656
mock_logger("CRITICAL", err, futures[f])
5757
raise err
5858
else:
59-
if not queue.empty():
60-
record = queue.get()
61-
futures[executor.submit(add_record, engine, record)] = record
62-
6359
success_recs += 1
6460
if success_recs % 100 == 0:
6561
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
6662
finally:
63+
if not queue.empty():
64+
record = queue.get()
65+
futures[executor.submit(add_record, engine, record)] = record
66+
6767
del futures[f]
6868

6969
print(f"\nSuccessfully loaded {success_recs:,} records, with {error_recs:,} errors")
@@ -80,6 +80,5 @@ def consumer(engine, queue):
8080
consumer_proc.start()
8181
producer_proc.join()
8282
consumer_proc.join()
83-
8483
except SzError as err:
8584
mock_logger("CRITICAL", err)

python/loading/add_with_info_futures.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,6 @@ def futures_add(engine, input_file, output_file):
7272
mock_logger("CRITICAL", err, futures[f])
7373
raise err
7474
else:
75-
record = in_file.readline()
76-
if record:
77-
futures[executor.submit(add_record, engine, record)] = record
78-
7975
out_file.write(f"{result}\n")
8076

8177
success_recs += 1
@@ -85,6 +81,9 @@ def futures_add(engine, input_file, output_file):
8581
if success_recs % 200 == 0:
8682
engine_stats(engine)
8783
finally:
84+
if record := in_file.readline():
85+
futures[executor.submit(add_record, engine, record)] = record
86+
8887
del futures[f]
8988

9089
print(f"\nSuccessfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")

python/redo/add_with_redo.py

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -59,10 +59,9 @@ def process_redo(engine):
5959

6060
print("\nStarting to process redo records...")
6161

62-
while 1:
62+
while True:
6363
try:
64-
response = engine.get_redo_record()
65-
if not response:
64+
if not (response := engine.get_redo_record()):
6665
break
6766
engine.process_redo_record(response)
6867

@@ -87,9 +86,8 @@ def process_redo(engine):
8786
sz_engine = sz_factory.create_engine()
8887
for load_file in INPUT_FILES:
8988
add_records_from_file(sz_engine, load_file)
90-
redo_count = sz_engine.count_redo_records()
9189

92-
if redo_count:
90+
if sz_engine.count_redo_records():
9391
process_redo(sz_engine)
9492
else:
9593
print("\nNo redo records to process")

python/redo/redo_continuous.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,11 +22,9 @@ def process_redo(engine):
2222
success_recs = 0
2323
error_recs = 0
2424

25-
while 1:
25+
while True:
2626
try:
27-
response = engine.get_redo_record()
28-
29-
if not response:
27+
if not (response := engine.get_redo_record()):
3028
print(
3129
"No redo records to process, pausing for 30 seconds. Total"
3230
f" processed {success_recs:,} . (CTRL-C to exit)..."

python/redo/redo_continuous_futures.py

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,7 @@ def get_redo_record(engine):
3030
def prime_redo_records(engine, quantity):
3131
redo_records = []
3232
for _ in range(quantity):
33-
redo_record = get_redo_record(engine)
34-
if redo_record:
33+
if redo_record := get_redo_record(engine):
3534
redo_records.append(redo_record)
3635
return redo_records
3736

@@ -61,7 +60,7 @@ def futures_redo(engine):
6160
redo_paused = False
6261

6362
with concurrent.futures.ThreadPoolExecutor() as executor:
64-
while 1:
63+
while True:
6564
futures = {
6665
executor.submit(process_redo_record, engine, record): record
6766
for record in prime_redo_records(engine, executor._max_workers)
@@ -71,7 +70,7 @@ def futures_redo(engine):
7170
else:
7271
break
7372

74-
while 1:
73+
while True:
7574
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
7675
for f in done:
7776
try:
@@ -86,25 +85,23 @@ def futures_redo(engine):
8685
mock_logger("CRITICAL", err, futures[f])
8786
raise err
8887
else:
89-
record = get_redo_record(engine)
90-
if record:
91-
futures[executor.submit(process_redo_record, engine, record)] = record
92-
else:
93-
redo_paused = True
94-
9588
success_recs += 1
9689
if success_recs % 100 == 0:
9790
print(f"Processed {success_recs:,} redo records, with" f" {error_recs:,} errors")
9891
finally:
92+
if record := get_redo_record(engine):
93+
futures[executor.submit(process_redo_record, engine, record)] = record
94+
else:
95+
redo_paused = True
96+
9997
del futures[f]
10098

10199
if redo_paused:
102100
while not redo_count(engine):
103101
redo_pause(success_recs)
104102
redo_paused = False
105103
while len(futures) < executor._max_workers:
106-
record = get_redo_record(engine)
107-
if record:
104+
if record := get_redo_record(engine):
108105
futures[executor.submit(process_redo_record, engine, record)] = record
109106

110107

python/redo/redo_with_info_continuous.py

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,8 @@ def process_redo(engine, output_file):
4242

4343
with open(output_file, "w", encoding="utf-8") as out_file:
4444
try:
45-
while 1:
46-
redo_record = engine.get_redo_record()
47-
48-
if not redo_record:
45+
while True:
46+
if not (redo_record := engine.get_redo_record()):
4947
redo_pause(success_recs)
5048
continue
5149

python/searching/search_futures.py

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,17 +51,16 @@ def futures_search(engine, input_file):
5151
mock_logger("CRITICAL", err, futures[f])
5252
raise err
5353
else:
54-
record = in_file.readline()
55-
if record:
56-
futures[executor.submit(search_record, engine, record)] = record
57-
5854
success_recs += 1
5955
if success_recs % 100 == 0:
60-
print(f"Processed {success_recs:,} adds, with {error_recs:,} errors", flush=True)
56+
print(f"Processed {success_recs:,} searches, with {error_recs:,} errors", flush=True)
6157

6258
print(f"\n------ Searched: {futures[f]}", flush=True)
6359
print(f"\n{result}", flush=True)
6460
finally:
61+
if record := in_file.readline():
62+
futures[executor.submit(search_record, engine, record)] = record
63+
6564
del futures[f]
6665

6766
print(f"\nSuccessfully searched {success_recs:,} records, with" f" {error_recs:,} errors")

0 commit comments

Comments
 (0)