Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(aggregate): Add complex type to map_union_sum #12268

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 148 additions & 52 deletions velox/functions/prestosql/aggregates/MapUnionSumAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/Strings.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/functions/lib/CheckedArithmeticImpl.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/vector/FlatVector.h"

Expand All @@ -32,7 +32,7 @@ struct Accumulator {
AlignedStlAllocator<std::pair<const K, S>, 16>>::Type;
ValuesMap sums;

explicit Accumulator(HashStringAllocator* allocator)
explicit Accumulator(const TypePtr& /*type*/, HashStringAllocator* allocator)
: sums{AlignedStlAllocator<std::pair<const K, S>, 16>(allocator)} {}

size_t size() const {
Expand All @@ -41,18 +41,20 @@ struct Accumulator {

void addValues(
const MapVector* mapVector,
const SimpleVector<K>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto keys = mapKeys->template as<SimpleVector<K>>();
auto values = mapValues->template as<SimpleVector<S>>();
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);

for (auto i = 0; i < size; ++i) {
// Ignore null map keys.
if (!mapKeys->isNullAt(offset + i)) {
auto key = mapKeys->valueAt(offset + i);
addValue(key, mapValues, offset + i, mapValues->typeKind());
if (!keys->isNullAt(offset + i)) {
auto key = keys->valueAt(offset + i);
addValue(key, values, offset + i, values->typeKind());
}
}
}
Expand Down Expand Up @@ -94,13 +96,16 @@ struct Accumulator {
}

vector_size_t extractValues(
FlatVector<K>& mapKeys,
FlatVector<S>& mapValues,
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
auto keys = mapKeys->asFlatVector<K>();
auto values = mapValues->asFlatVector<S>();

auto index = offset;
for (const auto& [key, sum] : sums) {
mapKeys.set(index, key);
mapValues.set(index, sum);
keys->set(index, key);
values->set(index, sum);

++index;
}
Expand All @@ -115,26 +120,30 @@ struct StringViewAccumulator {

Strings strings;

explicit StringViewAccumulator(HashStringAllocator* allocator)
: base{allocator} {}
explicit StringViewAccumulator(
const TypePtr& type,
HashStringAllocator* allocator)
: base{type, allocator} {}

size_t size() const {
return base.size();
}

void addValues(
const MapVector* mapVector,
const SimpleVector<StringView>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto keys = mapKeys->template as<SimpleVector<StringView>>();
auto values = mapValues->template as<SimpleVector<S>>();
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);

for (auto i = 0; i < size; ++i) {
// Ignore null map keys.
if (!mapKeys->isNullAt(offset + i)) {
auto key = mapKeys->valueAt(offset + i);
if (!keys->isNullAt(offset + i)) {
auto key = keys->valueAt(offset + i);

if (!key.isInline()) {
auto it = base.sums.find(key);
Expand All @@ -145,19 +154,95 @@ struct StringViewAccumulator {
}
}

base.addValue(key, mapValues, offset + i, mapValues->typeKind());
base.addValue(key, values, offset + i, values->typeKind());
}
}
}

vector_size_t extractValues(
FlatVector<StringView>& mapKeys,
FlatVector<S>& mapValues,
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
return base.extractValues(mapKeys, mapValues, offset);
}
};

/// Maintains a map with keys of type array, map or struct.
template <typename V>
struct ComplexTypeAccumulator {
using ValueMap = folly::F14FastMap<
AddressableNonNullValueList::Entry,
int64_t,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<
std::pair<const AddressableNonNullValueList::Entry, int64_t>,
16>>;

/// A set of pointers to values stored in AddressableNonNullValueList.
ValueMap sums;

/// Stores unique non-null keys.
AddressableNonNullValueList serializedKeys;

ComplexTypeAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: sums{
0,
AddressableNonNullValueList::Hash{},
AddressableNonNullValueList::EqualTo{type},
AlignedStlAllocator<
std::pair<const AddressableNonNullValueList::Entry, int64_t>,
16>(allocator)} {}

void addValues(
const MapVector* mapVector,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);
auto values = mapValues->template as<SimpleVector<V>>();

for (auto i = 0; i < size; ++i) {
if (!mapKeys->isNullAt(offset + i)) {
auto entry =
serializedKeys.append(*mapKeys.get(), offset + i, allocator);

auto it = sums.find(entry);
if (it == sums.end()) {
// New entry.
sums[entry] = values->valueAt(offset + i);
} else {
// Existing entry.
sums[entry] += values->valueAt(offset + i);
}
}
}
}

vector_size_t extractValues(
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
auto values = mapValues->asFlatVector<V>();
auto index = offset;

for (const auto& [position, count] : sums) {
AddressableNonNullValueList::read(position, *mapKeys.get(), index);
values->set(index, count);
++index;
}

return sums.size();
}

size_t size() const {
return sums.size();
}
};

// Defines unique accumulators dependent on type.
template <typename K, typename S>
struct AccumulatorTypeTraits {
using AccumulatorType = Accumulator<K, S>;
Expand All @@ -168,6 +253,12 @@ struct AccumulatorTypeTraits<StringView, S> {
using AccumulatorType = StringViewAccumulator<S>;
};

template <typename V>
struct AccumulatorTypeTraits<ComplexType, V> {
using AccumulatorType = ComplexTypeAccumulator<V>;
};

// Defines common aggregator.
template <typename K, typename S>
class MapUnionSumAggregate : public exec::Aggregate {
public:
Expand All @@ -190,12 +281,18 @@ class MapUnionSumAggregate : public exec::Aggregate {
VELOX_CHECK(mapVector);
mapVector->resize(numGroups);

auto mapKeys = mapVector->mapKeys()->as<FlatVector<K>>();
auto mapValues = mapVector->mapValues()->as<FlatVector<S>>();
auto mapKeysPtr = mapVector->mapKeys();
auto mapValuesPtr = mapVector->mapValues();

auto numElements = countElements(groups, numGroups);
mapKeys->resize(numElements);
mapValues->resize(numElements);
mapVector->mapValues()->as<FlatVector<S>>()->resize(numElements);

// ComplexType cannot be resized the same.
if constexpr (!std::is_same_v<K, ComplexType>) {
mapVector->mapKeys()->as<FlatVector<K>>()->resize(numElements);
} else {
mapVector->mapKeys()->resize(numElements);
}

auto rawNulls = mapVector->mutableRawNulls();
vector_size_t offset = 0;
Expand All @@ -208,7 +305,7 @@ class MapUnionSumAggregate : public exec::Aggregate {
clearNull(rawNulls, i);

auto mapSize = value<AccumulatorType>(group)->extractValues(
*mapKeys, *mapValues, offset);
mapKeysPtr, mapValuesPtr, offset);
mapVector->setOffsetAndSize(i, offset, mapSize);
offset += mapSize;
}
Expand All @@ -227,8 +324,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
bool /*mayPushdown*/) override {
decodedMaps_.decode(*args[0], rows);
auto mapVector = decodedMaps_.base()->template as<MapVector>();
auto mapKeys = mapVector->mapKeys()->template as<SimpleVector<K>>();
auto mapValues = mapVector->mapValues()->template as<SimpleVector<S>>();
auto mapKeys = mapVector->mapKeys();
auto mapValues = mapVector->mapValues();

rows.applyToSelected([&](auto row) {
if (!decodedMaps_.isNullAt(row)) {
Expand All @@ -249,8 +346,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
bool /* mayPushdown */) override {
decodedMaps_.decode(*args[0], rows);
auto mapVector = decodedMaps_.base()->template as<MapVector>();
auto mapKeys = mapVector->mapKeys()->template as<SimpleVector<K>>();
auto mapValues = mapVector->mapValues()->template as<SimpleVector<S>>();
auto mapKeys = mapVector->mapKeys();
auto mapValues = mapVector->mapValues();

auto groupMap = value<AccumulatorType>(group);

Expand Down Expand Up @@ -285,7 +382,7 @@ class MapUnionSumAggregate : public exec::Aggregate {
folly::Range<const vector_size_t*> indices) override {
setAllNulls(groups, indices);
for (auto index : indices) {
new (groups[index] + offset_) AccumulatorType{allocator_};
new (groups[index] + offset_) AccumulatorType{resultType_, allocator_};
}
}

Expand All @@ -304,8 +401,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
void addMap(
AccumulatorType& groupMap,
const MapVector* mapVector,
const SimpleVector<K>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row) const {
auto decodedRow = decodedMaps_.index(row);
groupMap.addValues(mapVector, mapKeys, mapValues, decodedRow, allocator_);
Expand Down Expand Up @@ -340,7 +437,8 @@ std::unique_ptr<exec::Aggregate> createMapUnionSumAggregate(
case TypeKind::DOUBLE:
return std::make_unique<MapUnionSumAggregate<K, double>>(resultType);
default:
VELOX_UNREACHABLE();
VELOX_UNREACHABLE(
"Unexpected value type {}", mapTypeKindToName(valueKind));
}
}

Expand All @@ -350,15 +448,6 @@ void registerMapUnionSumAggregate(
const std::string& prefix,
bool withCompanionFunctions,
bool overwrite) {
const std::vector<std::string> keyTypes = {
"tinyint",
"smallint",
"integer",
"bigint",
"real",
"double",
"varchar",
"json"};
const std::vector<std::string> valueTypes = {
"tinyint",
"smallint",
Expand All @@ -369,15 +458,14 @@ void registerMapUnionSumAggregate(
};

std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
for (auto keyType : keyTypes) {
for (auto valueType : valueTypes) {
auto mapType = fmt::format("map({},{})", keyType, valueType);
signatures.push_back(exec::AggregateFunctionSignatureBuilder()
.returnType(mapType)
.intermediateType(mapType)
.argumentType(mapType)
.build());
}
for (auto valueType : valueTypes) {
signatures.push_back(
exec::AggregateFunctionSignatureBuilder()
.comparableTypeVariable("K")
.returnType(fmt::format("map(K,{})", valueType))
.intermediateType(fmt::format("map(K,{})", valueType))
.argumentType(fmt::format("map(K,{})", valueType))
.build());
}

auto name = prefix + kMapUnionSum;
Expand All @@ -395,6 +483,8 @@ void registerMapUnionSumAggregate(
auto& mapType = argTypes[0]->asMap();
auto keyTypeKind = mapType.keyType()->kind();
auto valueTypeKind = mapType.valueType()->kind();
const auto keyType = resultType->childAt(0);

switch (keyTypeKind) {
case TypeKind::TINYINT:
return createMapUnionSumAggregate<int8_t>(
Expand All @@ -416,8 +506,14 @@ void registerMapUnionSumAggregate(
case TypeKind::VARCHAR:
return createMapUnionSumAggregate<StringView>(
valueTypeKind, resultType);
case TypeKind::ARRAY:
case TypeKind::MAP:
case TypeKind::ROW:
return createMapUnionSumAggregate<ComplexType>(
valueTypeKind, resultType);
default:
VELOX_UNREACHABLE();
VELOX_UNREACHABLE(
"Unexpected key type {}", mapTypeKindToName(keyTypeKind));
}
},
withCompanionFunctions,
Expand Down
Loading
Loading