flink-user-zh mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "gang.gou" <gougang_1...@163.com>
Subject Re: 回复:fink新增计算逻辑时kafka从头开始追平消费记录
Date Tue, 07 Apr 2020 08:17:25 GMT
好的,我试一下,有结果了同步大家,谢谢!

在 2020/4/7 下午3:52,“Evan”<user-zh-return-2826-gougang_1991=163.com@flink.apache.org
代表 chengyanan1008@foxmail.com> 写入:

    之前的代码好像乱码了,我设置了一下,重新发一下,建议你 在获取consumer之后,再设置一下&nbsp;consumer.setStartFromLatest();,这样设置的参考就是官网文档介绍的,这是我之前翻译的,可以看一下后边关于【Kafka
Consumers 从指定位置开始消费】的解释,链接:https://www.jianshu.com/p/b753527b91a6
    
    
    
    &nbsp;/**
    &nbsp; &nbsp; &nbsp;* @param env
    &nbsp; &nbsp; &nbsp;* @param topic
    &nbsp; &nbsp; &nbsp;* @param time&nbsp; 订阅的时间
    &nbsp; &nbsp; &nbsp;* @return
    &nbsp; &nbsp; &nbsp;* @throws IllegalAccessException
    &nbsp; &nbsp; &nbsp;*/
    &nbsp; &nbsp; public static DataStreamSource<XlogStreamBasicBean&gt; buildSource(StreamExecutionEnvironment
env, String topic, Long time) throws IllegalAccessException {
    &nbsp; &nbsp; &nbsp; &nbsp; ParameterTool parameterTool = (ParameterTool)
env.getConfig().getGlobalJobParameters();
    &nbsp; &nbsp; &nbsp; &nbsp; Properties props = buildKafkaProps(parameterTool);
    &nbsp; &nbsp; &nbsp; &nbsp; FlinkKafkaConsumer011<XlogStreamBasicBean&gt;
consumer = new FlinkKafkaConsumer011<&gt;(
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
topic,
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
new MetricSchema(),
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
props);
    
    
    &nbsp; &nbsp; &nbsp; &nbsp;&nbsp;consumer.setStartFromLatest();
    
    
    &nbsp; &nbsp; &nbsp; &nbsp; consumer.assignTimestampsAndWatermarks(new
AscendingTimestampExtractor<XlogStreamBasicBean&gt;() {
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Override
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; public long extractAscendingTimestamp(XlogStreamBasicBean
element) {
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
if (element == null || element.getTimestamp() == null) {
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
&nbsp; &nbsp; return System.currentTimeMillis();
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
}
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;
return element.getTimestamp() - 10000;
    &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
    &nbsp; &nbsp; &nbsp; &nbsp; });
    &nbsp; &nbsp; &nbsp; &nbsp; return env.addSource(consumer);
    &nbsp; &nbsp; }
    
    
    }
    
    
    
    
    
    ------------------&nbsp;原始邮件&nbsp;------------------
    发件人:&nbsp;"苟刚"<gougang_1991@163.com&gt;;
    发送时间:&nbsp;2020年4月7日(星期二) 中午11:27
    收件人:&nbsp;"user-zh"<user-zh@flink.apache.org&gt;;
    
    主题:&nbsp;fink新增计算逻辑时kafka从头开始追平消费记录
    
    
    
    Hello,
     我遇到一个问题,我用flink做实时统计的时候,每次新增一种计算类型,算子就会从kafka的最早的消息开始消费,导致我每次重启后都需要花费好长的时间去追平记录,请问有什么办法解决吗?
     我的wartermark是设置在kafka的consumer上的,下面的每新增一个process的时候都会从头开始消费。
    
    
    flink版本:1.6.3
    
    部分代码如下:
    
    public static void main(String[] args) throws Exception {
    final ParameterTool parameterTool = ExecutionEnvUtil.createParameterTool(args);
     StreamExecutionEnvironment env = ExecutionEnvUtil.prepare(parameterTool);
    
     DataStreamSource<XlogStreamBasicBean&gt; data = KafkaTools.buildSource(env);
    // 处理timing数据
    processTimingData(parameterTool, data);
    // 处理front error数据
    processFrontErrorData(parameterTool, data);
    // 处理img error数据
    processImgLoadErrorData(parameterTool, data);
     env.execute("xlog compute");
    }
    
    
    
    
    kafka的连接参数配置:
    public static Properties buildKafkaProps(ParameterTool parameterTool) {
     Properties props = parameterTool.getProperties();
     props.put("bootstrap.servers", parameterTool.get(KAFKA_BROKERS, DEFAULT_KAFKA_BROKERS));
     props.put("zookeeper.connect", parameterTool.get(KAFKA_ZOOKEEPER_CONNECT, DEFAULT_KAFKA_ZOOKEEPER_CONNECT));
     props.put("group.id", parameterTool.get(KAFKA_GROUP_ID, DEFAULT_KAFKA_GROUP_ID));
     props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
     props.put("auto.offset.reset", "latest");
    return props;
    }
    
    
    
    
    
    
    
    --
    
    Best Wishes
     Galen.K



Mime
View raw message