Skip to content

Commit

Permalink
[Enhancement] Support the conversion from json to struct
Browse files Browse the repository at this point in the history
Signed-off-by: Song Jiacheng <[email protected]>
  • Loading branch information
Jcnessss committed Nov 13, 2024
1 parent 837a053 commit b3f309a
Show file tree
Hide file tree
Showing 8 changed files with 346 additions and 10 deletions.
1 change: 1 addition & 0 deletions be/src/exprs/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ set(EXPR_FILES
cast_expr.cpp
cast_expr_array.cpp
cast_expr_json.cpp
cast_expr_struct.cpp
cast_nested.cpp
column_ref.cpp
placeholder_ref.cpp
Expand Down
19 changes: 19 additions & 0 deletions be/src/exprs/cast_expr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1596,6 +1596,25 @@ Expr* VectorizedCastExprFactory::create_primitive_cast(ObjectPool* pool, const T
}
}

if (from_type == TYPE_JSON && to_type == TYPE_STRUCT) {
TypeDescriptor cast_to = TypeDescriptor::from_thrift(node.type);

std::vector<std::unique_ptr<Expr>> field_casts(cast_to.children.size());
for (int i = 0; i < cast_to.children.size(); ++i) {
TypeDescriptor json_type = TypeDescriptor::create_json_type();
auto ret = create_cast_expr(pool, json_type, cast_to.children[i], allow_throw_exception);
if (!ret.ok()) {
LOG(WARNING) << "Not support cast from type: " << json_type << ", to type: " << cast_to.children[i];
return nullptr;
}
field_casts[i] = std::move(ret.value());
auto cast_input = create_slot_ref(json_type);
field_casts[i]->add_child(cast_input.get());
pool->add(cast_input.release());
}
return new CastJsonToStruct(node, std::move(field_casts));
}

if (from_type == TYPE_VARCHAR && to_type == TYPE_OBJECT) {
return dispatch_throw_exception<CastVarcharToBitmap>(allow_throw_exception, node);
}
Expand Down
29 changes: 29 additions & 0 deletions be/src/exprs/cast_expr.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "column/vectorized_fwd.h"
#include "exprs/column_ref.h"
#include "exprs/expr.h"
#include "jsonpath.h"
#include "runtime/large_int_value.h"
#include "runtime/types.h"

Expand Down Expand Up @@ -90,6 +91,34 @@ class CastJsonToArray final : public Expr {
TypeDescriptor _cast_to_type_desc;
};

// Cast Json to struct<ANY>
class CastJsonToStruct final : public Expr {
public:
CastJsonToStruct(const TExprNode& node, std::vector<std::unique_ptr<Expr>> field_casts)
: Expr(node), _field_casts(std::move(field_casts)) {
_json_paths.reserve(_type.field_names.size());
for (int j = 0; j < _type.field_names.size(); j++) {
std::string path_string = "$." + _type.field_names[j];
auto res = JsonPath::parse(Slice(path_string));
if (!res.ok()) {
throw std::runtime_error("Failed to parse JSON path: " + path_string);
}
_json_paths.emplace_back(res.value());
}
}

CastJsonToStruct(const CastJsonToStruct& rhs) : Expr(rhs) {}

~CastJsonToStruct() override = default;

StatusOr<ColumnPtr> evaluate_checked(ExprContext* context, Chunk* input_chunk) override;
Expr* clone(ObjectPool* pool) const override { return pool->add(new CastJsonToStruct(*this)); }

private:
std::vector<std::unique_ptr<Expr>> _field_casts;
std::vector<JsonPath> _json_paths;
};

// cast one ARRAY to another ARRAY.
// For example.
// cast ARRAY<tinyint> to ARRAY<int>
Expand Down
133 changes: 133 additions & 0 deletions be/src/exprs/cast_expr_struct.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
// Copyright 2021-present StarRocks, Inc. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "column/array_column.h"
#include "column/column_builder.h"
#include "column/column_helper.h"
#include "column/column_viewer.h"
#include "column/json_column.h"
#include "exprs/cast_expr.h"
#include "exprs/expr_context.h"
#include "gutil/casts.h"
#include "gutil/strings/split.h"
#include "gutil/strings/strip.h"
#include "gutil/strings/substitute.h"
#include "jsonpath.h"
#include "runtime/memory/memory_resource.h"
#include "types/logical_type.h"
#include "util/slice.h"
#include "velocypack/Iterator.h"

