Skip to content

Commit

Permalink
feat(aggregate): Add complex type to map_union_sum (#12268)
Browse files Browse the repository at this point in the history
Summary:

Adds complex type to Presto function map_union_sum. Addition required some additional surgery in order to make primitives/strings accumulator forward compatible with ComplexType accumulator, namely functions extract/addValues.

Differential Revision: D69204449
  • Loading branch information
peterenescu authored and facebook-github-bot committed Feb 11, 2025
1 parent b71648f commit cfb526c
Show file tree
Hide file tree
Showing 2 changed files with 198 additions and 52 deletions.
200 changes: 148 additions & 52 deletions velox/functions/prestosql/aggregates/MapUnionSumAggregate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "velox/exec/AddressableNonNullValueList.h"
#include "velox/exec/Aggregate.h"
#include "velox/exec/Strings.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/functions/lib/CheckedArithmeticImpl.h"
#include "velox/functions/prestosql/aggregates/AggregateNames.h"
#include "velox/vector/FlatVector.h"

Expand All @@ -32,7 +32,7 @@ struct Accumulator {
AlignedStlAllocator<std::pair<const K, S>, 16>>::Type;
ValuesMap sums;

explicit Accumulator(HashStringAllocator* allocator)
explicit Accumulator(const TypePtr& /*type*/, HashStringAllocator* allocator)
: sums{AlignedStlAllocator<std::pair<const K, S>, 16>(allocator)} {}

size_t size() const {
Expand All @@ -41,18 +41,20 @@ struct Accumulator {

void addValues(
const MapVector* mapVector,
const SimpleVector<K>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto keys = mapKeys->template as<SimpleVector<K>>();
auto values = mapValues->template as<SimpleVector<S>>();
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);

for (auto i = 0; i < size; ++i) {
// Ignore null map keys.
if (!mapKeys->isNullAt(offset + i)) {
auto key = mapKeys->valueAt(offset + i);
addValue(key, mapValues, offset + i, mapValues->typeKind());
if (!keys->isNullAt(offset + i)) {
auto key = keys->valueAt(offset + i);
addValue(key, values, offset + i, values->typeKind());
}
}
}
Expand Down Expand Up @@ -94,13 +96,16 @@ struct Accumulator {
}

vector_size_t extractValues(
FlatVector<K>& mapKeys,
FlatVector<S>& mapValues,
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
auto keys = mapKeys->asFlatVector<K>();
auto values = mapValues->asFlatVector<S>();

auto index = offset;
for (const auto& [key, sum] : sums) {
mapKeys.set(index, key);
mapValues.set(index, sum);
keys->set(index, key);
values->set(index, sum);

++index;
}
Expand All @@ -115,26 +120,30 @@ struct StringViewAccumulator {

Strings strings;

explicit StringViewAccumulator(HashStringAllocator* allocator)
: base{allocator} {}
explicit StringViewAccumulator(
const TypePtr& type,
HashStringAllocator* allocator)
: base{type, allocator} {}

size_t size() const {
return base.size();
}

void addValues(
const MapVector* mapVector,
const SimpleVector<StringView>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto keys = mapKeys->template as<SimpleVector<StringView>>();
auto values = mapValues->template as<SimpleVector<S>>();
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);

for (auto i = 0; i < size; ++i) {
// Ignore null map keys.
if (!mapKeys->isNullAt(offset + i)) {
auto key = mapKeys->valueAt(offset + i);
if (!keys->isNullAt(offset + i)) {
auto key = keys->valueAt(offset + i);

if (!key.isInline()) {
auto it = base.sums.find(key);
Expand All @@ -145,19 +154,95 @@ struct StringViewAccumulator {
}
}

base.addValue(key, mapValues, offset + i, mapValues->typeKind());
base.addValue(key, values, offset + i, values->typeKind());
}
}
}

vector_size_t extractValues(
FlatVector<StringView>& mapKeys,
FlatVector<S>& mapValues,
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
return base.extractValues(mapKeys, mapValues, offset);
}
};

/// Maintains a map with keys of type array, map or struct.
template <typename V>
struct ComplexTypeAccumulator {
using ValueMap = folly::F14FastMap<
AddressableNonNullValueList::Entry,
int64_t,
AddressableNonNullValueList::Hash,
AddressableNonNullValueList::EqualTo,
AlignedStlAllocator<
std::pair<const AddressableNonNullValueList::Entry, int64_t>,
16>>;

/// A set of pointers to values stored in AddressableNonNullValueList.
ValueMap sums;

/// Stores unique non-null keys.
AddressableNonNullValueList serializedKeys;

ComplexTypeAccumulator(const TypePtr& type, HashStringAllocator* allocator)
: sums{
0,
AddressableNonNullValueList::Hash{},
AddressableNonNullValueList::EqualTo{type},
AlignedStlAllocator<
std::pair<const AddressableNonNullValueList::Entry, int64_t>,
16>(allocator)} {}

void addValues(
const MapVector* mapVector,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row,
HashStringAllocator* allocator) {
auto offset = mapVector->offsetAt(row);
auto size = mapVector->sizeAt(row);
auto values = mapValues->template as<SimpleVector<V>>();

for (auto i = 0; i < size; ++i) {
if (!mapKeys->isNullAt(offset + i)) {
auto entry =
serializedKeys.append(*mapKeys.get(), offset + i, allocator);

auto it = sums.find(entry);
if (it == sums.end()) {
// New entry.
sums[entry] = values->valueAt(offset + i);
} else {
// Existing entry.
sums[entry] += values->valueAt(offset + i);
}
}
}
}

vector_size_t extractValues(
VectorPtr& mapKeys,
VectorPtr& mapValues,
vector_size_t offset) {
auto values = mapValues->asFlatVector<V>();
auto index = offset;

for (const auto& [position, count] : sums) {
AddressableNonNullValueList::read(position, *mapKeys.get(), index);
values->set(index, count);
++index;
}

return sums.size();
}

size_t size() const {
return sums.size();
}
};

// Defines unique accumulators dependent on type.
template <typename K, typename S>
struct AccumulatorTypeTraits {
using AccumulatorType = Accumulator<K, S>;
Expand All @@ -168,6 +253,12 @@ struct AccumulatorTypeTraits<StringView, S> {
using AccumulatorType = StringViewAccumulator<S>;
};

template <typename V>
struct AccumulatorTypeTraits<ComplexType, V> {
using AccumulatorType = ComplexTypeAccumulator<V>;
};

// Defines common aggregator.
template <typename K, typename S>
class MapUnionSumAggregate : public exec::Aggregate {
public:
Expand All @@ -190,12 +281,18 @@ class MapUnionSumAggregate : public exec::Aggregate {
VELOX_CHECK(mapVector);
mapVector->resize(numGroups);

auto mapKeys = mapVector->mapKeys()->as<FlatVector<K>>();
auto mapValues = mapVector->mapValues()->as<FlatVector<S>>();
auto mapKeysPtr = mapVector->mapKeys();
auto mapValuesPtr = mapVector->mapValues();

auto numElements = countElements(groups, numGroups);
mapKeys->resize(numElements);
mapValues->resize(numElements);
mapVector->mapValues()->as<FlatVector<S>>()->resize(numElements);

// ComplexType cannot be resized the same.
if constexpr (!std::is_same_v<K, ComplexType>) {
mapVector->mapKeys()->as<FlatVector<K>>()->resize(numElements);
} else {
mapVector->mapKeys()->resize(numElements);
}

auto rawNulls = mapVector->mutableRawNulls();
vector_size_t offset = 0;
Expand All @@ -208,7 +305,7 @@ class MapUnionSumAggregate : public exec::Aggregate {
clearNull(rawNulls, i);

auto mapSize = value<AccumulatorType>(group)->extractValues(
*mapKeys, *mapValues, offset);
mapKeysPtr, mapValuesPtr, offset);
mapVector->setOffsetAndSize(i, offset, mapSize);
offset += mapSize;
}
Expand All @@ -227,8 +324,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
bool /*mayPushdown*/) override {
decodedMaps_.decode(*args[0], rows);
auto mapVector = decodedMaps_.base()->template as<MapVector>();
auto mapKeys = mapVector->mapKeys()->template as<SimpleVector<K>>();
auto mapValues = mapVector->mapValues()->template as<SimpleVector<S>>();
auto mapKeys = mapVector->mapKeys();
auto mapValues = mapVector->mapValues();

rows.applyToSelected([&](auto row) {
if (!decodedMaps_.isNullAt(row)) {
Expand All @@ -249,8 +346,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
bool /* mayPushdown */) override {
decodedMaps_.decode(*args[0], rows);
auto mapVector = decodedMaps_.base()->template as<MapVector>();
auto mapKeys = mapVector->mapKeys()->template as<SimpleVector<K>>();
auto mapValues = mapVector->mapValues()->template as<SimpleVector<S>>();
auto mapKeys = mapVector->mapKeys();
auto mapValues = mapVector->mapValues();

auto groupMap = value<AccumulatorType>(group);

Expand Down Expand Up @@ -285,7 +382,7 @@ class MapUnionSumAggregate : public exec::Aggregate {
folly::Range<const vector_size_t*> indices) override {
setAllNulls(groups, indices);
for (auto index : indices) {
new (groups[index] + offset_) AccumulatorType{allocator_};
new (groups[index] + offset_) AccumulatorType{resultType_, allocator_};
}
}

Expand All @@ -304,8 +401,8 @@ class MapUnionSumAggregate : public exec::Aggregate {
void addMap(
AccumulatorType& groupMap,
const MapVector* mapVector,
const SimpleVector<K>* mapKeys,
const SimpleVector<S>* mapValues,
const VectorPtr& mapKeys,
const VectorPtr& mapValues,
vector_size_t row) const {
auto decodedRow = decodedMaps_.index(row);
groupMap.addValues(mapVector, mapKeys, mapValues, decodedRow, allocator_);
Expand Down Expand Up @@ -340,7 +437,8 @@ std::unique_ptr<exec::Aggregate> createMapUnionSumAggregate(
case TypeKind::DOUBLE:
return std::make_unique<MapUnionSumAggregate<K, double>>(resultType);
default:
VELOX_UNREACHABLE();
VELOX_UNREACHABLE(
"Unexpected value type {}", mapTypeKindToName(valueKind));
}
}

Expand All @@ -350,15 +448,6 @@ void registerMapUnionSumAggregate(
const std::string& prefix,
bool withCompanionFunctions,
bool overwrite) {
const std::vector<std::string> keyTypes = {
"tinyint",
"smallint",
"integer",
"bigint",
"real",
"double",
"varchar",
"json"};
const std::vector<std::string> valueTypes = {
"tinyint",
"smallint",
Expand All @@ -369,15 +458,14 @@ void registerMapUnionSumAggregate(
};

std::vector<std::shared_ptr<exec::AggregateFunctionSignature>> signatures;
for (auto keyType : keyTypes) {
for (auto valueType : valueTypes) {
auto mapType = fmt::format("map({},{})", keyType, valueType);
signatures.push_back(exec::AggregateFunctionSignatureBuilder()
.returnType(mapType)
.intermediateType(mapType)
.argumentType(mapType)
.build());
}
for (auto valueType : valueTypes) {
signatures.push_back(
exec::AggregateFunctionSignatureBuilder()
.typeVariable("K")
.returnType(fmt::format("map(K,{})", valueType))
.intermediateType(fmt::format("map(K,{})", valueType))
.argumentType(fmt::format("map(K,{})", valueType))
.build());
}

auto name = prefix + kMapUnionSum;
Expand All @@ -395,6 +483,8 @@ void registerMapUnionSumAggregate(
auto& mapType = argTypes[0]->asMap();
auto keyTypeKind = mapType.keyType()->kind();
auto valueTypeKind = mapType.valueType()->kind();
const auto keyType = resultType->childAt(0);

switch (keyTypeKind) {
case TypeKind::TINYINT:
return createMapUnionSumAggregate<int8_t>(
Expand All @@ -416,8 +506,14 @@ void registerMapUnionSumAggregate(
case TypeKind::VARCHAR:
return createMapUnionSumAggregate<StringView>(
valueTypeKind, resultType);
case TypeKind::ARRAY:
case TypeKind::MAP:
case TypeKind::ROW:
return createMapUnionSumAggregate<ComplexType>(
valueTypeKind, resultType);
default:
VELOX_UNREACHABLE();
VELOX_UNREACHABLE(
"Unexpected key type {}", mapTypeKindToName(keyTypeKind));
}
},
withCompanionFunctions,
Expand Down
Loading

0 comments on commit cfb526c

Please sign in to comment.