Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions examples/llm-api/llm_mgmn_llm_distributed.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@
# not supported in Slurm mode, you need to download the model and put it in
# the LOCAL_MODEL directory.

# Optionally, set TLLM_SPAWN_EXTRA_MAIN_PROCESS to 0 to disable spawning extra
# processes to offload the LLM frontend to a separate process. This is more
# stable, but is not recommended for high-throughput streaming generation.

# Adjust the paths to run
export script=$SOURCE_ROOT/examples/llm-api/quickstart_advanced.py

Expand Down
18 changes: 17 additions & 1 deletion tensorrt_llm/executor/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,22 @@ class LlmLauncherEnvs(StrEnum):
# Whether to use periodical responses handler in await_responses
TLLM_EXECUTOR_PERIODICAL_RESP_IN_AWAIT = "TLLM_EXECUTOR_PERIODICAL_RESP_IN_AWAIT"

# Whether to spawn a additional process for the main process, it will optimize
# the performance of the main process. Default is 1.
TLLM_SPAWN_EXTRA_MAIN_PROCESS = "TLLM_SPAWN_EXTRA_MAIN_PROCESS"

# TODO: Add other helpers

@staticmethod
def should_spawn_extra_main_process() -> bool:
return os.environ.get(LlmLauncherEnvs.TLLM_SPAWN_EXTRA_MAIN_PROCESS,
'1') == '1'

@staticmethod
def set_spawn_extra_main_process(value: bool = True):
os.environ[LlmLauncherEnvs.
TLLM_SPAWN_EXTRA_MAIN_PROCESS] = '1' if value else '0'


