Skip to content

Commit c5c435e

Browse files
authored
#32- Fix thread pool drain if non-terminal error occurs (#33)
1 parent 5c0355f commit c5c435e

File tree

11 files changed

+86
-166
lines changed

11 files changed

+86
-166
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
55
The format is based on [Keep a Changelog], [markdownlint],
66
and this project adheres to [Semantic Versioning].
77

8+
## [1.1.2] - 2025-06-03
9+
10+
### Fixed in 1.1.2
11+
12+
- Fix potential for futures thread pool to drain on non-terminal errors
13+
814
## [1.1.1] - 2024-05-24
915

1016
### Changed in 1.1.1

Python/Tasks/Deleting/DeleteFutures.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import sys
88
import time
9+
910
from senzing import (
1011
G2BadInputException,
1112
G2Engine,
@@ -63,9 +64,7 @@ def futures_del(engine, input_file):
6364
}
6465

6566
while futures:
66-
done, _ = concurrent.futures.wait(
67-
futures, return_when=concurrent.futures.FIRST_COMPLETED
68-
)
67+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
6968
for f in done:
7069
try:
7170
f.result()
@@ -79,27 +78,20 @@ def futures_del(engine, input_file):
7978
mock_logger("CRITICAL", err, futures[f])
8079
raise
8180
else:
82-
record = in_file.readline()
83-
if record:
84-
futures[executor.submit(del_record, engine, record)] = (
85-
record
86-
)
87-
8881
success_recs += 1
8982
if success_recs % 1000 == 0:
90-
prev_time = record_stats(
91-
success_recs, error_recs, prev_time
92-
)
83+
prev_time = record_stats(success_recs, error_recs, prev_time)
9384

9485
if success_recs % 10000 == 0:
9586
engine_stats(engine)
9687
finally:
88+
record = in_file.readline()
89+
if record:
90+
futures[executor.submit(del_record, engine, record)] = record
91+
9792
del futures[f]
9893

99-
print(
100-
f"Successfully deleted {success_recs:,} records, with"
101-
f" {error_recs:,} errors"
102-
)
94+
print(f"Successfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")
10395

10496

10597
try:

Python/Tasks/Deleting/DeleteWithInfoFutures.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import sys
88
import time
9+
910
from senzing import (
1011
G2BadInputException,
1112
G2Engine,
@@ -66,9 +67,7 @@ def futures_del(engine, input_file, output_file):
6667
}
6768

6869
while futures:
69-
done, _ = concurrent.futures.wait(
70-
futures, return_when=concurrent.futures.FIRST_COMPLETED
71-
)
70+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
7271
for f in done:
7372
try:
7473
result = f.result()
@@ -82,29 +81,22 @@ def futures_del(engine, input_file, output_file):
8281
mock_logger("CRITICAL", err, futures[f])
8382
raise
8483
else:
85-
record = in_file.readline()
86-
if record:
87-
futures[executor.submit(del_record, engine, record)] = (
88-
record
89-
)
90-
9184
out_file.write(f"{result}\n")
9285

9386
success_recs += 1
9487
if success_recs % 1000 == 0:
95-
prev_time = record_stats(
96-
success_recs, error_recs, prev_time
97-
)
88+
prev_time = record_stats(success_recs, error_recs, prev_time)
9889

9990
if success_recs % 10000 == 0:
10091
engine_stats(engine)
10192
finally:
93+
record = in_file.readline()
94+
if record:
95+
futures[executor.submit(del_record, engine, record)] = record
96+
10297
del futures[f]
10398

104-
print(
105-
f"Successfully deleted {success_recs:,} records, with"
106-
f" {error_recs:,} errors"
107-
)
99+
print(f"Successfully deleted {success_recs:,} records, with" f" {error_recs:,} errors")
108100
print(f"With info responses written to {output_file}")
109101

110102

Python/Tasks/Loading/Add100KFutures.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ def futures_add(engine, input_file):
6464
}
6565

