Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
e494935
TaskServerInterface implementation
Jul 9, 2025
2cd9431
Revert "TaskServerInterface implementation"
Jul 9, 2025
cb6cdf6
TaskServerInterface implementation
MarcusHsieh Jul 9, 2025
d0634c9
Merge branch 'MarcusHsieh/task_server' of https://github.com/MarcusHs…
MarcusHsieh Jul 9, 2025
3a0011c
Fix styling issues
MarcusHsieh Jul 9, 2025
4858552
FIX: Chain + File path
MarcusHsieh Jul 15, 2025
4d6a556
FIX: Chord membership + Dependency chain
MarcusHsieh Jul 15, 2025
c1004de
FIX: Working feature_demo
MarcusHsieh Jul 24, 2025
c7949f3
resolve conflicts
bgunnar5 Jul 15, 2025
50743c5
resolve conflicts
bgunnar5 Jul 15, 2025
6daae0f
mocked more items to try to fix broken tests on github
bgunnar5 Jul 16, 2025
dd5d19d
create MerlinWorker and CeleryWorker classes to handle the launching …
bgunnar5 Jul 21, 2025
032405c
implement worker-handler related classes
bgunnar5 Jul 23, 2025
cc327aa
add worker factory class and small cleanup to the rest of the worker …
bgunnar5 Jul 23, 2025
15a3386
remove functions that are now in the new worker files
bgunnar5 Jul 23, 2025
c4cbe13
link the new worker classes to the actual launching of workers
bgunnar5 Jul 23, 2025
d2d324b
add files that we'll need for this refactor
bgunnar5 Jul 15, 2025
9a81f88
fix regex in test
bgunnar5 Jul 23, 2025
caddb71
remove watchdog files and run fix-style
bgunnar5 Jul 23, 2025
547e3c5
fix tests that broke after refactor
bgunnar5 Jul 23, 2025
8b212eb
run fix-style
bgunnar5 Jul 23, 2025
fcc6b8e
update CHANGELOG
bgunnar5 Jul 23, 2025
4d84bd8
run fix-style
bgunnar5 Aug 7, 2025
be71c82
fix worker-related factory classes
bgunnar5 Aug 7, 2025
1a8c79d
+ Python 3.8+ compatibility
MarcusHsieh Aug 7, 2025
534769e
add tests for new workers files
bgunnar5 Aug 7, 2025
272a0a7
change imports for Celery worker and handler in tests
bgunnar5 Aug 7, 2025
4d5efd4
run fix-style
bgunnar5 Aug 7, 2025
799d667
attempt to fix broken unit tests
bgunnar5 Aug 7, 2025
bfe02a2
fix style
bgunnar5 Aug 7, 2025
0a4de9c
fix susbcriptable error in test
bgunnar5 Aug 7, 2025
b15b600
Backend independence for Celery/Kafka \ Not complete
MarcusHsieh Aug 11, 2025
80f4d43
Merge bgunnar5-refactor/worker-handling into MarcusHsieh/tasks_refactor
MarcusHsieh Aug 11, 2025
ef81d45
WIP Backend independence
MarcusHsieh Aug 12, 2025
5a5e732
Backend independence
MarcusHsieh Aug 13, 2025
c9bb9ec
Optimized Signatures Implementation \\ Task execution with universal …
MarcusHsieh Aug 18, 2025
70ba2a6
Universal Task System Integration with Celery
MarcusHsieh Aug 19, 2025
32cb808
Fix: Test imports and broken references
MarcusHsieh Aug 19, 2025
9e5d768
Fix kafka worker test compatibility and error handling
MarcusHsieh Aug 19, 2025
bec8f5e
Complete kafka worker test suite
MarcusHsieh Aug 19, 2025
7420677
Remove docker
MarcusHsieh Aug 19, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,12 @@ docs/build/
.tox/*
.coverage
**/setup_env.sh
app.yaml/
merlin_server/
.merlin/
*.out
*.sbatch
*.yaml

# Jupyter
jupyter/.ipynb_checkpoints
Expand All @@ -63,6 +69,7 @@ jupyter/testDistributedSamples.py
*.db
*.npy
*.log
*.txt

