flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Charoes <char...@gmail.com>
Subject Re: Flink 消费数据延迟问题
Date Fri, 17 May 2019 00:35:28 GMT
hi chai, 你好
 参考文档:
https://ci.apache.org/projects/flink/flink-docs-stable/dev/table/streaming/joins.html
 普通的join如下, 如果join不到,数据会一直保存在state里,这会有一些内存泄漏隐患

SELECT * FROM OrdersINNER JOIN ProductON Orders.productId = Product.id

可以尝试time window join, 如果可以估计两个stream到达的先后间隔,
可以估计一个间隔时间,比如10秒,超过10秒还没有join到,就会被flink从state中移除。

SELECT *FROM
  Orders o,
  Shipments sWHERE o.id = s.orderId AND
      o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

或者使用ConnectedStream, 参考ConnectedStream.process方法,
可以实现一个CoProcessFunction, 这样可以逐条处理两个stream里的elements,
可以自己控制合适需要从State中移除


On Thu, May 16, 2019 at 3:47 PM chai <chaiyuan@didachuxing.com> wrote:

> 场景:kafka 流表和cassandra维度表join,生成宽表
>
> 问题描述:
> kafka 数据(流表A)和cassandra 数据(维度表B)都是通过mysql binglog生成的。
> 在mysql 中业务库中,表A和表B是同步生成的两条数据,可以根据id关联。
> 在flink sql是用过程中发现,一个case。表A中最近一条数据id为1,此时表B中id为1的数据还没到,导致join不上。
>
> 个人的疑问:
> 1.flink sql中有没有延迟读取kafka 流表A(短时间,例如2秒),这样表A和B就能join上了。
> 2.是否有其它更适合当前场景的方式?
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message