diff --git a/velox/exec/fuzzer/JoinFuzzer.cpp b/velox/exec/fuzzer/JoinFuzzer.cpp index 3e9882ff653d..6d539c2bfbb5 100644 --- a/velox/exec/fuzzer/JoinFuzzer.cpp +++ b/velox/exec/fuzzer/JoinFuzzer.cpp @@ -82,30 +82,27 @@ class JoinFuzzer { void go(); + struct SplitData { + std::vector input; + std::vector splits; + size_t numKeys; + }; + struct PlanWithSplits { core::PlanNodePtr plan; - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - std::unordered_map> - splits; + std::unordered_map splitsMap; core::ExecutionStrategy executionStrategy{ core::ExecutionStrategy::kUngrouped}; int32_t numGroups; explicit PlanWithSplits( const core::PlanNodePtr& _plan, - const core::PlanNodeId& _probeScanId = "", - const core::PlanNodeId& _buildScanId = "", - const std::unordered_map< - core::PlanNodeId, - std::vector>& _splits = {}, + std::unordered_map _splitsMap = {}, core::ExecutionStrategy _executionStrategy = core::ExecutionStrategy::kUngrouped, int32_t _numGroups = 0) : plan(_plan), - probeScanId(_probeScanId), - buildScanId(_buildScanId), - splits(_splits), + splitsMap(_splitsMap), executionStrategy(_executionStrategy), numGroups(_numGroups) {} }; @@ -113,9 +110,13 @@ class JoinFuzzer { struct JoinData { core::JoinType joinType; bool nullAware; + RowTypePtr probeType; + RowTypePtr buildType; std::vector probeKeys; std::vector buildKeys; std::vector buildInput; + std::vector buildSplits; + RowTypePtr outputType; std::vector outputColumns; std::string filter; }; @@ -161,21 +162,6 @@ class JoinFuzzer { kLocalPartition, }; - // Makes the query plan with default settings in JoinFuzzer and value inputs - // for both probe and build sides. - // - // NOTE: 'probeInput' and 'buildInput' could either input rows with lazy - // vectors or flatten ones. - JoinFuzzer::PlanWithSplits makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); - // Constructs a cascading multi-join plan using hash join nodes to join the // input nodes of a specified type. // c @@ -190,75 +176,34 @@ class JoinFuzzer { const std::vector& joinDataList, const InputNodeType inputNodeType = InputNodeType::kValues); - JoinFuzzer::PlanWithSplits makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); - // Constructs a cascading multi-join plan with merge join nodes. JoinFuzzer::PlanWithSplits makeMergeJoinPlan( const std::vector& probeInput, const std::vector& joinDataList); - // Returns a PlanWithSplits for NestedLoopJoin with inputs from Values nodes. - // If withFilter is true, uses the equality filter between probeKeys and - // buildKeys as the join filter. Uses empty join filter otherwise. - JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter); - // Constructs a cascading multi-join plan with nested loop join nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlan( const std::vector& probeInput, const std::vector& joinDataList); - // Makes the default query plan with table scan as inputs for both probe and - // build sides. - JoinFuzzer::PlanWithSplits makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + // Constructs a cascading multi-join plan using hash join nodes to join table + // scan nodes. + JoinFuzzer::PlanWithSplits makeHashJoinPlanWithTableScan( + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList); JoinFuzzer::PlanWithSplits makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList); // Returns a PlanWithSplits for NestedLoopJoin with inputs from TableScan - // nodes. If withFilter is true, uses the equiality filter between probeKeys - // and buildKeys as the join filter. Uses empty join filter otherwise. + // nodes. JoinFuzzer::PlanWithSplits makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter); + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList); void makeAlternativePlans( const core::PlanNodePtr& defaultPlan, @@ -269,10 +214,9 @@ class JoinFuzzer { // Makes the query plan from 'planWithTableScan' with grouped execution mode. // Correspondingly, it replaces the table scan input splits with grouped ones. JoinFuzzer::PlanWithSplits makeGroupedExecutionPlanWithTableScan( + const std::string& tableDir, const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits); + const int32_t numGroups); // Runs one test iteration from query plans generations, executions and result // verifications. @@ -301,15 +245,9 @@ class JoinFuzzer { void addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter); + std::vector joinDataList, + std::vector& altPlans); // Splits the input into groups by partitioning on the join keys. std::vector> splitInputByGroup( @@ -320,9 +258,9 @@ class JoinFuzzer { // Generates the grouped splits. std::vector generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const size_t numKeys, + const std::string& tableName, const std::vector& input); RowVectorPtr execute(const PlanWithSplits& plan, bool injectSpill); @@ -332,16 +270,12 @@ class JoinFuzzer { const std::vector& probeInput, const std::vector& buildInput); - // Generates and executes plans using NestedLoopJoin without filters. The - // result is compared to DuckDB. Returns the result vector of the cross - // product. + // Generates and executes plans using NestedLoopJoin without filters. + // Returns the result vector of the cross product. RowVectorPtr testCrossProduct( const std::string& tableDir, - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, - const std::vector& buildInput); + JoinData joinData); int32_t randInt(int32_t min, int32_t max) { return boost::random::uniform_int_distribution(min, max)(rng_); @@ -577,13 +511,14 @@ RowVectorPtr JoinFuzzer::execute(const PlanWithSplits& plan, bool injectSpill) { << plan.plan->toString(true, true); AssertQueryBuilder builder(plan.plan); - for (const auto& [planNodeId, nodeSplits] : plan.splits) { - builder.splits(planNodeId, nodeSplits); + std::unordered_set scanNodeIds; + for (const auto& [planNodeId, splitData] : plan.splitsMap) { + builder.splits(planNodeId, splitData.splits); + scanNodeIds.insert(planNodeId); } - if (plan.executionStrategy == core::ExecutionStrategy::kGrouped) { builder.executionStrategy(core::ExecutionStrategy::kGrouped); - builder.groupedExecutionLeafNodeIds({plan.probeScanId, plan.buildScanId}); + builder.groupedExecutionLeafNodeIds(scanNodeIds); builder.numSplitGroups(plan.numGroups); builder.numConcurrentSplitGroups(randInt(1, plan.numGroups)); } @@ -794,31 +729,6 @@ std::vector makeSources( return sourceNodes; } -JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlan( - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { - auto planNodeIdGenerator = std::make_shared(); - auto plan = - PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{plan}; -} - JoinFuzzer::PlanWithSplits JoinFuzzer::makeHashJoinPlan( const std::vector& probeInput, const std::vector& joinDataList, @@ -877,53 +787,66 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeHashJoinPlan( return JoinFuzzer::PlanWithSplits{plan.planNode()}; } -JoinFuzzer::PlanWithSplits JoinFuzzer::makeDefaultPlanWithTableScan( - core::JoinType joinType, - bool nullAware, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { +JoinFuzzer::PlanWithSplits JoinFuzzer::makeHashJoinPlanWithTableScan( + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList) { + VELOX_CHECK_GT(joinDataList.size(), 0); + auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - auto plan = PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .hashJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType, - nullAware) - .planNode(); - return PlanWithSplits{ - plan, - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + core::PlanNodeId nodeScanId; + std::unordered_map splitsMap; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(joinDataList[0].probeType) + .capturePlanNodeId(nodeScanId); + splitsMap.emplace( + nodeScanId, + SplitData{probeInput, probeSplits, joinDataList[0].probeKeys.size()}); + for (const JoinData& joinData : joinDataList) { + plan.hashJoin( + joinData.probeKeys, + joinData.buildKeys, + /*build=*/ + PlanBuilder(planNodeIdGenerator) + .tableScan(joinData.buildType) + .capturePlanNodeId(nodeScanId) + .planNode(), + joinData.filter, + joinData.outputColumns, + joinData.joinType, + joinData.nullAware); + splitsMap.emplace( + nodeScanId, + SplitData{ + joinData.buildInput, + joinData.buildSplits, + joinData.buildKeys.size()}); + } + return PlanWithSplits{plan.planNode(), splitsMap}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeGroupedExecutionPlanWithTableScan( + const std::string& tableDir, const JoinFuzzer::PlanWithSplits& planWithTableScan, - int32_t numGroups, - const std::vector& groupedProbeScanSplits, - const std::vector& groupedBuildScanSplits) { + const int32_t numGroups) { + std::unordered_map splitsMap; + for (const auto& [planNodeId, splitData] : planWithTableScan.splitsMap) { + splitsMap.emplace( + planNodeId, + SplitData{ + splitData.input, + /*splits=*/ + generateSplitsWithGroup( + tableDir, + numGroups, + splitData.numKeys, + /*tableName=*/fmt::format("t_{}", planNodeId), + splitData.input), + splitData.numKeys}); + } return PlanWithSplits{ planWithTableScan.plan, - planWithTableScan.probeScanId, - planWithTableScan.buildScanId, - {{planWithTableScan.probeScanId, groupedProbeScanSplits}, - {planWithTableScan.buildScanId, groupedBuildScanSplits}}, + std::move(splitsMap), core::ExecutionStrategy::kGrouped, numGroups}; } @@ -948,10 +871,8 @@ template void addFlippedJoinPlan( const core::PlanNodePtr& plan, std::vector& plans, - const core::PlanNodeId& probeScanId = "", - const core::PlanNodeId& buildScanId = "", - const std::unordered_map>& - splits = {}, + const std::unordered_map& + splitsMap = {}, core::ExecutionStrategy executionStrategy = core::ExecutionStrategy::kUngrouped, int32_t numGroups = 0) { @@ -959,40 +880,10 @@ void addFlippedJoinPlan( VELOX_CHECK_NOT_NULL(joinNode); if (auto flippedPlan = JoinFuzzer::tryFlipJoinSides(*joinNode)) { plans.push_back(JoinFuzzer::PlanWithSplits{ - flippedPlan, - probeScanId, - buildScanId, - splits, - executionStrategy, - numGroups}); + flippedPlan, splitsMap, executionStrategy, numGroups}); } } -JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { - auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .values(buildInput) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode()}; -} - JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( const std::vector& probeInput, const std::vector& joinDataList) { @@ -1019,26 +910,6 @@ JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlan( return JoinFuzzer::PlanWithSplits{plan.planNode()}; } -JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - const std::string& filter) { - auto planNodeIdGenerator = std::make_shared(); - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .values(probeInput) - .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator).values(buildInput).planNode(), - filter, - outputColumns, - joinType) - .planNode()}; -} - JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlan( const std::vector& probeInput, const std::vector& joinDataList) { @@ -1136,35 +1007,24 @@ void JoinFuzzer::shuffleJoinKeys( RowVectorPtr JoinFuzzer::testCrossProduct( const std::string& tableDir, - core::JoinType joinType, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, - const std::vector& buildInput) { + JoinData joinData) { VELOX_CHECK_GT(probeInput.size(), 0); - VELOX_CHECK_GT(buildInput.size(), 0); - - const auto probeType = asRowType(probeInput[0]->type()); - const auto buildType = asRowType(buildInput[0]->type()); - auto outputColumns = - concat(asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) - ->names(); - - auto plan = makeNestedLoopJoinPlan( - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput, - outputColumns, - /*filter=*/""); + VELOX_CHECK_GT(joinData.buildInput.size(), 0); + + joinData.outputColumns = concat( + asRowType(probeInput[0]->type()), + asRowType(joinData.buildInput[0]->type())) + ->names(); + + auto plan = makeNestedLoopJoinPlan(probeInput, {joinData}); const auto expected = execute(plan, /*injectSpill=*/false); // If OOM injection is not enabled verify the results against Reference // query runner. if (!FLAGS_enable_oom_injection) { - if (auto referenceResult = - computeReferenceResults(plan.plan, probeInput, buildInput)) { + if (auto referenceResult = computeReferenceResults( + plan.plan, probeInput, joinData.buildInput)) { VELOX_CHECK( assertEqualResults( referenceResult.value(), plan.plan->outputType(), {expected}), @@ -1177,22 +1037,14 @@ RowVectorPtr JoinFuzzer::testCrossProduct( std::vector altPlans; if (isTableScanSupported(probeInput[0]->type()) && - isTableScanSupported(buildInput[0]->type())) { + isTableScanSupported(joinData.buildInput[0]->type())) { std::vector probeScanSplits = makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); + std::vector buildScanSplits = makeSplits( + joinData.buildInput, fmt::format("{}/build", tableDir), writerPool_); altPlans.push_back(makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - /*filter=*/"")); + probeScanSplits, probeInput, {joinData})); } addFlippedJoinPlan(plan.plan, altPlans); @@ -1241,6 +1093,8 @@ void JoinFuzzer::verify(core::JoinType joinType) { auto probeInput = generateProbeInput(probeKeys, keyTypes); auto buildInput = generateBuildInput(probeInput, probeKeys, buildKeys); + joinData.probeType = asRowType(probeInput[0]->type()); + joinData.buildType = asRowType(buildInput[0]->type()); joinData.buildInput = buildInput; // Flatten inputs. @@ -1268,32 +1122,24 @@ void JoinFuzzer::verify(core::JoinType joinType) { FLAGS_batch_size * FLAGS_num_batches <= 500) { if (vectorFuzzer_.coinToss(0.1)) { stats_.numCrossProduct++; - - auto result = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - probeInput, - buildInput); + auto result = + testCrossProduct(tableScanDir->getPath(), probeInput, joinData); + JoinData crossProductJoinData = joinData; + crossProductJoinData.buildInput = flatBuildInput; auto flatResult = testCrossProduct( - tableScanDir->getPath(), - joinType, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput); + tableScanDir->getPath(), flatProbeInput, crossProductJoinData); assertEqualResults({result}, {flatResult}); } } - auto outputColumns = + RowTypePtr outputType = (core::isLeftSemiProjectJoin(joinType) || core::isLeftSemiFilterJoin(joinType) || core::isAntiJoin(joinType)) - ? asRowType(probeInput[0]->type())->names() + ? asRowType(probeInput[0]->type()) : concat( - asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())) - ->names(); + asRowType(probeInput[0]->type()), asRowType(buildInput[0]->type())); + joinData.outputType = outputType; + std::vector outputColumns = outputType->names(); // Shuffle output columns. std::shuffle(outputColumns.begin(), outputColumns.end(), rng_); @@ -1331,24 +1177,17 @@ void JoinFuzzer::verify(core::JoinType joinType) { stats_.numVerified++; } } - joinData.buildInput = flatBuildInput; + JoinData joinFlatData = joinData; + joinFlatData.buildInput = flatBuildInput; std::vector altPlans; - altPlans.push_back(makeHashJoinPlan(flatProbeInput, {joinData})); + altPlans.push_back(makeHashJoinPlan(flatProbeInput, {joinFlatData})); makeAlternativePlans(defaultPlan.plan, probeInput, {joinData}, altPlans); - makeAlternativePlans(defaultPlan.plan, flatProbeInput, {joinData}, altPlans); + makeAlternativePlans( + defaultPlan.plan, flatProbeInput, {joinFlatData}, altPlans); addPlansWithTableScan( - tableScanDir->getPath(), - joinType, - nullAware, - probeKeys, - buildKeys, - flatProbeInput, - flatBuildInput, - outputColumns, - altPlans, - filter); + tableScanDir->getPath(), flatProbeInput, {joinFlatData}, altPlans); for (auto i = 0; i < altPlans.size(); ++i) { LOG(INFO) << "Testing plan #" << i; @@ -1357,6 +1196,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); + LOG(INFO) << "Result matches with logically equivalent plan."; } else { VELOX_CHECK( FLAGS_enable_oom_injection, "Got unexpected nullptr for results"); @@ -1378,6 +1218,7 @@ void JoinFuzzer::verify(core::JoinType joinType) { VELOX_CHECK( assertEqualResults({expected}, {actual}), "Logically equivalent plans produced different results"); + LOG(INFO) << "Result matches with logically equivalent plan."; } catch (const VeloxException&) { LOG(ERROR) << "Expected\n" << expected->toString(0, expected->size()) << "\nActual\n" @@ -1393,206 +1234,161 @@ void JoinFuzzer::verify(core::JoinType joinType) { } JoinFuzzer::PlanWithSplits JoinFuzzer::makeMergeJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList) { + VELOX_CHECK_GT(joinDataList.size(), 0); + auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .orderBy(probeKeys, false) - .mergeJoin( - probeKeys, - buildKeys, - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .orderBy(buildKeys, false) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + core::PlanNodeId nodeScanId; + std::unordered_map splitsMap; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(joinDataList[0].probeType) + .capturePlanNodeId(nodeScanId) + .orderBy(joinDataList[0].probeKeys, false); + splitsMap.emplace( + nodeScanId, + SplitData{probeInput, probeSplits, joinDataList[0].probeKeys.size()}); + for (const JoinData& joinData : joinDataList) { + plan.mergeJoin( + joinData.probeKeys, + joinData.buildKeys, + /*build=*/ + PlanBuilder(planNodeIdGenerator) + .tableScan(joinData.buildType) + .capturePlanNodeId(nodeScanId) + .orderBy(joinData.buildKeys, false) + .planNode(), + joinData.filter, + joinData.outputColumns, + joinData.joinType); + splitsMap.emplace( + nodeScanId, + SplitData{ + joinData.buildInput, + joinData.buildSplits, + joinData.buildKeys.size()}); + } + return PlanWithSplits{plan.planNode(), splitsMap}; } JoinFuzzer::PlanWithSplits JoinFuzzer::makeNestedLoopJoinPlanWithTableScan( - core::JoinType joinType, - const RowTypePtr& probeType, - const RowTypePtr& buildType, - const std::vector& probeKeys, - const std::vector& buildKeys, - const std::vector& probeSplits, - const std::vector& buildSplits, - const std::vector& outputColumns, - const std::string& filter) { + const std::vector probeSplits, + const std::vector& probeInput, + const std::vector& joinDataList) { + VELOX_CHECK_GT(joinDataList.size(), 0); + auto planNodeIdGenerator = std::make_shared(); - core::PlanNodeId probeScanId; - core::PlanNodeId buildScanId; - - return JoinFuzzer::PlanWithSplits{ - PlanBuilder(planNodeIdGenerator) - .tableScan(probeType) - .capturePlanNodeId(probeScanId) - .nestedLoopJoin( - PlanBuilder(planNodeIdGenerator) - .tableScan(buildType) - .capturePlanNodeId(buildScanId) - .planNode(), - filter, - outputColumns, - joinType) - .planNode(), - probeScanId, - buildScanId, - {{probeScanId, probeSplits}, {buildScanId, buildSplits}}}; + core::PlanNodeId nodeScanId; + std::unordered_map splitsMap; + PlanBuilder plan = PlanBuilder(planNodeIdGenerator) + .tableScan(joinDataList[0].probeType) + .capturePlanNodeId(nodeScanId); + splitsMap.emplace( + nodeScanId, + SplitData{probeInput, probeSplits, joinDataList[0].probeKeys.size()}); + for (const JoinData& joinData : joinDataList) { + plan.nestedLoopJoin( + /*build=*/ + PlanBuilder(planNodeIdGenerator) + .tableScan(joinData.buildType) + .capturePlanNodeId(nodeScanId) + .planNode(), + /*joinCondition=*/joinData.filter.empty() + ? makeJoinFilter(joinData.probeKeys, joinData.buildKeys) + : fmt::format( + "{} AND {}", + makeJoinFilter(joinData.probeKeys, joinData.buildKeys), + joinData.filter), + joinData.outputColumns, + joinData.joinType); + splitsMap.emplace( + nodeScanId, + SplitData{ + joinData.buildInput, + joinData.buildSplits, + joinData.buildKeys.size()}); + } + return PlanWithSplits{plan.planNode(), splitsMap}; } void JoinFuzzer::addPlansWithTableScan( const std::string& tableDir, - core::JoinType joinType, - bool nullAware, - const std::vector& probeKeys, - const std::vector& buildKeys, const std::vector& probeInput, - const std::vector& buildInput, - const std::vector& outputColumns, - std::vector& altPlans, - const std::string& filter) { + std::vector joinDataList, + std::vector& altPlans) { VELOX_CHECK(!tableDir.empty()); - - if (!isTableScanSupported(probeInput[0]->type()) || - !isTableScanSupported(buildInput[0]->type())) { + VELOX_CHECK_GT(joinDataList.size(), 0); + if (!isTableScanSupported(probeInput[0]->type())) { return; } + for (const auto& joinData : joinDataList) { + if (!isTableScanSupported((joinData.buildInput)[0]->type())) { + return; + } + } - std::vector probeScanSplits = + std::vector probeSplits = makeSplits(probeInput, fmt::format("{}/probe", tableDir), writerPool_); - std::vector buildScanSplits = - makeSplits(buildInput, fmt::format("{}/build", tableDir), writerPool_); - - const auto probeType = asRowType(probeInput[0]->type()); - const auto buildType = asRowType(buildInput[0]->type()); + for (int i = 0; i < joinDataList.size(); i++) { + joinDataList[i].buildSplits = makeSplits( + joinDataList[i].buildInput, + /*path=*/fmt::format("{}/build{}", tableDir, i), + writerPool_); + } std::vector plansWithTableScan; - auto defaultPlan = makeDefaultPlanWithTableScan( - joinType, - nullAware, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + auto defaultPlan = + makeHashJoinPlanWithTableScan(probeSplits, probeInput, joinDataList); plansWithTableScan.push_back(defaultPlan); - auto joinNode = - std::dynamic_pointer_cast(defaultPlan.plan); - VELOX_CHECK_NOT_NULL(joinNode); - // Flip join sides. addFlippedJoinPlan( - defaultPlan.plan, - plansWithTableScan, - defaultPlan.probeScanId, - defaultPlan.buildScanId, - defaultPlan.splits); - - const int32_t numGroups = randInt(1, probeScanSplits.size()); - const std::vector groupedProbeScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/true, - probeKeys.size(), - probeInput); - const std::vector groupedBuildScanSplits = - generateSplitsWithGroup( - tableDir, - numGroups, - /*isProbe=*/false, - buildKeys.size(), - buildInput); + defaultPlan.plan, plansWithTableScan, defaultPlan.splitsMap); + + const int32_t numGroups = randInt(1, probeSplits.size()); for (const auto& planWithTableScan : plansWithTableScan) { altPlans.push_back(planWithTableScan); altPlans.push_back(makeGroupedExecutionPlanWithTableScan( - planWithTableScan, - numGroups, - groupedProbeScanSplits, - groupedBuildScanSplits)); + tableDir, planWithTableScan, numGroups)); + } + bool mergeJoinSupported = true; + bool nestedLoopJoinSupported = true; + for (const JoinData& joinData : joinDataList) { + if (!core::MergeJoinNode::isSupported(joinData.joinType)) { + mergeJoinSupported = false; + } + if (!core::NestedLoopJoinNode::isSupported(joinData.joinType)) { + nestedLoopJoinSupported = false; + } } - // Add ungrouped MergeJoin with TableScan. - if (core::MergeJoinNode::isSupported(joinNode->joinType())) { - auto planWithSplits = makeMergeJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - filter); + if (mergeJoinSupported) { + auto planWithSplits = + makeMergeJoinPlanWithTableScan(probeSplits, probeInput, joinDataList); altPlans.push_back(planWithSplits); addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + planWithSplits.plan, altPlans, planWithSplits.splitsMap); } // Add ungrouped NestedLoopJoin with TableScan. - if (core::NestedLoopJoinNode::isSupported(joinNode->joinType())) { - std::string joinCondition = filter.empty() - ? makeJoinFilter(probeKeys, buildKeys) - : fmt::format( - "{} AND {}", makeJoinFilter(probeKeys, buildKeys), filter); + if (nestedLoopJoinSupported) { auto planWithSplits = makeNestedLoopJoinPlanWithTableScan( - joinType, - probeType, - buildType, - probeKeys, - buildKeys, - probeScanSplits, - buildScanSplits, - outputColumns, - joinCondition); + probeSplits, probeInput, joinDataList); altPlans.push_back(planWithSplits); addFlippedJoinPlan( - planWithSplits.plan, - altPlans, - planWithSplits.probeScanId, - planWithSplits.buildScanId, - {{planWithSplits.probeScanId, probeScanSplits}, - {planWithSplits.buildScanId, buildScanSplits}}); + planWithSplits.plan, altPlans, planWithSplits.splitsMap); } } std::vector JoinFuzzer::generateSplitsWithGroup( const std::string& tableDir, - int32_t numGroups, - bool isProbe, - size_t numKeys, + const int32_t numGroups, + const size_t numKeys, + const std::string& tableName, const std::vector& input) { const std::vector> inputVectorsByGroup = splitInputByGroup(numGroups, numKeys, input); @@ -1600,12 +1396,8 @@ std::vector JoinFuzzer::generateSplitsWithGroup( std::vector splitsWithGroup; for (int32_t groupId = 0; groupId < numGroups; ++groupId) { for (auto i = 0; i < inputVectorsByGroup[groupId].size(); ++i) { - const std::string filePath = fmt::format( - "{}/grouped[{}].{}.{}", - tableDir, - groupId, - isProbe ? "probe" : "build", - i); + const std::string filePath = + fmt::format("{}/grouped[{}].{}.{}", tableDir, groupId, tableName, i); writeToFile(filePath, inputVectorsByGroup[groupId][i], writerPool_.get()); splitsWithGroup.emplace_back(makeConnectorSplit(filePath), groupId); }