# IDEs
*.idea
Expand All @@ -71,4 +78,4 @@ jupyter/testDistributedSamples.py
dist/
build/
.DS_Store
.vscode/
.vscode/
8 changes: 8 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,18 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Unit tests for the `spec/` folder
- A page in the docs explaining the `feature_demo` example
- New `MerlinBaseFactory` class to help enable future plugins for backends, monitors, status renderers, etc.
- New worker related classes:
- `MerlinWorker`: base class for defining task server workers
- `CeleryWorker`: implementation of `MerlinWorker` specifically for Celery workers
- `WorkerFactory`: to help determine which task server worker to use
- `MerlinWorkerHandler`: base class for managing launching, stopping, and querying multiple workers
- `CeleryWorkerHandler`: implementation of `MerlinWorkerHandler` specifically for manager Celery workers
- `WorkerHandlerFactory`: to help determine which task server handler to use

### Changed
- Maestro version requirement is now at minimum 1.1.10 for status renderer changes
- The `BackendFactory`, `MonitorFactory`, and `StatusRendererFactory` classes all now inherit from `MerlinBaseFactory`
- Launching workers is now handled through worker classes rather than functions in the `celeryadapter.py` file

## [1.13.0b2]
### Added
Expand Down
9 changes: 9 additions & 0 deletions merlin/adapters/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
##############################################################################
# Copyright (c) Lawrence Livermore National Security, LLC and other Merlin
# Project developers. See top-level LICENSE and COPYRIGHT files for dates and
# other details. No copyright assignment is required to contribute to Merlin.
##############################################################################

"""
Backend-specific adapters for task execution.
"""
179 changes: 179 additions & 0 deletions merlin/adapters/signature_adapters.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
##############################################################################
# Copyright (c) Lawrence Livermore National Security, LLC and other Merlin
# Project developers. See top-level LICENSE and COPYRIGHT files for dates and
# other details. No copyright assignment is required to contribute to Merlin.
##############################################################################

"""
Backend-specific adapters for task execution.
"""

from abc import ABC, abstractmethod
from typing import Any, Dict, List, Optional
from merlin.factories.task_definition import UniversalTaskDefinition


class SignatureAdapter(ABC):
"""Base class for backend-specific signature adapters."""

@abstractmethod
def create_signature(self, task_def: UniversalTaskDefinition) -> Any:
"""Create backend-specific signature from universal definition."""
pass

@abstractmethod
def submit_task(self, signature: Any) -> str:
"""Submit task using backend-specific signature."""
pass

@abstractmethod
def submit_group(self, signatures: List[Any]) -> str:
"""Submit group of tasks."""
pass

@abstractmethod
def submit_chain(self, signatures: List[Any]) -> str:
"""Submit chain of tasks."""
pass

@abstractmethod
def submit_chord(self, parallel_signatures: List[Any], callback_signature: Any) -> str:
"""Submit chord pattern."""
pass


class CelerySignatureAdapter(SignatureAdapter):
"""Adapter for Celery backend."""

def __init__(self, task_registry: Dict[str, Any]):
self.task_registry = task_registry

def create_signature(self, task_def: UniversalTaskDefinition) -> Any:
"""Create Celery signature from universal definition."""

# Get Celery task function
task_func = self._get_task_function(task_def.task_type.value)

# Convert task definition to Celery signature
if task_def.task_type.value == "merlin_step":
return task_func.s(
task_id=task_def.task_id,
script_reference=task_def.script_reference,
config_reference=task_def.config_reference,
workspace_reference=task_def.workspace_reference
).set(
queue=task_def.queue_name,
priority=task_def.priority,
retry=task_def.retry_limit,
time_limit=task_def.timeout_seconds
)
else:
# Handle other task types
return task_func.s(
task_definition=task_def.to_dict()
).set(
queue=task_def.queue_name,
priority=task_def.priority
)

def submit_task(self, signature: Any) -> str:
"""Submit single Celery task."""
result = signature.apply_async()
return result.id

def submit_group(self, signatures: List[Any]) -> str:
"""Submit Celery group."""
from celery import group
job = group(signatures)
result = job.apply_async()
return result.id

def submit_chain(self, signatures: List[Any]) -> str:
"""Submit Celery chain."""
from celery import chain
job = chain(signatures)
result = job.apply_async()
return result.id

def submit_chord(self, parallel_signatures: List[Any], callback_signature: Any) -> str:
"""Submit Celery chord."""
from celery import chord
job = chord(parallel_signatures)(callback_signature)
result = job.apply_async()
return result.id

def _get_task_function(self, task_type: str):
"""Get Celery task function by type."""
return self.task_registry.get(task_type)


class KafkaSignatureAdapter(SignatureAdapter):
"""Adapter for Kafka backend."""

def __init__(self, kafka_producer, topic_manager):
self.producer = kafka_producer
self.topic_manager = topic_manager

