Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
160 changes: 156 additions & 4 deletions src/server/http_server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@
#include <httplib.h>
#include <nlohmann/json.hpp>

#include <algorithm>
#include <cctype>
#include <cstdio>
#include <cstdlib>
#include <ctime>
#include <exception>
#include <limits>
#include <mutex>
#include <stdexcept>
#include <string>
Expand All @@ -34,6 +38,16 @@ namespace jllm {
// stomp on the KV cache pool.
static std::mutex g_engine_mutex;

struct AgentRequestHints {
bool present = false;
std::string session_id;
int priority = 0;
int osl = 0;
bool speculative_prefill = false;
bool cache_ephemeral = false;
int cache_ttl_seconds = 0;
};

// ── /health response ─────────────────────────────────────────────────────

static json build_health(const Engine& engine) {
Expand Down Expand Up @@ -150,6 +164,115 @@ struct ThinkSplit {
}
};

static bool json_get_int(const json& obj, const char* key, int* out) {
if (!out || !obj.is_object() || !obj.contains(key)) return false;
const auto& v = obj.at(key);
if (!v.is_number_integer() && !v.is_number_unsigned()) return false;
long long raw = v.get<long long>();
raw = std::max<long long>(std::numeric_limits<int>::min(),
std::min<long long>(std::numeric_limits<int>::max(), raw));
*out = (int)raw;
return true;
}

static std::string sanitize_session_id(const std::string& raw) {
std::string out;
out.reserve(std::min<size_t>(raw.size(), 64));
bool last_was_us = false;
for (unsigned char c : raw) {
if (std::isalnum(c) || c == '_' || c == '-') {
out.push_back((char)c);
last_was_us = false;
} else if (!last_was_us) {
out.push_back('_');
last_was_us = true;
}
if (out.size() == 64) break;
}
while (!out.empty() && out.front() == '_') out.erase(out.begin());
while (!out.empty() && out.back() == '_') out.pop_back();
return out;
}

static int parse_ttl_seconds(const json& ttl) {
if (ttl.is_number_integer() || ttl.is_number_unsigned()) {
long long n = ttl.get<long long>();
if (n < 0) return 0;
return (int)std::min<long long>(n, std::numeric_limits<int>::max());
}
if (!ttl.is_string()) return 0;
std::string s = ttl.get<std::string>();
if (s.empty()) return 0;
char unit = s.back();
int multiplier = 1;
if (unit == 's' || unit == 'S') {
multiplier = 1;
s.pop_back();
} else if (unit == 'm' || unit == 'M') {
multiplier = 60;
s.pop_back();
} else if (unit == 'h' || unit == 'H') {
multiplier = 3600;
s.pop_back();
}
char* end = nullptr;
long long n = std::strtoll(s.c_str(), &end, 10);
if (end == s.c_str() || n < 0) return 0;
long long seconds = n * multiplier;
return (int)std::min<long long>(seconds, std::numeric_limits<int>::max());
}

static AgentRequestHints parse_agent_request_hints(const json& body) {
AgentRequestHints out;
if (!body.is_object()) return out;

if (body.contains("conversation_id") && body["conversation_id"].is_string()) {
out.session_id = sanitize_session_id(body["conversation_id"].get<std::string>());
out.present = out.present || !out.session_id.empty();
}

const json* nvext = nullptr;
if (body.contains("nvext") && body["nvext"].is_object()) {
nvext = &body["nvext"];
}
const json* agent_hints = nullptr;
if (nvext && nvext->contains("agent_hints") &&
(*nvext)["agent_hints"].is_object()) {
agent_hints = &(*nvext)["agent_hints"];
}
if (agent_hints) {
out.present = true;
if (agent_hints->contains("session_id") &&
(*agent_hints)["session_id"].is_string()) {
std::string sid =
sanitize_session_id((*agent_hints)["session_id"].get<std::string>());
if (!sid.empty()) out.session_id = sid;
}
(void)json_get_int(*agent_hints, "priority", &out.priority);
if (!json_get_int(*agent_hints, "osl", &out.osl)) {
(void)json_get_int(*agent_hints, "output_sequence_length", &out.osl);
}
if (agent_hints->contains("speculative_prefill") &&
(*agent_hints)["speculative_prefill"].is_boolean()) {
out.speculative_prefill =
(*agent_hints)["speculative_prefill"].get<bool>();
}
}

if (nvext && nvext->contains("cache_control") &&
(*nvext)["cache_control"].is_object()) {
const json& cc = (*nvext)["cache_control"];
out.present = true;
out.cache_ephemeral =
cc.value("type", std::string()) == "ephemeral";
if (cc.contains("ttl")) {
out.cache_ttl_seconds = parse_ttl_seconds(cc["ttl"]);
}
}

return out;
}

// ── Chat completion (non-streaming) ──────────────────────────────────────

static json build_completion(Engine& engine, const std::string& prompt,
Expand Down Expand Up @@ -199,6 +322,25 @@ static json build_completion(Engine& engine, const std::string& prompt,
};
}

static void attach_agent_hint_report(json& completion,
const AgentRequestHints& hints) {
if (!hints.present) return;
json report = json::object();
if (!hints.session_id.empty()) report["session_id"] = hints.session_id;
if (hints.priority != 0) report["priority"] = hints.priority;
if (hints.osl > 0) report["osl"] = hints.osl;
if (hints.speculative_prefill) {
report["speculative_prefill"] = true;
}
if (hints.cache_ephemeral || hints.cache_ttl_seconds > 0) {
report["cache_control"] = {
{"type", hints.cache_ephemeral ? "ephemeral" : "default"},
{"ttl_seconds", hints.cache_ttl_seconds},
};
}
completion["jetson"]["agent_hints"] = report;
}

// Reply with `{ "error": { "message": ..., "type": ... } }` (OpenAI shape).
static void send_error(httplib::Response& res, int code,
const std::string& msg, const std::string& type) {
Expand Down Expand Up @@ -381,7 +523,11 @@ void run_server(Engine& engine, int port, bool default_kv_int8) {
// the wrong format and the model emits garbage tokens.
params.kv_int8 = body.value("kv_int8", default_kv_int8);

AgentRequestHints request_hints = parse_agent_request_hints(body);
std::string conv_id = body.value("conversation_id", "");
if (conv_id.empty() && !request_hints.session_id.empty()) {
conv_id = request_hints.session_id;
}
if (!conv_id.empty()) {
if (validate_conversation_id(conv_id)) {
params.conversation_id = conv_id;
Expand All @@ -396,10 +542,15 @@ void run_server(Engine& engine, int port, bool default_kv_int8) {
const std::string prompt = format_qwen_chat(messages, think);
fprintf(stderr,
"[http] chat request body_bytes=%zu messages=%zu "
"prompt_bytes=%zu stream=%d max_tokens=%d think=%d\n",
"prompt_bytes=%zu stream=%d max_tokens=%d think=%d "
"session=%s priority=%d osl=%d cache_ttl_s=%d\n",
req.body.size(), messages.size(), prompt.size(),
body.value("stream", false) ? 1 : 0,
params.max_tokens, think ? 1 : 0);
params.max_tokens, think ? 1 : 0,
params.conversation_id.empty() ? "-" : params.conversation_id.c_str(),
request_hints.priority,
request_hints.osl,
request_hints.cache_ttl_seconds);

if (body.value("stream", false)) {
// SSE path — mutex is taken inside the chunked-content
Expand All @@ -412,8 +563,9 @@ void run_server(Engine& engine, int port, bool default_kv_int8) {

try {
std::lock_guard<std::mutex> lk(g_engine_mutex);
res.set_content(build_completion(engine, prompt, params).dump(),
"application/json");
json completion = build_completion(engine, prompt, params);
attach_agent_hint_report(completion, request_hints);
res.set_content(completion.dump(), "application/json");
} catch (const std::length_error& e) {
fprintf(stderr, "[http] chat rejected: %s\n", e.what());
send_error(res, 400, e.what(), "invalid_request_error");
Expand Down