Skip to content

Commit 9402cd3

Browse files
authored
Made indexer # threads controllable. (#366)
# Changes - Added `num_threads` option to control fast indexer parallelism - Don't use a thread pool for indexing if `num_threads == 1` # Fixes - Fixed `MixedLogReader` handling of Python `pathlib.Path` objects
2 parents b4f5412 + 1a040d2 commit 9402cd3

3 files changed

Lines changed: 27 additions & 12 deletions

File tree

python/fusion_engine_client/analysis/data_loader.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ class DataLoader(object):
161161

162162
logger = logging.getLogger('point_one.fusion_engine.analysis.data_loader')
163163

164-
def __init__(self, path=None, save_index=True, ignore_index=False):
164+
def __init__(self, path=None, save_index=True, ignore_index=False, num_threads: int = None):
165165
"""!
166166
@brief Create a new reader instance.
167167
@@ -170,6 +170,8 @@ def __init__(self, path=None, save_index=True, ignore_index=False):
170170
future. See @ref FileIndex for details.
171171
@param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly.
172172
If `save_index == True`, this will delete the existing file and create a new one.
173+
@param num_threads The number of parallel threads to spawn during indexing. If `None`, defaults to the number
174+
of available CPUs.
173175
"""
174176
self.reader: MixedLogReader = None
175177

@@ -185,9 +187,9 @@ def __init__(self, path=None, save_index=True, ignore_index=False):
185187

186188
self._generate_index = save_index
187189
if path is not None:
188-
self.open(path, save_index=save_index, ignore_index=ignore_index)
190+
self.open(path, save_index=save_index, ignore_index=ignore_index, num_threads=num_threads)
189191

190-
def open(self, path, save_index=True, ignore_index=False):
192+
def open(self, path, save_index=True, ignore_index=False, num_threads: int = None):
191193
"""!
192194
@brief Open a FusionEngine binary file.
193195
@@ -196,11 +198,13 @@ def open(self, path, save_index=True, ignore_index=False):
196198
future. See @ref FileIndex for details.
197199
@param ignore_index If `True`, ignore the existing index file and read from the `.p1log` binary file directly.
198200
If `save_index == True`, this will delete the existing file and create a new one.
201+
@param num_threads The number of parallel threads to spawn during indexing. If `None`, defaults to the number
202+
of available CPUs.
199203
"""
200204
self.close()
201205

202206
self.reader = MixedLogReader(input_file=path, save_index=save_index, ignore_index=ignore_index,
203-
return_bytes=True, return_message_index=True)
207+
return_bytes=True, return_message_index=True, num_threads=num_threads)
204208

205209
# Read the first message (with P1 time) in the file to set self.t0.
206210
#

python/fusion_engine_client/parsers/fast_indexer.py

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -149,6 +149,9 @@ def fast_generate_index(
149149
150150
@return The loaded or generated @ref FileIndex.
151151
"""
152+
if num_threads is None:
153+
num_threads = cpu_count()
154+
152155
file_size = os.stat(input_path).st_size
153156
_logger.debug(f'File size: {int(file_size/1024/1024)}MB')
154157

@@ -191,9 +194,12 @@ def fast_generate_index(
191194
_logger.debug(f'Reads/thread: {blocks_per_thread}')
192195

193196
# Create a threadpool.
194-
with Pool(num_threads) as p:
195-
# Kick off the threads to process with their args. Then concatenate their returned data.
196-
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])
197+
if num_threads > 1:
198+
with Pool(num_threads) as p:
199+
# Kick off the threads to process with their args. Then concatenate their returned data.
200+
index_raw = np.concatenate([o for o in p.starmap(_search_blocks_for_fe, args)])
201+
else:
202+
index_raw = _search_blocks_for_fe(*args[0])
197203

198204
# Some messages may encapsulate other complete FE messages. Normally, these
199205
# are ignored. However, if a message straddles one of the processing blocks,

python/fusion_engine_client/parsers/mixed_log_reader.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import copy
44
from datetime import datetime
5+
import io
56
import os
67
import sys
78

@@ -22,7 +23,8 @@ class MixedLogReader(object):
2223
logger = logging.getLogger('point_one.fusion_engine.parsers.mixed_log_reader')
2324

2425
def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool = False,
25-
save_index: bool = True, ignore_index: bool = False, max_bytes: int = None,
26+
save_index: bool = True, ignore_index: bool = False, num_threads: int = None,
27+
max_bytes: int = None,
2628
time_range: TimeRange = None, message_types: Union[Iterable[MessageType], MessageType] = None,
2729
source_ids: Optional[Iterable[int]] = None, return_header: bool = True,
2830
return_payload: bool = True, return_bytes: bool = False, return_offset: bool = False,
@@ -41,6 +43,8 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool =
4143
See @ref FileIndex for details. Ignored if `max_bytes` is specified.
4244
@param ignore_index If `True`, ignore the existing index file and read from the binary file directly. If
4345
`save_index == True`, this will delete the existing file and create a new one.
46+
@param num_threads The number of parallel threads to spawn during indexing. If `None`, defaults to the number
47+
of available CPUs.
4448
@param max_bytes If specified, read up to the maximum number of bytes.
4549
@param time_range An optional @ref TimeRange object specifying desired start and end time bounds of the data to
4650
be read. See @ref TimeRange for more details.
@@ -103,10 +107,10 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool =
103107
self.start_time = datetime.now()
104108

105109
# Open the file to be read.
106-
if isinstance(input_file, str):
107-
self.input_file = open(input_file, 'rb')
108-
else:
110+
if isinstance(input_file, io.IOBase):
109111
self.input_file = input_file
112+
else:
113+
self.input_file = open(input_file, 'rb')
110114

111115
input_path = self.input_file.name
112116
self.file_size_bytes = os.stat(input_path).st_size
@@ -118,7 +122,8 @@ def __init__(self, input_file, warn_on_gaps: bool = False, show_progress: bool =
118122

119123
# Open the companion index file if one exists, otherwise index the file.
120124
self._original_index = fast_indexer.fast_generate_index(input_path, force_reindex=ignore_index,
121-
save_index=save_index, max_bytes=max_bytes)
125+
save_index=save_index, max_bytes=max_bytes,
126+
num_threads=num_threads)
122127
self.next_index_elem = 0
123128
self.index = self._original_index
124129
self.filtered_message_types = False

0 commit comments

Comments
 (0)