namespace starrocks {

#define APPEND_NULL(json_columns, null_column) \
for (auto& json_column : json_columns) { \
json_column.append_null(); \
} \
null_column->append(1);

StatusOr<ColumnPtr> CastJsonToStruct::evaluate_checked(ExprContext* context, Chunk* input_chunk) {
ASSIGN_OR_RETURN(ColumnPtr column, _children[0]->evaluate_checked(context, input_chunk));
if (column->only_null()) {
return ColumnHelper::create_const_null_column(column->size());
}

ColumnViewer<TYPE_JSON> src(column);
NullColumn::Ptr null_column = NullColumn::create();

// 1. Cast Json to json columns.
size_t field_size = _type.children.size();
DCHECK_EQ(field_size, _type.field_names.size());
vector<ColumnBuilder<TYPE_JSON>> json_columns;
for (size_t i = 0; i < field_size; i++) {
ColumnBuilder<TYPE_JSON> json_column_builder(src.size());
json_columns.emplace_back(json_column_builder);
}
for (size_t i = 0; i < src.size(); i++) {
if (src.is_null(i)) {
APPEND_NULL(json_columns, null_column);
continue;
}
const JsonValue* json_value = src.value(i);
if (json_value && json_value->get_type() == JsonType::JSON_ARRAY) {
vpack::Slice json_slice = json_value->to_vslice();
DCHECK(json_slice.isArray());
size_t index = 0;
for (const auto& element : vpack::ArrayIterator(json_slice)) {
if (index >= field_size) {
break;
}
JsonValue element_value(element);
json_columns[index].append(std::move(element_value));
index++;
}
if (index < field_size) {
// Fill the other field with null.
for (; index < field_size; index++) {
json_columns[index].append_null();
}
}
null_column->append(0);
} else if (json_value && json_value->get_type() == JsonType::JSON_OBJECT) {
// For json object, the names of the struct fields must match the json object keys.
// Otherwise, the value of the field will be NULL.
for (int path_index = 0; path_index < _type.field_names.size(); path_index++) {
vpack::Builder builder;
if (path_index >= _json_paths.size()) {
json_columns[path_index].append_null();
continue;
}
vpack::Slice json_slice = JsonPath::extract(json_value, _json_paths[path_index], &builder);
if (json_slice.isNone()) {
json_columns[path_index].append_null();
continue;
}
JsonValue element_value(json_slice);
json_columns[path_index].append(std::move(element_value));
}
null_column->append(0);
} else {
APPEND_NULL(json_columns, null_column);
}
}
// 2. Cast json column to specified column
Columns casted_fields;
for (size_t i = 0; i < field_size; i++) {
ColumnPtr elements = json_columns[i].build_nullable_column();
if (_field_casts[i] != nullptr) {
Chunk field_chunk;
field_chunk.append_column(elements, 0);
ASSIGN_OR_RETURN(auto casted_field, _field_casts[i]->evaluate_checked(context, &field_chunk));
casted_field = NullableColumn::wrap_if_necessary(casted_field);
casted_fields.emplace_back(std::move(casted_field));
} else {
casted_fields.emplace_back(NullableColumn::wrap_if_necessary(elements->clone_shared()));
}
DCHECK(casted_fields[i]->is_nullable());
}

ColumnPtr res = StructColumn::create(std::move(casted_fields), _type.field_names);
RETURN_IF_ERROR(res->unfold_const_children(_type));
if (column->is_nullable()) {
res = NullableColumn::create(res, null_column);
}

// Wrap constant column if source column is constant.
if (column->is_constant()) {
res = ConstColumn::create(res, column->size());
}
return res;
}

} // namespace starrocks
94 changes: 94 additions & 0 deletions be/test/exprs/cast_expr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2380,4 +2380,98 @@ TEST_F(VectorizedCastExprTest, unsupported_test) {
ASSERT_FALSE(Expr::create_vectorized_expr(&pool, cast_expr, &expr3, &runtime_state).ok());
}

TTypeDesc gen_struct_type_desc(const std::vector<TPrimitiveType::type> field_types,
const std::vector<std::string> field_names) {
std::vector<TTypeNode> types_list;
TTypeDesc type_desc;

TTypeNode type_struct;
type_struct.type = TTypeNodeType::STRUCT;
std::vector<TStructField> fields;
for (const auto& field_name : field_names) {
TStructField field;
field.__set_name(field_name);
fields.push_back(field);
}
type_struct.__set_struct_fields(fields);
types_list.push_back(type_struct);

for (int index = 0; index < field_types.size(); index++) {
TTypeNode type_scalar;
TScalarType scalar_type;
scalar_type.__set_type(field_types[index]);
scalar_type.__set_precision(0);
scalar_type.__set_scale(0);
scalar_type.__set_len(0);
type_scalar.__set_scalar_type(scalar_type);
types_list.push_back(type_scalar);
}
type_desc.__set_types(types_list);
return type_desc;
}

