flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Y. Sakamoto" <phonypian...@gmail.com>
Subject How to achieve exactly once on node failure using Kafka
Date Mon, 20 Feb 2017 16:53:08 GMT
Hi,
I'm using Flink 1.2.0 and try to do "exactly once" data transfer
from Kafka to Elasticsearch, but I cannot.
(Scala 2.11, Kafka 0.10, without YARN)

There are 2 Flink TaskManager nodes, and when processing
with 2 parallelism, shutdown one of them (simulating node failure).

Using flink-connector-kafka, I wrote following code:

    StreamExecutionEnvironment env = StreamExecutionEnvironment
          .getExecutionEnvironment();
    env.enableCheckpointing(1000L);
    env.setParallelism(2);

    Properties kafkaProp = new Properties();
    kafkaProp.setProperty("bootstrap.servers", "192.168.97.42:9092");
    kafkaProp.setProperty("zookeeper.connect", "192.168.97.42:2181");
    kafkaProp.setProperty("group.id", "id");

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer010<>(
          "topic", new SimpleStringSchema(), kafkaProp));

I found duplicated data transfer on map function.
Data from the checkpoint before node failure seems duplicated.

Is there any way to achieve "exactly once" on failure?


Thanks.
Yuichiro

Mime
View raw message