flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Benchao Li <libenc...@gmail.com>
Subject Re: flink-1.10-sql 维表问题
Date Thu, 16 Apr 2020 02:53:46 GMT
https://issues.apache.org/jira/browse/FLINK-16068
https://issues.apache.org/jira/browse/FLINK-16345
上面这两个issue的修改都加到了1.10上了么?如果是的话,那这可能是还有其他的bug。
如果你可以在1.10和或者master分支的最新代码上复现这个问题的话,可以建一个issue来跟踪下这个问题。

111 <xinghalo@163.com> 于2020年4月16日周四 上午10:46写道:

> Hi,
> 基于1.10 源码按照jira里面的PR修改不行么?
> 跟hbase的ddl关系应该不大,就发一个kafka的吧。
>
>
> //代码占位符
> Flink SQL> CREATE TABLE kafka_test1 (
> //代码占位符
> Flink SQL> CREATE TABLE kafka_test1 (
> >   id varchar,
> >   a varchar,
> >   b int,
> >   ts as PROCTIME()
> > ) WITH (
> >   'connector.type' = 'kafka',
> >   'connector.version' = '0.11',
> >   'connector.topic' = 'test',
> >   'connector.properties.zookeeper.connect' = 'localnode2:2181',
> >   'connector.properties.bootstrap.servers' = 'localnode2:9092',
> >   'connector.properties.group.id' = 'testGroup',
> >   'connector.startup-mode' = 'latest-offset',
> >   'format.type' = 'json'
> > )
> > ;
> [INFO] Table has been created.
>
>
> Flink SQL> select a.*,b.* from kafka_test1 a join hbase_test1 FOR
> SYSTEM_TIME AS OF a.ts as b on a.id = b.rowkey;
>
>
> 异常信息:
> //代码占位符
> [ERROR] Could not execute SQL statement. Reason:
> java.lang.AssertionError: Conversion to relational algebra failed to
> preserve datatypes:
> validated type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIMESTAMP(3) NOT
> NULL ts, VARCHAR(2147483647) CHARACTER SET "UTF-16LE" rowkey,
> RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a,
> INTEGER b) f) NOT NULL
> converted type:
> RecordType(VARCHAR(2147483647) CHARACTER SET "UTF-16LE" id,
> VARCHAR(2147483647) CHARACTER SET "UTF-16LE" a, INTEGER b, TIME
> ATTRIBUTE(PROCTIME) NOT NULL ts, VARCHAR(2147483647) CHARACTER SET
> "UTF-16LE" rowkey, RecordType:peek_no_expand(VARCHAR(2147483647) CHARACTER
> SET "UTF-16LE" a, INTEGER b) f) NOT NULL
> rel:
> LogicalProject(id=[$0], a=[$1], b=[$2], ts=[$3], rowkey=[$4], f=[$5])
>   LogicalCorrelate(correlation=[$cor1], joinType=[inner],
> requiredColumns=[{0, 3}])
>     LogicalProject(id=[$0], a=[$1], b=[$2], ts=[PROCTIME()])
>       LogicalTableScan(table=[[tgou, collie, kafka_test1, source:
> [Kafka011TableSource(id, a, b)]]])
>     LogicalFilter(condition=[=($cor1.id, $0)])
>       LogicalSnapshot(period=[$cor1.ts])
>         LogicalTableScan(table=[[tgou, collie, hbase_test1, source:
> [HBaseTableSource[schema=[rowkey, f], projectFields=null]]]])
>
>
> Best,
> Xinghalo



-- 

Benchao Li
School of Electronics Engineering and Computer Science, Peking University
Tel:+86-15650713730
Email: libenchao@gmail.com; libenchao@pku.edu.cn

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message