static std::string cast_json_to_struct(TExprNode& cast_expr, std::vector<LogicalType> element_types,
std::vector<std::string> field_names, const std::string& str) {
cast_expr.child_type = to_thrift(TYPE_JSON);
std::vector<TPrimitiveType::type> field_types;
for (const auto& element_type : element_types) {
field_types.emplace_back(to_thrift(element_type));
}
cast_expr.type = gen_struct_type_desc(field_types, field_names);
ObjectPool pool;
std::unique_ptr<Expr> expr(VectorizedCastExprFactory::from_thrift(&pool, cast_expr));

auto json = JsonValue::parse(str);
if (!json.ok()) {
return "INVALID JSON";
}
cast_expr.type = gen_type_desc(cast_expr.child_type);
MockVectorizedExpr<TYPE_JSON> col1(cast_expr, 1, &json.value());
expr->_children.push_back(&col1);

ColumnPtr ptr = expr->evaluate(nullptr, nullptr);
if (ptr->size() != 1) {
return "EMPTY";
}
return ptr->debug_item(0);
}

TEST_F(VectorizedCastExprTest, json_to_struct) {
TExprNode cast_expr;
cast_expr.opcode = TExprOpcode::CAST;
cast_expr.node_type = TExprNodeType::CAST_EXPR;
cast_expr.num_children = 2;
cast_expr.__isset.opcode = true;
cast_expr.__isset.child_type = true;

EXPECT_EQ("{col1:1,col2:2,col3:3}",
cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_INT, TYPE_INT}, {"col1", "col2", "col3"}, "[1,2,3]"));
EXPECT_EQ("{col1:1,col2:2,col3:3}",
cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_INT, TYPE_INT}, {"col1", "col2", "col3"}, "[1, 2, 3]"));
EXPECT_EQ("{col1:NULL,col2:NULL,col3:NULL}",
cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_INT, TYPE_INT}, {"col1", "col2", "col3"}, "[]"));
EXPECT_EQ("{col1:NULL}", cast_json_to_struct(cast_expr, {TYPE_INT}, {"col1"}, ""));
EXPECT_EQ("{col1:NULL}", cast_json_to_struct(cast_expr, {TYPE_INT}, {"col1"}, "a"));
EXPECT_EQ("{col1:NULL,col2:NULL}",
cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_INT}, {"col1", "col2"}, R"(["a","b"])"));

EXPECT_EQ("{col1:1.1,col2:2.2,col3:3.3}", cast_json_to_struct(cast_expr, {TYPE_DOUBLE, TYPE_DOUBLE, TYPE_DOUBLE},
{"col1", "col2", "col3"}, "[1.1,2.2,3.3]"));

EXPECT_EQ("{col1:'a',col2:'b'}",
cast_json_to_struct(cast_expr, {TYPE_VARCHAR, TYPE_VARCHAR}, {"col1", "col2"}, R"(["a","b"])"));
EXPECT_EQ("{col1:'a',col2:' b'}",
cast_json_to_struct(cast_expr, {TYPE_VARCHAR, TYPE_VARCHAR}, {"col1", "col2"}, R"(["a", " b"])"));
EXPECT_EQ("{col1:'1',col2:'2'}",
cast_json_to_struct(cast_expr, {TYPE_VARCHAR, TYPE_VARCHAR}, {"col1", "col2"}, R"([1, 2])"));

