Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,9 @@ add_library(dflash_common STATIC
src/common/spark_corpus.cpp
src/common/moe_hybrid_ffn_eval.cpp
src/common/moe_hybrid_stream.cpp
src/common/cold_ffn_cpu.cpp
src/common/moe_expert_compute.cpp
src/common/moe_expert_compute_cpu.cpp
src/common/moe_expert_compute_ipc.cpp
src/common/moe_hybrid_swap_manager.cpp
src/qwen35/layer_split_forward.cpp
src/qwen35/qwen35_target_shard_ipc.cpp
Expand Down Expand Up @@ -531,7 +533,7 @@ target_link_libraries(dflash_common
ggml-base
nlohmann_json::nlohmann_json
)
# OpenMP for parallel cold FFN kernel (saturate memory bandwidth).
# OpenMP for parallel MoE expert compute kernel (saturate memory bandwidth).
find_package(OpenMP)
if(OpenMP_CXX_FOUND)
target_link_libraries(dflash_common PRIVATE OpenMP::OpenMP_CXX)
Expand Down
72 changes: 66 additions & 6 deletions server/src/common/backend_ipc.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const char * backend_ipc_mode_name(BackendIpcMode mode) {
case BackendIpcMode::DFlashDraft: return "dflash-draft";
case BackendIpcMode::PFlashCompress: return "pflash-compress";
case BackendIpcMode::Qwen35TargetShard: return "qwen35-target-shard";
case BackendIpcMode::MoeExpertCompute: return "moe-expert-compute";
}
return "unknown";
}
Expand All @@ -46,6 +47,10 @@ bool parse_backend_ipc_mode(const std::string & value, BackendIpcMode & out) {
out = BackendIpcMode::Qwen35TargetShard;
return true;
}
if (value == "moe-expert-compute") {
out = BackendIpcMode::MoeExpertCompute;
return true;
}
return false;
}

Expand Down Expand Up @@ -187,7 +192,26 @@ bool BackendIpcProcess::start(const BackendIpcLaunchConfig & cfg) {
}
int32_t status = -1;
if (!read_exact_fd(stream_fd_, &status, sizeof(status)) || status != 0) {
std::fprintf(stderr, "backend-ipc daemon did not become ready (status=%d)\n", status);
int child_status = 0;
const pid_t exited = ::waitpid(pid_, &child_status, WNOHANG);
if (exited == pid_) {
if (WIFEXITED(child_status)) {
std::fprintf(stderr,
"backend-ipc daemon did not become ready (status=%d, exit=%d)\n",
status, WEXITSTATUS(child_status));
} else if (WIFSIGNALED(child_status)) {
std::fprintf(stderr,
"backend-ipc daemon did not become ready (status=%d, signal=%d)\n",
status, WTERMSIG(child_status));
} else {
std::fprintf(stderr,
"backend-ipc daemon did not become ready (status=%d, child-status=%d)\n",
status, child_status);
}
pid_ = -1;
} else {
std::fprintf(stderr, "backend-ipc daemon did not become ready (status=%d)\n", status);
}
close();
return false;
}
Expand Down Expand Up @@ -246,20 +270,56 @@ std::string BackendIpcProcess::next_path(const char * prefix) {
}

bool BackendIpcProcess::write_shared_payload(const void * data, size_t bytes, uint64_t & seq) {
if (!shared_payload_map_ || bytes > shared_payload_capacity_) return false;
if (bytes > 0 && !data) return false;
BackendIpcPayloadSegment segment{data, bytes};
return write_shared_payload_segments(&segment, 1, seq);
}

