flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Evan" <chengyanan1...@foxmail.com>
Subject 回复: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
Date Tue, 07 Apr 2020 07:10:29 GMT
&nbsp; &nbsp;苟刚你好,刚才看了你的kafka消费代码,建议你在获取consumer后,增加一行如下代码
“consumer.setStartFromLatest();”然后再测试一下。


/**
&nbsp; &nbsp; &nbsp;* @param env
&nbsp; &nbsp; &nbsp;* @param topic
&nbsp; &nbsp; &nbsp;* @param time&nbsp; 订阅的时间
&nbsp; &nbsp; &nbsp;* @return
&nbsp; &nbsp; &nbsp;* @throws IllegalAccessException
&nbsp; &nbsp; &nbsp;*/
&nbsp; &nbsp; public static DataStreamSource<XlogStreamBasicBean&gt; buildSource(StreamExecutionEnvironment
env, String topic, Long time) throws IllegalAccessException {
&nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameterTool = (ParameterTool)
env.getConfig().getGlobalJobParameters();
&nbsp; &nbsp; &nbsp; &nbsp; Properties props = buildKafkaProps(parameterTool);
&nbsp; &nbsp; &nbsp; &nbsp; FlinkKafkaConsumer011<XlogStreamBasicBean&gt;
consumer = new FlinkKafkaConsumer011<&gt;(
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; topic,
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new
MetricSchema(),
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; props);


&nbsp; &nbsp; &nbsp; &nbsp; consumer.setStartFromLatest();

&nbsp; &nbsp; &nbsp; &nbsp; consumer.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<XlogStreamBasicBean&gt;()
{
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(XlogStreamBasicBean
element) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if
(element == null || element.getTimestamp() == null) {
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; return System.currentTimeMillis();
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return
element.getTimestamp() - 10000;
&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
&nbsp; &nbsp; &nbsp; &nbsp; });
&nbsp; &nbsp; &nbsp; &nbsp; return env.addSource(consumer);
&nbsp; &nbsp; }


}





------------------&nbsp;原始邮件&nbsp;------------------
发件人:&nbsp;"gang.gou"<gougang_1991@163.com&gt;;
发送时间:&nbsp;2020年4月7日(星期二) 下午2:32
收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;

主题:&nbsp;Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录



我的kafka版本是0.11;flink版本是1.6.3;我没有显示设置offset自动提交,但是看kafka的官网文档,默认应该是true




在 2020/4/7 下午2:10,“Evan”<user-zh-return-2816-gougang_1991=163.com@flink.apache.org
代表 chengyanan1008@foxmail.com&gt; 写入:

&nbsp;&nbsp;&nbsp; 苟刚你好:请问你使用的kafka是什么版本的,还有就是有没有设置offset自动提交
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; ------------------&amp;nbsp;原始邮件&amp;nbsp;------------------
&nbsp;&nbsp;&nbsp; 发件人:&amp;nbsp;"苟刚"<gougang_1991@163.com&amp;gt;;
&nbsp;&nbsp;&nbsp; 发送时间:&amp;nbsp;2020年4月7日(星期二) 中午11:27
&nbsp;&nbsp;&nbsp; 收件人:&amp;nbsp;"user-zh"<user-zh@flink.apache.org&amp;gt;;
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 主题:&amp;nbsp;fink新增计算逻辑时kafka从头开始追平消费记录
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; Hello,
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; 我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; flink版本:1.6.3
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 部分代码如下:
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; public static void main(String[] args) throws Exception {
&nbsp;&nbsp;&nbsp; final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; StreamExecutionEnvironment
env = ExecutionEnvUtil.prepare(parameterTool);
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; DataStreamSource<XlogStreamBasicBean&amp;gt;
data = KafkaTools.buildSource(env);
&nbsp;&nbsp;&nbsp; // 处理timing数据
&nbsp;&nbsp;&nbsp; processTimingData(parameterTool, data);
&nbsp;&nbsp;&nbsp; // 处理front error数据
&nbsp;&nbsp;&nbsp; processFrontErrorData(parameterTool, data);
&nbsp;&nbsp;&nbsp; // 处理img error数据
&nbsp;&nbsp;&nbsp; processImgLoadErrorData(parameterTool, data);
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; env.execute("xlog
compute");
&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; kafka的连接参数配置:
&nbsp;&nbsp;&nbsp; public static Properties buildKafkaProps(ParameterTool parameterTool)
{
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; Properties props
= parameterTool.getProperties();
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("bootstrap.servers",
parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("zookeeper.connect",
parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("group.id",
parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp;&amp;nbsp; props.put("auto.offset.reset",
"latest");
&nbsp;&nbsp;&nbsp; return props;
&nbsp;&nbsp;&nbsp; }
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; --
&nbsp;&nbsp;&nbsp; 
&nbsp;&nbsp;&nbsp; Best Wishes
&nbsp;&nbsp;&nbsp; &amp;nbsp;&amp;nbsp; Galen.K
Mime
  • Unnamed multipart/alternative (inline, 8-Bit, 0 bytes)
View raw message