def create_signature(self, task_def: UniversalTaskDefinition) -> Dict[str, Any]:
"""Create Kafka message from universal definition."""

# Kafka "signature" is just the optimized message
return {
'task_definition': task_def.to_dict(),
'topic': self.topic_manager.get_topic_for_queue(task_def.queue_name),
'partition_key': task_def.group_id or task_def.task_id
}

def submit_task(self, signature: Dict[str, Any]) -> str:
"""Submit task to Kafka topic."""

future = self.producer.send(
signature['topic'],
value=signature['task_definition'],
key=signature['partition_key']
)

result = future.get(timeout=10)
return f"{result.topic}:{result.partition}:{result.offset}"

def submit_group(self, signatures: List[Dict[str, Any]]) -> str:
"""Submit group of tasks to Kafka."""

# For groups, we need to coordinate across tasks
group_id = signatures[0]['task_definition']['group_id']

# Submit all tasks
task_ids = []
for sig in signatures:
result_id = self.submit_task(sig)
task_ids.append(result_id)

# Create coordination message if there's a callback
callback_tasks = [s for s in signatures
if s['task_definition']['coordination_pattern'] == 'chord']

if callback_tasks:
# Submit callback task with dependencies
for callback_sig in callback_tasks:
self.submit_task(callback_sig)

return group_id

def submit_chain(self, signatures: List[Dict[str, Any]]) -> str:
"""Submit chain of tasks to Kafka."""

# Submit all tasks - dependencies are embedded in task definitions
chain_id = signatures[0]['task_definition']['group_id']

for sig in signatures:
self.submit_task(sig)

return chain_id

def submit_chord(self, parallel_signatures: List[Dict[str, Any]],
callback_signature: Dict[str, Any]) -> str:
"""Submit chord pattern to Kafka."""

# Combine parallel and callback signatures
all_signatures = parallel_signatures + [callback_signature]
return self.submit_group(all_signatures)
49 changes: 31 additions & 18 deletions merlin/cli/commands/run_workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,7 @@
from merlin.cli.commands.command_entry_point import CommandEntryPoint
from merlin.cli.utils import get_merlin_spec_with_override
from merlin.config.configfile import initialize_config
from merlin.db_scripts.merlin_db import MerlinDatabase
from merlin.router import launch_workers
from merlin.workers.handlers.handler_factory import worker_handler_factory


LOG = logging.getLogger("merlin")
Expand Down Expand Up @@ -54,12 +53,19 @@ def add_parser(self, subparsers: ArgumentParser):
)
run_workers.set_defaults(func=self.process_command)
run_workers.add_argument("specification", type=str, help="Path to a Merlin YAML spec file")
run_workers.add_argument(
"--backend",
type=str,
choices=["celery", "kafka"],
default=None,
help="Task server backend to use (overrides spec configuration)"
)
run_workers.add_argument(
"--worker-args",
type=str,
dest="worker_args",
default="",
help="celery worker arguments in quotes.",
help="worker arguments in quotes.",
)
run_workers.add_argument(
"--steps",
Expand Down Expand Up @@ -117,21 +123,28 @@ def process_command(self, args: Namespace):
if not args.worker_echo_only:
LOG.info(f"Launching workers from '{filepath}'")

# Initialize the database
merlin_db = MerlinDatabase()

# Create logical worker entries
step_queue_map = spec.get_task_queues()
for worker, steps in spec.get_worker_step_map().items():
worker_queues = {step_queue_map[step] for step in steps}
merlin_db.create("logical_worker", worker, worker_queues)

# Launch the workers
launch_worker_status = launch_workers(
spec, args.worker_steps, args.worker_args, args.disable_logs, args.worker_echo_only
# Get the names of the workers that the user is requesting to start
workers_to_start = spec.get_workers_to_start(args.worker_steps)

# Build a list of MerlinWorker instances
worker_instances = spec.build_worker_list(workers_to_start)

# Launch the workers or echo out the command that will be used to launch the workers
# Use backend override if provided, otherwise use spec configuration
backend_type = args.backend or spec.merlin["resources"]["task_server"]

worker_handler = worker_handler_factory.create(backend_type)

# For unified interface, call launch_workers with spec instead of pre-built instances
result = worker_handler.launch_workers(
spec=spec,
steps=args.worker_steps,
worker_args=args.worker_args,
disable_logs=args.disable_logs,
just_return_command=args.worker_echo_only
)

if args.worker_echo_only:
print(launch_worker_status)
print(result)
else:
LOG.debug(f"celery command: {launch_worker_status}")
LOG.info(result)
Loading
Loading