apex-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Raja.Aravapalli <Raja.Aravapa...@target.com>
Subject Re: kafka input is processing records in a jumbled order
Date Thu, 09 Jun 2016 21:43:31 GMT

Great.

I will explore. Thanks for your inputs.


Regards,
Raja.

From: Munagala Ramanath <ram@datatorrent.com<mailto:ram@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Thursday, June 9, 2016 at 3:38 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Any fields within an operator that are not declared "transient" are considered part of the
operator state and
are checkpointed; on failure, they are restored from the checkpoint automatically by the platform.

Ram

On Thu, Jun 9, 2016 at 1:04 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Really a great thought!!

Wondering how will application handle the failures ?

Also, what does this phrase mean “hold them in the operator state” ?  Hold incoming messages
in some data structure, map etc ?

-Regards,
Raja.

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Thursday, June 9, 2016 at 2:57 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

You can order the messages by event time as they arrive, hold them in the operator state and
then only emit those in the endWindow callback that are older than <threshold> (note
that you are not simply emitting all, only those that are old enough).

Thomas

On Thu, Jun 9, 2016 at 12:22 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

So,  you want me to add all the incoming tuples into a Map, and complete the processing in
endwindow() ??

How would this solve my problem as described below with windowing.


msg2 ts2
—————— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—————— window ends here

Thanks a lot for your inputs. Your thoughts are valuable!!



Regards,
Raja.

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Thursday, June 9, 2016 at 1:49 PM

To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order


There is no need for an extra thread. In fact, tuples should be emitted in the operator thread
only. This can be done in endWindow()

--
sent from mobile

On Jun 9, 2016 11:46 AM, "Sandesh Hegde" <sandesh@datatorrent.com<mailto:sandesh@datatorrent.com>>
wrote:

How about something like this,

Store the incoming tuples in the following format:
   TreeMap<TimeStamp, List<Tuples>>

Create a Flusher thread, which periodically flushes the *fristKey*, after considering the
lag.


On Thu, Jun 9, 2016 at 11:09 AM Munagala Ramanath <ram@datatorrent.com<mailto:ram@datatorrent.com>>
wrote:
You'll need to have some some limit one how a lag is possible for out-of-order messages.
If that limit is say 30s, then you'll need to buffer tuples for double the lag -- 60s.

You can configure the Application Window size suitably to do this.

Ram

On Thu, Jun 9, 2016 at 10:40 AM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

No aggregation, but I need messages to be played in sequential !!


Ex:

Below is the way actually msgs should come from my kafka topic

msg1 ts1
msg2 ts2
msg3 ts3
msg4 ts4
msg5 ts5
msg6 ts6
msg7 ts7


But, due to some network issues I am seeing the messages in kafka topic something like below:

msg2 ts2  ==> msg2 which actually should come after msg1, but unfortunately msg2 is coming
to kafka before msg1, losing the sequence!!
msg1 ts1 ==> delayed by few milli secs to seconds to reach on time!!
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7 ==> msg7 had come early into topics before msg6
msg6 ts6 ==> delayed !!


I am losing the order of messages and business logic gives correct results only when msgs
played in sequence!!

Now if I define windowing/some buffering and then order on timestamp and play msgs…

What if window boundary takes

msg2 ts2
—————— window ends here
msg1 ts1
msg3 ts3
msg4 ts4
msg5 ts5
msg7 ts7
msg6 ts6
—————— window ends here

Now, if you see, even though I am trying to do buffering and then ordering the msgs based
on some timstamp, I still face the problem of msg2 already processed before msg1 !! Which
I don’t want.

Did I really understand windowing correctly…. Pls correct me if I am wrong!! Thanks for
your thoughts!!


Regards,
Raja.

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Thursday, June 9, 2016 at 10:51 AM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Apex can do stateful processing, you can define a window in which you can reorder the messages.
It will have the same effect on latency as "micro-batching".

Why is the ordering important? What operations do you perform on the data? Aggregation?

Thanks,
Thomas


On Thu, Jun 9, 2016 at 8:23 AM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

My bad… we observes our source data in kafka topics is not really in a ordered fashion,
where we are seeing the messages with few milli secs delay.!!

Source couldn’t ensure the ordering guarantee due to the network!!

