flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eduardo Winpenny Tejedor <eduardo.winpe...@gmail.com>
Subject Re: Exactly-once semantics in Flink Kafka Producer
Date Fri, 02 Aug 2019 10:03:51 GMT
Hi Vasily,

You're probably executing this from your IDE or from a local Flink cluster
without starting your job from a checkpoint.

When you start your Flink job for the second time you need to specify the
path to the latest checkpoint as an argument, otherwise Flink will start
from scratch.

You're probably thinking that's not great, ideally Flink should be able to
automatically continue from the last produced checkpoint, and actually
that's what the docs say! Well, that's only when you're running in a proper
cluster environment. Flink is able to recover using checkpoints when only
part of the cluster fails, not when the whole job is stopped. For full
stops you need to specify the checkpoint manually.

Hope that helps!


On Fri, 2 Aug 2019, 10:05 Vasily Melnik, <
Vasily.Melnik@glowbyteconsulting.com> wrote:

> I,m trying to test Flink 1.8.0 exactly-once semantics with Kafka Source
> and Sink:
>
>    1. Run flink app, simply transferring messages from one topic to
>    another with parallelism=1, checkpoint interval 20 seconds
>    2. Generate messages with incrementing integer numbers using Python
>    script each 2 seconds.
>    3. Read output topic with console consumer in read_committed isolation
>    level.
>    4. Manually kill TaskManager
>
> I expect to see monotonically increasing integers in output topic
> regardless TaskManager killing and recovery.
>
> But actually a see something unexpected in console-consumer output:
>
> 32
> 33
> 34
> 35
> 36
> 37
> 38
> 39
> 40
> -- TaskManager Killed
> 32
> 34
> 35
> 36
> 40
> 41
> 46
> 31
> 33
> 37
> 38
> 39
> 42
> 43
> 44
> 45
>
> Looks like all messages between checkpoints where replayed in output
> topic. Also i expected to see results in output topic only after
> checkpointing i.e. each 20 seconds, but messages appeared in output
> immediately as they where send to input.
> Is it supposed to be correct behaviour or i do something wrong?
>
> Kafka version 1.0.1 from Cloudera parcels. I tested Kafka transactional
> producer and read-committed console comsumer with custom code and it worked
> perfectly well reading messages only after commitTransaction on producer.
>
> My Flink code:
>
> StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
>         env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
>         env.getConfig().setAutoWatermarkInterval(1000);
>         env.enableCheckpointing(20000, CheckpointingMode.EXACTLY_ONCE);
>         env.setStateBackend(new RocksDBStateBackend("hdfs:///checkpoints-data"));
>
>         Properties producerProperty = new Properties();
>         producerProperty.setProperty("bootstrap.servers", ...);
>         producerProperty.setProperty("zookeeper.connect", ...);
>         producerProperty.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG,"10000");
>         producerProperty.setProperty(ProducerConfig.TRANSACTIONAL_ID_CONFIG,"my-transaction");
>         producerProperty.setProperty(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true");
>
>         Properties consumerProperty = new Properties();
>         consumerProperty.setProperty("bootstrap.servers", ...);
>         consumerProperty.setProperty("zookeeper.connect", ...);
>         consumerProperty.setProperty("group.id", "test2");
>
>         FlinkKafkaConsumer<String> consumer1 = new FlinkKafkaConsumer<String>("stringTopic1",
new ComplexStringSchema(), consumerProperty);
>         consumer1.assignTimestampsAndWatermarks(new PeriodicAssigner());
>
>         FlinkKafkaProducer<String> producer1 = new FlinkKafkaProducer<String>("test",
 new KeyedSerializationSchemaWrapper(new SimpleStringSchema()), producerProperty, FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
>         producer1.ignoreFailuresAfterTransactionTimeout();
>         DataStreamSource<String> s1 = env.addSource(consumer1);
>         s1.addSink(producer1);
>         env.execute("Test");
>
>
>

Mime
View raw message