def get_spawn_proxy_process_ipc_addr_env() -> str | None:
''' Get the IPC address for the spawn proxy process dynamically. '''
Expand All @@ -49,7 +65,7 @@ def create_mpi_comm_session(
n_workers: int) -> RemoteMpiCommSessionClient | MpiPoolSession:
assert mpi_rank(
) == 0, f"create_mpi_comm_session must be called by rank 0, but it was called by rank {mpi_rank()}"
if get_spawn_proxy_process_env():
if LlmLauncherEnvs.should_spawn_extra_main_process():
assert get_spawn_proxy_process_ipc_addr_env(
), f"{LlmLauncherEnvs.TLLM_SPAWN_PROXY_PROCESS_IPC_ADDR} is not set."
logger_debug(
Expand Down
188 changes: 111 additions & 77 deletions tensorrt_llm/llmapi/trtllm-llmapi-launch
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@
set -Eeo pipefail

task_with_command=("$@")

# Whether to spawn a additional process for the main process, it will optimize
# the performance of the main process.
spawn_extra_main_process=${TLLM_SPAWN_EXTRA_MAIN_PROCESS:-1}

native_mpi_rank=$OMPI_COMM_WORLD_RANK
mpi_rank=${SLURM_PROCID:-${OMPI_COMM_WORLD_RANK:-${PMI_RANK:-${PMI_ID:-0}}}}

log_stderr() { echo -e "\033[33m$@\033[0m" >&2; }
log_stderr "mpi_rank: $mpi_rank"

pid=$(ps -o pid= -p $$ | tr -d ' ')

# Tell TRTLLM to spawn a additional process for the Proxy
# Tell TRTLLM to use the MPI Comm Session.
export TLLM_SPAWN_PROXY_PROCESS=1

function mpi_world_size {
Expand Down Expand Up @@ -40,90 +43,121 @@ print(port); s.close()')
export TLLM_SPAWN_PROXY_PROCESS_IPC_HMAC_KEY=$(openssl rand -hex 32)
}

# Invoke the RemoteCommSession Server/Client to run the LLM frontend in a
# separate process, and the main process (MPI rank0) will run the Worker0 task.
# This will optimize the LLM frontend performance, which is critical for the
# streaming generation performance when throughput is high.
function run_with_spawn_extra_main_process {
log_stderr "Rank${mpi_rank} run with spawn extra main process"

if [ -z "$mpi_rank" ] || [ "$mpi_rank" -eq 0 ]; then
log_stderr "Rank${mpi_rank} run ${task_with_command[@]} in background"

export_free_tcp_addr_for_spawn_proxy_process

# MPI doesn't allow spawn a process sharing the MPI environment in a MPI
# process, or duplicate MPI_Init in the child process will cause undefined
# behavior. Thus we need to clean the MPI environment in the parent process
# before spawning the child process, and restore the MPI environment later
# before running MPI operations in the parent process.
mpi_blacklist=(
OMPI_ PMIX_ PMI_ SLURM_ MPI_ UCX_
I_MPI_ HYDRA_ KMP_ MPICH_ MV2_ CRAY_
)

(
# Remove MPI-related variables only in the subshell context
for var in $(compgen -e); do
for prefix in "${mpi_blacklist[@]}"; do
if [[ "$var" == "$prefix"* ]]; then
unset "$var"
break
fi
done
done

export tllm_mpi_size=$(mpi_world_size)
log_stderr "tllm_mpi_size: $tllm_mpi_size"
# Turn off "exit on error" so the following lines always run
set +e

export_free_tcp_addr_for_spawn_proxy_process

if [ -z "$mpi_rank" ] || [ "$mpi_rank" -eq 0 ]; then
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume the following parts are directly moved to run_with_spawn_extra_main_process without changes?

log_stderr "Rank${mpi_rank} run ${task_with_command[@]} in background"

# MPI doesn't allow spawn a process sharing the MPI environment in a MPI
# process, or duplicate MPI_Init in the child process will cause undefined
# behavior. Thus we need to clean the MPI environment in the parent process
# before spawning the child process, and restore the MPI environment later
# before running MPI operations in the parent process.
mpi_blacklist=(
OMPI_ PMIX_ PMI_ SLURM_ MPI_ UCX_
I_MPI_ HYDRA_ KMP_ MPICH_ MV2_ CRAY_
)

(
# Remove MPI-related variables only in the subshell context
for var in $(compgen -e); do
for prefix in "${mpi_blacklist[@]}"; do
if [[ "$var" == "$prefix"* ]]; then
unset "$var"
break
fi
done
done
# Execute the task with cleaned environment
"${task_with_command[@]}"
task_exit_code=$?
log_stderr "Rank${mpi_rank} Task exit code: $task_exit_code"

# Turn off "exit on error" so the following lines always run
set +e
# Stop the MPI Comm server
python3 -m tensorrt_llm.llmapi.mgmn_leader_node --action stop
mpi_exit_code=$?
log_stderr "Rank${mpi_rank} MPI Comm server exit code: $mpi_exit_code"

# Execute the task with cleaned environment
"${task_with_command[@]}"
task_exit_code=$?
log_stderr "Rank${mpi_rank} Task exit code: $task_exit_code"
# Propagate task exit status
if [ $task_exit_code -ne 0 ]; then
exit $task_exit_code
else
exit $mpi_exit_code
fi
) &

# Stop the MPI Comm server
python3 -m tensorrt_llm.llmapi.mgmn_leader_node --action stop
mpi_exit_code=$?
log_stderr "Rank${mpi_rank} MPI Comm server exit code: $mpi_exit_code"
# Turn off "exit on error" so the following lines always run
set +e

# Propagate task exit status
if [ $task_exit_code -ne 0 ]; then
exit $task_exit_code
# Capture subshell PID
subshell_pid=$!
log_stderr "Rank${mpi_rank} Subshell PID: $subshell_pid"

log_stderr "Rank${mpi_rank} run mgmn leader node with mpi_world_size: $(mpi_world_size) ..."
log_stderr "Rank0 host: $HOSTNAME"
python3 -m tensorrt_llm.llmapi.mgmn_leader_node
mgmn_leader_node_exit_code=$?
log_stderr "Rank${mpi_rank} MGMN leader node exit code: $mgmn_leader_node_exit_code"

# Wait for subshell
wait $subshell_pid
# This is subshell's exit code
subshell_exit_code=$?
log_stderr "Rank${mpi_rank} Subshell exit code: $subshell_exit_code"

# Propagate subshell exit status
if [ $subshell_exit_code -ne 0 ]; then
exit $subshell_exit_code
else
exit $mpi_exit_code
exit $mgmn_leader_node_exit_code
fi
) &

# Turn off "exit on error" so the following lines always run
set +e

# Capture subshell PID
subshell_pid=$!
log_stderr "Rank${mpi_rank} Subshell PID: $subshell_pid"

log_stderr "Rank${mpi_rank} run mgmn leader node with mpi_world_size: $(mpi_world_size) ..."
log_stderr "Rank0 host: $HOSTNAME"
python3 -m tensorrt_llm.llmapi.mgmn_leader_node
mgmn_leader_node_exit_code=$?
log_stderr "Rank${mpi_rank} MGMN leader node exit code: $mgmn_leader_node_exit_code"

# Wait for subshell
wait $subshell_pid
# This is subshell's exit code
subshell_exit_code=$?
log_stderr "Rank${mpi_rank} Subshell exit code: $subshell_exit_code"

# Propagate subshell exit status
if [ $subshell_exit_code -ne 0 ]; then
exit $subshell_exit_code
else
exit $mgmn_leader_node_exit_code
# Turn off "exit on error" so the following lines always run
set +e

log_stderr "Rank${mpi_rank} run mgmn worker node with mpi_world_size: $(mpi_world_size) ..."
python3 -m tensorrt_llm.llmapi.mgmn_worker_node
mgmn_worker_node_exit_code=$?
log_stderr "Rank${mpi_rank} MGMN worker node exit code: $mgmn_worker_node_exit_code"

exit $mgmn_worker_node_exit_code
fi
else
# Turn off "exit on error" so the following lines always run
set +e
}

# Run both the LLM frontend and Worker0 task in the main process.
# NOTE, this method is not recommended for high-throughput streaming generation.
function run_without_spawn_extra_main_process {
log_stderr "Rank${mpi_rank} run without spawn extra main process"

if [ -z "$mpi_rank" ] || [ "$mpi_rank" -eq 0 ]; then
"${task_with_command[@]}"
else
python3 -m tensorrt_llm.llmapi.mgmn_worker_node
mgmn_worker_node_exit_code=$?
log_stderr "Rank${mpi_rank} MGMN worker node exit code: $mgmn_worker_node_exit_code"

log_stderr "Rank${mpi_rank} run mgmn worker node with mpi_world_size: $(mpi_world_size) ..."
python3 -m tensorrt_llm.llmapi.mgmn_worker_node
mgmn_worker_node_exit_code=$?
log_stderr "Rank${mpi_rank} MGMN worker node exit code: $mgmn_worker_node_exit_code"
exit $mgmn_worker_node_exit_code
fi
}

exit $mgmn_worker_node_exit_code

# main logic ==
export tllm_mpi_size=$(mpi_world_size)
log_stderr "tllm_mpi_size: $tllm_mpi_size"

if [ "$spawn_extra_main_process" -eq 1 ]; then
run_with_spawn_extra_main_process
else
run_without_spawn_extra_main_process
fi
Loading