Is there a right way for me from consumer standpoint, I can ensure ordering ?? Will micro
batching work for me here ? Or Does apex support micro batching and order the messages ?



Regards,
Raja

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Tuesday, June 7, 2016 at 10:59 PM

To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Please also confirm how you are using partitioning. If, for example, in your DAG you shuffle
the data received from Kafka in a way that is different from the original partitioning, then
it would be possible that multiple downstream partitions process data that came from a single
Kafka partition concurrently and therefore in a different order.

Thomas


On Tue, Jun 7, 2016 at 6:33 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Yes Devendra.

p1.10 is read before p1.1 !!

Sure I shall check that. Thanks a lot for your response.


Regards,
Raja.

From: Devendra Tagare <devendrat@datatorrent.com<mailto:devendrat@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Tuesday, June 7, 2016 at 7:59 PM

To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

Just to be clear are you suggesting that p1.10 is being read before p1.1 ?

If thats the case can you use a console consumer that comes packed with kafka and verify the
ordering based on timestamps ?

Thanks,
Dev



On Tue, Jun 7, 2016 at 5:31 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Thanks a lot Devendra Tagare for the response.

What you said is very clear and understandable. But, wondering, I am NOT getting that partition
level order!! My operator is processing the records in jumbled order rather than in sequence!
And, I am saying this because, I am generating timestamps upon tuple receipt and emitting
that timestamp to my destination, which is clearly showing the records are receiving to operator
in a shuffled order.

I get records at milli second level differences!! Will that be a problem ?


Regards,
Raja.

From: Devendra Tagare <devendrat@datatorrent.com<mailto:devendrat@datatorrent.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Tuesday, June 7, 2016 at 7:12 PM

To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Hi Raja,

When you apply ONE_TO_MANY partitioning scheme, one instance of the operator consumes from
many partitions of a kafka topic.

When you look at the consumed data, all the events coming from a given partition would be
ordered but there are no ordering guarantees across partitions since kafka does not guarantee
that

eg : If 3 partitions of a topic p1,p2,p3 having 10 messages each are connected to one physical
partition of the KafkaInputOperator , then the ordering guarantee of p1.1 to p1.10 is honored.ie<http://honored.ie>
message 10 of p1 be consumed only after messages 1 through 9 are consumed but the operator
could consumer messages in a order like p1.1,p2.1,p1.2,p1.3,p3.1,p2.2..... which still follows
the guarantees per partition.

Thanks,
Dev

On Tue, Jun 7, 2016 at 5:00 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Thanks for the response Thomas.

My quick doubt is..

I have around 30 partitions of kafka topic, And all of them have messages ordered at partition
level.

So, when I consume those messages using single consumer[with ONE_TO_MANY strategy set], still
the ordering doesn’t work ?


My messages in topic are guaranteed to be ordered at partition level.

Thanks a lot in advance for your response.


Regards,
Raja.

From: Thomas Weise <thomas.weise@gmail.com<mailto:thomas.weise@gmail.com>>
Reply-To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Date: Tuesday, June 7, 2016 at 5:52 PM
To: "users@apex.apache.org<mailto:users@apex.apache.org>" <users@apex.apache.org<mailto:users@apex.apache.org>>
Subject: Re: kafka input is processing records in a jumbled order

Raja,

Are you expecting ordering across multiple Kafka partitions?

All messages from a given Kafka partition are received by the same consumer and thus will
be ordered. However, when messages come from multiple partitions there is no such guarantee.

Thomas


On Tue, Jun 7, 2016 at 3:34 PM, Raja.Aravapalli <Raja.Aravapalli@target.com<mailto:Raja.Aravapalli@target.com>>
wrote:

Hi

I have built a DAG, that reads from kafka and in the next operators, does lookup to a hbase
table and update hbase table based on some business logic.

Some times my operator which does hbase lookup and update in the same operator(Custom written),
is processing the records it receives from kafka in a jumbled order, which is causing, many
records being ignored from processing!!

I am not using any parallel partitions/instance, and with KafkaInputOperator I am using only
partition strategy ONE_TO_MANY.

I am very new to Apex. I expected, Apex will guarantee the ordering.

Can someone pls share your knowledge on the issue…?


Thanks a lot in advance…


Regards,
Raja.








Mime
View raw message