From 8136971a8dcd50b789181ac0d78004f528095eef Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Mon, 12 Oct 2020 10:54:12 -0700 Subject: [PATCH 1/8] Support for sketches longer than 65000 bytes. --- SOURCES/include/datasketches/theta/theta_common.hpp | 6 +++--- SOURCES/src/datasketches/theta/ANotB.cpp | 6 +++--- SOURCES/src/datasketches/theta/AggregateCreate.cpp | 6 +++--- SOURCES/src/datasketches/theta/AggregateIntersection.cpp | 6 +++--- SOURCES/src/datasketches/theta/AggregateUnion.cpp | 5 ++--- SOURCES/src/datasketches/theta/GetEstimate.cpp | 2 +- SOURCES/src/datasketches/theta/ScalarIntersection.cpp | 2 +- SOURCES/src/datasketches/theta/ScalarUnion.cpp | 2 +- 8 files changed, 17 insertions(+), 18 deletions(-) diff --git a/SOURCES/include/datasketches/theta/theta_common.hpp b/SOURCES/include/datasketches/theta/theta_common.hpp index fea571a..541cb6f 100644 --- a/SOURCES/include/datasketches/theta/theta_common.hpp +++ b/SOURCES/include/datasketches/theta/theta_common.hpp @@ -23,7 +23,7 @@ class ThetaSketchScalarFunctionFactory : public ScalarFunctionFactory { const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes) { uint8_t logK = readLogK(srvfloaterface); - outputTypes.addVarbinary(quickSelectSketchMinSize(logK)); + outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getParameterType(ServerInterface &srvInterface, @@ -49,14 +49,14 @@ class ThetaSketchAggregateFunctionFactory : public AggregateFunctionFactory { const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData) { uint8_t logK = readLogK(srvInterface); - intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK)); + intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getReturnType(ServerInterface &srvfloaterface, const SizedColumnTypes &inputTypes, SizedColumnTypes &outputTypes) { uint8_t logK = readLogK(srvfloaterface); - outputTypes.addVarbinary(quickSelectSketchMinSize(logK)); + outputTypes.addLongVarbinary(quickSelectSketchMinSize(logK)); } virtual void getParameterType(ServerInterface &srvInterface, diff --git a/SOURCES/src/datasketches/theta/ANotB.cpp b/SOURCES/src/datasketches/theta/ANotB.cpp index 01df1e9..6b2b461 100644 --- a/SOURCES/src/datasketches/theta/ANotB.cpp +++ b/SOURCES/src/datasketches/theta/ANotB.cpp @@ -47,9 +47,9 @@ class ThetaSketchANotBFactory : public ThetaSketchScalarFunctionFactory { virtual void getPrototype(ServerInterface &interface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } }; diff --git a/SOURCES/src/datasketches/theta/AggregateCreate.cpp b/SOURCES/src/datasketches/theta/AggregateCreate.cpp index e514eaf..4619217 100644 --- a/SOURCES/src/datasketches/theta/AggregateCreate.cpp +++ b/SOURCES/src/datasketches/theta/AggregateCreate.cpp @@ -74,7 +74,7 @@ class ThetaSketchAggregateCreate : public ThetaSketchAggregateFunction { class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addVarchar(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { @@ -85,8 +85,8 @@ class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunc class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { diff --git a/SOURCES/src/datasketches/theta/AggregateIntersection.cpp b/SOURCES/src/datasketches/theta/AggregateIntersection.cpp index 7ce6b48..95d2002 100644 --- a/SOURCES/src/datasketches/theta/AggregateIntersection.cpp +++ b/SOURCES/src/datasketches/theta/AggregateIntersection.cpp @@ -105,13 +105,13 @@ class ThetaSketchAggregateIntersectionFactory : public ThetaSketchAggregateFunct const SizedColumnTypes &inputTypes, SizedColumnTypes &intermediateTypeMetaData) { uint8_t logK = readLogK(srvInterface); - intermediateTypeMetaData.addVarbinary(quickSelectSketchMinSize(logK)); + intermediateTypeMetaData.addLongVarbinary(quickSelectSketchMinSize(logK)); intermediateTypeMetaData.addBool(); } virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { diff --git a/SOURCES/src/datasketches/theta/AggregateUnion.cpp b/SOURCES/src/datasketches/theta/AggregateUnion.cpp index cdf09b0..63276f3 100644 --- a/SOURCES/src/datasketches/theta/AggregateUnion.cpp +++ b/SOURCES/src/datasketches/theta/AggregateUnion.cpp @@ -77,8 +77,8 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); - returnType.addVarbinary(); + argTypes.addLongVarbinary(); + returnType.addLongVarbinary(); } virtual AggregateFunction *createAggregateFunction(ServerInterface &srvfloaterface) { @@ -87,4 +87,3 @@ class ThetaSketchAggregateUnionFactory : public ThetaSketchAggregateFunctionFact }; RegisterFactory(ThetaSketchAggregateUnionFactory); - diff --git a/SOURCES/src/datasketches/theta/GetEstimate.cpp b/SOURCES/src/datasketches/theta/GetEstimate.cpp index 2688c2f..d114b7c 100644 --- a/SOURCES/src/datasketches/theta/GetEstimate.cpp +++ b/SOURCES/src/datasketches/theta/GetEstimate.cpp @@ -38,7 +38,7 @@ class ThetaSketchGetEstimateFactory : public ScalarFunctionFactory { virtual void getPrototype(ServerInterface &interface, ColumnTypes &argTypes, ColumnTypes &returnType) { - argTypes.addVarbinary(); + argTypes.addLongVarbinary(); returnType.addFloat(); } diff --git a/SOURCES/src/datasketches/theta/ScalarIntersection.cpp b/SOURCES/src/datasketches/theta/ScalarIntersection.cpp index 8076a8a..f9ac09d 100644 --- a/SOURCES/src/datasketches/theta/ScalarIntersection.cpp +++ b/SOURCES/src/datasketches/theta/ScalarIntersection.cpp @@ -45,7 +45,7 @@ class ThetaSketchScalarIntersectionFactory : public ThetaSketchScalarFunctionFac ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addAny(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } }; diff --git a/SOURCES/src/datasketches/theta/ScalarUnion.cpp b/SOURCES/src/datasketches/theta/ScalarUnion.cpp index 23570a4..c8c51ef 100644 --- a/SOURCES/src/datasketches/theta/ScalarUnion.cpp +++ b/SOURCES/src/datasketches/theta/ScalarUnion.cpp @@ -49,7 +49,7 @@ class ThetaSketchScalarUnionFactory : public ThetaSketchScalarFunctionFactory { ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addAny(); - returnType.addVarbinary(); + returnType.addLongVarbinary(); } }; From df9e8ce27064d5233908b9f3d996df9a5ca6d689 Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Mon, 12 Oct 2020 11:36:21 -0700 Subject: [PATCH 2/8] More changes to Long Varbinary --- SOURCES/install.sql | 16 ++++++++-------- .../src/datasketches/theta/AggregateCreate.cpp | 5 ++--- 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/SOURCES/install.sql b/SOURCES/install.sql index 9c2e569..43720fe 100644 --- a/SOURCES/install.sql +++ b/SOURCES/install.sql @@ -5,19 +5,19 @@ CREATE OR REPLACE LIBRARY DataSketches AS '/home/dbadmin/build/libvertica-datask CREATE OR REPLACE FUNCTION theta_sketch_get_estimate AS LANGUAGE 'C++' NAME 'ThetaSketchGetEstimateFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_get_estimate(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_union(theta_sketch1, theta_sketch2, ...) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_union AS LANGUAGE 'C++' NAME 'ThetaSketchScalarUnionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_union(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_union(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_union_agg(theta_sketch) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_union_agg AS LANGUAGE 'C++' NAME 'ThetaSketchAggregateUnionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_union_agg(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_create(binary) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS @@ -28,23 +28,23 @@ GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARCHAR) TO PUBLIC; -- SELECT key, theta_sketch_create(chars) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_create AS LANGUAGE 'C++' - NAME 'ThetaSketchAggregateCreateVarbinaryFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(VARBINARY) TO PUBLIC; + NAME 'ThetaSketchAggregateCreateLongVarbinaryFactory' LIBRARY DataSketches; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_create(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_intersection(theta_sketch1, theta_sketch2, ...) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_intersection AS LANGUAGE 'C++' NAME 'ThetaSketchScalarIntersectionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_intersection(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_intersection(LONG VARBINARY) TO PUBLIC; -- SELECT key, theta_sketch_intersection_agg(theta_sketch) FROM ... GROUP BY key CREATE OR REPLACE AGGREGATE FUNCTION theta_sketch_intersection_agg AS LANGUAGE 'C++' NAME 'ThetaSketchAggregateIntersectionFactory' LIBRARY DataSketches; -GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(VARBINARY) TO PUBLIC; +GRANT EXECUTE ON AGGREGATE FUNCTION theta_sketch_intersection_agg(LONG VARBINARY) TO PUBLIC; -- SELECT theta_sketch_a_not_b(theta_sketch_a, theta_sketch_b) FROM ... CREATE OR REPLACE FUNCTION theta_sketch_a_not_b AS LANGUAGE 'C++' NAME 'ThetaSketchANotBFactory' LIBRARY DataSketches; -GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(VARBINARY, VARBINARY) TO PUBLIC; +GRANT EXECUTE ON FUNCTION theta_sketch_a_not_b(LONG VARBINARY, LONG VARBINARY) TO PUBLIC; diff --git a/SOURCES/src/datasketches/theta/AggregateCreate.cpp b/SOURCES/src/datasketches/theta/AggregateCreate.cpp index 4619217..2beb945 100644 --- a/SOURCES/src/datasketches/theta/AggregateCreate.cpp +++ b/SOURCES/src/datasketches/theta/AggregateCreate.cpp @@ -83,7 +83,7 @@ class ThetaSketchAggregateCreateVarcharFactory : public ThetaSketchAggregateFunc }; -class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFunctionFactory { +class ThetaSketchAggregateCreateLongVarbinaryFactory : public ThetaSketchAggregateFunctionFactory { virtual void getPrototype(ServerInterface &srvfloaterface, ColumnTypes &argTypes, ColumnTypes &returnType) { argTypes.addLongVarbinary(); returnType.addLongVarbinary(); @@ -95,5 +95,4 @@ class ThetaSketchAggregateCreateVarbinaryFactory : public ThetaSketchAggregateFu }; RegisterFactory(ThetaSketchAggregateCreateVarcharFactory); -RegisterFactory(ThetaSketchAggregateCreateVarbinaryFactory); - +RegisterFactory(ThetaSketchAggregateCreateLongVarbinaryFactory); From e7f266700f74e7fe342d3dd1c91416110446b127 Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Wed, 21 Oct 2020 06:26:00 -0700 Subject: [PATCH 3/8] Update Min/Max of logK to LONG VARBINARY range. --- SOURCES/include/datasketches/theta/theta_const.hpp | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/SOURCES/include/datasketches/theta/theta_const.hpp b/SOURCES/include/datasketches/theta/theta_const.hpp index 761a9f4..664bb61 100644 --- a/SOURCES/include/datasketches/theta/theta_const.hpp +++ b/SOURCES/include/datasketches/theta/theta_const.hpp @@ -2,10 +2,11 @@ #define VERTICA_UDFS_THETA_CONST_H #define DATASKETCHES_LOG_NOMINAL_VALUE_PARAMETER_NAME "logK" -#define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 12 -#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 5 -// Vertica supports maximum 65000 bytes in a binary field, hence the limit. -#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 12 +// Vertica supports maximum 32000000 bytes in a long binary field, hence the limit. +// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html +#define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 24 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX #define DATASKETCHES_SEED_PARAMETER_NAME "seed" #define DATASKETCHES_SEED_DEFAULT 9001 From c03146a408ee888d78b08da5331601688c133edc Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Wed, 21 Oct 2020 07:00:36 -0700 Subject: [PATCH 4/8] Revert "Update Min/Max of logK to LONG VARBINARY range." This reverts commit e7f266700f74e7fe342d3dd1c91416110446b127. --- SOURCES/include/datasketches/theta/theta_const.hpp | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/SOURCES/include/datasketches/theta/theta_const.hpp b/SOURCES/include/datasketches/theta/theta_const.hpp index 664bb61..761a9f4 100644 --- a/SOURCES/include/datasketches/theta/theta_const.hpp +++ b/SOURCES/include/datasketches/theta/theta_const.hpp @@ -2,11 +2,10 @@ #define VERTICA_UDFS_THETA_CONST_H #define DATASKETCHES_LOG_NOMINAL_VALUE_PARAMETER_NAME "logK" -// Vertica supports maximum 32000000 bytes in a long binary field, hence the limit. -// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html -#define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 24 -#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4 -#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX +#define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 12 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 5 +// Vertica supports maximum 65000 bytes in a binary field, hence the limit. +#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 12 #define DATASKETCHES_SEED_PARAMETER_NAME "seed" #define DATASKETCHES_SEED_DEFAULT 9001 From 5f917cb3f7e8486c48a8ae30e898ca7e953746bc Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Wed, 21 Oct 2020 07:13:30 -0700 Subject: [PATCH 5/8] Update Min/Max of logK to LONG VARBINARY range. --- SOURCES/include/datasketches/theta/theta_const.hpp | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/SOURCES/include/datasketches/theta/theta_const.hpp b/SOURCES/include/datasketches/theta/theta_const.hpp index 761a9f4..acc0b60 100644 --- a/SOURCES/include/datasketches/theta/theta_const.hpp +++ b/SOURCES/include/datasketches/theta/theta_const.hpp @@ -3,9 +3,10 @@ #define DATASKETCHES_LOG_NOMINAL_VALUE_PARAMETER_NAME "logK" #define DATASKETCHES_LOG_NOMINAL_VALUE_DEFAULT 12 -#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 5 -// Vertica supports maximum 65000 bytes in a binary field, hence the limit. -#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 12 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4 +// Vertica supports maximum 32000000 bytes in a long binary field, hence the limit. +// https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html +#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 24 #define DATASKETCHES_SEED_PARAMETER_NAME "seed" #define DATASKETCHES_SEED_DEFAULT 9001 From dfb559e97218612e5215d6fcc9efc03e36e1b1d6 Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Mon, 26 Oct 2020 18:22:32 -0700 Subject: [PATCH 6/8] Limit logK to 20, to avoid going past skech size bytes of over 32,000,000 --- SOURCES/include/datasketches/theta/theta_const.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/SOURCES/include/datasketches/theta/theta_const.hpp b/SOURCES/include/datasketches/theta/theta_const.hpp index acc0b60..a8cdfe2 100644 --- a/SOURCES/include/datasketches/theta/theta_const.hpp +++ b/SOURCES/include/datasketches/theta/theta_const.hpp @@ -6,7 +6,7 @@ #define DATASKETCHES_LOG_NOMINAL_VALUE_MIN 4 // Vertica supports maximum 32000000 bytes in a long binary field, hence the limit. // https://datasketches.apache.org/docs/Theta/ThetaErrorTable.html -#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 24 +#define DATASKETCHES_LOG_NOMINAL_VALUE_MAX 20 #define DATASKETCHES_SEED_PARAMETER_NAME "seed" #define DATASKETCHES_SEED_DEFAULT 9001 From 716053b082f88837735fb7b1977b103793d7bcf4 Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Mon, 26 Oct 2020 22:49:56 -0700 Subject: [PATCH 7/8] Log counters to UDX_EVENTS. --- .../datasketches/theta/theta_common.hpp | 18 ++++++++++++++++++ .../src/datasketches/theta/AggregateUnion.cpp | 2 ++ 2 files changed, 20 insertions(+) diff --git a/SOURCES/include/datasketches/theta/theta_common.hpp b/SOURCES/include/datasketches/theta/theta_common.hpp index 541cb6f..23d9dfa 100644 --- a/SOURCES/include/datasketches/theta/theta_common.hpp +++ b/SOURCES/include/datasketches/theta/theta_common.hpp @@ -83,6 +83,11 @@ class ThetaSketchAggregateFunction : public AggregateFunction { uint8_t logK; uint64_t seed; + long countCombine{0}; + long countInitAggregate{0}; + long countAggregate{0}; + long countTerminate{0}; + public: virtual void setup(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { this->logK = readLogK(srvInterface); @@ -91,6 +96,7 @@ class ThetaSketchAggregateFunction : public AggregateFunction { virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs) { try { + countInitAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -107,9 +113,18 @@ class ThetaSketchAggregateFunction : public AggregateFunction { BlockWriter &resWriter, IntermediateAggs &aggs) override { try { + countTerminate++; const VString &concat = aggs.getStringRef(0); VString &result = resWriter.getStringRef(); result.copy(&concat); + + // Log exported parquet file details to v_monitor.udx_events + std::map details; + details["combine"] = std::to_string(countCombine); + details["initAggregate"] = std::to_string(countInitAggregate); + details["aggregate"] = std::to_string(countAggregate); + details["terminate"] = std::to_string(countTerminate); + srvInterface.logEvent(details); } catch (exception &e) { // Standard exception. Quit. vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what()); @@ -117,4 +132,7 @@ class ThetaSketchAggregateFunction : public AggregateFunction { } }; +virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { +} + #endif //COM_CRITEO_MOAB_DATASKETCHES_VERTICA_H diff --git a/SOURCES/src/datasketches/theta/AggregateUnion.cpp b/SOURCES/src/datasketches/theta/AggregateUnion.cpp index 63276f3..c9a46c9 100644 --- a/SOURCES/src/datasketches/theta/AggregateUnion.cpp +++ b/SOURCES/src/datasketches/theta/AggregateUnion.cpp @@ -18,6 +18,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { BlockReader &argReader, IntermediateAggs &aggs) { try { + countAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -45,6 +46,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { IntermediateAggs &aggs, MultipleIntermediateAggs &aggsOther) override { try { + countCombine++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) From eb6000419d9324474789b5c838a9f7596c242239 Mon Sep 17 00:00:00 2001 From: eyurman14 Date: Mon, 26 Oct 2020 22:57:41 -0700 Subject: [PATCH 8/8] Fix log counters. --- .../datasketches/theta/theta_common.hpp | 24 +++++++++---------- .../src/datasketches/theta/AggregateUnion.cpp | 4 ++-- 2 files changed, 13 insertions(+), 15 deletions(-) diff --git a/SOURCES/include/datasketches/theta/theta_common.hpp b/SOURCES/include/datasketches/theta/theta_common.hpp index 23d9dfa..fb396bf 100644 --- a/SOURCES/include/datasketches/theta/theta_common.hpp +++ b/SOURCES/include/datasketches/theta/theta_common.hpp @@ -93,10 +93,19 @@ class ThetaSketchAggregateFunction : public AggregateFunction { this->logK = readLogK(srvInterface); this->seed = readSeed(srvInterface); } + virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { + // Log exported parquet file details to v_monitor.udx_events + std::map details; + details["combine"] = std::to_string(this->countCombine); + details["initAggregate"] = std::to_string(this->countInitAggregate); + details["aggregate"] = std::to_string(this->countAggregate); + details["terminate"] = std::to_string(this->countTerminate); + srvInterface.logEvent(details); + } virtual void initAggregate(ServerInterface &srvInterface, IntermediateAggs &aggs) { try { - countInitAggregate++; + this->countInitAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -113,18 +122,10 @@ class ThetaSketchAggregateFunction : public AggregateFunction { BlockWriter &resWriter, IntermediateAggs &aggs) override { try { - countTerminate++; + this->countTerminate++; const VString &concat = aggs.getStringRef(0); VString &result = resWriter.getStringRef(); result.copy(&concat); - - // Log exported parquet file details to v_monitor.udx_events - std::map details; - details["combine"] = std::to_string(countCombine); - details["initAggregate"] = std::to_string(countInitAggregate); - details["aggregate"] = std::to_string(countAggregate); - details["terminate"] = std::to_string(countTerminate); - srvInterface.logEvent(details); } catch (exception &e) { // Standard exception. Quit. vt_report_error(0, "Exception while computing aggregate output: [%s]", e.what()); @@ -132,7 +133,4 @@ class ThetaSketchAggregateFunction : public AggregateFunction { } }; -virtual void destroy(ServerInterface &srvInterface, const SizedColumnTypes &argTypes) { -} - #endif //COM_CRITEO_MOAB_DATASKETCHES_VERTICA_H diff --git a/SOURCES/src/datasketches/theta/AggregateUnion.cpp b/SOURCES/src/datasketches/theta/AggregateUnion.cpp index c9a46c9..678119d 100644 --- a/SOURCES/src/datasketches/theta/AggregateUnion.cpp +++ b/SOURCES/src/datasketches/theta/AggregateUnion.cpp @@ -18,7 +18,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { BlockReader &argReader, IntermediateAggs &aggs) { try { - countAggregate++; + this->countAggregate++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed) @@ -46,7 +46,7 @@ class ThetaSketchAggregateUnion : public ThetaSketchAggregateFunction { IntermediateAggs &aggs, MultipleIntermediateAggs &aggsOther) override { try { - countCombine++; + this->countCombine++; auto u = theta_union_custom::builder() .set_lg_k(logK) .set_seed(seed)