flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niels Basjes <Ni...@basjes.nl>
Subject Re: Kafka KeyedStream source
Date Wed, 11 Jan 2017 15:11:19 GMT

Ok. I think I get it.

Assume we create a addKeyedSource(...) which will allow us to add a source
that makes some guarantees about the data.
And assume this source returns simply the Kafka partition id as the result
of this 'hash' function.
Then if I have 10 kafka partitions I would read these records in and I
could filter the data more efficiently because the data would not need to
go over the network before this filter.
Afterwards I can scale it up to 'many' tasks for the heavier processing
that follows.

As a concept: Could that be made to work?


On Mon, Jan 9, 2017 at 9:14 AM, Tzu-Li (Gordon) Tai <tzulitai@apache.org>

> Hi Niels,
> Thank you for bringing this up. I recall there was some previous
> discussion related to this before: [1].
> I don’t think this is possible at the moment, mainly because of how the
> API is designed.
> On the other hand, a KeyedStream in Flink is basically just a DataStream
> with a hash partitioner that is used when deciding which instance of the
> following downstream operator an emitted record of the stream is sent to.
> So, even if we have a Kafka source that directly produces a KeyedStream on
> “addSource”, redistribution of data can still happen. I.e., if the
> parallelism of the compute operators right after is different than the
> number of Kafka partitions, redistribution will happen to let the key space
> and state be evenly distributed in Flink.
> This leads to the argument that we probably need to think about whether
> retaining the original partitioning of records in Kafka when consumed by
> Flink is actually only a special case.
> Flink, as a parallel compute engine, can freely adjust the parallelism of
> its operators regardless of the parallelism of Kafka topics (rescaling
> isn’t actually in yet, but is on the near-future roadmap).
> So, under the general case, the parallelism of a Flink operator may be
> different than the number of Kafka partitions, and therefore redistributing
> must occur.
> For redistribution to not need to take place right after an already
> partitioned Kafka topic, you’d need identical numbers of 1) Kafka
> partitions, 2) Flink source instances consuming the partitions, and 3) the
> parallelism of the keyed computation afterwards. This seems like a very
> specific situation, considering that you’ll be able to rescale Flink
> operators as the data’s key space / volume grows.
> The main observation, I think, is that Flink itself maintains how the key
> space is partitioned within the system, which plays a crucial part in
> rescaling. That’s why by default it doesn’t respect existing partitioning
> of the key space in Kafka (or other external sources). Even if it initially
> does at the beginning of a job, partitioning will most likely change as you
> rescale your job / operators (which is a good thing, to be able to adapt).
> Cheers,
> Gordon
> [1] http://apache-flink-mailing-list-archive.1008284.
> n3.nabble.com/kafka-partition-assignment-td12123.html
> On January 6, 2017 at 1:38:05 AM, Niels Basjes (niels@basjes.nl) wrote:
> Hi,
> In my scenario I have click stream data that I persist in Kafka.
> I use the sessionId as the key to instruct Kafka to put everything with
> the same sessionId into the same Kafka partition. That way I already have
> all events of a visitor in a single kafka partition in a fixed order.
> When I read this data into Flink I get a generic data stream ontop of
> which I have to do a keyBy before my processing can continue. Such a keyBy
> will redistribute the data again to later tasks that can do the actual work.
> Is it possible to create an adapted version of the Kafka source that
> immediately produces a keyed data stream?
> --
> Best regards / Met vriendelijke groeten,
> Niels Basjes

Best regards / Met vriendelijke groeten,

Niels Basjes

View raw message