Skip to content

Commit 42462de

Browse files
committed
fix(streaming): merge tool_call deltas by logical index, not physical position
Fixes accumulate_delta treating the \"index\" field as a physical list position. Servers using speculative decoding (e.g. vLLM) can emit two entries with the same logical index in a single chunk. The old code stored both and later merged subsequent deltas only into acc_value[0], leaving the second duplicate stranded with an incomplete arguments string. The fix builds a logical-index-to-physical-position map over acc_value and uses it for all lookups, so entries are always matched by their \"index\" field regardless of how many duplicates arrived in earlier chunks.
1 parent 658be64 commit 42462de

1 file changed

Lines changed: 23 additions & 7 deletions

File tree

src/openai/lib/streaming/_deltas.py

Lines changed: 23 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,20 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
3737
acc_value.extend(delta_value)
3838
continue
3939

40+
# Build a map from logical index -> position in acc_value so we can
41+
# look up existing entries by their "index" field rather than by
42+
# physical list position. Using physical position breaks when a
43+
# single chunk contains multiple entries that share the same logical
44+
# index (e.g. speculative-decoding servers that split the first
45+
# tool_call entry across two objects both carrying index=0).
46+
acc_index_map: dict[int, int] = {}
47+
for pos, entry in enumerate(acc_value):
48+
if is_dict(entry) and isinstance(entry.get("index"), int):
49+
logical_idx = entry["index"]
50+
# keep the first occurrence if there are duplicates already
51+
if logical_idx not in acc_index_map:
52+
acc_index_map[logical_idx] = pos
53+
4054
for delta_entry in delta_value:
4155
if not is_dict(delta_entry):
4256
raise TypeError(f"Unexpected list delta entry is not a dictionary: {delta_entry}")
@@ -49,15 +63,17 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) ->
4963
if not isinstance(index, int):
5064
raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}")
5165

52-
try:
53-
acc_entry = acc_value[index]
54-
except IndexError:
55-
acc_value.insert(index, delta_entry)
56-
else:
66+
if index in acc_index_map:
67+
# merge into the existing entry at the stored physical position
68+
pos = acc_index_map[index]
69+
acc_entry = acc_value[pos]
5770
if not is_dict(acc_entry):
5871
raise TypeError("not handled yet")
59-
60-
acc_value[index] = accumulate_delta(acc_entry, delta_entry)
72+
acc_value[pos] = accumulate_delta(acc_entry, delta_entry)
73+
else:
74+
# new logical index: append and record its physical position
75+
acc_index_map[index] = len(acc_value)
76+
acc_value.append(delta_entry)
6177

6278
acc[key] = acc_value
6379

0 commit comments

Comments
 (0)