diff --git a/Cargo.lock b/Cargo.lock index 976cd77..83334f4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1144,6 +1144,7 @@ dependencies = [ "mizan-wallet", "redis", "serde", + "serde_json", "sha2 0.11.0", "sqlx", "tokio", diff --git a/README.md b/README.md index dc3a283..c75ce37 100644 --- a/README.md +++ b/README.md @@ -19,13 +19,22 @@ provider/model routing, OpenAI-compatible non-streaming and streaming chat, usage metering, credit ledger updates, Redis runtime limits, and Prometheus gateway metrics. +The long-term contract is: + +- Keep API-facing provider output in an OpenAI-compatible shape. +- Support both `/v1/chat/completions` and `/v1/responses` through the same + normalization path. +- Add non-API provider adapters later (CLI/browser session types) without changing + client contract. + This is not a stable/full release yet. The remaining tracked work before a broader MVP is the RTK-backed CLI proxy baseline and durable request/admin audit log foundations. ## MVP Scope -- OpenAI-compatible gateway for `/v1/chat/completions` and `/v1/models` +- OpenAI-compatible gateway for `/v1/chat/completions`, `/v1/responses`, and + `/v1/models` - Admin-managed upstream connections for API providers and local models - User registration, virtual API keys, model access rules, and usage history - Credit accounting based on input/output token prices per 1M tokens diff --git a/crates/mizan-api/Cargo.toml b/crates/mizan-api/Cargo.toml index 5ae8d94..fb437da 100644 --- a/crates/mizan-api/Cargo.toml +++ b/crates/mizan-api/Cargo.toml @@ -15,6 +15,7 @@ futures-util.workspace = true redis.workspace = true sha2.workspace = true serde.workspace = true +serde_json.workspace = true sqlx.workspace = true tokio.workspace = true tower-http.workspace = true diff --git a/crates/mizan-api/src/billing.rs b/crates/mizan-api/src/billing.rs index 824739c..e7533fc 100644 --- a/crates/mizan-api/src/billing.rs +++ b/crates/mizan-api/src/billing.rs @@ -8,13 +8,19 @@ use mizan_metering::UsageChargeInput; use mizan_providers::{ChatMessage, TokenUsage}; use mizan_wallet::{RoutePrice, calculate_usage_charge}; use serde::{Deserialize, Serialize}; +use serde_json::json; use sqlx::{Any, AnyPool, FromRow, Transaction, query, query_as}; +use tracing::warn; use uuid::Uuid; use crate::AppState; use crate::auth::ApiKeyIdentity; +use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; use crate::utils::{from_app_error, prepare_sql, unix_timestamp_string}; +const AUDIT_ACTION_CREDIT_GRANT: &str = "credit_granted"; +const AUDIT_ENTITY_USER: &str = "user"; + const DEFAULT_USAGE_LIST_LIMIT: i64 = 100; const MAX_USAGE_LIST_LIMIT: i64 = 500; const CREDIT_GRANT_REASON: &str = "credit_grant"; @@ -163,6 +169,7 @@ pub async fn list_usage_admin( pub async fn grant_credits( State(state): State, Path(user_id): Path, + Extension(identity): Extension, Json(payload): Json, ) -> BillingHttpResult> { if payload.amount_microcredits <= 0 { @@ -240,6 +247,21 @@ pub async fn grant_credits( ))) })?; + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_CREDIT_GRANT.to_owned(), + entity_type: AUDIT_ENTITY_USER.to_owned(), + entity_id: Some(user_id.to_string()), + payload_json: serialize_payload(json!({ + "amount_microcredits": payload.amount_microcredits, + "reason": reason, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record credit grant audit"); + } + Ok(Json(GrantResponse { user_id, wallet_id: wallet.id, diff --git a/crates/mizan-api/src/gateway.rs b/crates/mizan-api/src/gateway.rs index 62b7e69..0d40a3b 100644 --- a/crates/mizan-api/src/gateway.rs +++ b/crates/mizan-api/src/gateway.rs @@ -1,6 +1,7 @@ use std::convert::Infallible; use std::time::Instant; +use crate::logging::{RequestLogInput, error_code_from_app_error, record_request_log}; use axum::{ Extension, Json, extract::State, @@ -38,6 +39,43 @@ use crate::utils::{decrypt_provider_api_key, from_app_error, now_utc_epoch_secon type GatewayHttpResult = Result)>; +const CHAT_COMPLETIONS_PATH: &str = "/v1/chat/completions"; +const RESPONSES_PATH: &str = "/v1/responses"; +const CHAT_COMPLETIONS_MODEL_FIELD: &str = "chat_completion.model"; +const CHAT_COMPLETIONS_STREAM_FIELD: &str = "chat_completion.stream"; +const CHAT_COMPLETIONS_MAX_TOKENS_FIELD: &str = "chat_completion.max_tokens"; +const RESPONSES_MODEL_FIELD: &str = "responses.model"; +const RESPONSES_STREAM_FIELD: &str = "responses.stream"; +const RESPONSES_MAX_TOKENS_FIELD: &str = "responses.max_tokens"; + +#[derive(Clone, Copy, Debug)] +struct GatewayRequestSpec { + path: &'static str, + kind: &'static str, + model_field: &'static str, + stream_field: &'static str, + max_tokens_field: &'static str, + allow_stream: bool, +} + +const CHAT_COMPLETIONS_SPEC: GatewayRequestSpec = GatewayRequestSpec { + path: CHAT_COMPLETIONS_PATH, + kind: "chat_completion", + model_field: CHAT_COMPLETIONS_MODEL_FIELD, + stream_field: CHAT_COMPLETIONS_STREAM_FIELD, + max_tokens_field: CHAT_COMPLETIONS_MAX_TOKENS_FIELD, + allow_stream: true, +}; + +const RESPONSES_SPEC: GatewayRequestSpec = GatewayRequestSpec { + path: RESPONSES_PATH, + kind: "responses", + model_field: RESPONSES_MODEL_FIELD, + stream_field: RESPONSES_STREAM_FIELD, + max_tokens_field: RESPONSES_MAX_TOKENS_FIELD, + allow_stream: false, +}; + #[derive(Debug, Deserialize)] pub struct ChatCompletionsRequest { pub model: String, @@ -121,6 +159,25 @@ pub async fn chat_completions( Extension(identity): Extension, headers: HeaderMap, Json(payload): Json, +) -> GatewayHttpResult { + chat_completions_impl(state, identity, headers, payload, CHAT_COMPLETIONS_SPEC).await +} + +pub async fn responses( + State(state): State, + Extension(identity): Extension, + headers: HeaderMap, + Json(payload): Json, +) -> GatewayHttpResult { + chat_completions_impl(state, identity, headers, payload, RESPONSES_SPEC).await +} + +async fn chat_completions_impl( + state: AppState, + identity: ApiKeyIdentity, + headers: HeaderMap, + payload: ChatCompletionsRequest, + spec: GatewayRequestSpec, ) -> GatewayHttpResult { let request_id = parse_request_id_header(&headers, "x-request-id").unwrap_or_else(Uuid::now_v7); let trace_id = parse_request_id_header(&headers, "x-trace-id").unwrap_or(request_id); @@ -132,17 +189,55 @@ pub async fn chat_completions( .api_key_id(identity.api_key_id) .request_id(request_id) .trace_id(trace_id) + .method("POST") + .path(spec.path) .streaming(payload.stream) .build(); if public_model.is_empty() { + let app_error = AppError::invalid_config(spec.model_field, "model is required"); + let status = app_error_status_code(&app_error); + let error_code = error_code_from_app_error(&app_error); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + None, + Some(&error_code), + ) + .await; + return Ok(build_error_response( + &context, + status, + Json(ErrorEnvelope::from(&app_error)), + )); + } + + if !spec.allow_stream && payload.stream { + let app_error = AppError::invalid_config( + spec.stream_field, + "stream is not supported for this endpoint yet", + ); + let status = app_error_status_code(&app_error); + let error_code = error_code_from_app_error(&app_error); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + None, + Some(&error_code), + ) + .await; return Ok(build_error_response( &context, - StatusCode::BAD_REQUEST, - Json(ErrorEnvelope::from(&AppError::invalid_config( - "chat_completion.model", - "model is required", - ))), + status, + Json(ErrorEnvelope::from(&app_error)), )); } @@ -150,6 +245,7 @@ pub async fn chat_completions( &state.database, state.database_backend(), state.config.provider_secret_key.as_deref(), + spec.model_field, public_model, ) .instrument(info_span!( @@ -162,7 +258,20 @@ pub async fn chat_completions( { Ok(route) => route, Err(error) => { - let (status, body) = from_app_error(error); + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + None, + Some(&error_code), + ) + .await; + let (_, body) = from_app_error(error); return Ok(build_error_response(&context, status, body)); } }; @@ -175,6 +284,8 @@ pub async fn chat_completions( .trace_id(trace_id) .route(public_model.to_string()) .route_id(route.id) + .method("POST") + .path(spec.path) .provider_id(route.provider_connection_id) .model(route.upstream_model.clone()) .streaming(payload.stream) @@ -187,17 +298,35 @@ pub async fn chat_completions( api_key_id = %context.api_key_id.map_or("unknown".to_owned(), |value| value.to_string()), route = %context.route.clone().unwrap_or_default(), streaming = context.streaming, - "chat completion request", + request_kind = spec.kind, + request_path = spec.path, + "gateway request", ); - let effective_max_tokens = - match resolve_effective_max_tokens(payload.max_tokens, route.max_tokens) { - Ok(max_tokens) => max_tokens, - Err(error) => { - let (status, body) = from_app_error(error); - return Ok(build_error_response(&context, status, body)); - } - }; + let effective_max_tokens = match resolve_effective_max_tokens( + payload.max_tokens, + route.max_tokens, + spec.max_tokens_field, + ) { + Ok(max_tokens) => max_tokens, + Err(error) => { + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + None, + Some(&error_code), + ) + .await; + let (_, body) = from_app_error(error); + return Ok(build_error_response(&context, status, body)); + } + }; let upstream_request = ChatRequest { model: route.upstream_model.clone(), @@ -216,7 +345,7 @@ pub async fn chat_completions( }; let route_price = route.route_price(); let provider = OpenAiCompatibleProvider::new( - provider_name, + provider_name.clone(), route.provider_base_url, route.provider_api_key.clone(), ); @@ -237,6 +366,25 @@ pub async fn chat_completions( )) .await { + warn!( + request_id = %request_id, + error = %error, + "credit preflight failed" + ); + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + let latency_ms = request_started_at.elapsed().as_millis() as u64; + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + Some(&provider_name), + Some(&error_code), + ) + .await; let (status, body) = from_app_error(error); if let Err(error) = billing::record_usage( &state.database, @@ -250,7 +398,7 @@ pub async fn chat_completions( model: public_model.to_string(), usage: prompt_only_usage, status_code: status.as_u16(), - latency_ms: request_started_at.elapsed().as_millis() as u64, + latency_ms, route_price, }, ) @@ -294,14 +442,28 @@ pub async fn chat_completions( { Ok(lease) => lease, Err(error) => { - let (status, body) = from_app_error(error); + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + let latency_ms = request_started_at.elapsed().as_millis() as u64; + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + Some(&provider_name), + Some(&error_code), + ) + .await; + let (_, body) = from_app_error(error); observe_gateway_metrics( &state.metrics, &context, public_model, admission_usage, status, - request_started_at.elapsed().as_millis() as u64, + latency_ms, route_price, ); return Ok(build_error_response(&context, status, body)); @@ -339,9 +501,10 @@ pub async fn chat_completions( Err(error) => { let normalized_error = normalize_provider_error(error, &context, public_model.to_string()); + let error_code = error_code_from_app_error(&normalized_error); + let latency_ms = request_started_at.elapsed().as_millis() as u64; let (status, body) = from_app_error(normalized_error); let usage = billing::estimate_usage(&request_messages, ""); - let latency_ms = request_started_at.elapsed().as_millis() as u64; if let Err(error) = billing::record_usage( &state.database, state.database_backend(), @@ -382,6 +545,17 @@ pub async fn chat_completions( latency_ms, route_price, ); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + Some(&provider_name), + Some(error_code.as_str()), + ) + .await; release_limit_lease(billing_context.limit_lease); return Ok(build_error_response(&context, status, body)); } @@ -413,9 +587,10 @@ pub async fn chat_completions( Err(error) => { let normalized_error = normalize_provider_error(error, &context, public_model.to_string()); + let error_code = error_code_from_app_error(&normalized_error); let (status, body) = from_app_error(normalized_error); - let usage = billing::estimate_usage(&request_messages, ""); let latency_ms = request_started_at.elapsed().as_millis() as u64; + let usage = billing::estimate_usage(&request_messages, ""); if let Err(error) = billing::record_usage( &state.database, state.database_backend(), @@ -456,6 +631,17 @@ pub async fn chat_completions( latency_ms, route_price, ); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + Some(&provider_name), + Some(error_code.as_str()), + ) + .await; release_limit_lease(Some(limit_lease)); return Ok(build_error_response(&context, status, body)); } @@ -491,7 +677,9 @@ pub async fn chat_completions( )) .await { - let (status, body) = from_app_error(error); + let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); + let (_, body) = from_app_error(error); observe_gateway_metrics( &state.metrics, &context, @@ -501,6 +689,17 @@ pub async fn chat_completions( latency_ms, route_price, ); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + status, + Some(public_model), + Some(&provider_name), + Some(error_code.as_str()), + ) + .await; release_limit_lease(Some(limit_lease)); return Ok(build_error_response(&context, status, body)); } @@ -514,6 +713,17 @@ pub async fn chat_completions( latency_ms, route_price, ); + record_gateway_request_completion( + &state.database, + state.database_backend(), + &context, + &request_started_at, + StatusCode::OK, + Some(public_model), + Some(&provider_name), + None, + ) + .await; let response = json_chat_completion_response( &completion_id, public_model.to_string(), @@ -527,6 +737,53 @@ pub async fn chat_completions( Ok(response) } +#[allow(clippy::too_many_arguments)] +async fn record_gateway_request_completion( + database: &AnyPool, + database_backend: DatabaseBackend, + context: &RequestContext, + request_started_at: &Instant, + status: StatusCode, + route_alias: Option<&str>, + provider_alias: Option<&str>, + error_code: Option<&str>, +) { + let route = context + .route + .clone() + .or_else(|| route_alias.map(|value| value.to_string())); + let provider = context + .provider + .clone() + .or_else(|| provider_alias.map(|value| value.to_string())); + let latency_ms = request_started_at.elapsed().as_millis() as u64; + + let database = database.clone(); + let request_log = RequestLogInput { + request_id: context.request_id, + user_id: context.user_id, + api_key_id: context.api_key_id, + provider_id: context.provider_id, + route_id: context.route_id, + method: context.method.clone().unwrap_or_else(|| "POST".to_owned()), + path: context + .path + .clone() + .unwrap_or_else(|| "/v1/chat/completions".to_owned()), + route, + provider, + status_code: status, + latency_ms, + error_code: error_code.map(|value| value.to_string()), + }; + + let _request_log_task = task::spawn(async move { + if let Err(error) = record_request_log(&database, database_backend, &request_log).await { + warn!(error = %error, "failed to persist gateway request log"); + } + }); +} + async fn acquire_runtime_limits( state: &AppState, scopes: Vec, @@ -625,14 +882,12 @@ fn parse_request_id_header(headers: &HeaderMap, header_name: &str) -> Option, route_max_tokens: Option, + field_name: &'static str, ) -> Result, AppError> { match (requested_max_tokens, route_max_tokens) { - (Some(requested), Some(route_limit)) if requested > route_limit => { - Err(AppError::invalid_config( - "chat_completion.max_tokens", - "max_tokens exceeds route limit", - )) - } + (Some(requested), Some(route_limit)) if requested > route_limit => Err( + AppError::invalid_config(field_name, "max_tokens exceeds route limit"), + ), (Some(requested), _) => Ok(Some(requested)), (None, route_limit) => Ok(route_limit), } @@ -845,6 +1100,7 @@ fn build_stream_events( state.route_alias.clone(), ); let status = app_error_status_code(&error); + let error_code = error_code_from_app_error(&error); let usage = state.latest_usage.take().unwrap_or_else(|| { billing::estimate_usage(&state.request_messages, "") }); @@ -874,11 +1130,24 @@ fn build_stream_events( )) .await { + let usage_error_code = error_code_from_app_error(&error); + let usage_status = app_error_status_code(&error); warn!( request_id = %state.context.request_id, error = %error, "failed to persist stream request usage after stream chunk error" ); + let _ = record_gateway_request_completion( + &state.database, + state.database_backend, + &state.context, + &state.request_started_at, + usage_status, + Some(&state.route_alias), + state.context.provider.as_deref(), + Some(usage_error_code.as_str()), + ) + .await; observe_gateway_metrics( &state.metrics, &state.context, @@ -906,6 +1175,17 @@ fn build_stream_events( latency_ms, state.route_price, ); + let _ = record_gateway_request_completion( + &state.database, + state.database_backend, + &state.context, + &state.request_started_at, + status, + Some(&state.route_alias), + state.context.provider.as_deref(), + Some(error_code.as_str()), + ) + .await; release_limit_lease(state.limit_lease.take()); Event::default() .event("error") @@ -952,6 +1232,17 @@ fn build_stream_events( error = %error, "failed to persist stream request usage" ); + let _ = record_gateway_request_completion( + &state.database, + state.database_backend, + &state.context, + &state.request_started_at, + app_error_status_code(&error), + Some(&state.route_alias), + state.context.provider.as_deref(), + Some(error_code_from_app_error(&error).as_str()), + ) + .await; observe_gateway_metrics( &state.metrics, &state.context, @@ -979,6 +1270,17 @@ fn build_stream_events( latency_ms, state.route_price, ); + let _ = record_gateway_request_completion( + &state.database, + state.database_backend, + &state.context, + &state.request_started_at, + StatusCode::OK, + Some(&state.route_alias), + state.context.provider.as_deref(), + None, + ) + .await; release_limit_lease(state.limit_lease.take()); Some((Ok(Event::default().data("[DONE]")), state)) } else { @@ -1054,6 +1356,7 @@ async fn resolve_model_route( database: &AnyPool, database_backend: DatabaseBackend, provider_secret_key: Option<&str>, + request_model_field: &'static str, public_model: &str, ) -> Result { #[derive(Debug, FromRow)] @@ -1094,9 +1397,7 @@ async fn resolve_model_route( .fetch_optional(database) .await .map_err(|error| AppError::infrastructure(error.to_string()))? - .ok_or_else(|| { - AppError::invalid_config("chat_completion.model", "model not found or disabled") - })?; + .ok_or_else(|| AppError::invalid_config(request_model_field, "model not found or disabled"))?; let route_id = resolved.id; let upstream_model = resolved.upstream_model; @@ -1194,16 +1495,22 @@ mod tests { #[test] fn resolve_effective_max_tokens_uses_route_default_and_rejects_overrides() { assert_eq!( - resolve_effective_max_tokens(None, Some(128)).unwrap(), + resolve_effective_max_tokens(None, Some(128), CHAT_COMPLETIONS_MAX_TOKENS_FIELD) + .unwrap(), Some(128) ); assert_eq!( - resolve_effective_max_tokens(Some(64), Some(128)).unwrap(), + resolve_effective_max_tokens(Some(64), Some(128), CHAT_COMPLETIONS_MAX_TOKENS_FIELD) + .unwrap(), Some(64) ); - assert!(resolve_effective_max_tokens(Some(129), Some(128)).is_err()); + assert!( + resolve_effective_max_tokens(Some(129), Some(128), CHAT_COMPLETIONS_MAX_TOKENS_FIELD) + .is_err() + ); assert_eq!( - resolve_effective_max_tokens(Some(64), None).unwrap(), + resolve_effective_max_tokens(Some(64), None, CHAT_COMPLETIONS_MAX_TOKENS_FIELD) + .unwrap(), Some(64) ); } diff --git a/crates/mizan-api/src/lib.rs b/crates/mizan-api/src/lib.rs index 7103c36..a78f645 100644 --- a/crates/mizan-api/src/lib.rs +++ b/crates/mizan-api/src/lib.rs @@ -19,6 +19,7 @@ use tracing::{info, warn}; mod auth; mod billing; mod gateway; +mod logging; mod metrics; mod providers; mod storage; @@ -157,6 +158,7 @@ pub fn router(state: AppState) -> Router { let api_key_router = Router::new() .route("/v1/ping", get(auth::api_key_ping)) .route("/v1/chat/completions", post(gateway::chat_completions)) + .route("/v1/responses", post(gateway::responses)) .route_layer(from_fn_with_state(state.clone(), auth::api_key_auth)); let public_models_router = Router::new() diff --git a/crates/mizan-api/src/logging.rs b/crates/mizan-api/src/logging.rs new file mode 100644 index 0000000..dd4ceca --- /dev/null +++ b/crates/mizan-api/src/logging.rs @@ -0,0 +1,249 @@ +use axum::http::StatusCode; +use mizan_core::{AppError, AppResult, DatabaseBackend}; +use serde::Serialize; +use sqlx::{AnyPool, query}; +use uuid::Uuid; + +use crate::utils::{prepare_sql, unix_timestamp_string}; + +const UUID_TEXT_BUFFER_LEN: usize = uuid::fmt::Urn::LENGTH; + +#[derive(Debug, Clone, Copy)] +struct UuidText { + buf: [u8; UUID_TEXT_BUFFER_LEN], + len: usize, +} + +impl UuidText { + fn new(value: Uuid) -> Self { + let mut buf = Uuid::encode_buffer(); + let len = { + let encoded = value.hyphenated().encode_lower(&mut buf); + encoded.len() + }; + + Self { buf, len } + } + + fn as_str(&self) -> &str { + std::str::from_utf8(&self.buf[..self.len]).expect("uuid text buffer must be valid utf8") + } +} + +#[derive(Debug, Clone)] +pub struct RequestLogInput { + pub request_id: Uuid, + pub user_id: Option, + pub api_key_id: Option, + pub provider_id: Option, + pub route_id: Option, + pub method: String, + pub path: String, + pub route: Option, + pub provider: Option, + pub status_code: StatusCode, + pub latency_ms: u64, + pub error_code: Option, +} + +#[derive(Debug, Clone)] +pub struct AdminAuditInput { + pub actor_user_id: Option, + pub action: String, + pub entity_type: String, + pub entity_id: Option, + pub payload_json: Option, +} + +pub fn error_code_from_app_error(error: &AppError) -> String { + error.public_code().to_string() +} + +pub fn serialize_payload(payload: impl Serialize) -> Option { + serde_json::to_string(&payload).ok() +} + +pub async fn record_request_log( + database: &AnyPool, + database_backend: DatabaseBackend, + input: &RequestLogInput, +) -> AppResult<()> { + let now = unix_timestamp_string(); + let status_code = i64::from(u16::from(input.status_code)); + let latency_ms = i64::try_from(input.latency_ms).map_err(|error| { + AppError::infrastructure(format!("request_log.latency_ms exceeds i64 range: {error}")) + })?; + let id = UuidText::new(Uuid::now_v7()); + let request_id = UuidText::new(input.request_id); + let user_id = input.user_id.map(UuidText::new); + let api_key_id = input.api_key_id.map(UuidText::new); + let provider_id = input.provider_id.map(UuidText::new); + let route_id = input.route_id.map(UuidText::new); + + query(&prepare_sql( + database_backend, + "INSERT INTO request_logs ( + id, + request_id, + user_id, + api_key_id, + provider_id, + route_id, + method, + path, + route, + provider, + status_code, + latency_ms, + error_code, + created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)", + )) + .bind(id.as_str()) + .bind(request_id.as_str()) + .bind(user_id.as_ref().map(UuidText::as_str)) + .bind(api_key_id.as_ref().map(UuidText::as_str)) + .bind(provider_id.as_ref().map(UuidText::as_str)) + .bind(route_id.as_ref().map(UuidText::as_str)) + .bind(&input.method) + .bind(&input.path) + .bind(input.route.as_ref()) + .bind(input.provider.as_ref()) + .bind(status_code) + .bind(latency_ms) + .bind(&input.error_code) + .bind(&now) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(format!("cannot insert request log: {error}")))?; + + Ok(()) +} + +pub async fn record_admin_audit( + database: &AnyPool, + database_backend: DatabaseBackend, + input: &AdminAuditInput, +) -> AppResult<()> { + let now = unix_timestamp_string(); + let id = UuidText::new(Uuid::now_v7()); + let actor_user_id = input.actor_user_id.map(UuidText::new); + + query(&prepare_sql( + database_backend, + "INSERT INTO admin_audit_logs ( + id, + actor_user_id, + action, + entity_type, + entity_id, + payload_json, + created_at + ) VALUES (?, ?, ?, ?, ?, ?, ?)", + )) + .bind(id.as_str()) + .bind(actor_user_id.as_ref().map(UuidText::as_str)) + .bind(&input.action) + .bind(&input.entity_type) + .bind(&input.entity_id) + .bind(&input.payload_json) + .bind(&now) + .execute(database) + .await + .map_err(|error| AppError::infrastructure(format!("cannot insert admin audit log: {error}")))?; + + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::storage; + use sqlx::query_scalar; + + #[derive(Debug, Serialize)] + struct SamplePayload { + id: u32, + note: String, + } + + #[test] + fn uuid_text_renders_hyphenated_uuid() { + let text = UuidText::new(Uuid::nil()); + + assert_eq!(text.as_str(), "00000000-0000-0000-0000-000000000000"); + } + + async fn test_database() -> AnyPool { + storage::connect_and_migrate("sqlite::memory:", true, 1) + .await + .expect("create memory sqlite") + } + + #[tokio::test] + async fn request_log_can_be_written_with_optional_error_code() { + let database = test_database().await; + let request_id = Uuid::now_v7(); + + record_request_log( + &database, + DatabaseBackend::Sqlite, + &RequestLogInput { + request_id, + user_id: None, + api_key_id: None, + provider_id: None, + route_id: None, + method: "POST".to_owned(), + path: "/v1/chat/completions".to_owned(), + route: Some("mizan/gpt-4o-mini".to_owned()), + provider: Some("openai".to_owned()), + status_code: StatusCode::OK, + latency_ms: 12, + error_code: Some("ok".to_owned()), + }, + ) + .await + .expect("insert request log"); + + let row_count: i64 = query_scalar("SELECT COUNT(*) FROM request_logs") + .fetch_one(&database) + .await + .expect("count request logs"); + + assert_eq!(row_count, 1); + } + + #[tokio::test] + async fn admin_audit_can_be_written_with_redacted_payload() { + let database = test_database().await; + let payload = SamplePayload { + id: 7, + note: "provider=openai".to_owned(), + }; + + let payload_json = serialize_payload(&payload).expect("payload json"); + assert!(payload_json.contains(r#""id":7"#)); + + record_admin_audit( + &database, + DatabaseBackend::Sqlite, + &AdminAuditInput { + actor_user_id: None, + action: "audit.test".to_owned(), + entity_type: "provider".to_owned(), + entity_id: Some("123e4567-e89b-12d3-a456-426614174000".to_owned()), + payload_json: Some(payload_json), + }, + ) + .await + .expect("insert audit log"); + + let row_count: i64 = query_scalar("SELECT COUNT(*) FROM admin_audit_logs") + .fetch_one(&database) + .await + .expect("count audit logs"); + + assert_eq!(row_count, 1); + } +} diff --git a/crates/mizan-api/src/providers.rs b/crates/mizan-api/src/providers.rs index 2458d75..f70f00e 100644 --- a/crates/mizan-api/src/providers.rs +++ b/crates/mizan-api/src/providers.rs @@ -1,22 +1,31 @@ use axum::Json; use axum::body::Body; -use axum::extract::{Path, State}; +use axum::extract::{Extension, Path, State}; use axum::http::StatusCode; use axum::middleware::Next; use axum::response::Response; use mizan_core::{AppError, ErrorEnvelope}; use serde::{Deserialize, Serialize}; +use serde_json::json; use sqlx::{query, query_as}; +use tracing::warn; use uuid::Uuid; use crate::AppState; use crate::auth::ApiKeyIdentity; +use crate::logging::{AdminAuditInput, record_admin_audit, serialize_payload}; use crate::utils::{ encrypt_provider_api_key, from_app_error, is_enabled, is_unique_constraint_error, parse_timestamp, prepare_sql, unix_timestamp_string, }; type ProviderHttpResult = Result)>; +const AUDIT_ACTION_CREATE_PROVIDER: &str = "provider_connection_created"; +const AUDIT_ACTION_DELETE_PROVIDER: &str = "provider_connection_deleted"; +const AUDIT_ACTION_CREATE_MODEL_ROUTE: &str = "model_route_created"; +const AUDIT_ACTION_DELETE_MODEL_ROUTE: &str = "model_route_deleted"; +const AUDIT_ENTITY_PROVIDER: &str = "provider_connection"; +const AUDIT_ENTITY_MODEL_ROUTE: &str = "model_route"; #[derive(Debug, Serialize)] pub struct ProviderConnectionResponse { @@ -243,6 +252,7 @@ pub async fn list_provider_connections( pub async fn create_provider_connection( State(state): State, + Extension(identity): Extension, Json(payload): Json, ) -> ProviderHttpResult> { let name = payload.name.trim(); @@ -327,6 +337,24 @@ pub async fn create_provider_connection( )) })?; + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_CREATE_PROVIDER.to_owned(), + entity_type: AUDIT_ENTITY_PROVIDER.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ + "name": name, + "provider_type": provider_type, + "base_url": base_url, + "enabled": enabled, + "api_key_stored": true, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record provider connection creation audit"); + } + Ok(Json(ProviderConnectionCreateResponse { id: id.to_string(), name: name.to_string(), @@ -338,6 +366,7 @@ pub async fn create_provider_connection( pub async fn delete_provider_connection( State(state): State, + Extension(identity): Extension, Path(id): Path, ) -> ProviderHttpResult> { let removed = query(&prepare_sql( @@ -358,6 +387,20 @@ pub async fn delete_provider_connection( )); } + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_DELETE_PROVIDER.to_owned(), + entity_type: AUDIT_ENTITY_PROVIDER.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ + "deleted": true, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record provider connection deletion audit"); + } + Ok(Json(ProviderConnectionWithStatus { id: id.to_string(), removed: true, @@ -442,6 +485,7 @@ pub async fn list_model_routes( pub async fn create_model_route( State(state): State, + Extension(identity): Extension, Json(payload): Json, ) -> ProviderHttpResult> { let public_model = payload.public_model.trim(); @@ -558,6 +602,26 @@ pub async fn create_model_route( .await .map_err(|error| from_app_error(map_duplicate_model_error(error.to_string())))?; + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_CREATE_MODEL_ROUTE.to_owned(), + entity_type: AUDIT_ENTITY_MODEL_ROUTE.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ + "provider_connection_id": provider_connection_id.to_string(), + "public_model": public_model, + "upstream_model": upstream_model, + "max_tokens": payload.max_tokens, + "pricing_input_per_1m_tokens": payload.pricing_input_per_1m_tokens.unwrap_or(0), + "pricing_output_per_1m_tokens": payload.pricing_output_per_1m_tokens.unwrap_or(0), + "enabled": enabled, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record model route creation audit"); + } + Ok(Json(ModelRouteCreateResponse { id: id.to_string(), provider_connection_id: provider_connection_id.to_string(), @@ -569,6 +633,7 @@ pub async fn create_model_route( pub async fn delete_model_route( State(state): State, + Extension(identity): Extension, Path(id): Path, ) -> ProviderHttpResult> { let removed = query(&prepare_sql( @@ -589,6 +654,20 @@ pub async fn delete_model_route( )); } + let audit = AdminAuditInput { + actor_user_id: Some(identity.user_id), + action: AUDIT_ACTION_DELETE_MODEL_ROUTE.to_owned(), + entity_type: AUDIT_ENTITY_MODEL_ROUTE.to_owned(), + entity_id: Some(id.to_string()), + payload_json: serialize_payload(json!({ + "deleted": true, + })), + }; + if let Err(error) = record_admin_audit(&state.database, state.database_backend(), &audit).await + { + warn!(error = %error, "failed to record model route deletion audit"); + } + Ok(Json(ModelRouteWithStatus { id: id.to_string(), removed: true, diff --git a/crates/mizan-core/src/config.rs b/crates/mizan-core/src/config.rs index daa3c47..3646a55 100644 --- a/crates/mizan-core/src/config.rs +++ b/crates/mizan-core/src/config.rs @@ -20,6 +20,7 @@ pub struct AppConfig { pub admin_seed_password: Option, pub admin_seed_role: String, pub provider_secret_key: Option, + pub log_raw_request_bodies: bool, } impl AppConfig { @@ -118,6 +119,11 @@ impl AppConfig { admin_seed_password, admin_seed_role, provider_secret_key, + log_raw_request_bodies: parse_bool_env( + "MIZAN_LOG_RAW_REQUEST_BODIES", + "false", + |value| parse_bool_value("MIZAN_LOG_RAW_REQUEST_BODIES", value), + )?, }) } else { Err(AppError::invalid_config( diff --git a/crates/mizan-core/src/context.rs b/crates/mizan-core/src/context.rs index af3acdd..4548bf7 100644 --- a/crates/mizan-core/src/context.rs +++ b/crates/mizan-core/src/context.rs @@ -11,6 +11,8 @@ pub struct RequestContext { pub provider_id: Option, pub route: Option, pub route_id: Option, + pub method: Option, + pub path: Option, pub model: Option, pub streaming: bool, } @@ -28,6 +30,8 @@ impl RequestContext { provider_id: None, route: None, route_id: None, + method: None, + path: None, model: None, streaming: false, } @@ -90,6 +94,16 @@ impl RequestContextBuilder { self } + pub fn method(mut self, method: impl Into) -> Self { + self.context.method = Some(method.into()); + self + } + + pub fn path(mut self, path: impl Into) -> Self { + self.context.path = Some(path.into()); + self + } + pub fn model(mut self, model: impl Into) -> Self { self.context.model = Some(model.into()); self diff --git a/crates/mizan-core/src/schema.rs b/crates/mizan-core/src/schema.rs index 9212417..9c8d406 100644 --- a/crates/mizan-core/src/schema.rs +++ b/crates/mizan-core/src/schema.rs @@ -133,7 +133,11 @@ pub struct RequestLogRecord { pub route_id: Option, pub method: String, pub path: String, + pub route: Option, + pub provider: Option, pub status_code: i64, + pub latency_ms: i64, + pub error_code: Option, pub created_at: String, } diff --git a/docs/ALPHA_1_READINESS.md b/docs/ALPHA_1_READINESS.md index 5d2555e..7368a5e 100644 --- a/docs/ALPHA_1_READINESS.md +++ b/docs/ALPHA_1_READINESS.md @@ -16,7 +16,8 @@ Included: - Provider connections and model routes. - `GET /v1/models`. - OpenAI-compatible `POST /v1/chat/completions`. -- Non-streaming and streaming responses. +- OpenAI-compatible `POST /v1/responses` for non-stream canonical replies. +- Non-streaming and streaming chat responses. - Usage events, credit ledger, wallet balance, and credit grants. - Redis RPM counters and concurrency leases. - Prometheus gateway metrics. @@ -26,8 +27,10 @@ Not included yet: - Stable/full release guarantee. - RTK-backed CLI proxy baseline. -- Durable request log and admin audit log foundations. +- Centralized gateway logging middleware refactor. - Production deployment hardening beyond local smoke validation. +- Non-API provider adapters (for Codex/Gemini CLI/Claude-style auth/login flows) + before the broader MVP cut. ## Required Validation @@ -66,10 +69,20 @@ connection creation, model route creation, credit grant, model listing, non-streaming chat, streaming chat, usage reads, credit reads, and Prometheus metrics scraping. +Manual local validation on 2026-05-30 also confirmed: + +- `scripts/model-sync.sh` can sync the upstream mock model list. +- `scripts/model-sync.sh` now parses JSON with `python3` instead of `jq`. +- The local `/v1/models` response is OpenAI-compatible and exposes the route-backed model list. +- Non-streaming and streaming `POST /v1/chat/completions` work against the route-backed provider. +- Usage and credit reads return after the completed requests. + ## Remaining MVP Work -These issues do not block the backend/API alpha pre-release, but they should -block a broader MVP or stable release: +These issues do not block the backend/API alpha pre-release. Issue #11 still +matters for a broader MVP/stable release, while Issue #51 is a follow-up +maintainability refactor that keeps the gateway boundary cleaner for later +work: - Issue #11: integrate the RTK baseline into `mizan-rtk`. -- Issue #7: add request log and admin audit storage foundations. +- Issue #51: refactor gateway logging flow to a centralized middleware pattern. diff --git a/docs/ALPHA_RUNBOOK.md b/docs/ALPHA_RUNBOOK.md index 5336f6d..006b2e9 100644 --- a/docs/ALPHA_RUNBOOK.md +++ b/docs/ALPHA_RUNBOOK.md @@ -12,7 +12,10 @@ Use API endpoints plus scripts: `POST /admin/users/{id}/credits/grant` - User setup: `POST /auth/register`, `POST /auth/login`, `POST /api-keys` - Runtime checks: `GET /v1/models`, `POST /v1/chat/completions`, - `GET /v1/usage`, `GET /v1/credits`, `GET /metrics` + `POST /v1/responses`, `GET /v1/usage`, `GET /v1/credits`, `GET /metrics` +- Model sync helper: `MODEL_SYNC_BASE_URL=... MODEL_SYNC_API_KEY=... scripts/model-sync.sh` + for syncing OpenAI-compatible model ids from an upstream provider. The + helper uses `python3` for JSON parsing, so `jq` is not required. Tradeoff: this is less friendly than a web UI, but it keeps alpha scope small and makes correctness easy to validate in CI-like scripts. @@ -64,8 +67,10 @@ The smoke covers: - model route creation - credit grant - model listing +- model sync helper against the mock upstream - non-streaming chat - streaming chat +- /v1/responses - usage and credit reads - Prometheus metrics scrape @@ -90,5 +95,7 @@ boundary and latest local proof. - `provider secret key` errors mean set `MIZAN_PROVIDER_SECRET_KEY`. - Port conflicts can be avoided with `MIZAN_ALPHA_API_PORT` and `MIZAN_ALPHA_MOCK_PORT`. +- If the first `cargo run -p mizan-api` build is slow, raise + `MIZAN_ALPHA_WAIT_SECONDS` for the smoke run. - If an existing API is already running at `MIZAN_BASE_URL`, the script reuses it and only starts the mock upstream. diff --git a/docs/ARCHITECTURE.md b/docs/ARCHITECTURE.md index 9a51e24..d5b676e 100644 --- a/docs/ARCHITECTURE.md +++ b/docs/ARCHITECTURE.md @@ -40,6 +40,8 @@ ledgering, and admin/user APIs. - Keep request and response normalization in shared types, not ad hoc structs inside route handlers. - Keep model registry data separate from provider transport details. +- Keep all provider outputs normalized into one OpenAI-compatible public contract at + the edge (chat completions and responses). These rules matter because the project will likely support many providers and many model aliases. A clean boundary on day one is cheaper than a refactor after @@ -58,8 +60,8 @@ flowchart LR Client["User app or CLI"] --> Gateway["OpenAI-compatible gateway"] Gateway --> Auth["API key auth"] Auth --> Limits["Redis limits and concurrency leases"] - Limits --> Router["Model router"] - Router --> Provider["Upstream provider or local model"] + Limits --> Router["Model router + provider adapter"] + Router --> Provider["Provider adapters: OpenAI API, CLI session, browser session"] Provider --> Meter["Usage meter"] Meter --> Ledger["Credit ledger"] Ledger --> DB["SQLite (default)"] @@ -72,7 +74,8 @@ flowchart LR Responsible for: -- OpenAI-compatible request/response surface. +- OpenAI-compatible request/response surface for `/v1/chat/completions` and + `/v1/responses`. - Streaming proxy. - Request id generation. - API key authentication. @@ -120,8 +123,9 @@ MVP router behavior: 3. Check route and provider are enabled. 4. Acquire Redis limit/concurrency leases. 5. Send request to upstream. -6. Normalize response and stream back to client. -7. Record usage and charge credits. +6. Normalize response into the canonical OpenAI-compatible shape. +7. Stream or return non-stream response to client. +8. Record usage and charge credits. The router should only orchestrate these steps. Provider details, metering logic, and wallet updates should live in their own modules. @@ -145,10 +149,26 @@ pub trait ProviderAdapter: Send + Sync { fn name(&self) -> &'static str; async fn chat_completions(&self, req: ChatRequest) -> Result; async fn stream_chat_completions(&self, req: ChatRequest) -> Result; + async fn responses(&self, req: ResponsesRequest) -> Result; async fn models(&self) -> Result>; } ``` +Canonical contract rule: + +- Even when upstream transport uses different auth methods (`api_key`, CLI login, + browser token/session), provider adapters are responsible for mapping to the + same `chat` and `responses` contracts used by clients. + +Adapter categories: + +- `api_key`: OpenAI-compatible HTTP APIs and local models with stable response + shape. +- `subscription_cli`: session or CLI-based connectors (Codex, Gemini CLI, Claude-like + flows) that are normalized by adapter before serialization. +- `browser_session`: controlled browser session connectors with policy and legal + constraints handled in provider registration and runtime guardrails. + Provider connection records should include encrypted secret material. The API never returns raw secrets after creation. diff --git a/docs/ENGINEERING_PRINCIPLES.md b/docs/ENGINEERING_PRINCIPLES.md index d047185..ec3ab34 100644 --- a/docs/ENGINEERING_PRINCIPLES.md +++ b/docs/ENGINEERING_PRINCIPLES.md @@ -12,6 +12,8 @@ models, routes, limits, and accounting rules grows. - Keep limit enforcement hot-path and isolated. - Keep observability centralized, structured, and consistent. - Keep public API responses stable even when upstream providers differ. +- Normalize all upstream responses into one OpenAI-compatible public contract before + returning to clients. ## Recommended Module Boundaries @@ -74,6 +76,8 @@ Rule: - Each provider family should be isolated behind a dedicated module. Adding a new provider should not require rewriting the gateway. +- Non-OpenAI upstream formats should be translated here, then shaped as the + shared `chat.completions` and `responses` contracts at the edge. ### `mizan-metering` diff --git a/docs/ISSUE_BACKLOG.md b/docs/ISSUE_BACKLOG.md index a6eaa47..a214f0e 100644 --- a/docs/ISSUE_BACKLOG.md +++ b/docs/ISSUE_BACKLOG.md @@ -53,6 +53,10 @@ Progress status (current): Milestone 4 completed with non-streaming chat proxy f 4. Add model route CRUD. 5. Add `/v1/models`. 6. Keep provider-specific logic behind adapter modules. +7. Keep all provider outputs normalized to OpenAI-compatible chat/responses contracts + (tracked in Issue #53). +8. Add non-API provider adapter pattern (`subscription_cli`, session-based) + for Codex/Gemini CLI/Claude-like adapters (planned, Issue #53). ## Gateway diff --git a/docs/PRD.md b/docs/PRD.md index 8dab317..df32f6d 100644 --- a/docs/PRD.md +++ b/docs/PRD.md @@ -29,6 +29,8 @@ clear usage and credit visibility. - Give admins one backend to register upstream AI connections. - Give users virtual API keys instead of raw provider credentials. +- Normalize all outbound provider output to a single OpenAI-compatible gateway + contract for chat and responses (including model routing, errors, and usage metadata). - Track usage by user, API key, provider, route, and model. - Enforce basic limits: concurrency, requests per minute, tokens per minute, and credit balance. @@ -57,6 +59,15 @@ For MVP, prioritize API-key providers and local OpenAI-compatible models first. Treat subscription/CLI connectors as experimental adapters with clear warnings, admin-only credential storage, and per-provider policy flags. +Regardless of upstream transport: + +- Internal adapters must normalize to an OpenAI-compatible surface before results + are returned to clients. +- Both `/v1/chat/completions` and `/v1/responses` should share the same public + shape and usage/credit semantics. +- Authentication modes can differ (API key, CLI session, browser session), but the + exposed response contract should stay consistent. + ## Personas ### Admin @@ -100,8 +111,11 @@ User needs: or model route. - As a user, I can register and create a virtual API key. - As a user, I can call `/v1/chat/completions` using that key. +- As a user, I can call `/v1/responses` using that key. - As a user, I can stream responses. - As a user, I can view token usage, credits spent, and remaining credits. +- As the system, any provider path (API-key, CLI session, browser session) emits + OpenAI-compatible responses and normalized errors. - As the system, every completed request creates an immutable usage event and credit ledger entry. @@ -125,9 +139,26 @@ MVP provider types: Later provider types: - `native_provider`: provider-specific request/response transforms. -- `subscription_cli`: adapter for subscription-backed CLI/web accounts. +- `subscription_cli`: adapter for subscription-backed CLI/web accounts (including + Codex, Gemini CLI, or Claude-like flows). - `browser_session`: controlled session/cookie adapter, only if legally safe. +### Canonical Provider Contract + +For every provider family, request handling must normalize upstream output into a +single public shape before calling the route handler: + +- `POST /v1/chat/completions` and `POST /v1/responses` use the same normalized + OpenAI-compatible envelope. +- Error responses keep the same public schema and contract fields (`error`, `type`, + `code`, request metadata). +- Usage/credit accounting uses the same token semantics across provider families. +- Unsupported or provider-specific payload fields are omitted from public responses. +- Streaming SSE for chat remains OpenAI-compatible (`chat.completion.chunk`). + +Non-API adapters are intentionally phased and should not change the contract +surface above. + ### Model Routes Each route contains: @@ -148,6 +179,7 @@ MVP endpoints: - `GET /healthz` - `GET /v1/models` - `POST /v1/chat/completions` +- `POST /v1/responses` (next phase: OpenAI-compatible canonical response surface) Admin/user API: diff --git a/migrations/0002_request_log_foundations.sql b/migrations/0002_request_log_foundations.sql new file mode 100644 index 0000000..62c25b5 --- /dev/null +++ b/migrations/0002_request_log_foundations.sql @@ -0,0 +1,12 @@ +-- Add request log fields required for issue #7 observability foundations. +ALTER TABLE request_logs + ADD COLUMN latency_ms INTEGER NOT NULL DEFAULT 0; + +ALTER TABLE request_logs + ADD COLUMN route TEXT; + +ALTER TABLE request_logs + ADD COLUMN provider TEXT; + +ALTER TABLE request_logs + ADD COLUMN error_code TEXT; diff --git a/scripts/alpha-smoke.sh b/scripts/alpha-smoke.sh index 628b733..3e276da 100755 --- a/scripts/alpha-smoke.sh +++ b/scripts/alpha-smoke.sh @@ -3,6 +3,7 @@ set -euo pipefail API_PORT="${MIZAN_ALPHA_API_PORT:-18180}" MOCK_PORT="${MIZAN_ALPHA_MOCK_PORT:-18182}" +WAIT_SECONDS="${MIZAN_ALPHA_WAIT_SECONDS:-600}" BASE_URL="${MIZAN_BASE_URL:-http://127.0.0.1:${API_PORT}}" MOCK_URL="${MIZAN_MOCK_BASE_URL:-http://127.0.0.1:${MOCK_PORT}}" ADMIN_EMAIL="${MIZAN_ADMIN_EMAIL:-admin@mizan.local}" @@ -22,7 +23,7 @@ json_field() { wait_for() { local url="$1" - for _ in $(seq 1 60); do + for _ in $(seq 1 "${WAIT_SECONDS}"); do if curl -fsS "$url" >/dev/null 2>&1; then return 0; fi sleep 1 done @@ -84,12 +85,23 @@ curl -fsS -X POST "${BASE_URL}/admin/users/${admin_user_id}/credits/grant" \ -H 'content-type: application/json' \ -d '{"amount_microcredits":1000000,"reason":"alpha_smoke"}' >/dev/null -echo "Checking models, non-stream chat, stream chat, usage, credits, and metrics" +echo "Checking models, non-stream chat, non-stream responses, stream chat, usage, credits, and metrics" curl -fsS "${BASE_URL}/v1/models" -H "authorization: Bearer ${api_key}" >/dev/null +echo "Syncing upstream model list" +synced_models="$( + MODEL_SYNC_BASE_URL="${MOCK_URL}/v1" \ + MODEL_SYNC_API_KEY="alpha-smoke" \ + bash scripts/model-sync.sh --format ids +)" +printf '%s\n' "${synced_models}" | grep -q '^mock-gpt$' curl -fsS -X POST "${BASE_URL}/v1/chat/completions" \ -H "authorization: Bearer ${api_key}" \ -H 'content-type: application/json' \ -d '{"model":"alpha-mock","messages":[{"role":"user","content":"hello"}],"max_tokens":32}' >/dev/null +curl -fsS -X POST "${BASE_URL}/v1/responses" \ + -H "authorization: Bearer ${api_key}" \ + -H 'content-type: application/json' \ + -d '{"model":"alpha-mock","messages":[{"role":"user","content":"hello"}],"max_tokens":32}' >/dev/null curl -fsS -N -X POST "${BASE_URL}/v1/chat/completions" \ -H "authorization: Bearer ${api_key}" \ -H 'content-type: application/json' \ diff --git a/scripts/model-sync.sh b/scripts/model-sync.sh new file mode 100755 index 0000000..4e27993 --- /dev/null +++ b/scripts/model-sync.sh @@ -0,0 +1,96 @@ +#!/usr/bin/env bash +set -euo pipefail + +format="ids" + +usage() { + cat <<'EOF' +Usage: + MODEL_SYNC_BASE_URL=... MODEL_SYNC_API_KEY=... scripts/model-sync.sh [--format ids|json] + +Environment: + MODEL_SYNC_BASE_URL OpenAI-compatible base URL, for example https://api.example.com/v1 + MODEL_SYNC_API_KEY Bearer token for the endpoint + OPENAI_BASE_URL Fallback base URL if MODEL_SYNC_BASE_URL is not set + OPENAI_API_KEY Fallback API key if MODEL_SYNC_API_KEY is not set + +Output: + ids Newline-separated model ids, sorted and deduplicated + json Raw /v1/models JSON payload +EOF +} + +while [[ $# -gt 0 ]]; do + case "$1" in + --format) + format="${2:-}" + shift 2 + ;; + --json) + format="json" + shift + ;; + -h|--help) + usage + exit 0 + ;; + *) + echo "error: unknown argument: $1" >&2 + usage >&2 + exit 2 + ;; + esac +done + +base_url="${MODEL_SYNC_BASE_URL:-${OPENAI_BASE_URL:-}}" +api_key="${MODEL_SYNC_API_KEY:-${OPENAI_API_KEY:-}}" + +if [[ -z "${base_url}" || -z "${api_key}" ]]; then + echo "error: set MODEL_SYNC_BASE_URL and MODEL_SYNC_API_KEY (or OPENAI_* fallbacks)" >&2 + usage >&2 + exit 1 +fi + +if [[ "${format}" != "ids" && "${format}" != "json" ]]; then + echo "error: unsupported format: ${format}" >&2 + exit 2 +fi + +url="${base_url%/}/models" +response="$( + curl -fsS --retry 2 --retry-delay 1 \ + -H "Authorization: Bearer ${api_key}" \ + "${url}" +)" + +MODEL_SYNC_RESPONSE="${response}" python3 - "$format" "$url" <<'PY' +import json +import os +import sys + +format = sys.argv[1] +url = sys.argv[2] +payload = json.loads(os.environ["MODEL_SYNC_RESPONSE"]) + +if payload.get("object") != "list" or not isinstance(payload.get("data"), list): + print("error: invalid model list response", file=sys.stderr) + raise SystemExit(1) + +models = payload["data"] +print(f"synced {len(models)} models from {url}", file=sys.stderr) + +if format == "json": + json.dump(payload, sys.stdout, indent=2) + sys.stdout.write("\n") +else: + if format != "ids": + print(f"error: unsupported format: {format}", file=sys.stderr) + raise SystemExit(2) + + for model_id in sorted({ + item.get("id") + for item in models + if isinstance(item, dict) and item.get("id") + }): + print(model_id) +PY