diff --git a/SOURCES/include/datasketches/theta/theta_common.hpp b/SOURCES/include/datasketches/theta/theta_common.hpp index fea571a..fb396bf 100644 --- a/SOURCES/include/datasketches/theta/theta_common.hpp +++ b/SOURCES/include/datasketches/theta/theta_common.hpp @@ -23,7 +23,7 @@ class ThetaSketchScalarFunctionFactory : public ScalarFunctionFactory { const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes) { uint8_t logK = readLogK(srvfloaterface); - outputTypes.addVarbinary(quickSelectSketchMinSize(logK)); + outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getParameterType(ServerInterface &srvInterface, @@ -49,14 +49,14 @@ class ThetaSketchAggregateFunctionFactory : public AggregateFunctionFactory { const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData) { uint8_t logK = readLogK(srvInterface); - intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK)); + intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getReturnType(ServerInterface &srvfloaterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes) { uint8_t logK = readLogK(srvfloaterface); - outputTypes.addVarbinary(quickSelectSketchMinSize(logK)); + outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getParameterType(ServerInterface &srvInterface, @@ -83,14 +83,29 @@ class ThetaSketchAggregateFunction : public AggregateFunction { uint8_t logK; uint64_t seed; + long countCombine{0}; + long countInitAggregate{0}; + long countAggregate{0}; + long countTerminate{0}; + public: virtual void setup(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { this->logK = readLogK(srvInterface); this->seed = readSeed(srvInterface); } + virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { + // Log exported parquet file details to v_monitor.udx_events + std::map details; + details["combine"] = std::to_string(this->countCombine); + details["initAggregate"] = std::to_string(this->countInitAggregate); + details["aggregate"] = std::to_string(this->countAggregate); + details["terminate"] = std::to_string(this->countTerminate); + srvInterface.logEvent(details); + } virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs) { try { + this->countInitAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -107,6 +122,7 @@ class ThetaSketchAggregateFunction : public AggregateFunction { BlockWriter &resWriter, IntermediateAggs &aggs) override { try { + this->countTerminate++; const VString &concat = aggs.getStringRef(0); VString &result = resWriter.getStringRef(); result.copy(&concat); diff --git a/SOURCES/include/datasketches/theta/theta_const.hpp b/SOURCES/include/datasketches/theta/theta_const.hpp index 761a9f4..a8cdfe2 100644 --- a/SOURCES/include/datasketches/theta/theta_const.hpp +++ b/SOURCES/include/datasketches/theta/theta_const.hpp @@ -3,9 +3,10 @@ #define DATASKETCHES_LOG_NOMINAL_VALUE_PARAMETER_NAME "logK" #define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 12 -#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 5 -// Vertica supports maximum 65000 bytes in a binary field, hence the limit. -#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 12 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4 +// Vertica supports maximum 32000000 bytes in a long binary field, hence the limit. +// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html +#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 20 #define DATASKETCHES_SEED_PARAMETER_NAME "seed" #define DATASKETCHES_SEED_DEFAULT 9001 diff --git a/SOURCES/install.sql b/SOURCES/install.sql index 9c2e569..43720fe 100644 --- a/SOURCES/install.sql +++ b/SOURCES/install.sql @@ -5,19 +5,19 @@ CREATE OR REPLACE LIBRARY DataSketches AS '/home/dbadmin/build/libvertica-datask CREATE OR REPLACE FUNCTION theta_sketch_get_estimate AS LANGUAGE 'C++' NAME 'ThetaSketchGetEstimateFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_union(theta_sketch1, theta_sketch2, ...) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_union AS LANGUAGE 'C++' NAME 'ThetaSketchScalarUnionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_union(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_union(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_union_agg(theta_sketch) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_union_agg AS LANGUAGE 'C++' NAME 'ThetaSketchAggregateUnionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_create(binary) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS @@ -28,23 +28,23 @@ GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARCHAR) TO PUBLIC; -- SELECT key, theta_sketch_create(chars) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS LANGUAGE 'C++' - NAME 'ThetaSketchAggregateCreateVarbinaryFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARBINARY) TO PUBLIC; + NAME 'ThetaSketchAggregateCreateLongVarbinaryFactory' LIBRARY DataSketches; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_intersection(theta_sketch1, theta_sketch2, ...) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_intersection AS LANGUAGE 'C++' NAME 'ThetaSketchScalarIntersectionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_intersection(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_intersection(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_intersection_agg(theta_sketch) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_intersection_agg AS LANGUAGE 'C++' NAME 'ThetaSketchAggregateIntersectionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_a_not_b(theta_sketch_a, theta_sketch_b) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_a_not_b AS LANGUAGE 'C++' NAME 'ThetaSketchANotBFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(VARBINARY, VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(LONG VARBINARY, LONG VARBINARY) TO PUBLIC; diff --git a/SOURCES/src/datasketches/theta/ANotB.cpp b/SOURCES/src/datasketches/theta/ANotB.cpp index 01df1e9..6b2b461 100644 --- a/SOURCES/src/datasketches/theta/ANotB.cpp +++ b/SOURCES/src/datasketches/theta/ANotB.cpp @@ -47,9 +47,9 @@ class ThetaSketchANotBFactory : public ThetaSketchScalarFunctionFactory { virtual void getPrototype(ServerInterface &interface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } }; diff --git a/SOURCES/src/datasketches/theta/AggregateCreate.cpp b/SOURCES/src/datasketches/theta/AggregateCreate.cpp index 52515af..4bf5da6 100644 --- a/SOURCES/src/datasketches/theta/AggregateCreate.cpp +++ b/SOURCES/src/datasketches/theta/AggregateCreate.cpp @@ -74,7 +74,7 @@ class ThetaSketchAggregateCreate : public ThetaSketchAggregateFunction { class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addVarchar(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { @@ -83,10 +83,10 @@ class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunc }; -class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFunctionFactory { +class ThetaSketchAggregateCreateLongVarbinaryFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { @@ -95,7 +95,7 @@ class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFu }; RegisterFactory(ThetaSketchAggregateCreateVarcharFactory); -RegisterFactory(ThetaSketchAggregateCreateVarbinaryFactory); +RegisterFactory(ThetaSketchAggregateCreateLongVarbinaryFactory); RegisterLibrary( "Criteo",// author @@ -106,4 +106,4 @@ RegisterLibrary( "Wrapper around incubator-datasketches-cpp to make it usable in Vertica", // description "", // licenses required "" // signature -); +); \ No newline at end of file diff --git a/SOURCES/src/datasketches/theta/AggregateIntersection.cpp b/SOURCES/src/datasketches/theta/AggregateIntersection.cpp index 7ce6b48..95d2002 100644 --- a/SOURCES/src/datasketches/theta/AggregateIntersection.cpp +++ b/SOURCES/src/datasketches/theta/AggregateIntersection.cpp @@ -105,13 +105,13 @@ class ThetaSketchAggregateIntersectionFactory : public ThetaSketchAggregateFunct const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData) { uint8_t logK = readLogK(srvInterface); - intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK)); + intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK)); intermediateTypeMetaData.addBool(); } virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { diff --git a/SOURCES/src/datasketches/theta/AggregateUnion.cpp b/SOURCES/src/datasketches/theta/AggregateUnion.cpp index cdf09b0..678119d 100644 --- a/SOURCES/src/datasketches/theta/AggregateUnion.cpp +++ b/SOURCES/src/datasketches/theta/AggregateUnion.cpp @@ -18,6 +18,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { BlockReader &argReader, IntermediateAggs &aggs) { try { + this->countAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -45,6 +46,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { IntermediateAggs &aggs, MultipleIntermediateAggs &aggsOther) override { try { + this->countCombine++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -77,8 +79,8 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { @@ -87,4 +89,3 @@ class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFact }; RegisterFactory(ThetaSketchAggregateUnionFactory); - diff --git a/SOURCES/src/datasketches/theta/GetEstimate.cpp b/SOURCES/src/datasketches/theta/GetEstimate.cpp index 2688c2f..d114b7c 100644 --- a/SOURCES/src/datasketches/theta/GetEstimate.cpp +++ b/SOURCES/src/datasketches/theta/GetEstimate.cpp @@ -38,7 +38,7 @@ class ThetaSketchGetEstimateFactory : public ScalarFunctionFactory { virtual void getPrototype(ServerInterface &interface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); + argTypes.addLongVarbinary(); returnType.addFloat(); } diff --git a/SOURCES/src/datasketches/theta/ScalarIntersection.cpp b/SOURCES/src/datasketches/theta/ScalarIntersection.cpp index 8076a8a..f9ac09d 100644 --- a/SOURCES/src/datasketches/theta/ScalarIntersection.cpp +++ b/SOURCES/src/datasketches/theta/ScalarIntersection.cpp @@ -45,7 +45,7 @@ class ThetaSketchScalarIntersectionFactory : public ThetaSketchScalarFunctionFac ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addAny(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } }; diff --git a/SOURCES/src/datasketches/theta/ScalarUnion.cpp b/SOURCES/src/datasketches/theta/ScalarUnion.cpp index 23570a4..c8c51ef 100644 --- a/SOURCES/src/datasketches/theta/ScalarUnion.cpp +++ b/SOURCES/src/datasketches/theta/ScalarUnion.cpp @@ -49,7 +49,7 @@ class ThetaSketchScalarUnionFactory : public ThetaSketchScalarFunctionFactory { ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addAny(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } };