Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -769,7 +769,11 @@ if(DFLASH27B_TESTS)
DFLASH27B_BACKEND_CUDA=1
DFLASH27B_CUDA_MIN_SM=${_dflash_cuda_min_sm})
endif()
target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread)
if(NOT WIN32)
target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread)
else()
target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} ws2_32)
endif()
if(CURL_FOUND)
target_compile_definitions(dflash_server PRIVATE DFLASH_HAS_CURL=1)
target_link_libraries(dflash_server PRIVATE CURL::libcurl)
Expand Down
55 changes: 40 additions & 15 deletions server/src/common/daemon_loop.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@
#include "sampler.h"

#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <filesystem>
#include <fstream>
#include <iostream>
#include <sstream>
Expand Down Expand Up @@ -65,23 +68,45 @@ ModelBackend::CompressResult ModelBackend::compress(const CompressRequest & req)
if (req.input_ids.empty()) return result;

// Write input IDs to temp file (handle_compress reads from file)
char tmp_path[] = "/tmp/pflash_XXXXXX.bin";
int tmp_fd = mkstemps(tmp_path, 4);
if (tmp_fd < 0) return result;
const size_t to_write = req.input_ids.size() * sizeof(int32_t);
const char *src = reinterpret_cast<const char *>(req.input_ids.data());
size_t remaining = to_write;
while (remaining > 0) {
ssize_t n = ::write(tmp_fd, src, remaining);
if (n <= 0) {
::close(tmp_fd);
::unlink(tmp_path);
return result;
std::string tmp_path;
#if defined(_WIN32)
{
static std::atomic<unsigned long long> ctr{0};
const auto uniq =
std::to_string((unsigned long long)
std::chrono::steady_clock::now().time_since_epoch().count()) +
"_" + std::to_string(ctr++);
std::filesystem::path p =
std::filesystem::temp_directory_path() / ("pflash_" + uniq + ".bin");
tmp_path = p.string();
FILE * f = std::fopen(tmp_path.c_str(), "wb");
if (!f) return result;
const size_t w = std::fwrite(req.input_ids.data(), 1, to_write, f);
std::fclose(f);
if (w != to_write) { std::remove(tmp_path.c_str()); return result; }
}
#else
{
char tmpl[] = "/tmp/pflash_XXXXXX.bin";
int tmp_fd = mkstemps(tmpl, 4);
if (tmp_fd < 0) return result;
tmp_path = tmpl;
const char *src = reinterpret_cast<const char *>(req.input_ids.data());
size_t remaining = to_write;
while (remaining > 0) {
ssize_t n = ::write(tmp_fd, src, remaining);
if (n <= 0) {
::close(tmp_fd);
::unlink(tmp_path.c_str());
return result;
}
src += n;
remaining -= (size_t)n;
}
src += n;
remaining -= (size_t)n;
::close(tmp_fd);
}
::close(tmp_fd);
#endif

// Build collecting DaemonIO
DaemonIO io;
Expand All @@ -98,7 +123,7 @@ ModelBackend::CompressResult ModelBackend::compress(const CompressRequest & req)
if (req.skip_park) cmd += " nopark";

result.ok = handle_compress(cmd, io) && !result.compressed_ids.empty();
::unlink(tmp_path);
std::remove(tmp_path.c_str());
return result;
}

Expand Down
30 changes: 30 additions & 0 deletions server/src/common/io_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,36 @@ static inline bool write_exact_fd(int fd, const void * data, size_t bytes) {
}
return true;
}
#else
#include <io.h>
static inline bool read_exact_fd(int fd, void * data, size_t bytes) {
char * p = (char *)data;
size_t done = 0;
while (done < bytes) {
int n = _read(fd, p + done, (unsigned int)(bytes - done > UINT_MAX ? UINT_MAX : bytes - done));
if (n == 0) return false;
if (n < 0) {
if (errno == EINTR) continue;
return false;
}
done += (size_t)n;
}
return true;
}

static inline bool write_exact_fd(int fd, const void * data, size_t bytes) {
const char * p = (const char *)data;
size_t done = 0;
while (done < bytes) {
int n = _write(fd, p + done, (unsigned int)(bytes - done > UINT_MAX ? UINT_MAX : bytes - done));
if (n < 0) {
if (errno == EINTR) continue;
return false;
}
done += (size_t)n;
}
return true;
}
#endif

