Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 23 additions & 7 deletions src/openai/lib/streaming/_deltas.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,20 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
acc_value.extend(delta_value)
continue

# Build a map from logical index -> position in acc_value so we can
# look up existing entries by their "index" field rather than by
# physical list position. Using physical position breaks when a
# single chunk contains multiple entries that share the same logical
# index (e.g. speculative-decoding servers that split the first
# tool_call entry across two objects both carrying index=0).
acc_index_map: dict[int, int] = {}
for pos, entry in enumerate(acc_value):
if is_dict(entry) and isinstance(entry.get("index"), int):
logical_idx = entry["index"]
# keep the first occurrence if there are duplicates already
if logical_idx not in acc_index_map:
acc_index_map[logical_idx] = pos

for delta_entry in delta_value:
if not is_dict(delta_entry):
raise TypeError(f"Unexpected list delta entry is not a dictionary: {delta_entry}")
Expand All @@ -49,15 +63,17 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
if not isinstance(index, int):
raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}")

try:
acc_entry = acc_value[index]
except IndexError:
acc_value.insert(index, delta_entry)
else:
if index in acc_index_map:
# merge into the existing entry at the stored physical position
pos = acc_index_map[index]
acc_entry = acc_value[pos]
if not is_dict(acc_entry):
raise TypeError("not handled yet")

acc_value[index] = accumulate_delta(acc_entry, delta_entry)
acc_value[pos] = accumulate_delta(acc_entry, delta_entry)
else:
# new logical index: append and record its physical position
acc_index_map[index] = len(acc_value)
acc_value.append(delta_entry)
Comment on lines +75 to +76
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Preserve logical index ordering when adding new delta entries

Appending unseen logical indices to the end of acc_value breaks the invariant that list position matches each entry's index, so out-of-order first appearances (e.g., index 1 chunk arrives before index 0) produce misordered arrays. Downstream streaming code indexes these arrays by logical index (for example choice_snapshot.message.tool_calls[tool_call_chunk.index] in src/openai/lib/streaming/chat/_completions.py), which can then read the wrong tool call or raise IndexError for valid streams from providers that interleave indices. The previous insert(index, ...) behavior maintained positional alignment.

Useful? React with 👍 / 👎.


acc[key] = acc_value

Expand Down