kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: [DISCUSS] KIP-138: Change punctuate semantics
Date Mon, 01 May 2017 17:23:46 GMT
Hi all,

As promised, here is my take at how one could implement the previously 
discussed hybrid semantics using the 2 PunctuationType callbacks (one 
for STREAM_TIME and one for SYSTEM_TIME).

However, there's a twist.

Since currently calling context.schedule() adds a new 
PunctuationSchedule and does not overwrite the previous one, a slight 
change would be required:

a) either that PuncuationSchedules are cancellable

b) or that calling schedule() ||overwrites(cancels) the previous one 
with the given |PunctuationType |(but that's not how it works currently)


Below is an example assuming approach a) is implemented by having 
schedule return Cancellable instead of void.

|ProcessorContext context;|
|long||streamTimeInterval = ...;|
|long||systemTimeUpperBound = ...;||//e.g. systemTimeUpperBound = 
streamTimeInterval + some tolerance|
|Cancellable streamTimeSchedule;|
|Cancellable systemTimeSchedule;|
|long||lastStreamTimePunctation = -||1||;|
||
|public||void||init(ProcessorContext context){|
|||this||.context = context;|
|||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
streamTimeInterval, ||this||::streamTimePunctuate);|
|||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
systemTimeUpperBound,||this||::systemTimePunctuate); |
|}|
||
|public||void||streamTimePunctuate(||long||streamTime){|
|||periodicBusiness(streamTime);|
|||systemTimeSchedule.cancel();|
|||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
systemTimeUpperBound,||this||::systemTimePunctuate);|
|}|
||
|public||void||systemTimePunctuate(||long||systemTime){|
|||periodicBusiness(context.timestamp());|
|||streamTimeSchedule.cancel();|
|||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
streamTimeInterval,||this||::streamTimePunctuate);|
|}|
||
|public||void||periodicBusiness(||long||streamTime){|
|||// guard against streamTime == -1, easy enough.|
|||// if you need system time instead, just use System.currentTimeMillis()|
||
|||// do something businessy here|
|}|

Where Cancellable is either an interface containing just a single void 
cancel() method or also boolean isCancelled() likehere 
<http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.


Please let your opinions known whether we should proceed in this 
direction or leave "hybrid" considerations out of scope.

Looking forward to hearing your thoughts.

Thanks,
Michal

On 30/04/17 20:07, Michal Borowiecki wrote:
>
> Hi Matthias,
>
> I'd like to start moving the discarded ideas into Rejected 
> Alternatives section. Before I do, I want to tidy them up, ensure 
> they've each been given proper treatment.
>
> To that end let me go back to one of your earlier comments about the 
> original suggestion (A) to put that to bed.
>
>
> On 04/04/17 06:44, Matthias J. Sax wrote:
>> (A) You argue, that users can still "punctuate" on event-time via
>> process(), but I am not sure if this is possible. Note, that users only
>> get record timestamps via context.timestamp(). Thus, users would need to
>> track the time progress per partition (based on the partitions they
>> obverse via context.partition(). (This alone puts a huge burden on the
>> user by itself.) However, users are not notified at startup what
>> partitions are assigned, and user are not notified when partitions get
>> revoked. Because this information is not available, it's not possible to
>> "manually advance" stream-time, and thus event-time punctuation within
>> process() seems not to be possible -- or do you see a way to get it
>> done? And even if, it might still be too clumsy to use.
> I might have missed something but I'm guessing your worry about users 
> having to track time progress /per partition/ comes from the what the 
> stream-time does currently.
> But I'm not sure that those semantics of stream-time are ideal for 
> users of punctuate.
> That is, if stream-time punctuate didn't exist and users had to use 
> process(), would they actually want to use the current semantics of 
> stream time?
>
> As a reminder stream time, in all its glory, is (not exactly actually, 
> but when trying to be absolutely precise here I spotted KAFKA-5144 
> <https://issues.apache.org/jira/browse/KAFKA-5144> so I think this 
> approximation suffices to illustrate the point):
>
> a minimum across all input partitions of (
>    if(msgs never received by partition) -1;
>    else {
>       a non-descending-minimum of ( the per-batch minimum msg timestamp)
>    }
> )
>
> Would that really be clear enough to the users of punctuate? Do they 
> care for such a convoluted notion of time? I see how this can be 
> useful for StreamTask to pick the next partition to take a record from 
> but for punctuate?
> If users had to implement punctuation with process(), is that what 
> they would have chosen as their notion of time?
> I'd argue not.
>
> None of the processors implementing the rich windowing/join operations 
> in the DSL use punctuate.
> Let's take the KStreamKStreamJoinProcessor as an example, in it's 
> process() method it simply uses context().timestamp(), which, since 
> it's called from process, returns simply, per javadoc:
> If it is triggered while processing a record streamed from the source 
> processor, timestamp is defined as the timestamp of the current input 
> record;
> So they don't use that convoluted formula for stream-time. Instead, 
> they only care about the timestamp of the current record. I think that 
> having users track just that wouldn't be that much of a burden. I 
> don't think they need to care about which partitions got assigned or 
> not. And StreamTask would still be picking records first from the 
> partition having the lowest timestamp to try to "synchronize" the 
> streams as it does now.
>
> What users would have to do in their Processor implementations is 
> somewhere along the lines of:
>
> long lastPunctuationTime = 0;
> long interval = <some-number>; //millis
>
> @Override
> public void process(K key, V value){
>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>         punctuate(ctx.timestamp());
>         lastPunctuationTime += interval;// I'm not sure of the merit 
> of this vs lastPunctuationTime = ctx.timestamp(); but that's what 
> PunctuationQueue does currently
>     }
>     // do some other business logic here
> }
>
> Looking forward to your thoughts.
>
> Cheers,
> Michal
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Mime
View raw message