6666
while futures:
67-
done, _ = concurrent.futures.wait(
68-
futures, return_when=concurrent.futures.FIRST_COMPLETED
69-
)
67+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
7068
for f in done:
7169
try:
7270
f.result()
@@ -80,27 +78,20 @@ def futures_add(engine, input_file):
8078
mock_logger("CRITICAL", err, futures[f])
8179
raise
8280
else:
83-
record = file.readline()
84-
if record:
85-
futures[executor.submit(add_record, engine, record)] = (
86-
record
87-
)
88-
8981
success_recs += 1
9082
if success_recs % 1000 == 0:
91-
prev_time = record_stats(
92-
success_recs, error_recs, prev_time
93-
)
83+
prev_time = record_stats(success_recs, error_recs, prev_time)
9484

9585
if success_recs % 10000 == 0:
9686
engine_stats(engine)
9787
finally:
88+
record = file.readline()
89+
if record:
90+
futures[executor.submit(add_record, engine, record)] = record
91+
9892
del futures[f]
9993

100-
print(
101-
f"Successfully loaded {success_recs:,} records, with"
102-
f" {error_recs:,} errors"
103-
)
94+
print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
10495

10596

10697
try:

Python/Tasks/Loading/Add10KFutures.py

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -64,9 +64,7 @@ def futures_add(engine, input_file):
6464
}
6565

6666
while futures:
67-
done, _ = concurrent.futures.wait(
68-
futures, return_when=concurrent.futures.FIRST_COMPLETED
69-
)
67+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
7068
for f in done:
7169
try:
7270
f.result()
@@ -80,27 +78,20 @@ def futures_add(engine, input_file):
8078
mock_logger("CRITICAL", err, futures[f])
8179
raise
8280
else:
83-
record = file.readline()
84-
if record:
85-
futures[executor.submit(add_record, engine, record)] = (
86-
record
87-
)
88-
8981
success_recs += 1
9082
if success_recs % 1000 == 0:
91-
prev_time = record_stats(
92-
success_recs, error_recs, prev_time
93-
)
83+
prev_time = record_stats(success_recs, error_recs, prev_time)
9484

9585
if success_recs % 10000 == 0:
9686
engine_stats(engine)
9787
finally:
88+
record = file.readline()
89+
if record:
90+
futures[executor.submit(add_record, engine, record)] = record
91+
9892
del futures[f]
9993

100-
print(
101-
f"Successfully loaded {success_recs:,} records, with"
102-
f" {error_recs:,} errors"
103-
)
94+
print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
10495

10596

10697
try:

Python/Tasks/Loading/Add50KFutures.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import sys
88
import time
9+
910
from senzing import (
1011
G2BadInputException,
1112
G2Engine,
@@ -63,9 +64,7 @@ def futures_add(engine, input_file):
6364
}
6465

6566
while futures:
66-
done, _ = concurrent.futures.wait(
67-
futures, return_when=concurrent.futures.FIRST_COMPLETED
68-
)
67+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
6968
for f in done:
7069
try:
7170
f.result()
@@ -79,27 +78,20 @@ def futures_add(engine, input_file):
7978
mock_logger("CRITICAL", err, futures[f])
8079
raise
8180
else:
82-
record = file.readline()
83-
if record:
84-
futures[executor.submit(add_record, engine, record)] = (
85-
record
86-
)
87-
8881
success_recs += 1
8982
if success_recs % 1000 == 0:
90-
prev_time = record_stats(
91-
success_recs, error_recs, prev_time
92-
)
83+
prev_time = record_stats(success_recs, error_recs, prev_time)
9384

9485
if success_recs % 10000 == 0:
9586
engine_stats(engine)
9687
finally:
88+
record = file.readline()
89+
if record:
90+
futures[executor.submit(add_record, engine, record)] = record
91+
9792
del futures[f]
9893

99-
print(
100-
f"Successfully loaded {success_recs:,} records, with"
101-
f" {error_recs:,} errors"
102-
)
94+
print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
10395

10496

10597
try:

