diff --git a/src/openai/lib/streaming/_deltas.py b/src/openai/lib/streaming/_deltas.py index a5e1317612..43acfcf296 100644 --- a/src/openai/lib/streaming/_deltas.py +++ b/src/openai/lib/streaming/_deltas.py @@ -37,6 +37,20 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> acc_value.extend(delta_value) continue + # Build a map from logical index -> position in acc_value so we can + # look up existing entries by their "index" field rather than by + # physical list position. Using physical position breaks when a + # single chunk contains multiple entries that share the same logical + # index (e.g. speculative-decoding servers that split the first + # tool_call entry across two objects both carrying index=0). + acc_index_map: dict[int, int] = {} + for pos, entry in enumerate(acc_value): + if is_dict(entry) and isinstance(entry.get("index"), int): + logical_idx = entry["index"] + # keep the first occurrence if there are duplicates already + if logical_idx not in acc_index_map: + acc_index_map[logical_idx] = pos + for delta_entry in delta_value: if not is_dict(delta_entry): raise TypeError(f"Unexpected list delta entry is not a dictionary: {delta_entry}") @@ -49,15 +63,17 @@ def accumulate_delta(acc: dict[object, object], delta: dict[object, object]) -> if not isinstance(index, int): raise TypeError(f"Unexpected, list delta entry `index` value is not an integer; {index}") - try: - acc_entry = acc_value[index] - except IndexError: - acc_value.insert(index, delta_entry) - else: + if index in acc_index_map: + # merge into the existing entry at the stored physical position + pos = acc_index_map[index] + acc_entry = acc_value[pos] if not is_dict(acc_entry): raise TypeError("not handled yet") - - acc_value[index] = accumulate_delta(acc_entry, delta_entry) + acc_value[pos] = accumulate_delta(acc_entry, delta_entry) + else: + # new logical index: append and record its physical position + acc_index_map[index] = len(acc_value) + acc_value.append(delta_entry) acc[key] = acc_value