flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "lucas.wu"<lucas...@xiaoying.com>
Subject Re: rowtime 的类型序列化问题
Date Fri, 20 Mar 2020 06:35:29 GMT
是的 使用的是blink planner。因为我基于flink的基础上又做了一些简单的开发,所以sinkTable的schmea我是先读取了Select
* from source_table,
然后把它注册成了一个临时表,然后把这个临时表的schema赋给sinktable,sinkTable同时也继承了RetractStreamTableSink[Row]。
这是他的一个operator连接图
Source: KafkaTableSource(SeqNo, Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp,
Offset) - SourceConversion(table=[default_catalog.default_database.source_table, source: [KafkaTableSource(SeqNo,
Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset)]], fields=[SeqNo,
Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset]) - Calc(select=[SeqNo,
Type, Table, ServerId, Database, OldData, GTID, Data, Timestamp, Offset, from_unixtime((Data.FuiUpdateTime
/ 1000)) AS FuiUpdateTimeSec, (from_unixtime((Data.FuiUpdateTime / 1000)) TO_TIMESTAMP _UTF-16LE'yyyy-MM-dd
HH:mm:ss') AS event_ts]) - WatermarkAssigner(rowtime=[event_ts], watermark=[(event_ts - 60000:INTERVAL
SECOND)]) - Calc(select=[event_ts]) - SinkConversionToTuple2 - Sink: ConsoleTableSink(event_ts)


目前从报错信息看,可能是SinkConversionToTuple2这个operator有点问题。
这个算子的 inTypeInfo是BaseRow(event_ts: TIMESTAMP(3) *ROWTIME*)
outTypeInfo是Java Tuple2Boolean, Row(event_ts: TimeIndicatorTypeInfo(rowtime)) 这两种对TimeIndicatorTypeInfo序列化方式是不一样的。
一个使用BaseRowSerializer会将TimeIndicatorTypeInfo的序列化方式设置成SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer。
所以我猜测是这里出现了问题。

原始邮件
发件人:Jark Wuimjark@gmail.com
收件人:user-zhuser-zh@flink.apache.org
发送时间:2020年3月20日(周五) 14:21
主题:Re: rowtime 的类型序列化问题


Hi, 请问使用的是 blink planner 么?可以把 sinkTable 的定义也发一下吗?
Best, Jark On Fri, 20 Mar 2020 at 11:40, lucas.wu lucas.wu@xiaoying.com wrote:  Hi all:
 建表语句  create table `source_table`(  `SeqNo` varchar,  `Type` varchar,  `Table` varchar,
 `ServerId` varchar,  `Database` varchar,  `OldData` varchar,  `GTID` varchar,  `Offset` varchar,
 `event_ts` as  to_timestamp(from_unixtime(Data.`FuiUpdateTime`),'yyyy-MM-ddHH:mm:ss'),  WATERMARK
FOR event_ts AS event_ts - interval '60' second  ) with(…)    查询语句  insert into
sinkTable from Select * from source_table;     报错信息:  java.lang.ClassCastException:
java.sql.Timestamp cannot be cast to  java.lang.Long at  org.apache.flink.api.common.typeutils.base.LongSerializer.copy(LongSerializer.java:32)
 at  org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:93)
 at  org.apache.flink.api.java.typeutils.runtime.RowSerializer.copy(RowSerializer.java:44)
 at  org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:115)
 at  org.apache.flink.api.java.typeutils.runtime.TupleSerializer.copy(TupleSerializer.java:37)
 at  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:639)
 at  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:616)
 at  org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:596)
 at  org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:730)
 at  org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:708)
 at SinkConversion$51.processElement(Unknown Source)  ……     最后查看代码,发现对于rowtime,在BaseRowTypeInfo下会是使用SqlTimestampSerializer,而在RowTypeInfo会使用LongSerializer,上下游使用serializer不一样,上游使用SqlTimestampSerializer下游使用LongSerializer就会报错。
 请问这个问题可以避免吗?
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message