Python/Tasks/Loading/Add50KWithInfoFutures.py

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import os
77
import sys
88
import time
9+
910
from senzing import (
1011
G2BadInputException,
1112
G2Engine,
@@ -66,9 +67,7 @@ def futures_add(engine, input_file, output_file):
6667
}
6768

6869
while futures:
69-
done, _ = concurrent.futures.wait(
70-
futures, return_when=concurrent.futures.FIRST_COMPLETED
71-
)
70+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
7271
for f in done:
7372
try:
7473
result = f.result()
@@ -82,29 +81,22 @@ def futures_add(engine, input_file, output_file):
8281
mock_logger("CRITICAL", err, futures[f])
8382
raise
8483
else:
85-
record = in_file.readline()
86-
if record:
87-
futures[executor.submit(add_record, engine, record)] = (
88-
record
89-
)
90-
9184
out_file.write(f"{result}\n")
9285

9386
success_recs += 1
9487
if success_recs % 1000 == 0:
95-
prev_time = record_stats(
96-
success_recs, error_recs, prev_time
97-
)
88+
prev_time = record_stats(success_recs, error_recs, prev_time)
9889

9990
if success_recs % 10000 == 0:
10091
engine_stats(engine)
10192
finally:
93+
record = in_file.readline()
94+
if record:
95+
futures[executor.submit(add_record, engine, record)] = record
96+
10297
del futures[f]
10398

104-
print(
105-
f"Successfully loaded {success_recs:,} records, with"
106-
f" {error_recs:,} errors"
107-
)
99+
print(f"Successfully loaded {success_recs:,} records, with" f" {error_recs:,} errors")
108100
print(f"With info responses written to {output_file}")
109101

110102

Python/Tasks/Redo/RedoContinuousFutures.py

Lines changed: 11 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import os
55
import sys
66
import time
7+
78
from senzing import (
89
G2BadInputException,
910
G2Engine,
@@ -71,10 +72,7 @@ def redo_count(engine):
7172

7273

7374
def redo_pause(success):
74-
print(
75-
"No redo records to process, pausing for 30 seconds. Total processed:"
76-
f" {success:,} (CTRL-C to exit)..."
77-
)
75+
print("No redo records to process, pausing for 30 seconds. Total processed:" f" {success:,} (CTRL-C to exit)...")
7876
time.sleep(30)
7977

8078

@@ -94,9 +92,7 @@ def futures_redo(engine):
9492
break
9593

9694
while True:
97-
done, _ = concurrent.futures.wait(
98-
futures, return_when=concurrent.futures.FIRST_COMPLETED
99-
)
95+
done, _ = concurrent.futures.wait(futures, return_when=concurrent.futures.FIRST_COMPLETED)
10096
for f in done:
10197
try:
10298
_ = f.result()
@@ -110,24 +106,19 @@ def futures_redo(engine):
110106
mock_logger("CRITICAL", err, futures[f])
111107
raise
112108
else:
113-
record = get_redo_record(engine)
114-
if record:
115-
futures[
116-
executor.submit(process_redo_record, engine, record)
117-
] = record
118-
else:
119-
redo_paused = True
120-
121109
success_recs += 1
122110
if success_recs % 100 == 0:
123-
print(
124-
f"Processed {success_recs:,} redo records, with"
125-
f" {error_recs:,} errors"
126-
)
111+
print(f"Processed {success_recs:,} redo records, with" f" {error_recs:,} errors")
127112

128113
if success_recs % 1000 == 0:
129114
engine_stats(engine)
130115
finally:
116+
record = get_redo_record(engine)
117+
if record:
118+
futures[executor.submit(process_redo_record, engine, record)] = record
119+
else:
120+
redo_paused = True
121+
131122
del futures[f]
132123

133124
if redo_paused:
@@ -137,9 +128,7 @@ def futures_redo(engine):
137128
while len(futures) < executor._max_workers:
138129
record = get_redo_record(engine)
139130
if record:
140-
futures[
141-
executor.submit(process_redo_record, engine, record)
142-
] = record
131+
futures[executor.submit(process_redo_record, engine, record)] = record
143132

144133

145134
try:

0 commit comments

Comments
 (0)