Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 21 additions & 1 deletion Packages/OsaurusCore/Managers/Model/ModelManager.swift
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,27 @@ final class ModelManager: NSObject, ObservableObject {

// Pull the OsaurusAI HF org listing once on launch so newly published
// models surface in the Recommended tab without requiring a code push.
Task { [weak self] in await self?.loadOsaurusAIOrgModels() }
//
// The unit-test runner constructs `ModelManager()` repeatedly to drive
// `applyOsaurusOrgFetch` directly. If the launch-time HF fetch races
// with those test calls, whichever finishes last wins and the merge
// result is non-deterministic — that's the regression class behind
// `ModelManagerSuggestedTests/applyOsaurusOrgFetch_*` flaking in CI.
// Skip the background fetch under XCTest; production launches still
// get it because `XCTestConfigurationFilePath` is only set inside
// a test host.
if !Self.isRunningInTestEnvironment {
Task { [weak self] in await self?.loadOsaurusAIOrgModels() }
}
}

/// True when the current process was launched by xctest. Used to gate
/// network-touching launch-time side effects so tests can drive the
/// affected code paths deterministically.
nonisolated private static var isRunningInTestEnvironment: Bool {
ProcessInfo.processInfo.environment["XCTestConfigurationFilePath"] != nil
|| ProcessInfo.processInfo.environment["XCTestBundlePath"] != nil
|| ProcessInfo.processInfo.environment["XCTestSessionIdentifier"] != nil
}

// MARK: - Public Methods
Expand Down
59 changes: 38 additions & 21 deletions Packages/OsaurusCore/Services/Chat/ChatEngine.swift
Original file line number Diff line number Diff line change
Expand Up @@ -204,15 +204,21 @@ actor ChatEngine: Sendable, ChatEngineProtocol {
var finishReason: InferenceLog.FinishReason = .stop
var errorMsg: String? = nil
var toolInvocation: (name: String, args: String)? = nil
var lastDeltaTime = startTime
#if DEBUG
var lastDeltaTime = startTime
#endif

print("[Osaurus][Stream] Starting stream wrapper for model: \(model)")
#if DEBUG
print("[Osaurus][Stream] Starting stream wrapper for model: \(model)")
#endif

do {
for try await delta in inner {
// Check for task cancellation to allow early termination
if Task.isCancelled {
print("[Osaurus][Stream] Task cancelled after \(deltaCount) deltas")
#if DEBUG
print("[Osaurus][Stream] Task cancelled after \(deltaCount) deltas")
#endif
continuation.finish()
return
}
Expand All @@ -224,43 +230,54 @@ actor ChatEngine: Sendable, ChatEngineProtocol {
}

deltaCount += 1
let now = Date()
let timeSinceStart = now.timeIntervalSince(startTime)
let timeSinceLastDelta = now.timeIntervalSince(lastDeltaTime)
lastDeltaTime = now

// Log every 50th delta or if there's a long gap (potential freeze indicator)
if deltaCount % 50 == 1 || timeSinceLastDelta > 2.0 {
print(
"[Osaurus][Stream] Delta #\(deltaCount): +\(String(format: "%.2f", timeSinceStart))s total, gap=\(String(format: "%.3f", timeSinceLastDelta))s, len=\(delta.count)"
)
}

#if DEBUG
let now = Date()
let timeSinceStart = now.timeIntervalSince(startTime)
let timeSinceLastDelta = now.timeIntervalSince(lastDeltaTime)
lastDeltaTime = now

// Log every 50th delta or if there's a long gap (potential freeze indicator)
if deltaCount % 50 == 1 || timeSinceLastDelta > 2.0 {
print(
"[Osaurus][Stream] Delta #\(deltaCount): +\(String(format: "%.2f", timeSinceStart))s total, gap=\(String(format: "%.3f", timeSinceLastDelta))s, len=\(delta.count)"
)
}
#endif

// Estimate tokens: each delta chunk is roughly proportional to tokens
// More accurate: count whitespace-separated words, or use tokenizer
outputTokenCount += max(1, delta.count / 4)
continuation.yield(delta)
}

let totalTime = Date().timeIntervalSince(startTime)
print(
"[Osaurus][Stream] Stream completed: \(deltaCount) deltas in \(String(format: "%.2f", totalTime))s"
)
#if DEBUG
let totalTime = Date().timeIntervalSince(startTime)
print(
"[Osaurus][Stream] Stream completed: \(deltaCount) deltas in \(String(format: "%.2f", totalTime))s"
)
#endif

continuation.finish()
} catch let inv as ServiceToolInvocation {
print("[Osaurus][Stream] Tool invocation: \(inv.toolName)")
#if DEBUG
print("[Osaurus][Stream] Tool invocation: \(inv.toolName)")
#endif
toolInvocation = (inv.toolName, inv.jsonArguments)
finishReason = .toolCalls
continuation.finish(throwing: inv)
} catch {
// Check if this is a CancellationError (expected when consumer stops)
if Task.isCancelled || error is CancellationError {
print("[Osaurus][Stream] Stream cancelled after \(deltaCount) deltas")
#if DEBUG
print("[Osaurus][Stream] Stream cancelled after \(deltaCount) deltas")
#endif
continuation.finish()
return
}
print("[Osaurus][Stream] Stream error after \(deltaCount) deltas: \(error.localizedDescription)")
#if DEBUG
print("[Osaurus][Stream] Stream error after \(deltaCount) deltas: \(error.localizedDescription)")
#endif
finishReason = .error
errorMsg = error.localizedDescription
continuation.finish(throwing: error)
Expand Down
22 changes: 12 additions & 10 deletions Packages/OsaurusCore/Services/ModelRuntime/StreamAccumulator.swift
Original file line number Diff line number Diff line change
Expand Up @@ -246,17 +246,19 @@ struct StreamAccumulator: AsyncSequence, Sendable {

// Log info events and surface generation stats downstream.
if let info = event.info {
print(
String(
format: "[MLX] prompt: %d tokens %.1f tok/s (%.2fs) | gen: %d tokens %.1f tok/s (%.2fs)",
info.promptTokenCount,
info.promptTokensPerSecond,
info.promptTime,
info.generationTokenCount,
info.tokensPerSecond,
info.generateTime
#if DEBUG
print(
String(
format: "[MLX] prompt: %d tokens %.1f tok/s (%.2fs) | gen: %d tokens %.1f tok/s (%.2fs)",
info.promptTokenCount,
info.promptTokensPerSecond,
info.promptTime,
info.generationTokenCount,
info.tokensPerSecond,
info.generateTime
)
)
)
#endif
// Emit GPU-accurate stats as a signpost event so they appear in
// Instruments and can be captured by `log stream --type signpost`.
accumSignposter.emitEvent(
Expand Down
10 changes: 6 additions & 4 deletions Packages/OsaurusCore/Views/Chat/ChatView.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1144,10 +1144,12 @@ final class ChatSession: ObservableObject {
}
}

let totalTime = Date().timeIntervalSince(streamStartTime)
print(
"[Osaurus][UI] Stream consumption completed: \(uiDeltaCount) deltas in \(String(format: "%.2f", totalTime))s, final contentLen=\(assistantTurn.contentLength)"
)
#if DEBUG
let totalTime = Date().timeIntervalSince(streamStartTime)
print(
"[Osaurus][UI] Stream consumption completed: \(uiDeltaCount) deltas in \(String(format: "%.2f", totalTime))s, final contentLen=\(assistantTurn.contentLength)"
)
#endif

break // finished normally
} catch let inv as ServiceToolInvocation {
Expand Down
Loading