Skip to content

Commit a40f090

Browse files
committed
review of task path section
1 parent eac1367 commit a40f090

File tree

1 file changed

+60
-31
lines changed

1 file changed

+60
-31
lines changed

taskpath.rst

Lines changed: 60 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -159,81 +159,110 @@ and then adds a callback onto the executor future to run when the task completes
159159
160160
That callback will fire later as the result comes back. This style of callback is used in a few places to drive state changes asynchronously.
161161

162-
.. index:: Globus Compute
162+
.. index:: Globus Compute, ZMQ, ZeroMQ
163163

164164
HighThroughputExecutor.submit()
165165
===============================
166166

167-
Now lets dig into the high throughput executor. the dataflow kernel hands over control to whichever executor the user configured (the other options are commonly the thread pool executor (link) and work queue (link) although there are a few others included). but for this example we're going to concentrate on the high throughput executor. If you're a Globus Compute fan, this is the layer at which the Globus Compute endpoint attaches to the guts of parsl - so everything before this isn't relevant for Globus Compute, but this bit about the high throughput executor is.
167+
``executor.submit()`` above will send the task to the executor I configured, which is an instance of the High ThroughputExecutor. This is the point at which the task would instead go to Work Queue or one of the other executors, if the configuration was different. I'll cover plugin points like this in more depth in `plugins`.
168168

169-
The data flow kernel will have performed some initialization on the high throughput executor when it started up, in addition to the user-specified configuration at construction time. for now, I'm going to assume that all the parts of the high throughput executor have started up correctly.
169+
The High Throughput Executor consists of a bunch of threads and processes distributed across the various nodes you want to execute tasks on.
170170

171-
.. todo:: perhaps this initialization code is in enough of one place to link to in the DFK code?
171+
Inside the user workflow process, the ``submit`` method packages the task up for execution and sends it on to the :dfn:`interchange` process.
172172

173-
The High Throughput Executor consists of a small part that runs in the user workflow process and then quite a lot of other processes.
173+
Inside the user workflow process, the High Throughput Executor ``submit`` method (`parsl/executors/high_throughput/executor.py, line 632 onwards <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/executor.py#L634>`_) packages the task up for execution and sends it on to the :dfn:`interchange` process:
174174

175-
The first process in the interchange, defined in `parsl/executors/high_throughput/interchange.py <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/interchange.py>`_. This runs on the same host as the user workflow process and offloads task and result queues.
175+
.. code-block:: python
176+
:lineno-start: 666
177+
178+
fut = Future()
179+
fut.parsl_executor_task_id = task_id
180+
self.tasks[task_id] = fut
181+
182+
try:
183+
fn_buf = pack_res_spec_apply_message(func, args, kwargs,
184+
resource_specification=resource_specification,
185+
buffer_threshold=1024 * 1024)
186+
except TypeError:
187+
raise SerializationError(func.__name__)
176188
177-
Beyond that, on each worker node on our HPC system, a copy of the process worker pool will be running. In this example workflow, our local system is the only worker node, so we should only expect to see one process worker pool, on the local system.
189+
msg = {"task_id": task_id, "buffer": fn_buf}
178190
179-
.. index:: ZMQ
191+
# Post task to the outgoing queue
192+
self.outgoing_q.put(msg)
180193
181-
These worker pools connect back to the interchange using two network connections (ZMQ over TCP) - so on the interchange process you'll need 2 fds per node - this is a common limitation to "number of nodes" scalability of Parsl. (see `issue #3022 <https://github.com/Parsl/parsl/issues/3022>`_ for a proposal to use one network connection per worker pool)
194+
# Return the future
195+
return fut
182196
183-
so inside htex.submit:
184-
we're going to:
197+
The steps here are:
185198

186-
* serialize the details of the function invocation (the function, the positional args and the keyword args) into a sequence of bytes. `Later, I'll talk about this in much more depth <pickle>`.
199+
* make the executor future
200+
* map it to the task ID so results handling can find it later
201+
* serialize the task definition (that same triple of function, args, keyword args, along with any resource specification) into a byte stream ``fn_buf`` that is easier to send over the network (see `pickle` later)
202+
* construct a message for the interchange pairing the task ID with that byte stream sequence
203+
* send that message on the outgoing queue to the interchange
204+
* return the (empty) executor future back to the DFK
187205

