Skip to content

some problem on flink batch sql about insert operation #3

@queyuexzy

Description

@queyuexzy

hi,最近在写flink 的batch sql,在写的时候遇到了一个问题:
flink 版本:1.10.1
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
BatchTableEnvironment tEnv = BatchTableEnvironment.create(env);
tEnv.registerFunction("explode", new ExplodeFunction());
Table orders = tEnv.from("zhangying480_test");
HiveCatalog catalog = new HiveCatalog("myhive", "test", "", HiveVersionInfo.getVersion());
tEnv.registerCatalog("myhive", catalog);
tEnv.useCatalog("myhive");
Table counts = orders
.groupBy("related_info")
.select("explode(related_info)");
counts.insertInto("zhangying4802_test");
tEnv.execute("test");

如果直接这么执行,会报错如下:
Exception in thread "main" org.apache.flink.table.api.TableException: Only BatchTableSource and InputFormatTableSource are supported in BatchTableEnvironment.
at org.apache.flink.table.plan.nodes.dataset.BatchTableSourceScan.translateToPlan(BatchTableSourceScan.scala:118)
at org.apache.flink.table.plan.nodes.dataset.DataSetDistinct.translateToPlan(DataSetDistinct.scala:84)
at org.apache.flink.table.plan.nodes.dataset.DataSetCalc.translateToPlan(DataSetCalc.scala:92)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:306)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.translate(BatchTableEnvImpl.scala:281)
at org.apache.flink.table.api.internal.BatchTableEnvImpl.writeToSink(BatchTableEnvImpl.scala:145)
at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:664)
at org.apache.flink.table.api.internal.TableEnvImpl.insertInto(TableEnvImpl.scala:604)
at org.apache.flink.table.api.internal.TableImpl.insertInto(TableImpl.java:411)
at com.jd.search.algo.invoke.LocalHiveTest.main(LocalHiveTest.java:99)

因此,我用如下代码跑:
EnvironmentSettings settings = EnvironmentSettings.newInstance().useBlinkPlanner().inBatchMode().build();
TableEnvironment tEnv = TableEnvironment.create(settings);
tEnv.registerFunction("explode", new ExplodeFunction());
Table orders = tEnv.from("zhangying480_test");
HiveCatalog catalog = new HiveCatalog("myhive", "test", "", HiveVersionInfo.getVersion());
tEnv.registerCatalog("myhive", catalog);
tEnv.useCatalog("myhive");
Table counts = orders
.groupBy("related_info")
.select("explode(related_info)");
counts.insertInto("zhangying4802_test");
tEnv.execute("test");
但是这么写的话,tEnv.registerFunction("explode", new ExplodeFunction()); 这行代码报错,查看源码,发现是因为TableEnvironment 只支持ScalarFunction,但是ExplodeFunction继承的是TableFunction。

我们的业务需求是需要使用TableFunction,但是感觉这个问题好像无解了。因为我们使用的是平台的flink 集群,他们的升级比较困难,请问可以在不生版本的同时解决这个问题吗?

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions