flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Tzu-Li (Gordon) Tai" <tzuli...@apache.org>
Subject Re: Kafka KeyedStream source
Date Sun, 15 Jan 2017 20:48:43 GMT
Hi Niels,

If it’s only for simple data filtering that does not depend on the key, a simple “flatMap”
or “filter" directly after the source can be chained to the source instances.
What that does is that the filter processing will be done within the same thread as the one
fetching data from a Kafka partition, hence no excessive network transfers for this simple
You can read more about operator chaining here: https://ci.apache.org/projects/flink/flink-docs-release-1.2/concepts/runtime.html#tasks-and-operator-chains

So, what that sums up to is that you have a FlinkKafkaConsumer as source, do a filter transformation
right after, and then a keyBy followed with your heavy-processing, key-wise computations.
Does that makes sense for what you have in mind?


On January 11, 2017 at 4:11:26 PM, Niels Basjes (niels@basjes.nl) wrote:


Ok. I think I get it.

Assume we create a addKeyedSource(...) which will allow us to add a source that makes some
guarantees about the data.
And assume this source returns simply the Kafka partition id as the result of this 'hash'
Then if I have 10 kafka partitions I would read these records in and I could filter the data
more efficiently because the data would not need to go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing that follows.

As a concept: Could that be made to work?


On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org> wrote:
Hi Niels,

Thank you for bringing this up. I recall there was some previous discussion related to this
before: [1].

I don’t think this is possible at the moment, mainly because of how the API is designed.

On the other hand, a KeyedStream in Flink is basically just a DataStream with a hash partitioner
that is used when deciding which instance of the following downstream operator an emitted
record of the stream is sent to.
So, even if we have a Kafka source that directly produces a KeyedStream on “addSource”,
redistribution of data can still happen. I.e., if the parallelism of the compute operators
right after is different than the number of Kafka partitions, redistribution will happen to
let the key space and state be evenly distributed in Flink.

This leads to the argument that we probably need to think about whether retaining the original
partitioning of records in Kafka when consumed by Flink is actually only a special case.
Flink, as a parallel compute engine, can freely adjust the parallelism of its operators regardless
of the parallelism of Kafka topics (rescaling isn’t actually in yet, but is on the near-future

So, under the general case, the parallelism of a Flink operator may be different than the
number of Kafka partitions, and therefore redistributing must occur.
For redistribution to not need to take place right after an already partitioned Kafka topic,
you’d need identical numbers of 1) Kafka partitions, 2) Flink source instances consuming
the partitions, and 3) the parallelism of the keyed computation afterwards. This seems like
a very specific situation, considering that you’ll be able to rescale Flink operators as
the data’s key space / volume grows.

The main observation, I think, is that Flink itself maintains how the key space is partitioned
within the system, which plays a crucial part in rescaling. That’s why by default it doesn’t
respect existing partitioning of the key space in Kafka (or other external sources). Even
if it initially does at the beginning of a job, partitioning will most likely change as you
rescale your job / operators (which is a good thing, to be able to adapt).


[1] http://apache-flink-mailing-list-archive.1008284.n3.nabble.com/kafka-partition-assignment-td12123.html

On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:


In my scenario I have click stream data that I persist in Kafka.
I use the sessionId as the key to instruct Kafka to put everything with the same sessionId
into the same Kafka partition. That way I already have all events of a visitor in a single
kafka partition in a fixed order.

When I read this data into Flink I get a generic data stream ontop of which I have to do a
keyBy before my processing can continue. Such a keyBy will redistribute the data again to
later tasks that can do the actual work.

Is it possible to create an adapted version of the Kafka source that immediately produces
a keyed data stream?

Best regards / Met vriendelijke groeten,

Niels Basjes

Best regards / Met vriendelijke groeten,

Niels Basjes
View raw message