flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From LakeShen <shenleifight...@gmail.com>
Subject Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
Date Tue, 07 Apr 2020 11:54:31 GMT
Hi 苟刚,

Flink 任务中,如果开启 Checkpoint 的话,会在每次Checkpoint
完成后,提交偏移量。如果没有开启的话,就是根据自动提交来提交偏移量,默认是开启的,间隔是
5 s.
至于你说每次都是重头开始的,我个人的想法是不是在代码中设置了从最早开始消费,也就是
你使用到了这个方法:setStartFromEarliest[1]

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/connectors/kafka.html#kafka-consumers-start-position-configuration

Best,
LakeShen

gang.gou <gougang_1991@163.com> 于2020年4月7日周二 下午4:17写道:

> 好的,我试一下,有结果了同步大家,谢谢!
>
> 在 2020/4/7 下午3:52,“Evan”<user-zh-return-2826-gougang_1991=
> 163.com@flink.apache.org 代表 chengyanan1008@foxmail.com> 写入:
>
>     之前的代码好像乱码了,我设置了一下,重新发一下,建议你
> 在获取consumer之后,再设置一下&nbsp;consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
> Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
>
>
>
>     &nbsp;/**
>     &nbsp; &nbsp; &nbsp;* @param env
>     &nbsp; &nbsp; &nbsp;* @param topic
>     &nbsp; &nbsp; &nbsp;* @param time&nbsp; 订阅的时间
>     &nbsp; &nbsp; &nbsp;* @return
>     &nbsp; &nbsp; &nbsp;* @throws IllegalAccessException
>     &nbsp; &nbsp; &nbsp;*/
>     &nbsp; &nbsp; public static DataStreamSource<XlogStreamBasicBean&gt;
> buildSource(StreamExecutionEnvironment env, String topic, Long time) throws
> IllegalAccessException {
>     &nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameterTool =
> (ParameterTool) env.getConfig().getGlobalJobParameters();
>     &nbsp; &nbsp; &nbsp; &nbsp; Properties props =
> buildKafkaProps(parameterTool);
>     &nbsp; &nbsp; &nbsp; &nbsp;
> FlinkKafkaConsumer011<XlogStreamBasicBean&gt; consumer = new
> FlinkKafkaConsumer011<&gt;(
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
topic,
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
new
> MetricSchema(),
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
props);
>
>
>     &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;consumer.setStartFromLatest();
>
>
>     &nbsp; &nbsp; &nbsp; &nbsp; consumer.assignTimestampsAndWatermarks(new
> AscendingTimestampExtractor<XlogStreamBasicBean&gt;() {
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long
> extractAscendingTimestamp(XlogStreamBasicBean element) {
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
if (element ==
> null || element.getTimestamp() == null) {
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp;
> return System.currentTimeMillis();
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
}
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
return
> element.getTimestamp() - 10000;
>     &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
>     &nbsp; &nbsp; &nbsp; &nbsp; });
>     &nbsp; &nbsp; &nbsp; &nbsp; return env.addSource(consumer);
>     &nbsp; &nbsp; }
>
>
>     }
>
>
>
>
>
>     ------------------&nbsp;原始邮件&nbsp;------------------
>     发件人:&nbsp;"苟刚"<gougang_1991@163.com&gt;;
>     发送时间:&nbsp;2020年4月7日(星期二) 中午11:27
>     收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
>
>     主题:&nbsp;fink新增计算逻辑时kafka从头开始追平消费记录
>
>
>
>     Hello,
>
>  我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
>      我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
>
>
>     flink版本:1.6.3
>
>     部分代码如下:
>
>     public static void main(String[] args) throws Exception {
>     final ParameterTool parameterTool =
> ExecutionEnvUtil.createParameterTool(args);
>      StreamExecutionEnvironment env =
> ExecutionEnvUtil.prepare(parameterTool);
>
>      DataStreamSource<XlogStreamBasicBean&gt; data =
> KafkaTools.buildSource(env);
>     // 处理timing数据
>     processTimingData(parameterTool, data);
>     // 处理front error数据
>     processFrontErrorData(parameterTool, data);
>     // 处理img error数据
>     processImgLoadErrorData(parameterTool, data);
>      env.execute("xlog compute");
>     }
>
>
>
>
>     kafka的连接参数配置:
>     public static Properties buildKafkaProps(ParameterTool parameterTool) {
>      Properties props = parameterTool.getProperties();
>      props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS,
> DEFAULT_KAFKA_BROKERS));
>      props.put("zookeeper.connect",
> parameterTool.get(KAFKA_ZOOKEEPER_CONNECT,
> DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
>      props.put("group.id", parameterTool.get(KAFKA_GROUP_ID,
> DEFAULT_KAFKA_GROUP_ID));
>      props.put("key.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      props.put("value.deserializer",
> "org.apache.kafka.common.serialization.StringDeserializer");
>      props.put("auto.offset.reset", "latest");
>     return props;
>     }
>
>
>
>
>
>
>
>     --
>
>     Best Wishes
>      Galen.K
>
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message