diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index fc000590..59f70316 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -769,7 +769,11 @@ if(DFLASH27B_TESTS) DFLASH27B_BACKEND_CUDA=1 DFLASH27B_CUDA_MIN_SM=${_dflash_cuda_min_sm}) endif() - target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread) + if(NOT WIN32) + target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} pthread) + else() + target_link_libraries(dflash_server PRIVATE dflash_common ggml ${DFLASH27B_GGML_BACKEND_TARGET} ws2_32) + endif() if(CURL_FOUND) target_compile_definitions(dflash_server PRIVATE DFLASH_HAS_CURL=1) target_link_libraries(dflash_server PRIVATE CURL::libcurl) diff --git a/server/src/common/daemon_loop.cpp b/server/src/common/daemon_loop.cpp index 19aad462..1abcf0c5 100644 --- a/server/src/common/daemon_loop.cpp +++ b/server/src/common/daemon_loop.cpp @@ -8,9 +8,12 @@ #include "sampler.h" #include +#include +#include #include #include #include +#include #include #include #include @@ -65,23 +68,45 @@ ModelBackend::CompressResult ModelBackend::compress(const CompressRequest & req) if (req.input_ids.empty()) return result; // Write input IDs to temp file (handle_compress reads from file) - char tmp_path[] = "/tmp/pflash_XXXXXX.bin"; - int tmp_fd = mkstemps(tmp_path, 4); - if (tmp_fd < 0) return result; const size_t to_write = req.input_ids.size() * sizeof(int32_t); - const char *src = reinterpret_cast(req.input_ids.data()); - size_t remaining = to_write; - while (remaining > 0) { - ssize_t n = ::write(tmp_fd, src, remaining); - if (n <= 0) { - ::close(tmp_fd); - ::unlink(tmp_path); - return result; + std::string tmp_path; +#if defined(_WIN32) + { + static std::atomic ctr{0}; + const auto uniq = + std::to_string((unsigned long long) + std::chrono::steady_clock::now().time_since_epoch().count()) + + "_" + std::to_string(ctr++); + std::filesystem::path p = + std::filesystem::temp_directory_path() / ("pflash_" + uniq + ".bin"); + tmp_path = p.string(); + FILE * f = std::fopen(tmp_path.c_str(), "wb"); + if (!f) return result; + const size_t w = std::fwrite(req.input_ids.data(), 1, to_write, f); + std::fclose(f); + if (w != to_write) { std::remove(tmp_path.c_str()); return result; } + } +#else + { + char tmpl[] = "/tmp/pflash_XXXXXX.bin"; + int tmp_fd = mkstemps(tmpl, 4); + if (tmp_fd < 0) return result; + tmp_path = tmpl; + const char *src = reinterpret_cast(req.input_ids.data()); + size_t remaining = to_write; + while (remaining > 0) { + ssize_t n = ::write(tmp_fd, src, remaining); + if (n <= 0) { + ::close(tmp_fd); + ::unlink(tmp_path.c_str()); + return result; + } + src += n; + remaining -= (size_t)n; } - src += n; - remaining -= (size_t)n; + ::close(tmp_fd); } - ::close(tmp_fd); +#endif // Build collecting DaemonIO DaemonIO io; @@ -98,7 +123,7 @@ ModelBackend::CompressResult ModelBackend::compress(const CompressRequest & req) if (req.skip_park) cmd += " nopark"; result.ok = handle_compress(cmd, io) && !result.compressed_ids.empty(); - ::unlink(tmp_path); + std::remove(tmp_path.c_str()); return result; } diff --git a/server/src/common/io_utils.h b/server/src/common/io_utils.h index cf0c4a46..a4818b4f 100644 --- a/server/src/common/io_utils.h +++ b/server/src/common/io_utils.h @@ -123,6 +123,36 @@ static inline bool write_exact_fd(int fd, const void * data, size_t bytes) { } return true; } +#else +#include +static inline bool read_exact_fd(int fd, void * data, size_t bytes) { + char * p = (char *)data; + size_t done = 0; + while (done < bytes) { + int n = _read(fd, p + done, (unsigned int)(bytes - done > UINT_MAX ? UINT_MAX : bytes - done)); + if (n == 0) return false; + if (n < 0) { + if (errno == EINTR) continue; + return false; + } + done += (size_t)n; + } + return true; +} + +static inline bool write_exact_fd(int fd, const void * data, size_t bytes) { + const char * p = (const char *)data; + size_t done = 0; + while (done < bytes) { + int n = _write(fd, p + done, (unsigned int)(bytes - done > UINT_MAX ? UINT_MAX : bytes - done)); + if (n < 0) { + if (errno == EINTR) continue; + return false; + } + done += (size_t)n; + } + return true; +} #endif // ── Numeric helpers ───────────────────────────────────────────────── diff --git a/server/src/gemma4/gemma4_loader.cpp b/server/src/gemma4/gemma4_loader.cpp index 00be4c8a..642e642e 100644 --- a/server/src/gemma4/gemma4_loader.cpp +++ b/server/src/gemma4/gemma4_loader.cpp @@ -38,9 +38,38 @@ namespace { struct Gemma4Mmap { void * addr = nullptr; size_t len = 0; +#if defined(_WIN32) + HANDLE hFile = INVALID_HANDLE_VALUE; + HANDLE hMap = nullptr; +#else int fd = -1; +#endif bool open_ro(const std::string & path, std::string & err) { +#if defined(_WIN32) + hFile = CreateFileA(path.c_str(), GENERIC_READ, FILE_SHARE_READ, + nullptr, OPEN_EXISTING, FILE_ATTRIBUTE_NORMAL, nullptr); + if (hFile == INVALID_HANDLE_VALUE) { + err = "CreateFileA: " + path + ": error " + std::to_string(GetLastError()); + return false; + } + LARGE_INTEGER sz; + if (!GetFileSizeEx(hFile, &sz)) { + err = "GetFileSizeEx: error " + std::to_string(GetLastError()); + return false; + } + len = (size_t)sz.QuadPart; + hMap = CreateFileMappingA(hFile, nullptr, PAGE_READONLY, 0, 0, nullptr); + if (!hMap) { + err = "CreateFileMappingA: error " + std::to_string(GetLastError()); + return false; + } + addr = MapViewOfFile(hMap, FILE_MAP_READ, 0, 0, 0); + if (!addr) { + err = "MapViewOfFile: error " + std::to_string(GetLastError()); + return false; + } +#else fd = ::open(path.c_str(), O_RDONLY); if (fd < 0) { err = "open: " + path + " " + strerror(errno); return false; } struct stat st; @@ -48,11 +77,18 @@ struct Gemma4Mmap { len = (size_t)st.st_size; addr = ::mmap(nullptr, len, PROT_READ, MAP_PRIVATE, fd, 0); if (addr == MAP_FAILED) { err = "mmap"; addr = nullptr; ::close(fd); fd = -1; return false; } +#endif return true; } void close_map() { +#if defined(_WIN32) + if (addr) { UnmapViewOfFile(addr); addr = nullptr; } + if (hMap) { CloseHandle(hMap); hMap = nullptr; } + if (hFile != INVALID_HANDLE_VALUE) { CloseHandle(hFile); hFile = INVALID_HANDLE_VALUE; } +#else if (addr) { ::munmap(addr, len); addr = nullptr; } if (fd >= 0) { ::close(fd); fd = -1; } +#endif } }; @@ -387,15 +423,25 @@ bool load_gemma4_gguf_partial(const std::string & path, // Set up CPU embedder (keeps mmap alive) out.embedder.mmap_addr = mmap.addr; out.embedder.mmap_len = mmap.len; +#if defined(_WIN32) + out.embedder.mmap_hfile = mmap.hFile; + out.embedder.mmap_hmap = mmap.hMap; +#else out.embedder.mmap_fd = mmap.fd; +#endif out.embedder.tok_embd_bytes = (const uint8_t *)mmap.addr + tok_embd_off; out.embedder.tok_embd_type = tok_embd_type; out.embedder.n_embd = n_embd; out.embedder.n_vocab = (int64_t)n_vocab; out.embedder.row_bytes = tok_embd_sz / (size_t)n_vocab; - // Release mmap ownership to embedder (it will munmap on destruction) + // Release mmap ownership to embedder (it will unmap on destruction) mmap.addr = nullptr; +#if defined(_WIN32) + mmap.hFile = INVALID_HANDLE_VALUE; + mmap.hMap = nullptr; +#else mmap.fd = -1; +#endif // ── Assign tensors to struct ─────────────────────────────────────── out.tok_embd = find_tensor(meta_ctx, "token_embd.weight"); diff --git a/server/src/laguna/laguna_backend.cpp b/server/src/laguna/laguna_backend.cpp index ab75ef5a..c7e63e5f 100644 --- a/server/src/laguna/laguna_backend.cpp +++ b/server/src/laguna/laguna_backend.cpp @@ -31,10 +31,7 @@ #include #include #include -#include -#include -#include -#include +#include "common/gguf_mmap.h" namespace dflash::common { @@ -1703,14 +1700,15 @@ bool LagunaBackend::build_hybrid_storage_from_file( gguf_context * gctx = gguf_init_from_file(args_.target_path.c_str(), gip); if (!gctx) { err = "failed to re-open GGUF for expert loading"; return false; } - int fd = ::open(args_.target_path.c_str(), O_RDONLY); - if (fd < 0) { gguf_free(gctx); err = "open failed for mmap"; return false; } - struct stat st; - if (::fstat(fd, &st) < 0) { ::close(fd); gguf_free(gctx); err = "fstat failed"; return false; } - const size_t file_size = (size_t)st.st_size; - void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0); - ::close(fd); - if (mmap_addr == MAP_FAILED) { gguf_free(gctx); err = "mmap failed"; return false; } + GgufMmap _mf; + std::string _mferr; + if (!_mf.open(args_.target_path, _mferr)) { + gguf_free(gctx); + err = "mmap failed: " + _mferr; + return false; + } + const size_t file_size = _mf.size(); + const void * mmap_addr = _mf.data(); const size_t data_start = gguf_get_data_offset(gctx); const auto * file_bytes = (const uint8_t *)mmap_addr; diff --git a/server/src/qwen35/qwen35_backend.cpp b/server/src/qwen35/qwen35_backend.cpp index 0e6e74bc..70d671ed 100644 --- a/server/src/qwen35/qwen35_backend.cpp +++ b/server/src/qwen35/qwen35_backend.cpp @@ -21,10 +21,13 @@ #include #include #include +#include #include +#if !defined(_WIN32) #include #include #include +#endif namespace dflash::common { @@ -82,6 +85,10 @@ static int dflash_min_tokens_floor() { } static FILE * open_dflash_floor_log() { +#if defined(_WIN32) + // Simple append-mode log on Windows (no file size check). + return std::fopen("dflash_floor.log", "a"); +#else static constexpr const char * kPath = "/tmp/dflash_floor.log"; static constexpr off_t kMaxBytes = 1024 * 1024; @@ -115,6 +122,7 @@ static FILE * open_dflash_floor_log() { FILE * out = fdopen(fd, "a"); if (!out) ::close(fd); return out; +#endif } } // namespace diff --git a/server/src/qwen35moe/qwen35moe_backend.cpp b/server/src/qwen35moe/qwen35moe_backend.cpp index 6455eac5..cfd73f66 100644 --- a/server/src/qwen35moe/qwen35moe_backend.cpp +++ b/server/src/qwen35moe/qwen35moe_backend.cpp @@ -19,10 +19,7 @@ #include #include #include -#include -#include -#include -#include +#include "common/gguf_mmap.h" namespace dflash::common { @@ -99,27 +96,15 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & } // Mmap the file - int fd = ::open(cfg_.target_path, O_RDONLY); - if (fd < 0) { - set_last_error("failed to open GGUF file for mmap"); - gguf_free(gctx); - return false; - } - struct stat st; - if (::fstat(fd, &st) < 0) { - ::close(fd); - set_last_error("fstat failed on GGUF"); - gguf_free(gctx); - return false; - } - const size_t file_size = (size_t)st.st_size; - void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0); - ::close(fd); - if (mmap_addr == MAP_FAILED) { - set_last_error("mmap failed on GGUF"); + GgufMmap _mf; + std::string _mferr; + if (!_mf.open(cfg_.target_path, _mferr)) { + set_last_error("mmap failed on GGUF: " + _mferr); gguf_free(gctx); return false; } + const size_t file_size = _mf.size(); + const void * mmap_addr = _mf.data(); const size_t data_start = gguf_get_data_offset(gctx); const auto * file_bytes = (const uint8_t *)mmap_addr; @@ -155,10 +140,9 @@ bool Qwen35MoeBackend::load_target_model(ggml_backend_t backend, TargetWeights & layer_descs[(size_t)il] = make_moe_layer_desc(out.layers[(size_t)il]); } int cache_slots = 0; - if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs)); - else if (cache_slots_ >= 0) cache_slots = cache_slots_; - if (!build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend, placement, layer_descs, layer_file_data, mmap_addr, file_size, *hybrid, &err, cache_slots)) { - ::munmap(mmap_addr, file_size); + if (const char * cs = std::getenv("DFLASH_QWEN35MOE_CACHE_SLOTS")) cache_slots = std::max(0, std::atoi(cs)); + else if (cache_slots_ >= 0) cache_slots = cache_slots_; + if (!build_moe_hybrid_storage_from_file_with_mmap(hybrid_cfg, backend, placement, layer_descs, layer_file_data, mmap_addr, file_size, *hybrid, &err, cache_slots)) { gguf_free(gctx); set_last_error(std::string("qwen35moe hybrid storage build failed: ") + err); return false; @@ -261,14 +245,15 @@ bool Qwen35MoeBackend::rebuild_hybrid_from_placement(const MoeHybridPlacement & gguf_init_params gip{}; gguf_context * gctx = gguf_init_from_file(cfg_.target_path, gip); if (!gctx) { err = "gguf reinit failed"; return false; } - int fd = ::open(cfg_.target_path, O_RDONLY); - if (fd < 0) { gguf_free(gctx); err = "open failed"; return false; } - struct stat st; - if (::fstat(fd, &st) < 0) { ::close(fd); gguf_free(gctx); err = "fstat failed"; return false; } - const size_t file_size = (size_t)st.st_size; - void * mmap_addr = ::mmap(nullptr, file_size, PROT_READ, MAP_PRIVATE, fd, 0); - ::close(fd); - if (mmap_addr == MAP_FAILED) { gguf_free(gctx); err = "mmap failed"; return false; } + GgufMmap _mf; + std::string _mferr; + if (!_mf.open(cfg_.target_path, _mferr)) { + gguf_free(gctx); + err = "mmap failed: " + _mferr; + return false; + } + const size_t file_size = _mf.size(); + const void * mmap_addr = _mf.data(); const size_t data_start = gguf_get_data_offset(gctx); const auto * file_bytes = (const uint8_t *)mmap_addr; @@ -305,7 +290,6 @@ bool Qwen35MoeBackend::rebuild_hybrid_from_placement(const MoeHybridPlacement & const bool ok = build_moe_hybrid_storage_from_file(hybrid_cfg, backend, placement, layer_descs, layer_file_data, *hybrid, &err, cache_slots); - ::munmap(mmap_addr, file_size); gguf_free(gctx); if (!ok) return false; out.moe_hybrid = std::move(hybrid); diff --git a/server/src/server/disk_prefix_cache.cpp b/server/src/server/disk_prefix_cache.cpp index e2316c9d..ca320240 100644 --- a/server/src/server/disk_prefix_cache.cpp +++ b/server/src/server/disk_prefix_cache.cpp @@ -12,12 +12,13 @@ #include #include #include -#include -#include -#include +#include +#include namespace dflash::common { +namespace fs = std::filesystem; + // ─── Inline SHA-1 (same as prefix_cache.cpp) ──────────────────────────── static void sha1_hash(const void * data, size_t len, uint8_t out[20]) { @@ -88,14 +89,10 @@ static std::string hex(const uint8_t * data, int len) { } static bool mkdir_p(const std::string & path) { - struct stat st{}; - if (stat(path.c_str(), &st) == 0) return S_ISDIR(st.st_mode); - // Try to create parent first. - size_t slash = path.rfind('/'); - if (slash != std::string::npos && slash > 0) { - mkdir_p(path.substr(0, slash)); - } - return mkdir(path.c_str(), 0755) == 0 || errno == EEXIST; + std::error_code ec; + if (fs::is_directory(path, ec)) return true; + fs::create_directories(path, ec); + return fs::is_directory(path, ec); } static uint64_t now_unix() { @@ -213,16 +210,13 @@ void DiskPrefixCache::scan_directory() { if (layout_dir_.empty()) return; - DIR * dir = opendir(layout_dir_.c_str()); - if (!dir) return; - - struct dirent * ent; - while ((ent = readdir(dir)) != nullptr) { - const char * name = ent->d_name; - size_t nlen = std::strlen(name); - if (nlen < 36 || std::strcmp(name + nlen - 4, ".dkv") != 0) continue; + std::error_code ec; + for (const auto & de : fs::directory_iterator(layout_dir_, ec)) { + const std::string name = de.path().filename().string(); + size_t nlen = name.size(); + if (nlen < 36 || name.compare(nlen - 4, 4, ".dkv") != 0) continue; - std::string path = layout_dir_ + "/" + name; + std::string path = de.path().string(); FILE * f = std::fopen(path.c_str(), "rb"); if (!f) continue; @@ -241,15 +235,13 @@ void DiskPrefixCache::scan_directory() { entry.cur_pos = hdr.cur_pos; entry.last_used = hdr.last_used; - struct stat st{}; - if (stat(path.c_str(), &st) == 0) { - entry.file_size = (uint64_t)st.st_size; - } + std::error_code fec; + auto fsz = fs::file_size(path, fec); + if (!fec) entry.file_size = (uint64_t)fsz; total_bytes_ += entry.file_size; entries_.push_back(std::move(entry)); } - closedir(dir); std::fprintf(stderr, "[disk-cache] scanned %zu files, %.1f MB\n", entries_.size(), (double)total_bytes_ / (1024.0 * 1024.0)); @@ -259,27 +251,23 @@ void DiskPrefixCache::scan_directory() { void DiskPrefixCache::try_learn_from_disk() { // Scan cache_dir for subdirectories (each is a layout fingerprint). - DIR * dir = opendir(config_.cache_dir.c_str()); - if (!dir) return; - - struct dirent * ent; - while ((ent = readdir(dir)) != nullptr) { - if (ent->d_name[0] == '.') continue; - std::string subdir = config_.cache_dir + "/" + ent->d_name; - struct stat st{}; - if (stat(subdir.c_str(), &st) != 0 || !S_ISDIR(st.st_mode)) continue; + std::error_code ec; + for (const auto & de : fs::directory_iterator(config_.cache_dir, ec)) { + const std::string base = de.path().filename().string(); + if (!base.empty() && base[0] == '.') continue; + std::error_code dec; + if (!de.is_directory(dec)) continue; + const std::string subdir = de.path().string(); // Check if this subdir has any .dkv files. - DIR * sub = opendir(subdir.c_str()); - if (!sub) continue; - - struct dirent * sent; - while ((sent = readdir(sub)) != nullptr) { - size_t nlen = std::strlen(sent->d_name); - if (nlen < 4 || std::strcmp(sent->d_name + nlen - 4, ".dkv") != 0) continue; + std::error_code sec; + for (const auto & se : fs::directory_iterator(subdir, sec)) { + const std::string sname = se.path().filename().string(); + size_t nlen = sname.size(); + if (nlen < 4 || sname.compare(nlen - 4, 4, ".dkv") != 0) continue; // Read the header to get the layout_id. - std::string fpath = subdir + "/" + sent->d_name; + std::string fpath = se.path().string(); FILE * f = std::fopen(fpath.c_str(), "rb"); if (!f) continue; @@ -290,16 +278,12 @@ void DiskPrefixCache::try_learn_from_disk() { layout_from_disk_ = true; // unverified — must be confirmed by learn_layout() layout_dir_ = subdir; std::fclose(f); - closedir(sub); - closedir(dir); scan_directory(); return; } std::fclose(f); } - closedir(sub); } - closedir(dir); } // ─── Lookup ───────────────────────────────────────────────────────────── @@ -423,8 +407,9 @@ bool DiskPrefixCache::save(int slot, const std::vector & prompt_ids) { entry.cur_pos = (uint32_t)ref.cur_pos; entry.last_used = now_unix(); entry.created_at = entry.last_used; - struct stat st{}; - if (stat(path.c_str(), &st) == 0) entry.file_size = (uint64_t)st.st_size; + std::error_code fec; + auto fsz = fs::file_size(path, fec); + if (!fec) entry.file_size = (uint64_t)fsz; total_bytes_ += entry.file_size; entries_.push_back(std::move(entry)); diff --git a/server/src/server/http_server.cpp b/server/src/server/http_server.cpp index 3602e7d4..3c250980 100644 --- a/server/src/server/http_server.cpp +++ b/server/src/server/http_server.cpp @@ -3,6 +3,19 @@ // Core infrastructure: socket listen/accept, client threads, HTTP parsing, // job queue, worker thread with SSE streaming and disconnect detection. +// On Windows, winsock2.h must be included BEFORE windows.h (which comes +// transitively via internal.h → http_server.h). Classic MSVC ordering. +#if defined(_WIN32) +#if !defined(NOMINMAX) +#define NOMINMAX +#endif +#if !defined(WIN32_LEAN_AND_MEAN) +#define WIN32_LEAN_AND_MEAN +#endif +#include +#include +#endif + #include "http_server.h" #include "sse_emitter.h" #include "prompt_normalize.h" @@ -22,15 +35,46 @@ #include #include +#if defined(_WIN32) +#include +#include +typedef long ssize_t; +#define MSG_NOSIGNAL 0 +#define MSG_DONTWAIT 0 +#define SHUT_RDWR SD_BOTH +#define socklen_t int +#define poll(fds,nfds,timeout) WSAPoll(fds,nfds,timeout) +#define SOCK_FD(fd) ((SOCKET)(fd)) +// Replace fcntl(F_GETFL) / fcntl(F_SETFL, O_NONBLOCK) with ioctlsocket +static inline int sock_get_flags(int fd) { (void)fd; return 0; /* stub */ } +static inline void sock_set_nonblock(int fd) { u_long m = 1; ioctlsocket(SOCK_FD(fd), FIONBIO, &m); } +static inline void sock_set_block(int fd) { u_long m = 0; ioctlsocket(SOCK_FD(fd), FIONBIO, &m); } +static inline void socket_close(int fd) { closesocket(SOCK_FD(fd)); } +#define SETSOCKOPT_CAST (const char *) +static inline const char* sock_strerror() { + static thread_local char buf[64]; + // On Windows, use FormatMessage for WSA errors + snprintf(buf, sizeof(buf), "WSA error %d", WSAGetLastError()); + return buf; +} +#else +#include +static inline int sock_get_flags(int fd) { return fcntl(fd, F_GETFL, 0); } +static inline void sock_set_nonblock(int fd) { fcntl(fd, F_SETFL, O_NONBLOCK); } +static inline void socket_close(int fd) { ::close(fd); } +#define SETSOCKOPT_CAST /* empty on POSIX */ +#include +static inline const char* sock_strerror() { return sock_strerror(); } #include -#include #include #include #include #include #include +#include #include #include +#endif namespace dflash::common { @@ -773,15 +817,28 @@ std::string HttpServer::resolve_status_html() { struct stat st; if (::stat(path.c_str(), &st) == 0) return path; } - // 2. share/ relative to /proc/self/exe (build dir or installed prefix) + // 2. share/ relative to exe path (build dir or installed prefix) + { + std::string exe_dir; +#if defined(_WIN32) + char exe_buf[MAX_PATH] = {}; + DWORD n = GetModuleFileNameA(nullptr, exe_buf, sizeof(exe_buf)); + if (n > 0 && n < sizeof(exe_buf)) { + exe_dir = std::string(exe_buf, n); + auto slash = exe_dir.find_last_of("/\\"); + if (slash != std::string::npos) exe_dir = exe_dir.substr(0, slash); + } +#else char exe_buf[1024] = {}; ssize_t len = ::readlink("/proc/self/exe", exe_buf, sizeof(exe_buf) - 1); if (len > 0) { exe_buf[len] = '\0'; - std::string exe_dir(exe_buf); + exe_dir = exe_buf; auto slash = exe_dir.rfind('/'); - if (slash != std::string::npos) { - exe_dir = exe_dir.substr(0, slash); + if (slash != std::string::npos) exe_dir = exe_dir.substr(0, slash); + } +#endif + if (!exe_dir.empty()) { // 2a. /share/status.html (build directory layout) { std::string path = exe_dir + "/share/status.html"; @@ -815,10 +872,10 @@ static bool sse_try_send(int fd, const void * data, size_t len) { deadline - std::chrono::steady_clock::now()).count(); if (remaining <= 0) return false; - struct pollfd pfd = {fd, POLLOUT, 0}; + struct pollfd pfd = {SOCK_FD(fd), POLLOUT, 0}; int ret; do { - ret = poll(&pfd, 1, static_cast(std::min(remaining, (long)50))); + ret = poll(&pfd, 1, (int)(remaining < 50 ? remaining : 50)); } while (ret < 0 && errno == EINTR); if (ret < 0 || (pfd.revents & (POLLERR | POLLHUP | POLLNVAL))) return false; if (ret == 0) continue; @@ -844,7 +901,7 @@ void HttpServer::broadcast_status() { } } for (int fd : dead) { - ::close(fd); + socket_close(fd); sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), sse_fds_.end()); } @@ -867,7 +924,7 @@ void HttpServer::broadcast_token(const std::string & text) { } } for (int fd : dead) { - ::close(fd); + socket_close(fd); sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), sse_fds_.end()); } @@ -888,7 +945,7 @@ void HttpServer::sse_heartbeat() { } } for (int fd : dead) { - ::close(fd); + socket_close(fd); sse_fds_.erase(std::remove(sse_fds_.begin(), sse_fds_.end(), fd), sse_fds_.end()); } @@ -906,7 +963,7 @@ void HttpServer::shutdown() { stopping_.store(true); queue_cv_.notify_all(); if (listen_fd_ >= 0) { - ::close(listen_fd_); + socket_close(listen_fd_); listen_fd_ = -1; } if (worker_thread_.joinable()) { @@ -916,7 +973,7 @@ void HttpServer::shutdown() { // Close SSE client connections. { std::lock_guard lk(sse_mu_); - for (int fd : sse_fds_) ::close(fd); + for (int fd : sse_fds_) socket_close(fd); sse_fds_.clear(); } @@ -950,40 +1007,42 @@ void HttpServer::shutdown() { } int HttpServer::run() { +#if !defined(_WIN32) // Ignore SIGPIPE so send() returns EPIPE instead of killing the process. signal(SIGPIPE, SIG_IGN); +#endif // Create listen socket. listen_fd_ = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd_ < 0) { - std::fprintf(stderr, "[server] socket() failed: %s\n", strerror(errno)); + std::fprintf(stderr, "[server] socket() failed: %s\n", sock_strerror()); return 1; } int yes = 1; - setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(SOCK_FD(listen_fd_), SOL_SOCKET, SO_REUSEADDR, SETSOCKOPT_CAST &yes, sizeof(yes)); struct sockaddr_in sa{}; sa.sin_family = AF_INET; sa.sin_port = htons((uint16_t)config_.port); if (inet_pton(AF_INET, config_.host.c_str(), &sa.sin_addr) != 1) { std::fprintf(stderr, "[server] invalid host address: %s\n", config_.host.c_str()); - ::close(listen_fd_); + socket_close(listen_fd_); listen_fd_ = -1; return 1; } if (bind(listen_fd_, (struct sockaddr *)&sa, sizeof(sa)) < 0) { std::fprintf(stderr, "[server] bind(%s:%d) failed: %s\n", - config_.host.c_str(), config_.port, strerror(errno)); - ::close(listen_fd_); + config_.host.c_str(), config_.port, sock_strerror()); + socket_close(listen_fd_); listen_fd_ = -1; return 1; } if (listen(listen_fd_, 128) < 0) { - std::fprintf(stderr, "[server] listen() failed: %s\n", strerror(errno)); - ::close(listen_fd_); + std::fprintf(stderr, "[server] listen() failed: %s\n", sock_strerror()); + socket_close(listen_fd_); listen_fd_ = -1; return 1; } @@ -992,10 +1051,12 @@ int HttpServer::run() { // timeout. This guarantees the loop exits on SIGTERM/SIGINT regardless of // which thread the signal handler runs on (it only sets the atomic flag). { - int fl = fcntl(listen_fd_, F_GETFL, 0); - if (fl < 0 || fcntl(listen_fd_, F_SETFL, fl | O_NONBLOCK) < 0) { - std::fprintf(stderr, "[server] fcntl(O_NONBLOCK) failed: %s\n", strerror(errno)); - ::close(listen_fd_); + int fl = sock_get_flags(listen_fd_); + if (fl < 0) { /* Windows: ioctlsocket sets non-block directly */ } + sock_set_nonblock(listen_fd_); + if (false) { + std::fprintf(stderr, "[server] fcntl(O_NONBLOCK) failed: %s\n", "n/a"); + socket_close(listen_fd_); listen_fd_ = -1; return 1; } @@ -1009,12 +1070,12 @@ int HttpServer::run() { // Accept loop. while (!stopping_.load()) { - struct pollfd pfd{listen_fd_, POLLIN, 0}; + struct pollfd pfd{SOCK_FD(listen_fd_), POLLIN, 0}; int pr = poll(&pfd, 1, 200 /* ms */); if (pr <= 0) { // 0 = timeout (re-check stopping_); <0 with EINTR = signal. Both loop. if (pr < 0 && errno != EINTR) { - std::fprintf(stderr, "[server] poll() error: %s\n", strerror(errno)); + std::fprintf(stderr, "[server] poll() error: %s\n", sock_strerror()); } continue; } @@ -1025,13 +1086,13 @@ int HttpServer::run() { if (client_fd < 0) { if (stopping_.load()) break; if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) continue; - std::fprintf(stderr, "[server] accept() error: %s\n", strerror(errno)); + std::fprintf(stderr, "[server] accept() error: %s\n", sock_strerror()); continue; } // Disable Nagle for low-latency SSE streaming. int flag = 1; - setsockopt(client_fd, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag)); + setsockopt(SOCK_FD(client_fd), IPPROTO_TCP, TCP_NODELAY, SETSOCKOPT_CAST &flag, sizeof(flag)); // Spawn client thread (detached — client_main owns the fd). active_clients_.fetch_add(1); @@ -1090,21 +1151,21 @@ void HttpServer::handle_client(int fd) { HttpRequest hr; if (!read_http_request(fd, hr)) { send_error(fd, 400, "bad HTTP request"); - ::close(fd); + socket_close(fd); return; } // CORS preflight. if (hr.method == "OPTIONS") { send_response(fd, 204, "", ""); - ::close(fd); + socket_close(fd); return; } // Health check. if (hr.method == "GET" && (hr.path == "/health" || hr.path == "/")) { send_response(fd, 200, "application/json", "{\"status\":\"ok\"}\n"); - ::close(fd); + socket_close(fd); return; } @@ -1112,7 +1173,7 @@ void HttpServer::handle_client(int fd) { if (hr.method == "GET" && hr.path == "/props") { json body = build_props_body(config_, prefix_cache_, tool_memory_); send_response(fd, 200, "application/json", body.dump() + "\n"); - ::close(fd); + socket_close(fd); return; } @@ -1121,19 +1182,19 @@ void HttpServer::handle_client(int fd) { if (status_html_path_.empty()) { send_error(fd, 404, "status.html not found. Set DFLASH_SHARE_DIR or place it in share/status.html"); - ::close(fd); + socket_close(fd); return; } std::ifstream ifs(status_html_path_); if (!ifs.is_open()) { send_error(fd, 500, "failed to open status.html"); - ::close(fd); + socket_close(fd); return; } std::ostringstream oss; oss << ifs.rdbuf(); send_response(fd, 200, "text/html; charset=utf-8", oss.str()); - ::close(fd); + socket_close(fd); return; } @@ -1141,7 +1202,7 @@ void HttpServer::handle_client(int fd) { if (hr.method == "GET" && hr.path == "/status/json") { send_response(fd, 200, "application/json", status_.to_json().dump(-1, ' ', false, json::error_handler_t::replace) + "\n"); - ::close(fd); + socket_close(fd); return; } @@ -1156,7 +1217,7 @@ void HttpServer::handle_client(int fd) { "Access-Control-Allow-Origin: *\r\n" "\r\n"; if (!send_all(fd, headers, std::strlen(headers))) { - ::close(fd); + socket_close(fd); return; } // Send initial state immediately. @@ -1217,7 +1278,7 @@ void HttpServer::handle_client(int fd) { })} }; send_response(fd, 200, "application/json", codex_models.dump() + "\n"); - ::close(fd); + socket_close(fd); return; } json models = { @@ -1232,7 +1293,7 @@ void HttpServer::handle_client(int fd) { })} }; send_response(fd, 200, "application/json", models.dump() + "\n"); - ::close(fd); + socket_close(fd); return; } @@ -1240,7 +1301,7 @@ void HttpServer::handle_client(int fd) { if (!route_request(fd, hr)) { send_error(fd, 404, "unknown endpoint"); } - ::close(fd); + socket_close(fd); } bool HttpServer::route_request(int fd, const HttpRequest & hr) { @@ -1462,7 +1523,7 @@ bool HttpServer::route_request(int fd, const HttpRequest & hr) { // thinking.budget_tokens (if set) wins over reasoning.effort. // Either is clamped to think_max_tokens. if (request_budget_tokens >= 0) { - int eff = std::min(request_budget_tokens, config_.think_max_tokens); + int eff = (std::min)(request_budget_tokens, config_.think_max_tokens); if (request_budget_tokens > config_.think_max_tokens) { std::fprintf(stderr, "[server] thinking.budget_tokens=%d clamped to " @@ -1477,9 +1538,9 @@ bool HttpServer::route_request(int fd, const HttpRequest & hr) { // exceed default_max_tokens (e.g. Qwen3.6 max=81408 with // default=32768) — clients that want that full budget must pass // an explicit max_tokens. Otherwise we narrow silently to fit. - const int max_output_phase1_room = std::max(0, + const int max_output_phase1_room = (std::max)(0, req.max_output - config_.hard_limit_reply_budget); - int eff = std::min(effort_phase1_cap, max_output_phase1_room); + int eff = (std::min)(effort_phase1_cap, max_output_phase1_room); if (effort_phase1_cap > max_output_phase1_room) { // Info-level: this is normal when clients use a tier name but // don't pass an explicit max_tokens. Not a warning. @@ -1494,7 +1555,7 @@ bool HttpServer::route_request(int fd, const HttpRequest & hr) { } // Reply budget: if (request_reply_budget >= 0) { - int eff = std::min(request_reply_budget, config_.hard_limit_reply_budget); + int eff = (std::min)(request_reply_budget, config_.hard_limit_reply_budget); if (request_reply_budget > config_.hard_limit_reply_budget) { std::fprintf(stderr, "[server] thinking.reply_budget=%d clamped to " @@ -1587,8 +1648,8 @@ bool HttpServer::route_request(int fd, const HttpRequest & hr) { req.model.c_str()); // Set socket non-blocking for send() stall detection during streaming. - int flags = fcntl(fd, F_GETFL, 0); - if (flags >= 0) fcntl(fd, F_SETFL, flags | O_NONBLOCK); + int flags = sock_get_flags(fd); + if (flags >= 0) sock_set_nonblock(fd); // Enqueue job and wait for worker. ServerJob job; @@ -1621,7 +1682,7 @@ void HttpServer::worker_loop() { std::string prompt_excerpt; if (!req.prompt_tokens.empty()) { // Decode first ~40 tokens as a prompt excerpt (cheap, bounded). - const int excerpt_len = std::min((int)req.prompt_tokens.size(), 40); + const int excerpt_len = (std::min)((int)req.prompt_tokens.size(), 40); std::vector excerpt_toks(req.prompt_tokens.begin(), req.prompt_tokens.begin() + excerpt_len); prompt_excerpt = tokenizer_.decode(excerpt_toks); @@ -1822,7 +1883,7 @@ void HttpServer::worker_loop() { } } } - float survival = (float)query_kept / std::max(1, (int)query_ids.size()); + float survival = (float)query_kept / (std::max)(1, (int)query_ids.size()); std::fprintf(stderr, "[pflash] query survival: %d/%d (%.0f%%)\n", query_kept, (int)query_ids.size(), survival * 100.0f); if (survival < 0.80f && (int)query_ids.size() < 1000) { @@ -1939,7 +2000,7 @@ void HttpServer::worker_loop() { ? req.per_req_reply_budget : config_.hard_limit_reply_budget; const int n_gen_cap = budget_active - ? std::min(effective_think_ceiling + eff_reply_for_n_gen, req.max_output) + ? (std::min)(effective_think_ceiling + eff_reply_for_n_gen, req.max_output) : req.max_output; GenerateRequest gen_req; @@ -2345,7 +2406,7 @@ void HttpServer::worker_loop() { // prefills the delta beyond the cached prefix, so dividing the full // prompt size by delta time would be wrong. const int prefill_tokens = using_restore - ? std::max(0, (int)effective_prompt.size() - prefix_len) + ? (std::max)(0, (int)effective_prompt.size() - prefix_len) : (int)effective_prompt.size(); perf.prefill_tok_s = (result.prefill_s > 0.0) ? (double)prefill_tokens / result.prefill_s : 0.0; @@ -2626,8 +2687,8 @@ void HttpServer::worker_loop() { resp = {{"text", emitter.accumulated_text()}}; } // Set socket back to blocking for the final send. - int flags = fcntl(fd, F_GETFL, 0); - if (flags >= 0) fcntl(fd, F_SETFL, flags & ~O_NONBLOCK); + int flags = sock_get_flags(fd); + if (flags >= 0) sock_set_block(fd); send_response(fd, 200, "application/json", resp.dump() + "\n"); } @@ -2641,7 +2702,7 @@ void HttpServer::worker_loop() { const double elapsed_s = std::chrono::duration(done_at - started_at).count(); const int result_tokens = (int)result.tokens.size(); - const int out_tokens = std::max(completion_tokens, result_tokens); + const int out_tokens = (std::max)(completion_tokens, result_tokens); const double tok_s = elapsed_s > 0.0 ? out_tokens / elapsed_s : 0.0; const double decode_tok_s = result.decode_s > 0.0 ? out_tokens / result.decode_s : 0.0; @@ -2807,7 +2868,7 @@ bool HttpServer::send_all(int fd, const void * data, size_t len) { deadline - std::chrono::steady_clock::now()).count(); if (remaining <= 0) return false; // stall timeout - struct pollfd pfd = {fd, POLLOUT, 0}; + struct pollfd pfd = {SOCK_FD(fd), POLLOUT, 0}; int timeout = remaining > 50 ? 50 : (int)remaining; int ret; do { diff --git a/server/src/server/http_server.h b/server/src/server/http_server.h index 2220b098..36287c1a 100644 --- a/server/src/server/http_server.h +++ b/server/src/server/http_server.h @@ -34,7 +34,9 @@ #include #include #include +#if !defined(_WIN32) #include +#endif #include #include diff --git a/server/src/server/model_card.cpp b/server/src/server/model_card.cpp index fe3376cc..b1ff82c1 100644 --- a/server/src/server/model_card.cpp +++ b/server/src/server/model_card.cpp @@ -10,16 +10,26 @@ #include #include #include +#include #include #include #include +#if defined(_WIN32) +#if !defined(NOMINMAX) +#define NOMINMAX +#endif +#include +#else #include #include +#endif namespace dflash::common { using json = nlohmann::json; +namespace fs = std::filesystem; + // ── Helpers ───────────────────────────────────────────────────────────── std::string normalize_model_card_stem(const std::string & general_name) { @@ -42,11 +52,20 @@ std::string normalize_model_card_stem(const std::string & general_name) { } static bool file_exists(const std::string & path) { - struct stat st{}; - return ::stat(path.c_str(), &st) == 0 && S_ISREG(st.st_mode); + std::error_code ec; + return fs::is_regular_file(path, ec); } static std::string self_bin_dir() { +#if defined(_WIN32) + char buf[MAX_PATH]; + DWORD n = GetModuleFileNameA(nullptr, buf, sizeof(buf)); + if (n == 0 || n >= sizeof(buf)) return {}; + std::string path(buf, n); + auto slash = path.find_last_of("/\\"); + if (slash == std::string::npos) return {}; + return path.substr(0, slash); +#else char buf[4096]; ssize_t n = ::readlink("/proc/self/exe", buf, sizeof(buf) - 1); if (n <= 0) return {}; @@ -55,6 +74,7 @@ static std::string self_bin_dir() { auto slash = path.find_last_of('/'); if (slash == std::string::npos) return {}; return path.substr(0, slash); +#endif } // Find share/model_cards/ directory. Search order (spec §1 implementation note): @@ -82,8 +102,8 @@ static std::string find_model_cards_dir(const std::string & repo_root_hint) { } for (const auto & c : candidates) { - struct stat st{}; - if (::stat(c.c_str(), &st) == 0 && S_ISDIR(st.st_mode)) { + std::error_code ec; + if (fs::is_directory(c, ec)) { std::fprintf(stderr, "[model_card] using cards dir: %s\n", c.c_str()); return c; } @@ -330,10 +350,10 @@ ModelCard resolve_model_card(const std::string & gguf_path, // Derive think_max_tokens and missing tier values. if (card.hard_limit_reply_budget < 0) card.hard_limit_reply_budget = 0; - card.think_max_tokens = std::max(0, card.max_tokens - card.hard_limit_reply_budget); + card.think_max_tokens = (std::max)(0, card.max_tokens - card.hard_limit_reply_budget); int complex_think_max = card.complex_problem_max_tokens > 0 - ? std::max(0, card.complex_problem_max_tokens - card.hard_limit_reply_budget) + ? (std::max)(0, card.complex_problem_max_tokens - card.hard_limit_reply_budget) : card.think_max_tokens; // For each tier not explicitly set, fill via §3.3 formula. diff --git a/server/src/server/server_main.cpp b/server/src/server/server_main.cpp index 9d75dddb..f4580538 100644 --- a/server/src/server/server_main.cpp +++ b/server/src/server/server_main.cpp @@ -33,6 +33,11 @@ #include #include +#ifdef _WIN32 +#define setenv(name, value, overwrite) _putenv_s(name, value) +#define unsetenv(name) _putenv_s(name, "") +#endif + using namespace dflash::common; // Global server pointer for signal handling. diff --git a/server/test/test_dflash.cpp b/server/test/test_dflash.cpp index b6d4f0be..25f88522 100644 --- a/server/test/test_dflash.cpp +++ b/server/test/test_dflash.cpp @@ -269,6 +269,7 @@ using dflash::common::free_qwen35_layer_split_shards; // ─── Speculative decode — generic loop in common/, qwen35 layer-split adapter. #include "qwen35_layer_split_dflash_target.h" #include "common/dflash_spec_decode.h" +#include "common/gguf_mmap.h" using dflash::common::is_eos_tok; // ─── Layer-split daemon — extracted to src/qwen35/layer_split_daemon.{h,cpp} ─ @@ -1619,19 +1620,14 @@ int main(int argc, char ** argv) { if (!gctx) { std::fprintf(stderr, "[time-breakdown] failed to re-open GGUF for hybrid\n"); } else { - int fd = ::open(target_path, O_RDONLY); - struct stat st_buf; - bool mmap_ok = (fd >= 0 && ::fstat(fd, &st_buf) == 0); - void * mmap_addr = mmap_ok - ? ::mmap(nullptr, (size_t)st_buf.st_size, PROT_READ, MAP_PRIVATE, fd, 0) - : MAP_FAILED; - if (fd >= 0) ::close(fd); - - if (mmap_addr == MAP_FAILED) { - std::fprintf(stderr, "[time-breakdown] mmap failed for hybrid\n"); + dflash::common::GgufMmap _mf; + std::string _mferr; + if (!_mf.open(target_path, _mferr)) { + std::fprintf(stderr, "[time-breakdown] mmap failed for hybrid: %s\n", _mferr.c_str()); gguf_free(gctx); } else { - const size_t file_size = (size_t)st_buf.st_size; + const size_t file_size = _mf.size(); + const void * mmap_addr = _mf.data(); const size_t data_start = gguf_get_data_offset(gctx); const auto * file_bytes = (const uint8_t *)mmap_addr; @@ -1973,7 +1969,6 @@ int main(int argc, char ** argv) { } } - ::munmap(mmap_addr, file_size); gguf_free(gctx); } }