// ── Numeric helpers ─────────────────────────────────────────────────
Expand Down
48 changes: 47 additions & 1 deletion server/src/gemma4/gemma4_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,21 +38,57 @@ namespace {
struct Gemma4Mmap {
void * addr = nullptr;
size_t len = 0;
#if defined(_WIN32)
HANDLE hFile = INVALID_HANDLE_VALUE;
HANDLE hMap = nullptr;
#else
int fd = -1;
#endif

bool open_ro(const std::string & path, std::string & err) {
#if defined(_WIN32)
hFile = CreateFileA(path.c_str(), GENERIC_READ, FILE_SHARE_READ,
nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr);
if (hFile == INVALID_HANDLE_VALUE) {
err = "CreateFileA: " + path + ": error " + std::to_string(GetLastError());
return false;
}
LARGE_INTEGER sz;
if (!GetFileSizeEx(hFile, &sz)) {
err = "GetFileSizeEx: error " + std::to_string(GetLastError());
return false;
}
len = (size_t)sz.QuadPart;
hMap = CreateFileMappingA(hFile, nullptr, PAGE_READONLY, 0, 0, nullptr);
if (!hMap) {
err = "CreateFileMappingA: error " + std::to_string(GetLastError());
return false;
}
addr = MapViewOfFile(hMap, FILE_MAP_READ, 0, 0, 0);
if (!addr) {
err = "MapViewOfFile: error " + std::to_string(GetLastError());
return false;
}
#else
fd = ::open(path.c_str(), O_RDONLY);
if (fd < 0) { err = "open: " + path + " " + strerror(errno); return false; }
struct stat st;
if (fstat(fd, &st) < 0) { err = "fstat"; ::close(fd); fd = -1; return false; }
len = (size_t)st.st_size;
addr = ::mmap(nullptr, len, PROT_READ, MAP_PRIVATE, fd, 0);
if (addr == MAP_FAILED) { err = "mmap"; addr = nullptr; ::close(fd); fd = -1; return false; }
#endif
return true;
}
void close_map() {
#if defined(_WIN32)
if (addr) { UnmapViewOfFile(addr); addr = nullptr; }
if (hMap) { CloseHandle(hMap); hMap = nullptr; }
if (hFile != INVALID_HANDLE_VALUE) { CloseHandle(hFile); hFile = INVALID_HANDLE_VALUE; }
#else
if (addr) { ::munmap(addr, len); addr = nullptr; }
if (fd >= 0) { ::close(fd); fd = -1; }
#endif
}
};

Expand Down Expand Up @@ -387,15 +423,25 @@ bool load_gemma4_gguf_partial(const std::string & path,
// Set up CPU embedder (keeps mmap alive)
out.embedder.mmap_addr = mmap.addr;
out.embedder.mmap_len = mmap.len;
#if defined(_WIN32)
out.embedder.mmap_hfile = mmap.hFile;
out.embedder.mmap_hmap = mmap.hMap;
#else
out.embedder.mmap_fd = mmap.fd;
#endif
out.embedder.tok_embd_bytes = (const uint8_t *)mmap.addr + tok_embd_off;
out.embedder.tok_embd_type = tok_embd_type;
out.embedder.n_embd = n_embd;
out.embedder.n_vocab = (int64_t)n_vocab;
out.embedder.row_bytes = tok_embd_sz / (size_t)n_vocab;
// Release mmap ownership to embedder (it will munmap on destruction)
// Release mmap ownership to embedder (it will unmap on destruction)
mmap.addr = nullptr;
#if defined(_WIN32)
mmap.hFile = INVALID_HANDLE_VALUE;
mmap.hMap = nullptr;
#else
mmap.fd = -1;
#endif

// ── Assign tensors to struct ───────────────────────────────────────
out.tok_embd = find_tensor(meta_ctx, "token_embd.weight");
Expand Down
22 changes: 10 additions & 12 deletions server/src/laguna/laguna_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@
#include <cstring>
#include <fstream>
#include <sstream>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "common/gguf_mmap.h"

namespace dflash::common {

Expand Down Expand Up @@ -1703,14 +1700,15 @@ bool LagunaBackend::build_hybrid_storage_from_file(
gguf_context * gctx = gguf_init_from_file(args_.target_path.c_str(), gip);
if (!gctx) { err = "failed to re-open GGUF for expert loading"; return false; }

int fd = ::open(args_.target_path.c_str(), O_RDONLY);
if (fd < 0) { gguf_free(gctx); err = "open failed for mmap"; return false; }
struct stat st;
if (::fstat(fd, &st) < 0) { ::close(fd); gguf_free(gctx); err = "fstat failed"; return false; }
const size_t file_size = (size_t)st.st_size;
void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
::close(fd);
if (mmap_addr == MAP_FAILED) { gguf_free(gctx); err = "mmap failed"; return false; }
GgufMmap _mf;
std::string _mferr;
if (!_mf.open(args_.target_path, _mferr)) {
gguf_free(gctx);
err = "mmap failed: " + _mferr;
return false;
}
const size_t file_size = _mf.size();
const void * mmap_addr = _mf.data();

const size_t data_start = gguf_get_data_offset(gctx);
const auto * file_bytes = (const uint8_t *)mmap_addr;
Expand Down
8 changes: 8 additions & 0 deletions server/src/qwen35/qwen35_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,13 @@
#include <cstdio>
#include <cstdlib>
#include <cstdint>
#include <cstdio>
#include <cstring>
#if !defined(_WIN32)
#include <fcntl.h>
#include <sys/stat.h>
#include <unistd.h>
#endif

namespace dflash::common {

Expand Down Expand Up @@ -82,6 +85,10 @@ static int dflash_min_tokens_floor() {
}

static FILE * open_dflash_floor_log() {
#if defined(_WIN32)
// Simple append-mode log on Windows (no file size check).
return std::fopen("dflash_floor.log", "a");
#else
static constexpr const char * kPath = "/tmp/dflash_floor.log";
static constexpr off_t kMaxBytes = 1024 * 1024;

Expand Down Expand Up @@ -115,6 +122,7 @@ static FILE * open_dflash_floor_log() {
FILE * out = fdopen(fd, "a");
if (!out) ::close(fd);
return out;
#endif
}
} // namespace

Expand Down
54 changes: 19 additions & 35 deletions server/src/qwen35moe/qwen35moe_backend.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,7 @@
#include <cstdio>
#include <cstdlib>
#include <cstring>
#include <sys/mman.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <unistd.h>
#include "common/gguf_mmap.h"

namespace dflash::common {

Expand Down Expand Up @@ -99,27 +96,15 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights &
}

// Mmap the file
int fd = ::open(cfg_.target_path, O_RDONLY);
if (fd < 0) {
set_last_error("failed to open GGUF file for mmap");
gguf_free(gctx);
return false;
}
struct stat st;
if (::fstat(fd, &st) < 0) {
::close(fd);
set_last_error("fstat failed on GGUF");
gguf_free(gctx);
return false;
}
const size_t file_size = (size_t)st.st_size;
void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
::close(fd);
if (mmap_addr == MAP_FAILED) {
set_last_error("mmap failed on GGUF");
GgufMmap _mf;
std::string _mferr;
if (!_mf.open(cfg_.target_path, _mferr)) {
set_last_error("mmap failed on GGUF: " + _mferr);
gguf_free(gctx);
return false;
}
const size_t file_size = _mf.size();
const void * mmap_addr = _mf.data();

const size_t data_start = gguf_get_data_offset(gctx);
const auto * file_bytes = (const uint8_t *)mmap_addr;
Expand Down Expand Up @@ -155,10 +140,9 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights &
layer_descs[(size_t)il] = make_moe_layer_desc(out.layers[(size_t)il]);
}
int cache_slots = 0;
if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs));
else if (cache_slots_ >= 0) cache_slots = cache_slots_;
if (!build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend, placement, layer_descs, layer_file_data, mmap_addr, file_size, *hybrid, &err, cache_slots)) {
::munmap(mmap_addr, file_size);
if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs));
else if (cache_slots_ >= 0) cache_slots = cache_slots_;
if (!build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend, placement, layer_descs, layer_file_data, mmap_addr, file_size, *hybrid, &err, cache_slots)) {
gguf_free(gctx);
set_last_error(std::string("qwen35moe hybrid storage build failed: ") + err);
return false;
Expand Down Expand Up @@ -261,14 +245,15 @@ bool Qwen35MoeBackend::rebuild_hybrid_from_placement(const MoeHybridPlacement &
gguf_init_params gip{};
gguf_context * gctx = gguf_init_from_file(cfg_.target_path, gip);
if (!gctx) { err = "gguf reinit failed"; return false; }
int fd = ::open(cfg_.target_path, O_RDONLY);
if (fd < 0) { gguf_free(gctx); err = "open failed"; return false; }
struct stat st;
if (::fstat(fd, &st) < 0) { ::close(fd); gguf_free(gctx); err = "fstat failed"; return false; }
const size_t file_size = (size_t)st.st_size;
void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0);
::close(fd);
if (mmap_addr == MAP_FAILED) { gguf_free(gctx); err = "mmap failed"; return false; }
GgufMmap _mf;
std::string _mferr;
if (!_mf.open(cfg_.target_path, _mferr)) {
gguf_free(gctx);
err = "mmap failed: " + _mferr;
return false;
}
const size_t file_size = _mf.size();
const void * mmap_addr = _mf.data();

const size_t data_start = gguf_get_data_offset(gctx);
const auto * file_bytes = (const uint8_t *)mmap_addr;
Expand Down Expand Up @@ -305,7 +290,6 @@ bool Qwen35MoeBackend::rebuild_hybrid_from_placement(const MoeHybridPlacement &

const bool ok = build_moe_hybrid_storage_from_file(hybrid_cfg, backend, placement, layer_descs,
layer_file_data, *hybrid, &err, cache_slots);
::munmap(mmap_addr, file_size);
gguf_free(gctx);
if (!ok) return false;
out.moe_hybrid = std::move(hybrid);
Expand Down
Loading