bool BackendIpcProcess::write_shared_payload_segments(
const BackendIpcPayloadSegment * segments,
size_t n_segments,
uint64_t & seq) {
if (!shared_payload_map_ || (!segments && n_segments > 0)) return false;
size_t bytes = 0;
for (size_t i = 0; i < n_segments; ++i) {
if (segments[i].bytes > 0 && !segments[i].data) return false;
if (!backend_ipc_checked_add_size(bytes, segments[i].bytes, bytes)) {
return false;
}
}
if (bytes > shared_payload_capacity_) return false;
auto * header = static_cast<BackendIpcSharedPayloadHeader *>(shared_payload_map_);
void * payload = static_cast<void *>(
auto * payload = static_cast<char *>(
static_cast<char *>(shared_payload_map_) + backend_ipc_shared_payload_header_bytes());
if (bytes > 0) {
std::memcpy(payload, data, bytes);
size_t off = 0;
for (size_t i = 0; i < n_segments; ++i) {
if (segments[i].bytes > 0) {
std::memcpy(payload + off, segments[i].data, segments[i].bytes);
off += segments[i].bytes;
}
}
seq = ++shared_payload_seq_;
header->bytes = (uint64_t)bytes;
header->sequence = seq;
return true;
}

bool BackendIpcProcess::read_shared_payload(void * data, size_t bytes, uint64_t seq) const {
if (!shared_payload_map_ || bytes > shared_payload_capacity_) return false;
if (bytes > 0 && !data) return false;
const auto * header =
static_cast<const BackendIpcSharedPayloadHeader *>(shared_payload_map_);
if (header->sequence != seq || header->bytes != (uint64_t)bytes) {
return false;
}
const void * payload = static_cast<const void *>(
static_cast<const char *>(shared_payload_map_) +
backend_ipc_shared_payload_header_bytes());
if (bytes > 0) {
std::memcpy(data, payload, bytes);
}
return true;
}

#if !defined(_WIN32)
bool BackendIpcProcess::init_shared_payload(size_t bytes) {
if (bytes == 0) return false;
Expand Down
10 changes: 10 additions & 0 deletions server/src/common/backend_ipc.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ enum class BackendIpcMode {
DFlashDraft,
PFlashCompress,
Qwen35TargetShard,
MoeExpertCompute,
};

const char * backend_ipc_mode_name(BackendIpcMode mode);
Expand Down Expand Up @@ -79,6 +80,11 @@ struct BackendIpcLaunchConfig {
size_t shared_payload_bytes = 0;
};

struct BackendIpcPayloadSegment {
const void * data = nullptr;
size_t bytes = 0;
};

class BackendIpcProcess {
public:
BackendIpcProcess() = default;
Expand All @@ -102,6 +108,10 @@ class BackendIpcProcess {

std::string next_path(const char * prefix);
bool write_shared_payload(const void * data, size_t bytes, uint64_t & seq);
bool write_shared_payload_segments(const BackendIpcPayloadSegment * segments,
size_t n_segments,
uint64_t & seq);
bool read_shared_payload(void * data, size_t bytes, uint64_t seq) const;

private:
#if !defined(_WIN32)
Expand Down
59 changes: 0 additions & 59 deletions server/src/common/cold_ffn_compute.h

This file was deleted.

167 changes: 167 additions & 0 deletions server/src/common/moe_expert_compute.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
#include "moe_expert_compute.h"

#include <algorithm>
#include <cerrno>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <limits>

namespace dflash::common {

namespace {

uint64_t hash_u64(uint64_t h, uint64_t v) {
h ^= v + 0x9e3779b97f4a7c15ULL + (h << 6) + (h >> 2);
return h;
}

const char * nonempty_env(const char * name) {
const char * raw = std::getenv(name);
return raw && *raw ? raw : nullptr;
}

int parse_nonnegative_env(const char * name, int fallback) {
const char * raw = nonempty_env(name);
if (!raw) return fallback;
errno = 0;
char * end = nullptr;
const long value = std::strtol(raw, &end, 10);
if (errno == ERANGE || end == raw || *end != '\0' ||
value < 0 || value > std::numeric_limits<int>::max()) {
return fallback;
}
return (int)value;
}

} // namespace

uint64_t moe_expert_placement_fingerprint(const MoeHybridStorage & hybrid,
int n_layer,
int n_expert,
int n_expert_used) {
uint64_t h = 1469598103934665603ULL;
h = hash_u64(h, (uint64_t)n_layer);
h = hash_u64(h, (uint64_t)n_expert);
h = hash_u64(h, (uint64_t)n_expert_used);
h = hash_u64(h, (uint64_t)hybrid.placement.total_hot);
for (size_t il = 0; il < hybrid.placement.hot_expert_ids.size(); ++il) {
h = hash_u64(h, (uint64_t)il);
for (int32_t expert : hybrid.placement.hot_expert_ids[il]) {
h = hash_u64(h, (uint64_t)(uint32_t)expert);
}
}
return h;
}

std::vector<MoeExpertLayer> make_moe_expert_layers(
const MoeHybridStorage & hybrid,
const std::vector<MoeLayerDesc> & layer_descs) {
std::vector<MoeExpertLayer> layers(hybrid.layers.size());
for (size_t il = 0; il < hybrid.layers.size(); ++il) {
const auto & storage = hybrid.layers[il];
const MoeLayerDesc * desc =
il < layer_descs.size() ? &layer_descs[il] : nullptr;
auto & cl = layers[il];
cl.layer_idx = (int)il;
cl.cold_global_by_local = storage.cold_expert_ids;
cl.fused_gate_up = (storage.gate_up_cold != nullptr);
if (cl.fused_gate_up) {
cl.gate_up_data =
storage.gate_up_cold ? storage.gate_up_cold->data : nullptr;
cl.gate_up_stride =
storage.gate_up_cold ? storage.gate_up_cold->nb[2] : 0;
cl.gate_up_type =
storage.gate_up_cold ? storage.gate_up_cold->type : GGML_TYPE_Q4_K;
cl.gate_up_scale = desc ? desc->ffn_gate_up_exps_s : 1.0f;
} else {
cl.gate_data = storage.gate_cold ? storage.gate_cold->data : nullptr;
cl.up_data = storage.up_cold ? storage.up_cold->data : nullptr;
cl.gate_stride = storage.gate_cold ? storage.gate_cold->nb[2] : 0;
cl.up_stride = storage.up_cold ? storage.up_cold->nb[2] : 0;
cl.gate_type =
storage.gate_cold ? storage.gate_cold->type : GGML_TYPE_Q4_K;
cl.up_type =
storage.up_cold ? storage.up_cold->type : GGML_TYPE_Q4_K;
cl.gate_scale = desc ? desc->ffn_gate_exps_s : 1.0f;
cl.up_scale = desc ? desc->ffn_up_exps_s : 1.0f;
}
cl.down_data = storage.down_cold ? storage.down_cold->data : nullptr;
cl.down_stride = storage.down_cold ? storage.down_cold->nb[2] : 0;
cl.down_type =
storage.down_cold ? storage.down_cold->type : GGML_TYPE_Q4_K;
cl.down_scale = desc ? desc->ffn_down_exps_s : 1.0f;
}
return layers;
}

void MoeExpertComputeRuntime::reset() {
compute.reset();
layers.clear();
target_path.clear();
placement_fingerprint = 0;
}

bool ensure_moe_expert_compute_runtime(
MoeExpertComputeRuntime & runtime,
const MoeExpertComputeRuntimeConfig & cfg,
const MoeHybridStorage & hybrid,
const std::vector<MoeLayerDesc> & layer_descs,
std::string * err) {
if (!cfg.enabled) {
runtime.reset();
return true;
}
if (cfg.n_layer <= 0 || cfg.n_expert <= 0 || cfg.n_expert_used <= 0 ||
cfg.n_embd <= 0 || cfg.n_ff_exp <= 0) {
if (err) *err = "invalid MoE expert compute runtime config";
runtime.reset();
return false;
}

const uint64_t fingerprint =
moe_expert_placement_fingerprint(hybrid, cfg.n_layer, cfg.n_expert,
cfg.n_expert_used);
const bool can_reuse =
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
runtime.compute &&
runtime.compute->healthy() &&
runtime.target_path == cfg.target_path &&
runtime.placement_fingerprint == fingerprint;
if (!can_reuse) {
runtime.compute.reset();
runtime.target_path.clear();
runtime.placement_fingerprint = 0;
}

if (!runtime.compute) {
if (const char * ipc_bin = nonempty_env("DFLASH_MOE_EXPERT_COMPUTE_IPC_BIN")) {
const char * work_dir = nonempty_env("DFLASH_MOE_EXPERT_COMPUTE_IPC_WORK_DIR");
const int remote_gpu =
parse_nonnegative_env("DFLASH_MOE_EXPERT_COMPUTE_IPC_GPU", 0);
const bool required =
parse_nonnegative_env("DFLASH_MOE_EXPERT_COMPUTE_IPC_REQUIRED", 0) != 0;
MoeExpertComputeIpcStartResult remote = make_moe_expert_compute_ipc(
ipc_bin, cfg.target_path, remote_gpu, hybrid.placement,
cfg.n_embd, cfg.n_ff_exp, cfg.n_expert_used,
work_dir ? work_dir : "", required);
if (required && !remote.started_remote) {
if (err) *err = "remote MoE expert compute IPC is required but did not start";
std::fprintf(stderr, "%s %s\n", cfg.log_prefix ? cfg.log_prefix : "[moe-expert-compute]",
err ? err->c_str() : "remote IPC did not start");
runtime.reset();
return false;
}
runtime.compute = std::move(remote.compute);
}
if (!runtime.compute) {
runtime.compute = make_cpu_moe_expert_compute(cfg.n_ff_exp);
}
}

runtime.layers = make_moe_expert_layers(hybrid, layer_descs);
runtime.target_path = cfg.target_path;
runtime.placement_fingerprint = fingerprint;
return true;
}

} // namespace dflash::common
Loading
Loading