flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Parallel execution but keep order of kafka messages
Date Mon, 17 Apr 2017 11:42:17 GMT
Hi Benjamin,

In your case, the tumbling window subtasks would each have 3 input streams, 1 for each of
the 3 FlinkKafkaConsumer operator subtasks.

I thought that each subtask of the window would get only elements from one partition and therefore
the watermarks would be calculated independently per stream. 
This is a misunderstanding. After the keyBy, the window subtasks could get input from any
of the consumer subtasks, and would therefore need to wait for broadcasted watermarks from
all of them. It just happens to be that in your case, each consumer subtasks will only produce
records with exactly one key.

Moving back a bit to your original setup: it seems like what you want to achieve is a simple
window on each partition independently, and then produce the window output to a new partition.
In your original setup where each topic and its corresponding output topic each has 1 partition,
I’d actually just have separate jobs for each topic-to-topic pipeline, instead of bundling
them into one job. Was there any specific reason for bundling them together?

On 17 April 2017 at 5:04:26 PM, Benjamin Reißaus (benjamin.reissaus@gmail.com) wrote:


So I have been rearranging my architecture to where I only have one input and one output topic,
each with 3 partitions and in my flink job I have one consumer and one producer running with
parallelism of 3. To run in parallel, I extract the partition from the metadata information
per kafka message and keyBy that very partition. The code sample is at the bottom. 

Now it seems though, that my tumbling window of 1 second that I run on all partitions and
that I use to calculate statistics only gives output on one partition. The reason seems to
be that the timestamps of partition A and B are 2 hours ahead of partition C. In the documentation
I read that the event time of an operator following a keyBy (my tumbling window) is the minimum
of its input streams’ event times. 

But is that even the case for me? Does my tumbling window have multiple input streams? I thought
that each subtask of the window would get only elements from one partition and therefore the
watermarks would be calculated independently per stream. 

I would appreciate any input! Again, my goal is to run the same queries on independent kafka

Best regards,

import org.apache.flink.api.scala._
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.windowing.time.Time
import org.hpi.esb.flink.datamodel.{SimpleRecord, Statistics}

class StatisticsQuery(windowSize: Int)
  extends Query[(String, SimpleRecord), (String, Statistics)] {

  override def execute(stream: DataStream[(String, SimpleRecord)]): DataStream[(String, Statistics)]
= {
      .fold(("", new Statistics())) { (acc, value) => Statistics.fold(acc, value) }

2017-04-14 19:22 GMT+02:00 Benjamin Reißaus <benjamin.reissaus@gmail.com>:
Hi everybody,


I have the following flink/kafka setup:


I have 3 kafka “input” topics and 3 “output” topics with each 1 partition (only 1
partition because the order of the messages is important). I also have 1 master and 2 flink
slave nodes with a total of 16 task slots.

In my flink program I have created 3 consumers - each for one of the input topics.

On each of the datastreams I run a query that generates statistics over a window of 1 second
and I write the result to the corresponding output topic. You can find the execution plan
with parallelism set to 2 attached.


This setup with parallelism=2 sometimes seems to give me the wrong order of the statistics
results. I assume it is because of the rebalancing before the last map which leads to a race
condition when writing to kafka.


If I set parallelism to 1 no rebalancing will be done but only one task slot is used.


This has led me to the following questions:


Why is only 1 task slot used with my 3 pipelines when parallelism is set to 1? As far as I
understand, the parallelism refers to the number of parallel instances a task can be split
into. Therefore, I would assume that I could still run multiple different tasks (e.g. different
maps or window functions on different streams) in different task slots, right?


And to come back to my requirement: Is it not possible to run all 3 pipelines in parallel
and still keep the order of the messages and results?


I also asked these questions on stackoverflow. And it seems that I have similar trouble understanding
the terms “task slot”, “subtasks” etc. like Flavio mentioning in this flink mail


Thank you and I would appreciate any input!


Best regards,


View raw message