flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Zai Arnold <yzin...@gmail.com>
Subject Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
Date Tue, 07 Apr 2020 04:42:52 GMT
看一下flink 对应taskmanager log,里面有任务启动时kafka详细的启动配置信息.

发件人: 苟刚 <gougang_1991@163.com>
答复: "user-zh@flink.apache.org" <user-zh@flink.apache.org>
日期: 2020年4月7日 星期二 下午12:23
收件人: "user-zh@flink.apache.org" <user-zh@flink.apache.org>
抄送: "apache22@163.com" <apache22@163.com>
主题: Re:回复:fink新增计算逻辑时kafka从头开始追平消费记录


附件是两份主要代码









--
Best Wishes
   Galen.K



在 2020-04-07 12:11:07,"酷酷的浑蛋" <apache22@163.com> 写道:

>是不是代码中设置了从头消费,还有可能提交offset到kafka的代码中设置了false?因为你的代码应该不是全的,所以没法具体看

>

>

>| |

>apache22

>|

>|

>apache22@163.com

>|

>签名由网易邮箱大师定制

>在2020年4月7日 12:03,苟刚<gougang_1991@163.com> 写道:

>

>

>

>latest 不是最后消费的位置吗?

>另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。

>

>

>

>

>

>

>

>

>

>

>--

>

>Best Wishes

>Galen.K

>

>

>

>

>

>在 2020-04-07 11:39:03,"sunfulin" <sunfulin0321@163.com> 写道:

>

>

>

>Hi,

>props.put("auto.offset.reset", "latest");

>是加了这个设置导致的吧

>

>

>

>

>

>

>

>

>

>

>

>

>

>

>在 2020-04-07 11:27:53,"苟刚" <gougang_1991@163.com> 写道:

>Hello,

>我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?

>我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。

>

>

>flink版本:1.6.3

>

>部分代码如下:

>

>public static void main(String[] args) throws Exception {

>final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);

>StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);

>

>DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);

>// 处理timing数据

>processTimingData(parameterTool, data);

>// 处理front error数据

>processFrontErrorData(parameterTool, data);

>// 处理img error数据

>processImgLoadErrorData(parameterTool, data);

>env.execute("xlog compute");

>}

>

>

>

>

>kafka的连接参数配置:

>public static Properties buildKafkaProps(ParameterTool parameterTool) {

>Properties props = parameterTool.getProperties();

>props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));

>props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));

>props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));

>props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

>props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

>props.put("auto.offset.reset", "latest");

>return props;

>}

>

>

>

>

>

>

>

>--

>

>Best Wishes

>Galen.K

>





Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message