Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 19 additions & 3 deletions SOURCES/include/datasketches/theta/theta_common.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ class ThetaSketchScalarFunctionFactory : public ScalarFunctionFactory {
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes) {
uint8_t logK = readLogK(srvfloaterface);
outputTypes.addVarbinary(quickSelectSketchMinSize(logK));
outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK));
}

virtual void getParameterType(ServerInterface &srvInterface,
Expand All @@ -49,14 +49,14 @@ class ThetaSketchAggregateFunctionFactory : public AggregateFunctionFactory {
const SizedColumnTypes &inputTypes,
SizedColumnTypes &intermediateTypeMetaData) {
uint8_t logK = readLogK(srvInterface);
intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK));
intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK));
}

virtual void getReturnType(ServerInterface &srvfloaterface,
const SizedColumnTypes &inputTypes,
SizedColumnTypes &outputTypes) {
uint8_t logK = readLogK(srvfloaterface);
outputTypes.addVarbinary(quickSelectSketchMinSize(logK));
outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK));
}

virtual void getParameterType(ServerInterface &srvInterface,
Expand All @@ -83,14 +83,29 @@ class ThetaSketchAggregateFunction : public AggregateFunction {
uint8_t logK;
uint64_t seed;

long countCombine{0};
long countInitAggregate{0};
long countAggregate{0};
long countTerminate{0};

public:
virtual void setup(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) {
this->logK = readLogK(srvInterface);
this->seed = readSeed(srvInterface);
}
virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) {
// Log exported parquet file details to v_monitor.udx_events
std::map<std::string, std::string> details;
details["combine"] = std::to_string(this->countCombine);
details["initAggregate"] = std::to_string(this->countInitAggregate);
details["aggregate"] = std::to_string(this->countAggregate);
details["terminate"] = std::to_string(this->countTerminate);
srvInterface.logEvent(details);
}

virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs) {
try {
this->countInitAggregate++;
auto u = theta_union_custom::builder()
.set_lg_k(logK)
.set_seed(seed)
Expand All @@ -107,6 +122,7 @@ class ThetaSketchAggregateFunction : public AggregateFunction {
BlockWriter &resWriter,
IntermediateAggs &aggs) override {
try {
this->countTerminate++;
const VString &concat = aggs.getStringRef(0);
VString &result = resWriter.getStringRef();
result.copy(&concat);
Expand Down
7 changes: 4 additions & 3 deletions SOURCES/include/datasketches/theta/theta_const.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@

#define DATASKETCHES_LOG_NOMINAL_VALUE_PARAMETER_NAME "logK"
#define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 12
#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 5
// Vertica supports maximum 65000 bytes in a binary field, hence the limit.
#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 12
#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4
// Vertica supports maximum 32000000 bytes in a long binary field, hence the limit.
// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html
#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 20
#define DATASKETCHES_SEED_PARAMETER_NAME "seed"
#define DATASKETCHES_SEED_DEFAULT 9001

Expand Down
16 changes: 8 additions & 8 deletions SOURCES/install.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,19 @@ CREATE OR REPLACE LIBRARY DataSketches AS '/home/dbadmin/build/libvertica-datask
CREATE OR REPLACE FUNCTION theta_sketch_get_estimate AS
LANGUAGE 'C++'
NAME 'ThetaSketchGetEstimateFactory' LIBRARY DataSketches;
GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(VARBINARY) TO PUBLIC;
GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(LONG VARBINARY) TO PUBLIC;

-- SELECT theta_sketch_union(theta_sketch1, theta_sketch2, ...) FROM ...
CREATE OR REPLACE FUNCTION theta_sketch_union AS
LANGUAGE 'C++'
NAME 'ThetaSketchScalarUnionFactory' LIBRARY DataSketches;
GRANT EXECUTE ON FUNCTION theta_sketch_union(VARBINARY) TO PUBLIC;
GRANT EXECUTE ON FUNCTION theta_sketch_union(LONG VARBINARY) TO PUBLIC;

-- SELECT key, theta_sketch_union_agg(theta_sketch) FROM ... GROUP BY key
CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_union_agg AS
LANGUAGE 'C++'
NAME 'ThetaSketchAggregateUnionFactory' LIBRARY DataSketches;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(VARBINARY) TO PUBLIC;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(LONG VARBINARY) TO PUBLIC;

-- SELECT key, theta_sketch_create(binary) FROM ... GROUP BY key
CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS
Expand All @@ -28,23 +28,23 @@ GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARCHAR) TO PUBLIC;
-- SELECT key, theta_sketch_create(chars) FROM ... GROUP BY key
CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS
LANGUAGE 'C++'
NAME 'ThetaSketchAggregateCreateVarbinaryFactory' LIBRARY DataSketches;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARBINARY) TO PUBLIC;
NAME 'ThetaSketchAggregateCreateLongVarbinaryFactory' LIBRARY DataSketches;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(LONG VARBINARY) TO PUBLIC;

-- SELECT theta_sketch_intersection(theta_sketch1, theta_sketch2, ...) FROM ...
CREATE OR REPLACE FUNCTION theta_sketch_intersection AS
LANGUAGE 'C++'
NAME 'ThetaSketchScalarIntersectionFactory' LIBRARY DataSketches;
GRANT EXECUTE ON FUNCTION theta_sketch_intersection(VARBINARY) TO PUBLIC;
GRANT EXECUTE ON FUNCTION theta_sketch_intersection(LONG VARBINARY) TO PUBLIC;

-- SELECT key, theta_sketch_intersection_agg(theta_sketch) FROM ... GROUP BY key
CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_intersection_agg AS
LANGUAGE 'C++'
NAME 'ThetaSketchAggregateIntersectionFactory' LIBRARY DataSketches;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(VARBINARY) TO PUBLIC;
GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(LONG VARBINARY) TO PUBLIC;

-- SELECT theta_sketch_a_not_b(theta_sketch_a, theta_sketch_b) FROM ...
CREATE OR REPLACE FUNCTION theta_sketch_a_not_b AS
LANGUAGE 'C++'
NAME 'ThetaSketchANotBFactory' LIBRARY DataSketches;
GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(VARBINARY, VARBINARY) TO PUBLIC;
GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(LONG VARBINARY, LONG VARBINARY) TO PUBLIC;
6 changes: 3 additions & 3 deletions SOURCES/src/datasketches/theta/ANotB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,9 @@ class ThetaSketchANotBFactory : public ThetaSketchScalarFunctionFactory {
virtual void getPrototype(ServerInterface &interface,
ColumnTypes &argTypes,
ColumnTypes &returnType) {
argTypes.addVarbinary();
argTypes.addVarbinary();
returnType.addVarbinary();
argTypes.addLongVarbinary();
argTypes.addLongVarbinary();
returnType.addLongVarbinary();
}
};

Expand Down
12 changes: 6 additions & 6 deletions SOURCES/src/datasketches/theta/AggregateCreate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ class ThetaSketchAggregateCreate : public ThetaSketchAggregateFunction {
class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunctionFactory {
virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) {
argTypes.addVarchar();
returnType.addVarbinary();
returnType.addLongVarbinary();
}

virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) {
Expand All @@ -83,10 +83,10 @@ class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunc
};


class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFunctionFactory {
class ThetaSketchAggregateCreateLongVarbinaryFactory : public ThetaSketchAggregateFunctionFactory {
virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) {
argTypes.addVarbinary();
returnType.addVarbinary();
argTypes.addLongVarbinary();
returnType.addLongVarbinary();
}

virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) {
Expand All @@ -95,7 +95,7 @@ class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFu
};

RegisterFactory(ThetaSketchAggregateCreateVarcharFactory);
RegisterFactory(ThetaSketchAggregateCreateVarbinaryFactory);
RegisterFactory(ThetaSketchAggregateCreateLongVarbinaryFactory);

RegisterLibrary(
"Criteo",// author
Expand All @@ -106,4 +106,4 @@ RegisterLibrary(
"Wrapper around incubator-datasketches-cpp to make it usable in Vertica", // description
"", // licenses required
"" // signature
);
);
6 changes: 3 additions & 3 deletions SOURCES/src/datasketches/theta/AggregateIntersection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,13 +105,13 @@ class ThetaSketchAggregateIntersectionFactory : public ThetaSketchAggregateFunct
const SizedColumnTypes &inputTypes,
SizedColumnTypes &intermediateTypeMetaData) {
uint8_t logK = readLogK(srvInterface);
intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK));
intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK));
intermediateTypeMetaData.addBool();
}

virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) {
argTypes.addVarbinary();
returnType.addVarbinary();
argTypes.addLongVarbinary();
returnType.addLongVarbinary();
}

virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) {
Expand Down
7 changes: 4 additions & 3 deletions SOURCES/src/datasketches/theta/AggregateUnion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction {
BlockReader &argReader,
IntermediateAggs &aggs) {
try {
this->countAggregate++;
auto u = theta_union_custom::builder()
.set_lg_k(logK)
.set_seed(seed)
Expand Down Expand Up @@ -45,6 +46,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction {
IntermediateAggs &aggs,
MultipleIntermediateAggs &aggsOther) override {
try {
this->countCombine++;
auto u = theta_union_custom::builder()
.set_lg_k(logK)
.set_seed(seed)
Expand Down Expand Up @@ -77,8 +79,8 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction {

class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFactory {
virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) {
argTypes.addVarbinary();
returnType.addVarbinary();
argTypes.addLongVarbinary();
returnType.addLongVarbinary();
}

virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) {
Expand All @@ -87,4 +89,3 @@ class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFact
};

RegisterFactory(ThetaSketchAggregateUnionFactory);

2 changes: 1 addition & 1 deletion SOURCES/src/datasketches/theta/GetEstimate.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class ThetaSketchGetEstimateFactory : public ScalarFunctionFactory {
virtual void getPrototype(ServerInterface &interface,
ColumnTypes &argTypes,
ColumnTypes &returnType) {
argTypes.addVarbinary();
argTypes.addLongVarbinary();
returnType.addFloat();
}

Expand Down
2 changes: 1 addition & 1 deletion SOURCES/src/datasketches/theta/ScalarIntersection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class ThetaSketchScalarIntersectionFactory : public ThetaSketchScalarFunctionFac
ColumnTypes &argTypes,
ColumnTypes &returnType) {
argTypes.addAny();
returnType.addVarbinary();
returnType.addLongVarbinary();
}
};

Expand Down
2 changes: 1 addition & 1 deletion SOURCES/src/datasketches/theta/ScalarUnion.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class ThetaSketchScalarUnionFactory : public ThetaSketchScalarFunctionFactory {
ColumnTypes &argTypes,
ColumnTypes &returnType) {
argTypes.addAny();
returnType.addVarbinary();
returnType.addLongVarbinary();
}
};

Expand Down