flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zhisheng <zhisheng2...@gmail.com>
Subject Re: 请教Flink SQL watermark遇到未来时间的处理问题
Date Thu, 01 Aug 2019 10:49:36 GMT
感谢你

郑 仲尼 <zhengzhongni@outlook.com> 于 2019年7月31日周三 下午4:09写道:

> hi,智笙:
>
>
>     感谢提供解决思路,目前我这边还尝试了几种可行的方案:
>
> 1.在kafka反序列化的时候,判断kafka中日期字段的值,如果超过当前时间太多,则丢弃,或者重置为当前时间(重置其实可能导致正常数据丢失)。
>
>
> 2.自定义一个watermark,当时间大于当前时间太多的时候,不更新当前的watermark,这样在watermark达到这条未来时间的时间点后,也会将这条数据纳入窗口计算,这种其实是比较理想的。但是这种没有完全的测试,感觉数据会一直存放在内存中,不知道会不会引起其他问题。
>
> 在编写自定义watermark的时候,发现只能使用scala写,使用java实现的话,没有数据输出,debug看java
> 实现的代码生成的watermark也正确。因为对scala调用java不熟,不打算深究了。
>
>
>     下面是关键的实现代码,希望对有类似问题的人有所帮助:
>
> 构建Schema时指定rowtime:
>
> Rowtime rowtime = new Rowtime() .timestampsFromField(rowTimeField)
> .watermarksFromStrategy(new
> BoundedOutOfOrderTimestamps(delay,futrueTimeLimit));
>
>
> 下面是scala版本的自定义watermark生成策略,仿的Flink自带的BoundedOutOfOrderTimestamps:
>
>
> import org.apache.flink.streaming.api.watermark.Watermark
>
> import
> org.apache.flink.table.sources.wmstrategies.PeriodicWatermarkAssigner
>
>
> final class BoundedOutOfOrderTimestamps(val delay: Long, val
> futrueTimeLimit: Long) extends PeriodicWatermarkAssigner {
>
>   var maxTimestamp: Long = Long.MinValue + delay
>
>   override def nextTimestamp(timestamp: Long): Unit = {
>
>     //需考虑时区,28800000为8小时,按需修改
>
>     var currentTimeLimit: Long = System.currentTimeMillis() + 28800000 +
> futrueTimeLimit;
>
>     if (timestamp > currentTimeLimit) {
>
>     //这里不更新maxTimestamp即不更新返回的watermark
>
>       println("未来时间:timestamp=" + timestamp + ",maxTimestamp=" +
> maxTimestamp + ",scalacurrentTimeLimit=" + currentTimeLimit);
>
>     } else {
>
>       if (timestamp > maxTimestamp) {
>
>         maxTimestamp = timestamp
>
>       }
>
>     }
>
>   }
>
>   override def getWatermark: Watermark = new Watermark(maxTimestamp -
> delay)
>
>   override def equals(other: Any): Boolean = other match {
>
>     case that: BoundedOutOfOrderTimestamps =>
>
>       delay == that.delay
>
>     case _ => false
>
>   }
>
>   override def hashCode(): Int = {
>
>     delay.hashCode()
>
>   }
>
> }
>
>  原始邮件
> 发件人: zhisheng<zhisheng2018@gmail.com>
> 收件人: user-zh<user-zh@flink.apache.org>
> 发送时间: 2019年7月24日(周三) 23:45
> 主题: Re: 请教Flink SQL watermark遇到未来时间的处理问题
>
>
> hi,仲尼:
>   通常这种时间超前的数据是由于你机器的时间有问题(未对齐),然后采集上来的数据使用的那个时间可能就会比当前时间超前了(大了),你可以有下面解决方法:
>
> 1、在 Flink 从 Kafka 中消费数据后就进行 filter
> 部分这种数据(可以获取到时间后和当前时间相比一下,如果超前或者超前多久就把这条数据丢掉,之前我自己项目也有遇到过这种数据问题,设置的超前
5
> 分钟以上的数据就丢失),就不让进入后面生成水印,这样就不会导致因为水印过大而导致你后面的问题
> 2、在生成水印的地方做判断,如果采集上来的数据的时间远大于当前时间(比如超过
5
> 分钟)那么就不把当前水印的时间设置为数据的时间,而是用当前系统的时间代替
>
>
>
>
>
> Best!
> From zhisheng
>
> 郑 仲尼 <zhengzhongni@outlook.com<mailto:zhengzhongni@outlook.com>>
> 于2019年7月24日周三 下午3:44写道:
>
> > 各位Flink社区大佬,
> > 您好!
> > 我使用Flink SQL (Flink
> >
> 1.8.0)进行一些聚合计算,消费的是Kafka数据,使用的是EventTime,但是有时候,偶然会出现rowtime字段来了一条未来时间的数据(可能是上送的数据时区导致),这样Watermark会直接推到了未来某个时间点,导致这笔错误数据到达后的数据,到未来时间点之间的数据会被丢弃。
> >
> > 这个问题根本确实是业务方面的问题,但是我们还是希望有一些方案应对这种异常情况。
> >
> > 目前,我们这边处理的方法是:
> >
> 1.在进入聚合任务之前进行过滤操作,新增一个过滤的任务(使用的是ProcessTime),将这条错误的数据直接丢弃(或者输出到其他topic),将结果发送中间的kafka
> > topic,聚合任务再消费中间的kafka topic。
> >
> > 我想请教的是:
> > 1.不知各位是否有遇到过同样的问题,有没有更好的处理方式
? 新加一个任务虽然能够暂时解决,但是可能会导致延迟增加,也增加了出错的几率。
> > 2.不知是否有方法在一个任务中完成下面的两步操作:
> > 1) tEnv.registerTable(operatorTable,tEnv.sqlQuery(select * from
> > KafkaSource where $field1 >$value));// 这一步来一条处理一条,进行数据的过滤
> > 2)select sum(field2) from operatorTable  group by TUMBLE(rowtime,INTERVAL
> > '5' SECOND),field2 //这一步使用rowtime聚合输出
> > 这种方法目前存在的问题是:在定义KafkaSource
> >
> 时,需要指定rowtime(构建kafka连接器的时候需要指定),一旦有错误数据进来,还没有执行到第2)步,watermark貌似就已经受到了影响。
> >
> > 我连接Kafka的代码大概如下:
> >
> > tEnv.connect( new Kafka()
> >
> >          .topic(topic)
> >
> >          .version(version)
> >
> >          .startFromLatest()
> >
> >          .properties(prop))
> >
> >         .withFormat(new Json()
> >
> >          .failOnMissingField(false)
> >
> >          .deriveSchema())
> >
> >         .withSchema(new Schema()
> >
> >          .schema(tableSchema.getTableSchema())
> >
> >          .rowtime(new Rowtime()
> >
> >          .timestampsFromField(rowTimeField)
> >
> >          .watermarksPeriodicBounded(delay)))
> >
> >         .inAppendMode()
> >
> >         .registerTableSink("KafkaSource");
> >
> > 以上,期待您帮忙解答,非常感谢~~
> >
> > 祝
> > 顺利
> > —————————
> > Johnny Zheng
> >
> >
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message