Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 11 additions & 1 deletion rtp_llm/config/engine_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -331,8 +331,17 @@ def update_worker_addrs(
return
worker_addrs = []
worker_grpc_addrs = []
all_worker_grpc_addrs = [""] * parallelism_config.world_size
local_rank = parallelism_config.local_rank
for member in world_info.members:
member_grpc_addr = f"{member.ip}:{member.rpc_server_port}"
if 0 <= member.world_rank < len(all_worker_grpc_addrs):
all_worker_grpc_addrs[member.world_rank] = member_grpc_addr
else:
logging.warning(
f"world_info member world_rank {member.world_rank} out of range "
f"for world_size {parallelism_config.world_size}"
)
if (
int(
(member.world_rank / parallelism_config.tp_size)
Expand All @@ -343,14 +352,15 @@ def update_worker_addrs(
worker_addrs.append(
f"{member.ip}:{member.cache_store_listen_port}:{member.cache_store_rdma_listen_port}"
)
worker_grpc_addrs.append(f"{member.ip}:{member.rpc_server_port}")
worker_grpc_addrs.append(member_grpc_addr)
logging.info(
f"append member for pd sep "
f"{member.ip}:{member.rpc_server_port}, {member.cache_store_listen_port}, "
f"{member.cache_store_rdma_listen_port} to local rank {local_rank}, world rank {member.world_rank}"
)
runtime_config.worker_grpc_addrs = worker_grpc_addrs
runtime_config.worker_addrs = worker_addrs
runtime_config.all_worker_grpc_addrs = all_worker_grpc_addrs


def setup_pd_sep_config(
Expand Down
8 changes: 8 additions & 0 deletions rtp_llm/config/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,16 @@ class ExceptionType(IntEnum):

# master error
MASTER_NO_AVAILABLE_WORKER = 8400
MASTER_NO_PREFILL_WORKER = 8402
MASTER_NO_DECODE_WORKER = 8403
MASTER_NO_PDFUSION_WORKER = 8404
MASTER_NO_VIT_WORKER = 8405
MASTER_INVALID_REQUEST = 8406

# route error
ROUTER_QUEUE_FULL = 8502
ROUTER_QUEUE_TIMEOUT = 8503
ROUTER_REQUEST_CANCELLED = 8504
ROUTE_ERROR = 8500

# multimodal error
Expand Down
14 changes: 7 additions & 7 deletions rtp_llm/config/generate_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ class RoleAddr(BaseModel):
@field_validator("role", mode="before")
@classmethod
def validate_role(cls, v):
"""Convert string to RoleType enum for deserialization."""
if isinstance(v, str):
return getattr(RoleType, v)
"""Convert proto enum (int) to RoleType enum for deserialization."""
if isinstance(v, int):
return RoleType(v)
elif isinstance(v, RoleType):
return v
elif isinstance(v, str):
return getattr(RoleType, v.upper())
else:
raise ValueError(
f"RoleType must be a string or RoleType enum, got {type(v)}"
f"RoleType must be an int, str, or RoleType enum, got {type(v)}"
)

@field_serializer("role")
Expand Down Expand Up @@ -174,9 +176,7 @@ class GenerateConfig(BaseModel):
enable_memory_cache: bool = True

enable_remote_cache: bool = True
# 是否强制相同 request_id 的 stream 在一批中调度
force_batch: bool = False
batch_group_timeout: Optional[int] = None # ms
group_timeout: Optional[int] = None # ms

unique_key: str = ""

Expand Down
8 changes: 7 additions & 1 deletion rtp_llm/config/py_config_modules.py
Original file line number Diff line number Diff line change
Expand Up @@ -391,15 +391,21 @@ def __init__(self):
self.master_queue_reject_threshold: int = 100000
self.master_default_timeout_ms: int = 3600000
self.master_max_connect_pool_size: int = 100000
self.master_connector_limit_per_host: int = 0
# Session total timeout in seconds. If < 0: auto (3600 when queue mode, 0.5 otherwise).
self.master_session_timeout_s: float = -1
# When True, disable domain fallback routing when master is unavailable or not configured.
# Requests will fail with ROUTE_ERROR instead of falling back to VipServer domain routing.
self.disable_domain_fallback: bool = False

def to_string(self):
return (
f"master_queue_reject_threshold: {self.master_queue_reject_threshold}\n"
f"master_default_timeout_ms: {self.master_default_timeout_ms}\n"
f"master_max_connect_pool_size: {self.master_max_connect_pool_size}\n"
f"master_session_timeout_s: {self.master_session_timeout_s}"
f"master_connector_limit_per_host: {self.master_connector_limit_per_host}\n"
f"master_session_timeout_s: {self.master_session_timeout_s}\n"
f"disable_domain_fallback: {self.disable_domain_fallback}"
)


Expand Down
15 changes: 14 additions & 1 deletion rtp_llm/config/server_config_setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,23 @@ def fetch_model_files_to_local(py_env_configs: PyEnvConfigs):
)


def get_cuda_device_id_for_local_rank(local_rank: int) -> int:
"""Map logical local rank to CUDA device id.

RTP_LLM_LOCAL_DEVICE_OFFSET is only intended for local multi-part smoke tests
that simulate multiple nodes in separate server processes on one host.
"""
return local_rank + int(os.environ.get("RTP_LLM_LOCAL_DEVICE_OFFSET", "0"))


def setup_cuda_device_and_accl_env(local_rank: int) -> None:
"""Apply CUDA device and ACCL env side effects (same as ParallelInfo.from_params)."""
cuda_device_id = get_cuda_device_id_for_local_rank(local_rank)
if torch.cuda.is_available():
torch.cuda.set_device(local_rank)
torch.cuda.set_device(cuda_device_id)
logging.info(
"local rank %s mapped to cuda device %s", local_rank, cuda_device_id
)

if os.environ.get("ACCL_SELECT_PATH") == "1":
select_port = str(local_rank % 2)
Expand Down
2 changes: 1 addition & 1 deletion rtp_llm/cpp/api_server/InferenceService.cc
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ void InferenceService::inferResponse(int64_t
auto input = fillGenerateInput(request_id, req.input_texts[i], req.input_urls[i], req.generate_configs[i]);
inputs.push_back(input);
}
auto ori_streams = engine_->batchEnqueue(inputs);
auto ori_streams = engine_->enqueueMultiple(inputs);
std::vector<std::shared_ptr<GenerateStreamWrapper>> streams;
streams.reserve(ori_streams.size());
for (size_t idx = 0; idx < ori_streams.size(); ++idx) {
Expand Down
2 changes: 1 addition & 1 deletion rtp_llm/cpp/api_server/test/InferenceServiceTest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ TEST_F(InferenceServiceTest, InferResponseSuccess) {
auto mock_stream = CreateMockGenerateStream();
auto stream = std::dynamic_pointer_cast<GenerateStream>(mock_stream);
std::vector<GenerateStreamPtr> streams({stream});
EXPECT_CALL(*mock_engine_, batchEnqueue(Matcher<const std::vector<std::shared_ptr<GenerateInput>>&>(_)))
EXPECT_CALL(*mock_engine_, enqueueMultiple(Matcher<const std::vector<std::shared_ptr<GenerateInput>>&>(_)))
.WillOnce(Return(streams));

// stream
Expand Down
2 changes: 1 addition & 1 deletion rtp_llm/cpp/api_server/test/mock/MockEngineBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ class MockEngineBase: public EngineBase {
public:
MOCK_METHOD1(enqueue, std::shared_ptr<GenerateStream>(const std::shared_ptr<GenerateInput>&));
MOCK_METHOD1(enqueue, void(std::shared_ptr<GenerateStream>&));
MOCK_METHOD1(batchEnqueue,
MOCK_METHOD1(enqueueMultiple,
std::vector<GenerateStreamPtr>(const std::vector<std::shared_ptr<GenerateInput>>& inputs));
MOCK_METHOD0(stop, absl::Status());
MOCK_METHOD2(preRun, absl::StatusOr<GenerateStreamPtr>(const std::shared_ptr<GenerateInput>&, preRunMode));
Expand Down
15 changes: 14 additions & 1 deletion rtp_llm/cpp/config/ConfigModules.cc
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,13 @@ std::string RuntimeConfig::to_string() const {
if (i < worker_addrs.size() - 1)
oss << ", ";
}
oss << "]\n"
<< "all_worker_grpc_addrs: [";
for (size_t i = 0; i < all_worker_grpc_addrs.size(); ++i) {
oss << all_worker_grpc_addrs[i];
if (i < all_worker_grpc_addrs.size() - 1)
oss << ", ";
}
oss << "]\n"
<< "specify_gpu_arch: " << specify_gpu_arch;
return oss.str();
Expand Down Expand Up @@ -588,7 +595,13 @@ std::string PDSepConfig::to_string() const {
<< "load_cache_timeout_ms: " << load_cache_timeout_ms << "\n"
<< "max_rpc_timeout_ms: " << max_rpc_timeout_ms << "\n"
<< "worker_port_offset: " << worker_port_offset << "\n"
<< "decode_entrance: " << decode_entrance;
<< "decode_entrance: " << decode_entrance << "\n"
<< "batch_dispatch_timeout_ms: " << batch_dispatch_timeout_ms << "\n"
<< "batch_prepare_timeout_ms: " << batch_prepare_timeout_ms << "\n"
<< "batch_load_timeout_ms: " << batch_load_timeout_ms << "\n"
<< "prefill_enqueue_pool_size: " << prefill_enqueue_pool_size << "\n"
<< "prefill_worker_lambda_pool_size: " << prefill_worker_lambda_pool_size << "\n"
<< "prefill_slot_pool_size: " << prefill_slot_pool_size;
return oss.str();
}

Expand Down
38 changes: 25 additions & 13 deletions rtp_llm/cpp/config/ConfigModules.h
Original file line number Diff line number Diff line change
Expand Up @@ -175,17 +175,17 @@ struct KVCacheConfig {
bool enable_device_cache = true;
bool enable_memory_cache = false;
// When true, memory-cache H2D/D2H may use split-KV SM scatter/gather (CUDA) when layout is eligible.
bool enable_memory_cache_sm_copy = false;
bool enable_remote_cache = false;
bool write_cache_sync = false;
bool enable_tiered_memory_cache = false;
bool enable_gpu_prefix_tree = true;
bool enable_prefix_tree_memory_cache = true;
bool enable_legacy_memory_connector_fallback = true;
int64_t prefix_tree_memory_state_swa_pool_ratio = 0;
bool enable_memory_cache_sm_copy = false;
bool enable_remote_cache = false;
bool write_cache_sync = false;
bool enable_tiered_memory_cache = false;
bool enable_gpu_prefix_tree = true;
bool enable_prefix_tree_memory_cache = true;
bool enable_legacy_memory_connector_fallback = true;
int64_t prefix_tree_memory_state_swa_pool_ratio = 0;
bool enable_dsv4_state_block_independent_eviction = false;
int64_t device_cache_min_free_blocks = 0;
int load_cache_retry_times = 1; // Maximum retry attempts for load cache transfer failures
int64_t device_cache_min_free_blocks = 0;
int load_cache_retry_times = 1; // Maximum retry attempts for load cache transfer failures

// DSV4 fixed-allocation pool block count. 0 means the fixed regions
// (INDEXER_STATE / CSA_STATE / HCA_STATE / SWA_KV) use the normal
Expand Down Expand Up @@ -371,9 +371,9 @@ struct BatchDecodeSchedulerConfig {
};

struct FIFOSchedulerConfig {
int64_t max_context_batch_size = 1;
int64_t max_batch_tokens_size = 0;
bool cp_force_single_prefill = true;
int64_t max_context_batch_size = 1;
int64_t max_batch_tokens_size = 0;
bool cp_force_single_prefill = true;
int64_t max_inited_kv_cache_streams = 0;
std::string to_string() const;
};
Expand Down Expand Up @@ -405,6 +405,7 @@ struct RuntimeConfig {
std::string model_name = "";
std::vector<std::string> worker_grpc_addrs;
std::vector<std::string> worker_addrs;
std::vector<std::string> all_worker_grpc_addrs;

// Fields merged from PyDeviceResourceConfig
std::string specify_gpu_arch = "";
Expand Down Expand Up @@ -433,6 +434,17 @@ struct PDSepConfig {
int64_t max_rpc_timeout_ms = 2 * 3600 * 1000; // 2h default
int64_t worker_port_offset = 0;
bool decode_entrance = false;
int64_t batch_dispatch_timeout_ms = 60000; // 60s, cross-DP dispatch
int64_t batch_prepare_timeout_ms = 10000; // 10s, prepareAllocateResource
int64_t batch_load_timeout_ms = 10000; // 10s, remoteLoadCacheStart

// ========== Prefill Thread Pool Configuration ==========
// enqueue pool size (L1 DP dispatch, fast ms-level). 0 = use formula default.
int64_t prefill_enqueue_pool_size = 0;
// worker lambda pool size (heavy EnqueueGroup coordination, I/O-bound). 0 = use formula default.
int64_t prefill_worker_lambda_pool_size = 0;
// slot pool size (L2 Prepare + L3 Load + L4 Finish). 0 = use formula default.
int64_t prefill_slot_pool_size = 0;

std::string to_string() const;
};
Expand Down
13 changes: 11 additions & 2 deletions rtp_llm/cpp/disaggregate/cache_store/LoadContext.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,8 @@ void SyncContext::updateResult(bool succes
autil::TimeUtility::currentTimeInMilliSeconds() - start_time_ms_);
}

if (++done_layer_cnt_ == expect_layer_cnt_) {
++done_layer_cnt_;
if (done_layer_cnt_ == expect_layer_cnt_ || !success) {
cond_.notify_all();
}
}
Expand All @@ -81,6 +82,14 @@ void SyncContext::waitDone() {
return;
}

if (error_info_.hasError()) {
RTP_LLM_LOG_INFO("load context wait done on early error: %s, done %d/%d layers",
error_info_.ToString().c_str(),
done_layer_cnt_.load(),
expect_layer_cnt_);
return;
}

if (autil::TimeUtility::currentTimeInMilliSeconds() >= deadline_ms_) {
auto error_code = ErrorCode::CACHE_STORE_LOAD_BUFFER_TIMEOUT;
error_info_ = ErrorInfo(error_code, ErrorCodeToString(error_code));
Expand All @@ -97,7 +106,7 @@ void SyncContext::waitDone() {

// sync wait, safe to use this
if (cond_.wait_for(lock, std::chrono::milliseconds(once_time_ms), [this] {
return done_layer_cnt_ == expect_layer_cnt_;
return done_layer_cnt_ == expect_layer_cnt_ || error_info_.hasError();
})) {
return;
}
Expand Down
33 changes: 33 additions & 0 deletions rtp_llm/cpp/distribute/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,27 @@ cc_library(
copts = copts(),
)

cc_library(
name = "rpc_cpu_tp_broadcaster_hdr",
hdrs = ["RpcCpuTpBroadcaster.h"],
deps = [
"//rtp_llm/cpp/model_rpc:broadcast_manager",
"//rtp_llm/cpp/model_rpc/proto:model_rpc_service_cc_proto",
],
copts = copts(),
)

cc_library(
name = "rpc_cpu_tp_broadcaster",
srcs = ["RpcCpuTpBroadcaster.cc"],
hdrs = ["RpcCpuTpBroadcaster.h"],
deps = [
":rpc_cpu_tp_broadcaster_hdr",
"//rtp_llm/cpp/utils:core_utils",
],
copts = copts(),
)

cc_test(
name = "cpu_tp_broadcaster_test",
srcs = ["test/CpuTpBroadcasterTest.cc"],
Expand All @@ -28,3 +49,15 @@ cc_test(
copts = copts(),
timeout = "short",
)

cc_test(
name = "rpc_cpu_tp_broadcaster_test",
srcs = ["test/RpcCpuTpBroadcasterTest.cc"],
deps = [
":rpc_cpu_tp_broadcaster",
"@com_google_googletest//:gtest",
"@com_google_googletest//:gtest_main",
],
copts = copts(),
timeout = "short",
)
Loading