-
Notifications
You must be signed in to change notification settings - Fork 72
Scheduler refactor [utils]: multiprocessing encoding and messaging #293
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: feature/refactor/main
Are you sure you want to change the base?
Scheduler refactor [utils]: multiprocessing encoding and messaging #293
Conversation
d463e77
to
02f97ff
Compare
8831554
to
da5effc
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull Request Overview
This PR introduces comprehensive multiprocessing utilities for the scheduler refactor, focusing on encoding and messaging capabilities to optimize data transfer between processes. The implementation provides configurable serialization strategies, binary encoding options, and high-performance interprocess communication mechanisms.
- Adds robust message encoding system with support for Pydantic models, configurable serialization (dict/sequence), and binary encoding (msgpack/msgspec)
- Implements abstract messaging framework with concrete implementations for queues, manager queues, and pipes
- Provides comprehensive test coverage for all new utility modules including smoke, sanity, and regression scenarios
Reviewed Changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
File | Description |
---|---|
tests/unit/utils/test_text.py | Fixes spelling error in test docstring (puncutation → punctuation) |
tests/unit/utils/test_messaging.py | Comprehensive test suite for messaging utilities with multiprocessing scenarios |
tests/unit/utils/test_encoding.py | Complete test coverage for encoding utilities with various data types and serialization strategies |
src/guidellm/utils/messaging.py | Core messaging abstractions and implementations for interprocess communication |
src/guidellm/utils/encoding.py | Message encoding utilities with Pydantic support and configurable serialization/encoding |
src/guidellm/utils/init.py | Exports new encoding and messaging utilities from utils module |
Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.
7b74f29
to
e8300a9
Compare
fc47105
to
b70ee98
Compare
Co-authored-by: Copilot <[email protected]> Signed-off-by: Mark Kurtz <[email protected]>
… for the scheduler refactor
Co-authored-by: Copilot <[email protected]> Signed-off-by: Mark Kurtz <[email protected]>
1ecc54a
to
cff8d91
Compare
Co-authored-by: Samuel Monson <[email protected]> Signed-off-by: Mark Kurtz <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good. Just one minor comment.
async def get(self, timeout: float | None = None) -> ReceiveMessageT: | ||
""" | ||
Retrieve message from receive buffer with optional timeout. | ||
|
||
:param timeout: Maximum time to wait for a message | ||
:return: Decoded message from the receive buffer | ||
""" | ||
return await asyncio.wait_for( | ||
self.buffer_receive_queue.async_get(), timeout=timeout | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this function return a TimeoutError
when it times out? If so I think it would be beneficial to document that on all of these functions that have a timeout.
Summary
This PR introduces four new utility modules that provide improved capabilities for transferring information across processes to maximize performance. This is done through encoding.py to best serialize the data into a transferrable format and minimize the size of the data as well as through messaging.py which supports high performance multiprocessing queues and pipes for transfer that works with the encoding.
Details
MessageEncoding.py
InterProcessMessaging
,InterProcessMessagingManagerQueue
,InterProcessMessagingPipe
,InterProcessMessagingQueue
Test Plan
Related Issues
Use of AI
## WRITTEN BY AI ##
)