Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -113,11 +113,16 @@ SELECT json_add_prefix('{"user": {"name": "Alice"}, "count": 5}', 'data_');

**Note:** This function requires the input to be a JSON object. It will raise an error if given a JSON array or primitive value.

### `json_group_merge(json_expr [ORDER BY ...]) -> json`
### `json_group_merge(json_expr [, treat_null_values] [ORDER BY ...]) -> json`

Applies a sequence of JSON patches using [RFC 7396](https://datatracker.ietf.org/doc/html/rfc7396) merge semantics. Inputs can be `JSON` values or `VARCHAR` text that parses as JSON. SQL `NULL` rows are skipped, and the aggregate returns `'{}'::json` when no non-null inputs are provided.

Provide an `ORDER BY` clause to guarantee deterministic results—later rows in the ordered stream overwrite earlier keys, arrays replace wholesale, and `null` removes keys.
Provide an `ORDER BY` clause to guarantee deterministic results—later rows in the ordered stream overwrite earlier keys, arrays replace wholesale, and `null` removes keys. Pass the optional `treat_null_values` argument to override how object members set to JSON `null` are handled:

- `DELETE NULLS` *(default)* — object members set to `null` delete the key.
- `IGNORE NULLS` — `null` members are dropped before merging, so existing keys stay untouched. Null members in the first patch are also skipped (never seed keys with null values).

The `treat_null_values` argument must be a constant `VARCHAR` literal (case-insensitive, surrounding whitespace ignored).

**Grouped aggregation:**
```sql
Expand Down Expand Up @@ -145,6 +150,16 @@ FROM config_history
WHERE feature = 'search';
```

Toggle the null handling mode when sparse streams should ignore deletes:
```sql
SELECT json_group_merge(patch, 'IGNORE NULLS' ORDER BY ts)
FROM (VALUES ('{"keep":1}'::json, 1), ('{"keep":null}'::json, 2)) AS t(patch, ts);
```
*Result:*
```json
{"keep":1}
```

## Error Handling

- `json_flatten()` returns an error for malformed JSON
Expand Down
187 changes: 162 additions & 25 deletions src/json_tools_extension.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
#include "duckdb.hpp"
#include "duckdb/common/allocator.hpp"
#include "duckdb/common/exception.hpp"
#include "duckdb/common/string_util.hpp"
#include "duckdb/execution/expression_executor_state.hpp"
#include "duckdb/execution/expression_executor.hpp"
#include "duckdb/function/scalar_function.hpp"
#include "duckdb/function/aggregate_function.hpp"
#include "duckdb/function/function_set.hpp"
Expand Down Expand Up @@ -89,6 +91,34 @@ struct JsonGroupMergeState {
idx_t replacements_since_compact;
};

enum class JsonNullTreatment : uint8_t { DELETE_NULLS = 0, IGNORE_NULLS = 1 };

struct JsonGroupMergeBindData : public FunctionData {
explicit JsonGroupMergeBindData(JsonNullTreatment treatment_p) : treatment(treatment_p) {
}

JsonNullTreatment treatment;

unique_ptr<FunctionData> Copy() const override {
return make_uniq<JsonGroupMergeBindData>(treatment);
}

bool Equals(const FunctionData &other_p) const override {
const auto &other = other_p.Cast<JsonGroupMergeBindData>();
return treatment == other.treatment;
}
};

static JsonNullTreatment GetNullTreatment(optional_ptr<FunctionData> bind_data) {
if (!bind_data) {
return JsonNullTreatment::DELETE_NULLS;
}
return bind_data->Cast<JsonGroupMergeBindData>().treatment;
}

static unique_ptr<FunctionData> JsonGroupMergeBind(ClientContext &context, AggregateFunction &function,
vector<unique_ptr<Expression>> &arguments);

static void JsonGroupMergeStateInit(JsonGroupMergeState &state) {
state.doc = yyjson_mut_doc_new(nullptr);
if (!state.doc) {
Expand Down Expand Up @@ -119,7 +149,8 @@ constexpr idx_t JSON_GROUP_MERGE_COMPACT_THRESHOLD = 1024;
constexpr idx_t MAX_JSON_NESTING_DEPTH = 1000;

static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyjson_mut_val *base, yyjson_val *patch,
idx_t depth, idx_t &replacements_since_compact);
idx_t depth, idx_t &replacements_since_compact,
JsonNullTreatment null_treatment);

static void JsonGroupMergeCompactState(JsonGroupMergeState &state) {
if (!state.doc || !state.doc->root) {
Expand Down Expand Up @@ -148,13 +179,14 @@ static void JsonGroupMergeMaybeCompact(JsonGroupMergeState &state) {
JsonGroupMergeCompactState(state);
}

static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *patch_root) {
static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *patch_root,
JsonNullTreatment null_treatment) {
if (!patch_root) {
throw InvalidInputException("json_group_merge: invalid JSON payload");
}
auto base_root = state.has_input ? state.doc->root : nullptr;
auto merged_root =
JsonGroupMergeApplyPatchInternal(state.doc, base_root, patch_root, 0, state.replacements_since_compact);
auto merged_root = JsonGroupMergeApplyPatchInternal(state.doc, base_root, patch_root, 0,
state.replacements_since_compact, null_treatment);
if (!merged_root) {
throw InternalException("json_group_merge: failed to merge JSON documents");
}
Expand All @@ -166,7 +198,8 @@ static void JsonGroupMergeApplyPatch(JsonGroupMergeState &state, yyjson_val *pat
}

static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyjson_mut_val *base, yyjson_val *patch,
idx_t depth, idx_t &replacements_since_compact) {
idx_t depth, idx_t &replacements_since_compact,
JsonNullTreatment null_treatment) {
if (!patch) {
return base;
}
Expand All @@ -187,17 +220,26 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj
}

yyjson_mut_val *result = nullptr;
if (base && duckdb_yyjson::yyjson_mut_is_obj(base)) {
bool base_is_object = base && duckdb_yyjson::yyjson_mut_is_obj(base);
if (base_is_object) {
result = base;
} else {
}

auto EnsureResult = [&]() -> yyjson_mut_val * {
if (result) {
return result;
}
result = yyjson_mut_obj(doc);
if (!result) {
throw InternalException("json_group_merge: failed to allocate JSON object");
}
if (base) {
if (base && !base_is_object) {
replacements_since_compact++;
}
}
return result;
};

bool applied_any = false;

yyjson_val *patch_key = nullptr;
yyjson_obj_iter patch_iter = yyjson_obj_iter_with(patch);
Expand All @@ -211,29 +253,41 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj
}

if (duckdb_yyjson::yyjson_is_null(patch_val)) {
auto removed = duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len);
if (removed) {
replacements_since_compact++;
if (null_treatment == JsonNullTreatment::DELETE_NULLS) {
if (result) {
auto removed = duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len);
if (removed) {
replacements_since_compact++;
applied_any = true;
}
}
}
continue;
}

auto existing_child = duckdb_yyjson::yyjson_mut_obj_getn(result, key_str, key_len);
auto existing_child = result ? duckdb_yyjson::yyjson_mut_obj_getn(result, key_str, key_len) : nullptr;
if (duckdb_yyjson::yyjson_is_obj(patch_val)) {
auto merged_child =
JsonGroupMergeApplyPatchInternal(doc, existing_child, patch_val, depth + 1, replacements_since_compact);
auto merged_child = JsonGroupMergeApplyPatchInternal(doc, existing_child, patch_val, depth + 1,
replacements_since_compact, null_treatment);
// Skip if merged_child is null (can happen with IGNORE_NULLS when patch contains only nulls
// and there's no existing value)
if (!merged_child) {
continue;
}
if (!existing_child || merged_child != existing_child) {
auto target_obj = EnsureResult();
if (existing_child) {
replacements_since_compact++;
duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len);
duckdb_yyjson::yyjson_mut_obj_remove_keyn(target_obj, key_str, key_len);
}
auto key_copy = yyjson_mut_strncpy(doc, key_str, key_len);
if (!key_copy) {
throw InternalException("json_group_merge: failed to allocate key storage");
}
if (!duckdb_yyjson::yyjson_mut_obj_add(result, key_copy, merged_child)) {
if (!duckdb_yyjson::yyjson_mut_obj_add(target_obj, key_copy, merged_child)) {
throw InternalException("json_group_merge: failed to append merged object value");
}
applied_any = true;
}
continue;
}
Expand All @@ -246,16 +300,28 @@ static yyjson_mut_val *JsonGroupMergeApplyPatchInternal(yyjson_mut_doc *doc, yyj
replacements_since_compact++;
duckdb_yyjson::yyjson_mut_obj_remove_keyn(result, key_str, key_len);
}
auto target_obj = EnsureResult();
auto key_copy = yyjson_mut_strncpy(doc, key_str, key_len);
if (!key_copy) {
throw InternalException("json_group_merge: failed to allocate key storage");
}
if (!duckdb_yyjson::yyjson_mut_obj_add(result, key_copy, new_child)) {
if (!duckdb_yyjson::yyjson_mut_obj_add(target_obj, key_copy, new_child)) {
throw InternalException("json_group_merge: failed to append merged value");
}
applied_any = true;
}

return result;
// If nothing was applied, return the base unchanged (unless we're at top level with nothing)
if (!applied_any) {
// Special case: at top level with no base and empty patch, return empty object
if (depth == 0 && !base) {
return EnsureResult();
}
return base;
}

// Something was applied, ensure we have a result object to return
return result ? result : EnsureResult();
}