188-
* send that byte sequence to the interchange over ZMQ
206+
Another piece of code will handle getting results back into that executor future later on.
189207

190-
* create and return an executor future back to the invoking DFK - this is how we're going to signal to the DFK that the task is completed (with a result or failure) so it is part of the propagation route of results all the way back to the user.
208+
All of the different processes involved in the High Throughput Executor communicate using `ZeroMQ <https://zeromq.org/>`_ (ZMQ). I won't talk about that in much depth, but it's a messaging layer that (in High Throughput Executor) delivers messages over TCP/IP. The ``outgoing_q`` above is a ZMQ queue for submitting tasks to the interchange.
191209

192210
.. index:: interchange
193-
High Throughput Executor; interchange
194-
211+
High Throughput Executor; interchange
212+
195213
The Interchange
196214
===============
197215

198-
The interchange matches up tasks with available workers: it has a queue of tasks, and it has a queue of process worker pool managers which are ready for work. so whenever a new task arrives from the user workflow process, or when a manager is ready for work, a match is made. there won't always be available work or available workers so there are queues in the interchange.
216+
The interchange (defined in `parsl/executors/high_throughput/interchange.py <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/interchange.py>`_) runs alongside the user workflow process on the submitting node. It matches up tasks with available workers: it has a queue of tasks, and it has a queue of process worker pool managers which are ready for work.
217+
218+
Whenever it can match a new task (arriving on the outgoing task queue) with a process worker pool that is ready for work, it will send the task onwards to that worker pool. Otherwise, a queue of either ready tasks or ready workers builds up in the interchange.
199219

200220
The matching process so far has been fairly arbitrary but we have been doing some research on better ways to match workers and tasks - I'll talk a little about that later `when talking about scaling in <blocks>`.
201221

202-
So now, the interchange sends the task over one of those two ZMQ-over-TCP connections I talked about earlier - and now the task is on the worker node where it will be run.
222+
The interchange has two ZMQ connections per worker pool (one for sending tasks, one for receiving results) and when this task is matched, the definition will be sent onwards via the relevant per-pool connection.
203223

204224
.. index:: worker pool, pilot jobs
205225
High Throughput Executor; process worker pool
206226

207227
The Process Worker Pool
208228
=======================
209229

210-
The process worker pool is defined in `parsl/executors/high_throughput/process_worker_pool.py <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/process_worker_pool.py>`_.
230+
On each worker node on our HPC system, a copy of the process worker pool will be running - `blocks` will talk about how that comes about. In this example workflow, the local system is the only worker node, so there will only be one worker pool. But in a 1000-node run, there would usually be 1000 worker pools, one running on each of those nodes (although other configurations are possible).
231+
232+
These worker pools connect back to the interchange using two network connections each (ZMQ over TCP) - so on the interchange process you'll need 2 fds per node. This is a common limitation to "number of nodes" scalability of Parsl. (see `issue #3022 <https://github.com/Parsl/parsl/issues/3022>`_ for a proposal to use one network connection per worker pool)
211233

212-
Usually, one copy of the process worker pool runs on each worker node, although other configurations are possible. It consists of a few closely linked processes:
234+
The source code for the process worker pool livces in `parsl/executors/high_throughput/process_worker_pool.py <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/process_worker_pool.py>`_.
235+
236+
The worker pool consists of a few closely linked processes:
213237

214238
* The manager process which interfaces to the interchange (this is why you'll see a jumble of references to managers or worker pools in the code: the manager is the externally facing interface to the worker pool)
215239

216240
* Several worker processes - each worker process is a worker. There are a bunch of configuration parameters and heuristics to decide how many workers to run - this happens near the start of the process worker pool process at `parsl/executors/high_throughput/process_worker_pool.py line 210 <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/process_worker_pool.py#L210>`_. There is one worker per simultaneous task, so usually one per core or one per node (depending on application preference).
217241

218-
The task arrives at the manager, and the manager dispatches it to a free worker. It is possible there isn't a free worker, becuase of the `pre-fetch feature <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/executor.py#L113>`_ which can help in high throughput situations. The task will have to wait in another queue here - ready to start execution when a worker becomes free, without any more network activity.
242+
The task arrives at the manager, and the manager dispatches it to a free worker. It is possible there isn't a free worker, becuase of the `pre-fetch feature <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/executor.py#L113>`_ which can help in high throughput situations. In that case, the task will have to wait in another queue - ready to start execution when a worker becomes free, without any more network activity.
243+
244+
The worker then deserialises the byte package that was originally serialized all the way back in the user submit process, giving python objects for the function to run, the positional arguments and the keyword arguments.
245+
246+
At this point, the worker process can invoke the function with those arguments: the worker pool's ``execute_task`` method handles that at `line 593 <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/process_worker_pool.py#L593>`_
219247

220-
the worker then deserialises the byte package that was originally serialized all the way back in the user submit process: we've got python objects for the function to run, the positional arguments and the keyword arguments.
248+
Now the original function has run! but in a worker that could have been on a different node.
221249

222-
so at this point, we invoke the function with those arguments (link to the ``f(*args, **kwargs)`` line)
250+
The function execution is probably going to end in two ways: a result or an exception (actually there is a common third way, which is that it kills the unix-level worker process for example by using far too much memory or by a library segfault - or by the batch job containing the worker pool reaching the end of its run time - that is handled, but I'm ignoring that here)
223251

224-
and the user code runs! almost, but not quite, as if all of that hadn't happened and we'd just invoked the underlying function without Parsl.
252+
This result needs to be set on the ``AppFuture`` back in the user workflow process. It flows back over network connections that parallel the submitting side: first back to the interchange, and then to piece of the High Throughput Executor running inside the submit process.
225253

226-
it's probably going to end in two ways: a result or an exception
227-
(actually there is a common third way, which is that it kills the unix-level worker process for example by using far too much memory or by a library segfault - or by the batch job containing the worker pool reaching the end of its run time - that is handled, but I'm ignoring that here)
254+
This final part of the High Throughput Executor is less symmetrical: the user workflow script is not necessarily waiting for any results at this point, so the High Throughput Executor runs a second thread to process results, the :dfn:`result queue thread` implemented by `htex._result_queue_worker <https://github.com/Parsl/parsl/blob/3f2bf1865eea16cc44d6b7f8938a1ae1781c61fd/parsl/executors/high_throughput/executor.py#L438>`_. This listens for new results and sets the corresponding executor future.
228255

229-
now we've got the task outcome - either a Python object that is the result, or a Python object that is the exception. We pickle that object and send it back to the manager, then to the interchange (over the *other* ZMQ-over-TCP socket) and then to the high throughput executor submit-side in the user workflow process.
256+
Once the executor future is set, that causes the ``handle_exec_done`` callback in the Data Flow Kernel to run. Some interesting task handling might happen here (see `elaborating` - things like retry handling) but in this example, nothing interesting happens and the DFK sets the ``AppFuture`` result.
230257

231-
Back on the submit side, there's a high throughput executor process running listening on that socket. It gets the result package and sets the result into the executor future. That is the mechanism by which the DFK sees that the executor has finished its work, and so that's where the final bit of "task elaboration" happens - the big elaboration here would be retries on failure, which is basically do that whole HTEX submission again and get a new executor future for the next try. (but other less common elaborations would be storing checkpointing info for this task, and file staging)
258+
Setting the ``AppFuture`` result wakes up the main thread which is sitting blocked in the ``.result()`` part of final bit of the workflow:
232259

233-
.. todo:: code reference to deserializing and setting executor future result
260+
.. code-block:: python
261+
262+
print(add(5,3).result())
234263
235-
When that elaboration is finished (and didn't do a retry), we can set that same result value into the AppFuture which all that long time ago was given to the user. And so now future.result() returns that results (or raises that exception), back in the user workflow, and the user can see the result.
264+
... and the result can be printed.
236265

237-
So now we're at the end of our simple workflow, and we pass out of the parsl context manager. that causes parsl to do various bits of shutdown. and then the user workflow process falls of the bottom and ends.
266+
So now we're at the end of our simple workflow, and we pass out of the parsl context manager. That causes parsl to do various bits of shutdown. and then the user workflow process falls of the bottom and the process ends.
238267

239268
.. todo:: label the various TaskRecord state transitions (there are only a few relevant here) throughout this doc - it will play nicely with the monitoring DB chapter later, to they are reflected not only in the log but also in the monitoring database.

0 commit comments

Comments
 (0)