diff --git a/README.md b/README.md index edfa4ec..39cfcea 100644 --- a/README.md +++ b/README.md @@ -113,11 +113,16 @@ SELECT json_add_prefix('{"user": {"name": "Alice"}, "count": 5}', 'data_'); **Note:** This function requires the input to be a JSON object. It will raise an error if given a JSON array or primitive value. -### `json_group_merge(json_expr [ORDER BY ...]) -> json` +### `json_group_merge(json_expr [, treat_null_values] [ORDER BY ...]) -> json` Applies a sequence of JSON patches using [RFC 7396](https://datatracker.ietf.org/doc/html/rfc7396) merge semantics. Inputs can be `JSON` values or `VARCHAR` text that parses as JSON. SQL `NULL` rows are skipped, and the aggregate returns `'{}'::json` when no non-null inputs are provided. -Provide an `ORDER BY` clause to guarantee deterministic results—later rows in the ordered stream overwrite earlier keys, arrays replace wholesale, and `null` removes keys. +Provide an `ORDER BY` clause to guarantee deterministic results—later rows in the ordered stream overwrite earlier keys, arrays replace wholesale, and `null` removes keys. Pass the optional `treat_null_values` argument to override how object members set to JSON `null` are handled: + +- `DELETE NULLS` *(default)* — object members set to `null` delete the key. +- `IGNORE NULLS` — `null` members are dropped before merging, so existing keys stay untouched. Null members in the first patch are also skipped (never seed keys with null values). + +The `treat_null_values` argument must be a constant `VARCHAR` literal (case-insensitive, surrounding whitespace ignored). **Grouped aggregation:** ```sql @@ -145,6 +150,16 @@ FROM config_history WHERE feature = 'search'; ``` +Toggle the null handling mode when sparse streams should ignore deletes: +```sql +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"keep":1}'::json, 1), ('{"keep":null}'::json, 2)) AS t(patch, ts); +``` +*Result:* +```json +{"keep":1} +``` + ## Error Handling - `json_flatten()` returns an error for malformed JSON diff --git a/src/json_tools_extension.cpp b/src/json_tools_extension.cpp index 1cc07d8..ca21e17 100644 --- a/src/json_tools_extension.cpp +++ b/src/json_tools_extension.cpp @@ -4,7 +4,9 @@ #include "duckdb.hpp" #include "duckdb/common/allocator.hpp" #include "duckdb/common/exception.hpp" +#include "duckdb/common/string_util.hpp" #include "duckdb/execution/expression_executor_state.hpp" +#include "duckdb/execution/expression_executor.hpp" #include "duckdb/function/scalar_function.hpp" #include "duckdb/function/aggregate_function.hpp" #include "duckdb/function/function_set.hpp" @@ -89,6 +91,34 @@ struct JsonGroupMergeState { idx_t replacements_since_compact; }; +enum class JsonNullTreatment : uint8_t { DELETE_NULLS = 0, IGNORE_NULLS = 1 }; + +struct JsonGroupMergeBindData : public FunctionData { + explicit JsonGroupMergeBindData(JsonNullTreatment treatment_p) : treatment(treatment_p) { + } + + JsonNullTreatment treatment; + + unique_ptr Copy() const override { + return make_uniq(treatment); + } + + bool Equals(const FunctionData &other_p) const override { + const auto &other = other_p.Cast(); + return treatment == other.treatment; + } +}; + +static JsonNullTreatment GetNullTreatment(optional_ptr bind_data) { + if (!bind_data) { + return JsonNullTreatment::DELETE_NULLS; + } + return bind_data->Cast().treatment; +} + +static unique_ptr JsonGroupMergeBind(ClientContext &context, AggregateFunction &function, + vector> &arguments); + static void JsonGroupMergeStateInit(JsonGroupMergeState &state) { state.doc = yyjson_mut_doc_new(nullptr); if (!state.doc) { @@ -119,7 +149,8 @@ constexpr idx_t JSON_GROUP_MERGE_COMPACT_THRESHOLD = 1024; constexpr idx_t MAX_JSON_NESTING_DEPTH = 1000; static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyjson_mut_val *base, yyjson_val *patch, - idx_t depth, idx_t &replacements_since_compact); + idx_t depth, idx_t &replacements_since_compact, + JsonNullTreatment null_treatment); static void JsonGroupMergeCompactState(JsonGroupMergeState &state) { if (!state.doc || !state.doc->root) { @@ -148,13 +179,14 @@ static void JsonGroupMergeMaybeCompact(JsonGroupMergeState &state) { JsonGroupMergeCompactState(state); } -static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *patch_root) { +static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *patch_root, + JsonNullTreatment null_treatment) { if (!patch_root) { throw InvalidInputException("json_group_merge: invalid JSON payload"); } auto base_root = state.has_input ? state.doc->root : nullptr; - auto merged_root = - JsonGroupMergeApplyPatchInternal(state.doc, base_root, patch_root, 0, state.replacements_since_compact); + auto merged_root = JsonGroupMergeApplyPatchInternal(state.doc, base_root, patch_root, 0, + state.replacements_since_compact, null_treatment); if (!merged_root) { throw InternalException("json_group_merge: failed to merge JSON documents"); } @@ -166,7 +198,8 @@ static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *pat } static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyjson_mut_val *base, yyjson_val *patch, - idx_t depth, idx_t &replacements_since_compact) { + idx_t depth, idx_t &replacements_since_compact, + JsonNullTreatment null_treatment) { if (!patch) { return base; } @@ -187,17 +220,26 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj } yyjson_mut_val *result = nullptr; - if (base && duckdb_yyjson::yyjson_mut_is_obj(base)) { + bool base_is_object = base && duckdb_yyjson::yyjson_mut_is_obj(base); + if (base_is_object) { result = base; - } else { + } + + auto EnsureResult = [&]() -> yyjson_mut_val * { + if (result) { + return result; + } result = yyjson_mut_obj(doc); if (!result) { throw InternalException("json_group_merge: failed to allocate JSON object"); } - if (base) { + if (base && !base_is_object) { replacements_since_compact++; } - } + return result; + }; + + bool applied_any = false; yyjson_val *patch_key = nullptr; yyjson_obj_iter patch_iter = yyjson_obj_iter_with(patch); @@ -211,29 +253,41 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj } if (duckdb_yyjson::yyjson_is_null(patch_val)) { - auto removed = duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len); - if (removed) { - replacements_since_compact++; + if (null_treatment == JsonNullTreatment::DELETE_NULLS) { + if (result) { + auto removed = duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len); + if (removed) { + replacements_since_compact++; + applied_any = true; + } + } } continue; } - auto existing_child = duckdb_yyjson::yyjson_mut_obj_getn(result, key_str, key_len); + auto existing_child = result ? duckdb_yyjson::yyjson_mut_obj_getn(result, key_str, key_len) : nullptr; if (duckdb_yyjson::yyjson_is_obj(patch_val)) { - auto merged_child = - JsonGroupMergeApplyPatchInternal(doc, existing_child, patch_val, depth + 1, replacements_since_compact); + auto merged_child = JsonGroupMergeApplyPatchInternal(doc, existing_child, patch_val, depth + 1, + replacements_since_compact, null_treatment); + // Skip if merged_child is null (can happen with IGNORE_NULLS when patch contains only nulls + // and there's no existing value) + if (!merged_child) { + continue; + } if (!existing_child || merged_child != existing_child) { + auto target_obj = EnsureResult(); if (existing_child) { replacements_since_compact++; - duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len); + duckdb_yyjson::yyjson_mut_obj_remove_keyn(target_obj, key_str, key_len); } auto key_copy = yyjson_mut_strncpy(doc, key_str, key_len); if (!key_copy) { throw InternalException("json_group_merge: failed to allocate key storage"); } - if (!duckdb_yyjson::yyjson_mut_obj_add(result, key_copy, merged_child)) { + if (!duckdb_yyjson::yyjson_mut_obj_add(target_obj, key_copy, merged_child)) { throw InternalException("json_group_merge: failed to append merged object value"); } + applied_any = true; } continue; } @@ -246,16 +300,28 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj replacements_since_compact++; duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len); } + auto target_obj = EnsureResult(); auto key_copy = yyjson_mut_strncpy(doc, key_str, key_len); if (!key_copy) { throw InternalException("json_group_merge: failed to allocate key storage"); } - if (!duckdb_yyjson::yyjson_mut_obj_add(result, key_copy, new_child)) { + if (!duckdb_yyjson::yyjson_mut_obj_add(target_obj, key_copy, new_child)) { throw InternalException("json_group_merge: failed to append merged value"); } + applied_any = true; } - return result; + // If nothing was applied, return the base unchanged (unless we're at top level with nothing) + if (!applied_any) { + // Special case: at top level with no base and empty patch, return empty object + if (depth == 0 && !base) { + return EnsureResult(); + } + return base; + } + + // Something was applied, ensure we have a result object to return + return result ? result : EnsureResult(); } class JsonGroupMergeFunction { @@ -274,7 +340,7 @@ class JsonGroupMergeFunction { } template - static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &) { + static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &unary_input) { static_assert(std::is_same::value, "json_group_merge expects string_t input"); yyjson_read_err err; auto doc = yyjson_read_opts(const_cast(input.GetDataUnsafe()), input.GetSize(), JSONCommon::READ_FLAG, @@ -287,7 +353,8 @@ class JsonGroupMergeFunction { if (!patch_root) { throw InvalidInputException("json_group_merge: invalid JSON payload"); } - JsonGroupMergeApplyPatch(state, patch_root); + auto null_treatment = GetNullTreatment(unary_input.input.bind_data); + JsonGroupMergeApplyPatch(state, patch_root, null_treatment); } template @@ -299,11 +366,12 @@ class JsonGroupMergeFunction { } template - static void Combine(const STATE &source, STATE &target, AggregateInputData &) { + static void Combine(const STATE &source, STATE &target, AggregateInputData &aggr_input_data) { if (!source.has_input || !source.doc || !source.doc->root) { return; } - JsonGroupMergeApplyPatch(target, reinterpret_cast(source.doc->root)); + JsonGroupMergeApplyPatch(target, reinterpret_cast(source.doc->root), + GetNullTreatment(aggr_input_data.bind_data)); } template @@ -326,6 +394,53 @@ class JsonGroupMergeFunction { } }; +static string NormalizedNullTreatment(const string &input) { + auto normalized = input; + StringUtil::Trim(normalized); + return StringUtil::Lower(normalized); +} + +static string JsonGroupMergeNullOptionsText() { + return "DELETE NULLS, IGNORE NULLS"; +} + +static unique_ptr JsonGroupMergeBind(ClientContext &context, AggregateFunction &function, + vector> &arguments) { + if (arguments.empty() || arguments.size() > 2) { + throw BinderException( + "json_group_merge expects one JSON argument plus an optional treat_null_values parameter"); + } + auto treatment = JsonNullTreatment::DELETE_NULLS; + if (arguments.size() == 2) { + auto &mode_arg = arguments[1]; + if (mode_arg->return_type.id() == LogicalTypeId::UNKNOWN || mode_arg->HasParameter()) { + throw ParameterNotResolvedException(); + } + if (!mode_arg->IsFoldable()) { + throw BinderException("json_group_merge treat_null_values argument must be constant"); + } + auto mode_value = ExpressionExecutor::EvaluateScalar(context, *mode_arg); + if (mode_value.IsNull()) { + throw InvalidInputException("json_group_merge: treat_null_values must be one of %s", + JsonGroupMergeNullOptionsText().c_str()); + } + if (mode_value.type().id() != LogicalTypeId::VARCHAR) { + throw BinderException("json_group_merge treat_null_values must be a VARCHAR literal"); + } + auto normalized = NormalizedNullTreatment(mode_value.ToString()); + if (normalized == "delete nulls") { + treatment = JsonNullTreatment::DELETE_NULLS; + } else if (normalized == "ignore nulls") { + treatment = JsonNullTreatment::IGNORE_NULLS; + } else { + throw InvalidInputException("json_group_merge: treat_null_values must be one of %s", + JsonGroupMergeNullOptionsText().c_str()); + } + Function::EraseArgument(function, arguments, 1); + } + return make_uniq(treatment); +} + static AggregateFunction CreateJsonGroupMergeAggregate(const LogicalType &input_type) { AggregateFunction function( "json_group_merge", {input_type}, LogicalType::JSON(), AggregateFunction::StateSize, @@ -336,9 +451,17 @@ static AggregateFunction CreateJsonGroupMergeAggregate(const LogicalType &input_ AggregateFunction::UnaryUpdate); function.destructor = AggregateFunction::StateDestroy; function.order_dependent = AggregateOrderDependent::ORDER_DEPENDENT; + function.bind = JsonGroupMergeBind; return function; } +static void AddJsonGroupMergeAggregate(AggregateFunctionSet &set, const LogicalType &input_type) { + auto function = CreateJsonGroupMergeAggregate(input_type); + set.AddFunction(function); + function.arguments.emplace_back(LogicalType::VARCHAR); + set.AddFunction(function); +} + using duckdb_yyjson::yyjson_arr_iter; using duckdb_yyjson::yyjson_arr_iter_next; using duckdb_yyjson::yyjson_arr_iter_with; @@ -611,8 +734,22 @@ static void LoadInternal(JsonToolsLoadContext &ctx) { RegisterScalarFunction(ctx, json_add_prefix_scalar_function); AggregateFunctionSet json_group_merge_set("json_group_merge"); - json_group_merge_set.AddFunction(CreateJsonGroupMergeAggregate(LogicalType::JSON())); - json_group_merge_set.AddFunction(CreateJsonGroupMergeAggregate(LogicalType::VARCHAR)); + AddJsonGroupMergeAggregate(json_group_merge_set, LogicalType::JSON()); + AddJsonGroupMergeAggregate(json_group_merge_set, LogicalType::VARCHAR); + if (json_group_merge_set.Size() != 4) { + throw InternalException("json_group_merge: expected 4 overloads, found %llu", + (long long)json_group_merge_set.Size()); + } + idx_t double_arg_count = 0; + for (idx_t i = 0; i < json_group_merge_set.Size(); i++) { + if (json_group_merge_set.GetFunctionReferenceByOffset(i).arguments.size() == 2) { + double_arg_count++; + } + } + if (double_arg_count != 2) { + throw InternalException("json_group_merge: expected 2 overloads with treat_null_values parameter, found %llu", + (long long)double_arg_count); + } RegisterAggregateFunction(ctx, std::move(json_group_merge_set)); } diff --git a/test/sql/json_group_merge.test b/test/sql/json_group_merge.test index 65c5337..9304e82 100644 --- a/test/sql/json_group_merge.test +++ b/test/sql/json_group_merge.test @@ -57,6 +57,105 @@ FROM (VALUES ('{"keep":1}'::json, 1), ('{"keep":null}'::json, 2)) AS t(patch, ts ---- {} +# ======================================== +# treat_null_values Parameter +# ======================================== + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"keep":1}'::json, 1), ('{"keep":null}'::json, 2)) AS t(patch, ts); +---- +{"keep":1} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"keep":1,"drop":null}'::json, 1)) AS t(patch, ts); +---- +{"keep":1} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"outer":{"drop":null}}'::json, 1)) AS t(patch, ts); +---- +{} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES + ('{"outer":{"drop":10,"keep":5}}'::json, 1), + ('{"outer":{"drop":null,"keep":7}}'::json, 2) +) AS t(patch, ts); +---- +{"outer":{"drop":10,"keep":7}} + +query I +SELECT json_group_merge(patch, ' delete nulls ' ORDER BY ts) +FROM (VALUES ('{"keep":1}'::json, 1), ('{"keep":null}'::json, 2)) AS t(patch, ts); +---- +{} + +statement error +SELECT json_group_merge(patch, 'DROP' ORDER BY ts) +FROM (VALUES ('{"keep":1}'::json, 1)) AS t(patch, ts); +---- +Invalid Input Error: json_group_merge: treat_null_values must be one of DELETE NULLS, IGNORE NULLS + +statement error +SELECT json_group_merge(patch, NULL ORDER BY ts) +FROM (VALUES ('{"keep":1}'::json, 1)) AS t(patch, ts); +---- +Invalid Input Error: json_group_merge: treat_null_values must be one of DELETE NULLS, IGNORE NULLS + +statement error +SELECT json_group_merge(patch, mode ORDER BY patch) +FROM (VALUES ('{"keep":1}'::json, 'IGNORE NULLS')) AS t(patch, mode); +---- +Binder Error: json_group_merge treat_null_values argument must be constant + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"a":1}', 1), ('{"a":null}', 2)) AS t(patch, ts); +---- +{"a":1} + +query II +SELECT grp, json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('A', 1, '{"x":1}'::json), ('A', 2, '{"x":null}'::json)) AS t(grp, ts, patch) +GROUP BY grp; +---- +A {"x":1} + +# Edge cases for IGNORE NULLS + +query I +SELECT json_group_merge('{}', 'IGNORE NULLS'); +---- +{} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES ('{"a":null}'::json, 1), ('{"b":null}'::json, 2)) AS t(patch, ts); +---- +{} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES + ('{"a":{"b":{"c":{"d":1}}}}'::json, 1), + ('{"a":{"b":{"c":null}}}'::json, 2) +) AS t(patch, ts); +---- +{"a":{"b":{"c":{"d":1}}}} + +query I +SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts) +FROM (VALUES + ('{"a":{"b":{"c":1}}}'::json, 1), + ('{"a":{"b":null}}'::json, 2) +) AS t(patch, ts); +---- +{"a":{"b":{"c":1}}} + # ======================================== # VARCHAR Input Support # ======================================== diff --git a/test/sql/json_group_merge_heavy_docs.test b/test/sql/json_group_merge_heavy_docs.test index 1077a83..0815e26 100644 --- a/test/sql/json_group_merge_heavy_docs.test +++ b/test/sql/json_group_merge_heavy_docs.test @@ -430,3 +430,13 @@ SELECT FROM merged; ---- {"event":"evt_00002","tier":"silver"} + +# IGNORE NULLS should preserve nested values in heavy payloads +query I +SELECT json_group_merge(payload, 'IGNORE NULLS' ORDER BY seq) +FROM (VALUES + (0, '{"levels":{"one":{"two":{"keep":"value","drop":[1,2,3]}}},"metrics":{"baseline":5}}'::json), + (1, '{"levels":{"one":{"two":{"drop":null,"extra":999}}},"metrics":{"baseline":10,"latest":20}}'::json) +) AS t(seq, payload); +---- +{"levels":{"one":{"two":{"keep":"value","drop":[1,2,3],"extra":999}}},"metrics":{"baseline":10,"latest":20}}