flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 苟刚 <gougang_1...@163.com>
Subject Re:Re:fink新增计算逻辑时kafka从头开始追平消费记录
Date Tue, 07 Apr 2020 04:03:14 GMT



latest 不是最后消费的位置吗?
另外我一直不明白的是,如果我不新增新的算子,从savepoint启动是没有问题的。不会从头开始消费,之后新增算子后才会出现这个情况。










--

Best Wishes
   Galen.K





在 2020-04-07 11:39:03,"sunfulin" <sunfulin0321@163.com> 写道:
>
>
>
>Hi,
>props.put("auto.offset.reset", "latest");
>是加了这个设置导致的吧
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2020-04-07 11:27:53,"苟刚" <gougang_1991@163.com> 写道:
>>Hello,
>>    我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>>    我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>>
>>
>>flink版本:1.6.3
>>
>>部分代码如下:
>>
>>public static void main(String[] args) throws Exception {
>>final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
>>    StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
>>
>>    DataStreamSource<XlogStreamBasicBean> data = KafkaTools.buildSource(env);
>>// 处理timing数据
>>processTimingData(parameterTool, data);
>>// 处理front error数据
>>processFrontErrorData(parameterTool, data);
>>// 处理img error数据
>>processImgLoadErrorData(parameterTool, data);
>>    env.execute("xlog compute");
>>}
>>
>>
>>
>>
>>kafka的连接参数配置:
>>public static Properties buildKafkaProps(ParameterTool parameterTool) {
>>    Properties props = parameterTool.getProperties();
>>    props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
>>    props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>>    props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
>>    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
>>    props.put("auto.offset.reset", "latest");
>>return props;
>>}
>>
>>
>>
>>
>>
>>
>>
>>--
>>
>>Best Wishes
>>   Galen.K
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message