EXPECT_EQ("{star:'rocks',number:1}", cast_json_to_struct(cast_expr, {TYPE_VARCHAR, TYPE_INT}, {"star", "number"},
R"({"star": "rocks", "number": 1})"));
EXPECT_EQ("{number:1,star:'rocks'}", cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_VARCHAR}, {"number", "star"},
R"({"star": "rocks", "number": 1})"));
EXPECT_EQ("{number:1,not_found:NULL}",
cast_json_to_struct(cast_expr, {TYPE_INT, TYPE_VARCHAR}, {"number", "not_found"},
R"({"star": "rocks", "number": 1})"));
}

} // namespace starrocks
17 changes: 7 additions & 10 deletions fe/fe-core/src/main/java/com/starrocks/catalog/Type.java
Original file line number Diff line number Diff line change
Expand Up @@ -1147,8 +1147,13 @@ public static boolean canCastTo(Type from, Type to) {
return true;
} else if (from.isStringType() && to.isArrayType()) {
return true;
} else if (from.isJsonType() && to.isArrayScalar()) {
// now we only support cast json to one dimensional array
} else if (from.isJsonType() && to.isArrayType()) {
ArrayType array = (ArrayType) to;
if (array.getItemType().isScalarType() || array.getItemType().isStructType()) {
return true;
}
return false;
} else if (from.isJsonType() && to.isStructType()) {
return true;
} else if (from.isBoolean() && to.isComplexType()) {
// for mock nest type with NULL value, the cast must return NULL
Expand All @@ -1159,14 +1164,6 @@ public static boolean canCastTo(Type from, Type to) {
}
}

public boolean isArrayScalar() {
if (!isArrayType()) {
return false;
}
ArrayType array = (ArrayType) this;
return array.getItemType().isScalarType();
}

/**
* Return type t such that values from both t1 and t2 can be assigned to t without an
* explicit cast. If strict, does not consider conversions that would result in loss
Expand Down
49 changes: 49 additions & 0 deletions test/sql/test_cast/R/test_cast_json_to_struct
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
-- name: test_cast_json_to_struct
select cast(PARSE_JSON('[1,2,3]') as struct<col1 int, col2 int, col3 int>);
-- result:
{"col1":1,"col2":2,"col3":3}
-- !result
select cast(PARSE_JSON('[1,2,3]') as struct<col1 int, col2 int>);
-- result:
{"col1":1,"col2":2}
-- !result
select cast(PARSE_JSON('[1,2,3]') as struct<col1 int, col2 int, col3 int, col4 int>);
-- result:
{"col1":1,"col2":2,"col3":3,"col4":null}
-- !result
select cast(PARSE_JSON('[1, 2, 3, "a"]') as struct<col1 int, col2 int, col3 int, col4 int>);
-- result:
{"col1":1,"col2":2,"col3":3,"col4":null}
-- !result
select cast(PARSE_JSON('[1.1, 2.2, 3.3]') as struct<col1 double, col2 double, col3 double>);
-- result:
{"col1":1.1,"col2":2.2,"col3":3.3}
-- !result
select cast(PARSE_JSON('[1.1, 2.2, 3.3]') as struct<col1 double, col2 double, col3 int>);
-- result:
{"col1":1.1,"col2":2.2,"col3":3}
-- !result
select cast(PARSE_JSON('{"star": "rocks", "number": 1}') as struct<number int, star varchar>);
-- result:
{"number":1,"star":"rocks"}
-- !result
select cast(PARSE_JSON('{"star": "rocks", "number": 1}') as struct<number int, not_found varchar>);
-- result:
{"number":1,"not_found":null}
-- !result
select cast(PARSE_JSON('{"star": "rocks", "number": [1, 2, 3]}') as struct<number array<int>, not_found varchar>);
-- result:
{"number":[1,2,3],"not_found":null}
-- !result
select cast(PARSE_JSON('[1, [{"star": "rocks"}, {"star": "rocks"}]]') as struct<col1 int, col2 array<json>>);
-- result:
{"col1":1,"col2":['{"star": "rocks"}','{"star": "rocks"}']}
-- !result
select cast(PARSE_JSON('{"star" : "rocks", "length": 5, "numbers": [1, 4, 7], "nest": [1, 2, 3]}') as struct<star varchar(10), length int, numbers array<int>, nest struct<col1 int, col2 int, col3 int>>);
-- result:
{"star":"rocks","length":5,"numbers":[1,4,7],"nest":{"col1":1,"col2":2,"col3":3}}
-- !result
select cast(PARSE_JSON('[{"star" : "rocks", "length": 5, "numbers": [1, 4, 7], "nest": [1, 2, 3]}, {"star" : "rockses", "length": 33, "numbers": [2, 5, 9], "nest": [3, 6, 9]}]') as array<struct<star varchar(10), length int, numbers array<int>, nest struct<col1 int, col2 int, col3 int>>>);
-- result:
[{"star":"rocks","length":5,"numbers":[1,4,7],"nest":{"col1":1,"col2":2,"col3":3}},{"star":"rockses","length":33,"numbers":[2,5,9],"nest":{"col1":3,"col2":6,"col3":9}}]
-- !result
Loading

0 comments on commit b3f309a

Please sign in to comment.