flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "r. r." <rob...@abv.bg>
Subject FlinkKafkaConsumer010 does not start from the next record on startup from offsets in Kafka
Date Tue, 21 Nov 2017 17:36:45 GMT
Hello
according to https://issues.apache.org/jira/browse/FLINK-4618 "FlinkKafkaConsumer09 should
start from the next record on startup from offsets in Kafka".
Is the same behavior expected of FlinkKafkaConsumer010?
A document in Kafka is failing my job and I want on restart of the job (via the restart strategy
or after stop and run again the job) processing to continue from the next document in the
partition.
Checkpoints are enabled:

            env.enableCheckpointing(1000);
            env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
            env.getCheckpointConfig().setCheckpointTimeout(60000);
            env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

INFO  org.apache.flink.streaming.runtime.tasks.StreamTask           - State backend
is set to heap memory (checkpoints to filesystem "file:/tmp/checkpoints")
taskmanager_4  | 2017-11-21 17:31:42,873 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase 
- Setting restore state in the FlinkKafkaConsumer.
taskmanager_4  | 2017-11-21 17:31:42,875 INFO  org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase 
- Consumer subtask 4 will commit offsets back to Kafka on completed checkpoints

Also, if a TM (other than the one that fails) has managed to successfully complete reading
and processing a record from Kafka, after the job is cancelled and restarted, the already
complete record is retrieved and processed again together with the failing one

flink-1.3.2
kafka_2.12-0.11.0.1

Thanks!
- Robert


Mime
View raw message