diff --git a/.changeset/fast-joins-redesign.md b/.changeset/fast-joins-redesign.md new file mode 100644 index 000000000..ef9e20e35 --- /dev/null +++ b/.changeset/fast-joins-redesign.md @@ -0,0 +1,5 @@ +--- +"@tanstack/db-ivm": patch +--- + +Redesign of the join operators with direct algorithms for major performance improvements by replacing composition-based joins (inner+anti) with implementation using mass tracking. Delivers significant performance gains while maintaining full correctness for all join types (inner, left, right, full, anti). diff --git a/packages/db-ivm/src/indexes.ts b/packages/db-ivm/src/indexes.ts index 3c52614eb..7dcb349ba 100644 --- a/packages/db-ivm/src/indexes.ts +++ b/packages/db-ivm/src/indexes.ts @@ -150,11 +150,30 @@ export class Index { * hash to identify identical values, storing them in a third level value map. */ #inner: IndexMap + #consolidatedMultiplicity: Map = new Map() // sum of multiplicities per key constructor() { this.#inner = new Map() } + /** + * Create an Index from multiple MultiSet messages. + * @param messages - Array of MultiSet messages to build the index from. + * @returns A new Index containing all the data from the messages. + */ + static fromMultiSets(messages: Array>): Index { + const index = new Index() + + for (const message of messages) { + for (const [item, multiplicity] of message.getInner()) { + const [key, value] = item + index.addValue(key, [value, multiplicity]) + } + } + + return index + } + /** * This method returns a string representation of the index. * @param indent - Whether to indent the string representation. @@ -184,6 +203,32 @@ export class Index { return this.#inner.has(key) } + /** + * Check if a key has presence (non-zero consolidated multiplicity). + * @param key - The key to check. + * @returns True if the key has non-zero consolidated multiplicity, false otherwise. + */ + hasPresence(key: TKey): boolean { + return (this.#consolidatedMultiplicity.get(key) || 0) !== 0 + } + + /** + * Get the consolidated multiplicity (sum of multiplicities) for a key. + * @param key - The key to get the consolidated multiplicity for. + * @returns The consolidated multiplicity for the key. + */ + getConsolidatedMultiplicity(key: TKey): number { + return this.#consolidatedMultiplicity.get(key) || 0 + } + + /** + * Get all keys that have presence (non-zero consolidated multiplicity). + * @returns An iterator of keys with non-zero consolidated multiplicity. + */ + getPresenceKeys(): Iterable { + return this.#consolidatedMultiplicity.keys() + } + /** * This method returns all values for a given key. * @param key - The key to get the values for. @@ -257,6 +302,15 @@ export class Index { // If the multiplicity is 0, do nothing if (multiplicity === 0) return + // Update consolidated multiplicity tracking + const newConsolidatedMultiplicity = + (this.#consolidatedMultiplicity.get(key) || 0) + multiplicity + if (newConsolidatedMultiplicity === 0) { + this.#consolidatedMultiplicity.delete(key) + } else { + this.#consolidatedMultiplicity.set(key, newConsolidatedMultiplicity) + } + const mapOrSingleValue = this.#inner.get(key) if (mapOrSingleValue === undefined) { diff --git a/packages/db-ivm/src/multiset.ts b/packages/db-ivm/src/multiset.ts index 1e793345e..44ba297ed 100644 --- a/packages/db-ivm/src/multiset.ts +++ b/packages/db-ivm/src/multiset.ts @@ -209,6 +209,12 @@ export class MultiSet { chunkedArrayPush(this.#inner, otherArray) } + add(item: T, multiplicity: number): void { + if (multiplicity !== 0) { + this.#inner.push([item, multiplicity]) + } + } + getInner(): MultiSetArray { return this.#inner } diff --git a/packages/db-ivm/src/operators/join.ts b/packages/db-ivm/src/operators/join.ts index 96e9ba962..36f710b2f 100644 --- a/packages/db-ivm/src/operators/join.ts +++ b/packages/db-ivm/src/operators/join.ts @@ -1,10 +1,57 @@ +/** + * # Direct Join Algorithms for Incremental View Maintenance + * + * High-performance join operations implementing all join types (inner, left, right, full, anti) + * with minimal state and optimized performance. + * + * ## Algorithm + * + * For each tick, the algorithm processes incoming changes (deltas) and emits join results: + * + * 1. **Build deltas**: Create delta indexes from input messages using `Index.fromMultiSet()` + * 2. **Inner results**: Emit `ΔA⋈B_old + A_old⋈ΔB + ΔA⋈ΔB` (matched pairs) + * 3. **Outer results**: For unmatched rows, emit null-extended tuples: + * - New unmatched rows from deltas (when opposite side empty) + * - Presence transitions: when key goes `0→>0` (retract nulls) or `>0→0` (emit nulls) + * 4. **Update state**: Append deltas to indexes (consolidated multiplicity tracking automatic) + * + * **Consolidated multiplicity tracking** enables O(1) presence checks instead of scanning index buckets. + * + * ## State + * + * **Indexes** store the actual data: + * - `indexA: Index` - all left-side rows accumulated over time + * - `indexB: Index` - all right-side rows accumulated over time + * + * **Consolidated multiplicity tracking** (built into Index): + * - Each Index maintains sum of multiplicities per key internally + * - Provides O(1) presence checks: `index.hasPresence(key)` and `index.getConsolidatedMultiplicity(key)` + * - Avoids scanning entire index buckets just to check if key has any rows + * + * ## Join Types + * + * - **Inner**: Standard delta terms only + * - **Outer**: Inner results + null-extended unmatched rows with transition handling + * - **Anti**: Unmatched rows only (no inner results) + * + * ## Key Optimizations + * + * - **No temp copying**: Uses `(A⊎ΔA)⋈ΔB = A⋈ΔB ⊎ ΔA⋈ΔB` distributive property + * - **Early-out checks**: Skip phases when no deltas present + * - **Zero-entry pruning**: Keep maps compact, O(distinct keys) memory + * - **Final presence logic**: Avoid emit→retract churn within same tick + * + * ## Correctness + * + * - **Ordering**: Pre-append snapshots for emissions, post-emit state updates + * - **Presence**: Key matched iff mass ≠ 0, transitions trigger null handling + * - **Bag semantics**: Proper multiplicity handling including negatives + */ + import { BinaryOperator, DifferenceStreamWriter } from "../graph.js" import { StreamBuilder } from "../d2.js" import { MultiSet } from "../multiset.js" import { Index } from "../indexes.js" -import { negate } from "./negate.js" -import { map } from "./map.js" -import { concat } from "./concat.js" import type { DifferenceStreamReader } from "../graph.js" import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js" @@ -14,66 +61,189 @@ import type { IStreamBuilder, KeyValue, PipedOperator } from "../types.js" export type JoinType = `inner` | `left` | `right` | `full` | `anti` /** - * Operator that joins two input streams + * Operator that joins two input streams using direct join algorithms */ export class JoinOperator extends BinaryOperator< - [K, V1] | [K, V2] | [K, [V1, V2]] + [K, V1] | [K, V2] | [K, [V1, V2]] | [K, [V1 | null, V2 | null]] > { #indexA = new Index() #indexB = new Index() + #mode: JoinType constructor( id: number, inputA: DifferenceStreamReader<[K, V1]>, inputB: DifferenceStreamReader<[K, V2]>, - output: DifferenceStreamWriter<[K, [V1, V2]]> + output: DifferenceStreamWriter, + mode: JoinType = `inner` ) { super(id, inputA, inputB, output) + this.#mode = mode } run(): void { - const deltaA = new Index() - const deltaB = new Index() + // Build deltas from input messages + const deltaA = Index.fromMultiSets( + this.inputAMessages() as Array> + ) + const deltaB = Index.fromMultiSets( + this.inputBMessages() as Array> + ) - // Process input A - process ALL messages, not just the first one - const messagesA = this.inputAMessages() - for (const message of messagesA) { - const multiSetMessage = message as unknown as MultiSet<[K, V1]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { - const [key, value] = item - deltaA.addValue(key, [value, multiplicity]) - } - } + // Early-out if nothing changed + if (deltaA.size === 0 && deltaB.size === 0) return - // Process input B - process ALL messages, not just the first one - const messagesB = this.inputBMessages() - for (const message of messagesB) { - const multiSetMessage = message as unknown as MultiSet<[K, V2]> - for (const [item, multiplicity] of multiSetMessage.getInner()) { - const [key, value] = item - deltaB.addValue(key, [value, multiplicity]) - } + const results = new MultiSet() + + // Emit inner results (all modes except anti) + if (this.#mode !== `anti`) { + this.emitInnerResults(deltaA, deltaB, results) } - // Process results - const results = new MultiSet<[K, [V1, V2]]>() + // Emit left outer/anti results + if ( + this.#mode === `left` || + this.#mode === `full` || + this.#mode === `anti` + ) { + this.emitLeftOuterResults(deltaA, deltaB, results) + } - // Join deltaA with existing indexB - results.extend(deltaA.join(this.#indexB)) + // Emit right outer results + if (this.#mode === `right` || this.#mode === `full`) { + this.emitRightOuterResults(deltaA, deltaB, results) + } - // Append deltaA to indexA + // Update state and send results + // IMPORTANT: All emissions use pre-append snapshots of indexA/indexB. + // Now append ALL deltas to indices - this happens unconditionally for every key, + // regardless of whether presence flipped. Consolidated multiplicity tracking is automatic. this.#indexA.append(deltaA) - - // Join existing indexA with deltaB - results.extend(this.#indexA.join(deltaB)) + this.#indexB.append(deltaB) // Send results if (results.getInner().length > 0) { this.output.sendData(results) } + } - // Append deltaB to indexB - this.#indexB.append(deltaB) + private emitInnerResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit the three standard delta terms: ΔA⋈B_old, A_old⋈ΔB, ΔA⋈ΔB + if (deltaA.size > 0) results.extend(deltaA.join(this.#indexB)) + if (deltaB.size > 0) results.extend(this.#indexA.join(deltaB)) + if (deltaA.size > 0 && deltaB.size > 0) results.extend(deltaA.join(deltaB)) + } + + private emitLeftOuterResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit unmatched left rows from deltaA + if (deltaA.size > 0) { + for (const [key, valueIterator] of deltaA.entriesIterators()) { + const currentMultiplicityB = + this.#indexB.getConsolidatedMultiplicity(key) + const deltaMultiplicityB = deltaB.getConsolidatedMultiplicity(key) + const finalMultiplicityB = currentMultiplicityB + deltaMultiplicityB + + if (finalMultiplicityB === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.add([key, [value, null]], multiplicity) + } + } + } + } + } + + // Handle presence transitions from right side changes + if (deltaB.size > 0) { + for (const key of deltaB.getPresenceKeys()) { + const before = this.#indexB.getConsolidatedMultiplicity(key) + const deltaMult = deltaB.getConsolidatedMultiplicity(key) + if (deltaMult === 0) continue + const after = before + deltaMult + + // Skip transition handling if presence doesn't flip (both zero or both non-zero) + // Note: Index updates happen later regardless - we're only skipping null-extension emissions here + if ((before === 0) === (after === 0)) continue + + // Determine the type of transition: + // - 0 → non-zero: Right becomes non-empty, left rows transition from unmatched to matched + // → RETRACT previously emitted null-extended rows (emit with negative multiplicity) + // - non-zero → 0: Right becomes empty, left rows transition from matched to unmatched + // → EMIT new null-extended rows (emit with positive multiplicity) + const transitioningToMatched = before === 0 + + for (const [value, multiplicity] of this.#indexA.getIterator(key)) { + if (multiplicity !== 0) { + results.add( + [key, [value, null]], + transitioningToMatched ? -multiplicity : +multiplicity + ) + } + } + } + } + } + + private emitRightOuterResults( + deltaA: Index, + deltaB: Index, + results: MultiSet + ): void { + // Emit unmatched right rows from deltaB + if (deltaB.size > 0) { + for (const [key, valueIterator] of deltaB.entriesIterators()) { + const currentMultiplicityA = + this.#indexA.getConsolidatedMultiplicity(key) + const deltaMultiplicityA = deltaA.getConsolidatedMultiplicity(key) + const finalMultiplicityA = currentMultiplicityA + deltaMultiplicityA + + if (finalMultiplicityA === 0) { + for (const [value, multiplicity] of valueIterator) { + if (multiplicity !== 0) { + results.add([key, [null, value]], multiplicity) + } + } + } + } + } + + // Handle presence transitions from left side changes + if (deltaA.size > 0) { + for (const key of deltaA.getPresenceKeys()) { + const before = this.#indexA.getConsolidatedMultiplicity(key) + const deltaMult = deltaA.getConsolidatedMultiplicity(key) + if (deltaMult === 0) continue + const after = before + deltaMult + + // Skip transition handling if presence doesn't flip (both zero or both non-zero) + // Note: Index updates happen later regardless - we're only skipping null-extension emissions here + if ((before === 0) === (after === 0)) continue + + // Determine the type of transition: + // - 0 → non-zero: Left becomes non-empty, right rows transition from unmatched to matched + // → RETRACT previously emitted null-extended rows (emit with negative multiplicity) + // - non-zero → 0: Left becomes empty, right rows transition from matched to unmatched + // → EMIT new null-extended rows (emit with positive multiplicity) + const transitioningToMatched = before === 0 + + for (const [value, multiplicity] of this.#indexB.getIterator(key)) { + if (multiplicity !== 0) { + results.add( + [key, [null, value]], + transitioningToMatched ? -multiplicity : +multiplicity + ) + } + } + } + } } } @@ -91,39 +261,30 @@ export function join< other: IStreamBuilder>, type: JoinType = `inner` ): PipedOperator> { - switch (type) { - case `inner`: - return innerJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `anti`: - return antiJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `left`: - return leftJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `right`: - return rightJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - case `full`: - return fullJoin(other) as unknown as PipedOperator< - T, - KeyValue - > - default: - throw new Error(`Join type ${type} is invalid`) + return ( + stream: IStreamBuilder + ): IStreamBuilder> => { + if (stream.graph !== other.graph) { + throw new Error(`Cannot join streams from different graphs`) + } + const output = new StreamBuilder>( + stream.graph, + new DifferenceStreamWriter>() + ) + const operator = new JoinOperator( + stream.graph.getNextOperatorId(), + stream.connectReader() as DifferenceStreamReader>, + other.connectReader(), + output.writer, + type + ) + stream.graph.addOperator(operator) + return output } } /** - * Joins two input streams + * Joins two input streams (inner join) * @param other - The other stream to join with */ export function innerJoin< @@ -134,27 +295,14 @@ export function innerJoin< >( other: IStreamBuilder> ): PipedOperator> { - return (stream: IStreamBuilder): IStreamBuilder> => { - if (stream.graph !== other.graph) { - throw new Error(`Cannot join streams from different graphs`) - } - const output = new StreamBuilder>( - stream.graph, - new DifferenceStreamWriter>() - ) - const operator = new JoinOperator( - stream.graph.getNextOperatorId(), - stream.connectReader() as DifferenceStreamReader>, - other.connectReader(), - output.writer - ) - stream.graph.addOperator(operator) - return output - } + return join(other, `inner`) as unknown as PipedOperator< + T, + KeyValue + > } /** - * Joins two input streams + * Joins two input streams (anti join) * @param other - The other stream to join with */ export function antiJoin< @@ -165,24 +313,14 @@ export function antiJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const matchedLeft = stream.pipe( - innerJoin(other), - map(([key, [valueLeft, _valueRight]]) => [key, valueLeft]) - ) - const anti = stream.pipe( - concat(matchedLeft.pipe(negate())), - // @ts-ignore TODO: fix this - map(([key, value]) => [key, [value, null]]) - ) - return anti as IStreamBuilder> - } + return join(other, `anti`) as unknown as PipedOperator< + T, + KeyValue + > } /** - * Joins two input streams + * Joins two input streams (left join) * @param other - The other stream to join with */ export function leftJoin< @@ -193,21 +331,14 @@ export function leftJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream - const right = other - const inner = left.pipe(innerJoin(right)) - const anti = left.pipe(antiJoin(right)) - return inner.pipe(concat(anti)) as IStreamBuilder< - KeyValue - > - } + return join(other, `left`) as unknown as PipedOperator< + T, + KeyValue + > } /** - * Joins two input streams + * Joins two input streams (right join) * @param other - The other stream to join with */ export function rightJoin< @@ -218,24 +349,14 @@ export function rightJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream as IStreamBuilder> - const right = other - const inner = left.pipe(innerJoin(right)) - const anti = right.pipe( - antiJoin(left), - map(([key, [a, b]]) => [key, [b, a]]) - ) - return inner.pipe(concat(anti)) as IStreamBuilder< - KeyValue - > - } + return join(other, `right`) as unknown as PipedOperator< + T, + KeyValue + > } /** - * Joins two input streams + * Joins two input streams (full join) * @param other - The other stream to join with */ export function fullJoin< @@ -246,19 +367,8 @@ export function fullJoin< >( other: IStreamBuilder> ): PipedOperator> { - return ( - stream: IStreamBuilder - ): IStreamBuilder> => { - const left = stream as IStreamBuilder> - const right = other - const inner = left.pipe(innerJoin(right)) - const antiLeft = left.pipe(antiJoin(right)) - const antiRight = right.pipe( - antiJoin(left), - map(([key, [a, b]]) => [key, [b, a]]) - ) - return inner.pipe(concat(antiLeft), concat(antiRight)) as IStreamBuilder< - KeyValue - > - } + return join(other, `full`) as unknown as PipedOperator< + T, + KeyValue + > } diff --git a/packages/db/src/query/compiler/joins.ts b/packages/db/src/query/compiler/joins.ts index 1beb93728..418783983 100644 --- a/packages/db/src/query/compiler/joins.ts +++ b/packages/db/src/query/compiler/joins.ts @@ -1,10 +1,4 @@ -import { - consolidate, - filter, - join as joinOperator, - map, - tap, -} from "@tanstack/db-ivm" +import { filter, join as joinOperator, map, tap } from "@tanstack/db-ivm" import { CollectionInputNotFoundError, InvalidJoinCondition, @@ -290,7 +284,6 @@ function processJoin( return mainPipeline.pipe( joinOperator(joinedPipeline, joinClause.type as JoinType), - consolidate(), processJoinResults(joinClause.type) ) }