flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hemant singh <hemant2...@gmail.com>
Subject Re: Flink Kafka connector consume from a single kafka partition
Date Mon, 24 Feb 2020 16:34:07 GMT
Hello Dawid,

Thanks for your response.

I am planning to have data in kafka as below -

partition 1 d1 d2 d1 d2 d1
partition 2 d3 d3 d3 d4 d4
where dx -> message from device x, like d1 -> event from device 1.

As you/Arvid have suggested I will create a keyed stream based on device-id
and then do my operations like window or CEP.  I am using
*AscendingTimestampExtractor
*for timestamp and watermark. Could you please let me know if you see any
issue arising if for load balancing I mix some devices in one partition but
create a keyed stream to work on one device stream.

Thanks,
Hemant



On Mon, Feb 24, 2020 at 4:12 PM Dawid Wysakowicz <dwysakowicz@apache.org>
wrote:

> Hi Hemant,
>
> I think Arvid's previous answer still best addresses your question. (minus
> that I would not use the reinterpretAsKeyedStream as it requires that the
> partitioning is exactly the same as Flink's internal partitioning)
>
> Let me rephrase a few things. First of all I think you're asking about two
> different issues.
>
> 1) ordering guarantees, which affects correctness
>
> 2) data skew, which affects performance
>
> As for the 1) as Arvid said if you work on a Keyed stream the order of
> events within a single key will be preserved. Therefore if in any single
> partitions you have events with a single device-id, any operations on a
> keyed stream will preserve the events of a single device. Even if a single
> task processes events from multiple devices. You do not need to subscribe
> with a single consumer to a single partition. You should just make sure you
> have no unkeyed shuffles before grouping. E.g. Applying a map operator with
> a different parallelism than the source parallelism. If you do keyBy right
> after the source you should be safe.
>
> It will not preserve order between partitions. For that you need a way to
> reorder the events, e.g. event time.
>
> As for the 2) It is possible that if you have a skewed partition, the
> processing of it might require more resources. I think it is unavoidable
> unless, you are able to come up with an algorithm that does not require
> perfect order (e.g windowed sum aggregate).
>
> Just for a completeness CEP does reorder events according to their
> timestamps. Window function does not order events, as it does not have to.
> It must only assign events to a specific window, without necessarily
> ordering them.
>
> Hope that helps a bit
>
> Best,
>
> Dawid
> On 21/02/2020 17:15, hemant singh wrote:
>
> Hello Robert,
>
> Thanks for your reply.
> I understand window function orders the records based on timestamp (event
> in my case). I am also using flink cep to publish alerts to downstream
> system. Something like this -
>
> Pattern<TemperatureWithTimestampEvent, ?> warningPattern = Pattern.<TemperatureWithTimestampEvent>
begin("first",skipStrategy)
>       .subtype(TemperatureWithTimestampEvent.class).where(new IterativeCondition<TemperatureWithTimestampEvent>()
{
>          @Override         public boolean filter(TemperatureWithTimestampEvent temperatureEvent,
Context<TemperatureWithTimestampEvent> context) throws Exception {
>             return temperatureEvent.getTemperature() >= 26.0;         }
>       }).times(2, 4).greedy().within(Time.seconds(20));
>
> Does CEP also order the records by event timestamp internally.
>
> Secondly, I think below case as shown below, I believe the tasks will not
> be consuming data equally so as in below -
>
> [image: IMG-2569.jpg]
>
> a1,a2 -> partition 1
> a3 -> partition 2
> a5,a6 -> partition 3
>
> above events gets consumed and check-pointed, then a4 is getting missed
> from partition 2. This can be an issue for my use case. Correct me if my
> understanding is wrong.
>
> Thanks,
> Hemant
>
> On Fri, Feb 21, 2020 at 4:47 PM Robert Metzger <rmetzger@apache.org>
> wrote:
>
>> Hey Hemant,
>>
>> Are you able to reconstruct the ordering of the event, for example based
>> on time or some sequence number?
>> If so, you could create as many Kafka partitions as you need (for proper
>> load distribution), disregarding any ordering at that point.
>> Then you keyBy your stream in Flink, and order it within a window
>> operator (or some custom logic in a process function)
>> Flink is able to handle quite large states using the RocksDB statebackend.
>>
>> Best,
>> Robert
>>
>>
>> On Wed, Feb 19, 2020 at 6:34 PM hemant singh <hemant2184@gmail.com>
>> wrote:
>>
>>> Hi Arvid,
>>>
>>> Thanks for your response. I think I did not word my question properly.
>>> I wanted to confirm that if the data is distributed to more than one
>>> partition then the ordering cannot be maintained (which is documented).
>>> According to your response I understand if I set the parallelism to number
>>> of partition then each consumer will consume from one partition and
>>> ordering can be maintained.
>>>
>>> However, I have a question here in case my parallelism is less than
>>> number of partitions still I believe if I create keyedstream ordering will
>>> be maintained at operator level for that key. Correct me if I am wrong.
>>>
>>> Second, one issue/challenge which I see with this model is one of the
>>> source's frequency of pushing data is very high then one partition is
>>> overloaded. Hence the task which process this will be overloaded too,
>>> however for maintaining ordering I do not have any other options but to
>>> maintain data in one partition.
>>>
>>> Thanks,
>>> Hemant
>>>
>>> On Wed, Feb 19, 2020 at 5:54 PM Arvid Heise <arvid@ververica.com> wrote:
>>>
>>>> Hi Hemant,
>>>>
>>>> Flink passes your configurations to the Kafka consumer, so you could
>>>> check if you can subscribe to only one partition there.
>>>>
>>>> However, I would discourage that approach. I don't see the benefit to
>>>> just subscribing to the topic entirely and have dedicated processing for
>>>> the different devices.
>>>>
>>>> If you are concerned about the order, you shouldn't. Since all events
>>>> of a specific device-id reside in the same source partition, events are
>>>> in-order in Kafka (responsibility of producer, but I'm assuming that
>>>> because of your mail) and thus they are also in order in non-keyed streams
>>>> in Flink. Any keyBy on device-id or composite key involving device-id,
>>>> would also retain the order.
>>>>
>>>> If you have exactly one partition per device-id, you could even go with
>>>> `DataStreamUtil#reinterpretAsKeyedStream` to avoid any shuffling.
>>>>
>>>> Let me know if I misunderstood your use case or if you have further
>>>> questions.
>>>>
>>>> Best,
>>>>
>>>> Arvid
>>>>
>>>> On Wed, Feb 19, 2020 at 8:39 AM hemant singh <hemant2184@gmail.com>
>>>> wrote:
>>>>
>>>>> Hello Flink Users,
>>>>>
>>>>> I have a use case where I am processing metrics from different type of
>>>>> sources(one source will have multiple devices) and for aggregations as
well
>>>>> as build alerts order of messages is important. To maintain customer
data
>>>>> segregation I plan to have single topic for each customer with each source
>>>>> stream data to one kafka partition.
>>>>> To maintain ordering I am planning to push data for a single source
>>>>> type to single partitions. Then I can create keyedstream so that each
of
>>>>> the device-id I have a single stream which has ordered data for each
>>>>> device-id.
>>>>>
>>>>> However, flink-kafka consumer I don't see that I can read from a
>>>>> specific partition hence flink consumer read from multiple kafka
>>>>> partitions. So even if I try to create a keyedstream on source type(and
>>>>> then write to a partition for further processing like keyedstream on
>>>>> device-id) I think ordering will not be maintained per source type.
>>>>>
>>>>> Only other option I feel I am left with is have single partition for
>>>>> the topic so that flink can subscribe to the topic and this maintains
the
>>>>> ordering, the challenge is too many topics(as I have this configuration
for
>>>>> multiple customers) which is not advisable for a kafka cluster.
>>>>>
>>>>> Can anyone shed some light on how to handle this use case.
>>>>>
>>>>> Thanks,
>>>>> Hemant
>>>>>
>>>>

Mime
View raw message