class JsonGroupMergeFunction {
Expand All @@ -274,7 +340,7 @@ class JsonGroupMergeFunction {
}

template <class INPUT_TYPE, class STATE, class OP>
static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &) {
static void Operation(STATE &state, const INPUT_TYPE &input, AggregateUnaryInput &unary_input) {
static_assert(std::is_same<INPUT_TYPE, string_t>::value, "json_group_merge expects string_t input");
yyjson_read_err err;
auto doc = yyjson_read_opts(const_cast<char *>(input.GetDataUnsafe()), input.GetSize(), JSONCommon::READ_FLAG,
Expand All @@ -287,7 +353,8 @@ class JsonGroupMergeFunction {
if (!patch_root) {
throw InvalidInputException("json_group_merge: invalid JSON payload");
}
JsonGroupMergeApplyPatch(state, patch_root);
auto null_treatment = GetNullTreatment(unary_input.input.bind_data);
JsonGroupMergeApplyPatch(state, patch_root, null_treatment);
}

template <class INPUT_TYPE, class STATE, class OP>
Expand All @@ -299,11 +366,12 @@ class JsonGroupMergeFunction {
}

template <class STATE, class OP>
static void Combine(const STATE &source, STATE &target, AggregateInputData &) {
static void Combine(const STATE &source, STATE &target, AggregateInputData &aggr_input_data) {
if (!source.has_input || !source.doc || !source.doc->root) {
return;
}
JsonGroupMergeApplyPatch(target, reinterpret_cast<yyjson_val *>(source.doc->root));
JsonGroupMergeApplyPatch(target, reinterpret_cast<yyjson_val *>(source.doc->root),
GetNullTreatment(aggr_input_data.bind_data));
}

template <class RESULT_TYPE, class STATE>
Expand All @@ -326,6 +394,53 @@ class JsonGroupMergeFunction {
}
};

static string NormalizedNullTreatment(const string &input) {
auto normalized = input;
StringUtil::Trim(normalized);
return StringUtil::Lower(normalized);
}

static string JsonGroupMergeNullOptionsText() {
return "DELETE NULLS, IGNORE NULLS";
}

static unique_ptr<FunctionData> JsonGroupMergeBind(ClientContext &context, AggregateFunction &function,
vector<unique_ptr<Expression>> &arguments) {
if (arguments.empty() || arguments.size() > 2) {
throw BinderException(
"json_group_merge expects one JSON argument plus an optional treat_null_values parameter");
}
auto treatment = JsonNullTreatment::DELETE_NULLS;
if (arguments.size() == 2) {
auto &mode_arg = arguments[1];
if (mode_arg->return_type.id() == LogicalTypeId::UNKNOWN || mode_arg->HasParameter()) {
throw ParameterNotResolvedException();
}
if (!mode_arg->IsFoldable()) {
throw BinderException("json_group_merge treat_null_values argument must be constant");
}
auto mode_value = ExpressionExecutor::EvaluateScalar(context, *mode_arg);
if (mode_value.IsNull()) {
throw InvalidInputException("json_group_merge: treat_null_values must be one of %s",
JsonGroupMergeNullOptionsText().c_str());
}
if (mode_value.type().id() != LogicalTypeId::VARCHAR) {
throw BinderException("json_group_merge treat_null_values must be a VARCHAR literal");
}
auto normalized = NormalizedNullTreatment(mode_value.ToString());
if (normalized == "delete nulls") {
treatment = JsonNullTreatment::DELETE_NULLS;
} else if (normalized == "ignore nulls") {
treatment = JsonNullTreatment::IGNORE_NULLS;
} else {
throw InvalidInputException("json_group_merge: treat_null_values must be one of %s",
JsonGroupMergeNullOptionsText().c_str());
}
Function::EraseArgument(function, arguments, 1);
}
return make_uniq<JsonGroupMergeBindData>(treatment);
}

static AggregateFunction CreateJsonGroupMergeAggregate(const LogicalType &input_type) {
AggregateFunction function(
"json_group_merge", {input_type}, LogicalType::JSON(), AggregateFunction::StateSize<JsonGroupMergeState>,
Expand All @@ -336,9 +451,17 @@ static AggregateFunction CreateJsonGroupMergeAggregate(const LogicalType &input_
AggregateFunction::UnaryUpdate<JsonGroupMergeState, string_t, JsonGroupMergeFunction>);
function.destructor = AggregateFunction::StateDestroy<JsonGroupMergeState, JsonGroupMergeFunction>;
function.order_dependent = AggregateOrderDependent::ORDER_DEPENDENT;
function.bind = JsonGroupMergeBind;
return function;
}

static void AddJsonGroupMergeAggregate(AggregateFunctionSet &set, const LogicalType &input_type) {
auto function = CreateJsonGroupMergeAggregate(input_type);
set.AddFunction(function);
function.arguments.emplace_back(LogicalType::VARCHAR);
set.AddFunction(function);
}

using duckdb_yyjson::yyjson_arr_iter;
using duckdb_yyjson::yyjson_arr_iter_next;
using duckdb_yyjson::yyjson_arr_iter_with;
Expand Down Expand Up @@ -611,8 +734,22 @@ static void LoadInternal(JsonToolsLoadContext &ctx) {
RegisterScalarFunction(ctx, json_add_prefix_scalar_function);

AggregateFunctionSet json_group_merge_set("json_group_merge");
json_group_merge_set.AddFunction(CreateJsonGroupMergeAggregate(LogicalType::JSON()));
json_group_merge_set.AddFunction(CreateJsonGroupMergeAggregate(LogicalType::VARCHAR));
AddJsonGroupMergeAggregate(json_group_merge_set, LogicalType::JSON());
AddJsonGroupMergeAggregate(json_group_merge_set, LogicalType::VARCHAR);
if (json_group_merge_set.Size() != 4) {
throw InternalException("json_group_merge: expected 4 overloads, found %llu",
(long long)json_group_merge_set.Size());
}
idx_t double_arg_count = 0;
for (idx_t i = 0; i < json_group_merge_set.Size(); i++) {
if (json_group_merge_set.GetFunctionReferenceByOffset(i).arguments.size() == 2) {
double_arg_count++;
}
}
if (double_arg_count != 2) {
throw InternalException("json_group_merge: expected 2 overloads with treat_null_values parameter, found %llu",
(long long)double_arg_count);
}
RegisterAggregateFunction(ctx, std::move(json_group_merge_set));
}

Expand Down
Loading
Loading