kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Michal Borowiecki <michal.borowie...@openbet.com>
Subject Re: [DISCUSS] KIP-138: Change punctuate semantics
Date Fri, 05 May 2017 11:04:35 GMT
> I shall move all alternatives other than the main proposal into the 
> Rejected Alternatives section and if I hear any objections, I'll move 
> those back up and we'll discuss further.
Done.


Still looking forward to any comments, especially about the recently 
proposed ability to cancel punctuation schedules. I think it goes well 
in the spirit of making complex things possible (such as the hybrid 
semantics).


In the absence of further comments I shall call for a vote in the next 
few days.


Thanks,

MichaƂ


On 04/05/17 09:41, Michal Borowiecki wrote:
>
> Further in this direction I've updated the main proposal to 
> incorporate the Cancellable return type for ProcessorContext.schedule 
> and the guidance on how to implement "hybrid" punctuation with the 
> proposed 2 PunctuationTypes.
>
> I look forward to more comments whether the Cancallable return type is 
> an agreeable solution and it's precise definition.
>
> I shall move all alternatives other than the main proposal into the 
> Rejected Alternatives section and if I hear any objections, I'll move 
> those back up and we'll discuss further.
>
>
> Looking forward to all comments and suggestions.
>
>
> Thanks,
>
> Michal
>
>
> On 01/05/17 18:23, Michal Borowiecki wrote:
>>
>> Hi all,
>>
>> As promised, here is my take at how one could implement the 
>> previously discussed hybrid semantics using the 2 PunctuationType 
>> callbacks (one for STREAM_TIME and one for SYSTEM_TIME).
>>
>> However, there's a twist.
>>
>> Since currently calling context.schedule() adds a new 
>> PunctuationSchedule and does not overwrite the previous one, a slight 
>> change would be required:
>>
>> a) either that PuncuationSchedules are cancellable
>>
>> b) or that calling schedule() ||overwrites(cancels) the previous one 
>> with the given |PunctuationType |(but that's not how it works currently)
>>
>>
>> Below is an example assuming approach a) is implemented by having 
>> schedule return Cancellable instead of void.
>>
>> |ProcessorContext context;|
>> |long||streamTimeInterval = ...;|
>> |long||systemTimeUpperBound = ...;||//e.g. systemTimeUpperBound = 
>> streamTimeInterval + some tolerance|
>> |Cancellable streamTimeSchedule;|
>> |Cancellable systemTimeSchedule;|
>> |long||lastStreamTimePunctation = -||1||;|
>> ||
>> |public||void||init(ProcessorContext context){|
>> |||this||.context = context;|
>> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
>> streamTimeInterval, ||this||::streamTimePunctuate);|
>> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
>> systemTimeUpperBound,||this||::systemTimePunctuate); |
>> |}|
>> ||
>> |public||void||streamTimePunctuate(||long||streamTime){|
>> |||periodicBusiness(streamTime);|
>> |||systemTimeSchedule.cancel();|
>> |||systemTimeSchedule = context.schedule(PunctuationType.SYSTEM_TIME, 
>> systemTimeUpperBound,||this||::systemTimePunctuate);|
>> |}|
>> ||
>> |public||void||systemTimePunctuate(||long||systemTime){|
>> |||periodicBusiness(context.timestamp());|
>> |||streamTimeSchedule.cancel();|
>> |||streamTimeSchedule = context.schedule(PunctuationType.STREAM_TIME, 
>> streamTimeInterval,||this||::streamTimePunctuate);|
>> |}|
>> ||
>> |public||void||periodicBusiness(||long||streamTime){|
>> |||// guard against streamTime == -1, easy enough.|
>> |||// if you need system time instead, just use 
>> System.currentTimeMillis()|
>> ||
>> |||// do something businessy here|
>> |}|
>>
>> Where Cancellable is either an interface containing just a single 
>> void cancel() method or also boolean isCancelled() likehere 
>> <http://doc.akka.io/japi/akka/2.5.0/akka/actor/Cancellable.html>.
>>
>>
>> Please let your opinions known whether we should proceed in this 
>> direction or leave "hybrid" considerations out of scope.
>>
>> Looking forward to hearing your thoughts.
>>
>> Thanks,
>> Michal
>>
>> On 30/04/17 20:07, Michal Borowiecki wrote:
>>>
>>> Hi Matthias,
>>>
>>> I'd like to start moving the discarded ideas into Rejected 
>>> Alternatives section. Before I do, I want to tidy them up, ensure 
>>> they've each been given proper treatment.
>>>
>>> To that end let me go back to one of your earlier comments about the 
>>> original suggestion (A) to put that to bed.
>>>
>>>
>>> On 04/04/17 06:44, Matthias J. Sax wrote:
>>>> (A) You argue, that users can still "punctuate" on event-time via
>>>> process(), but I am not sure if this is possible. Note, that users only
>>>> get record timestamps via context.timestamp(). Thus, users would need to
>>>> track the time progress per partition (based on the partitions they
>>>> obverse via context.partition(). (This alone puts a huge burden on the
>>>> user by itself.) However, users are not notified at startup what
>>>> partitions are assigned, and user are not notified when partitions get
>>>> revoked. Because this information is not available, it's not possible to
>>>> "manually advance" stream-time, and thus event-time punctuation within
>>>> process() seems not to be possible -- or do you see a way to get it
>>>> done? And even if, it might still be too clumsy to use.
>>> I might have missed something but I'm guessing your worry about 
>>> users having to track time progress /per partition/ comes from the 
>>> what the stream-time does currently.
>>> But I'm not sure that those semantics of stream-time are ideal for 
>>> users of punctuate.
>>> That is, if stream-time punctuate didn't exist and users had to use 
>>> process(), would they actually want to use the current semantics of 
>>> stream time?
>>>
>>> As a reminder stream time, in all its glory, is (not exactly 
>>> actually, but when trying to be absolutely precise here I spotted 
>>> KAFKA-5144 <https://issues.apache.org/jira/browse/KAFKA-5144> so I 
>>> think this approximation suffices to illustrate the point):
>>>
>>> a minimum across all input partitions of (
>>>    if(msgs never received by partition) -1;
>>>    else {
>>>       a non-descending-minimum of ( the per-batch minimum msg 
>>> timestamp)
>>>    }
>>> )
>>>
>>> Would that really be clear enough to the users of punctuate? Do they 
>>> care for such a convoluted notion of time? I see how this can be 
>>> useful for StreamTask to pick the next partition to take a record 
>>> from but for punctuate?
>>> If users had to implement punctuation with process(), is that what 
>>> they would have chosen as their notion of time?
>>> I'd argue not.
>>>
>>> None of the processors implementing the rich windowing/join 
>>> operations in the DSL use punctuate.
>>> Let's take the KStreamKStreamJoinProcessor as an example, in it's 
>>> process() method it simply uses context().timestamp(), which, since 
>>> it's called from process, returns simply, per javadoc:
>>> If it is triggered while processing a record streamed from the 
>>> source processor, timestamp is defined as the timestamp of the 
>>> current input record;
>>> So they don't use that convoluted formula for stream-time. Instead, 
>>> they only care about the timestamp of the current record. I think 
>>> that having users track just that wouldn't be that much of a burden. 
>>> I don't think they need to care about which partitions got assigned 
>>> or not. And StreamTask would still be picking records first from the 
>>> partition having the lowest timestamp to try to "synchronize" the 
>>> streams as it does now.
>>>
>>> What users would have to do in their Processor implementations is 
>>> somewhere along the lines of:
>>>
>>> long lastPunctuationTime = 0;
>>> long interval = <some-number>; //millis
>>>
>>> @Override
>>> public void process(K key, V value){
>>>     while (ctx.timestamp() >= lastPunctuationTime + interval){
>>>         punctuate(ctx.timestamp());
>>>         lastPunctuationTime += interval;// I'm not sure of the merit 
>>> of this vs lastPunctuationTime = ctx.timestamp(); but that's what 
>>> PunctuationQueue does currently
>>>     }
>>>     // do some other business logic here
>>> }
>>>
>>> Looking forward to your thoughts.
>>>
>>> Cheers,
>>> Michal
>>>
>>> -- 
>>> Signature
>>> <http://www.openbet.com/> 	Michal Borowiecki
>>> Senior Software Engineer L4
>>> 	T: 	+44 208 742 1600
>>>
>>> 	
>>> 	+44 203 249 8448
>>>
>>> 	
>>> 	
>>> 	E: 	michal.borowiecki@openbet.com
>>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>>
>>> 	
>>> 	OpenBet Ltd
>>>
>>> 	Chiswick Park Building 9
>>>
>>> 	566 Chiswick High Rd
>>>
>>> 	London
>>>
>>> 	W4 5XT
>>>
>>> 	UK
>>>
>>> 	
>>> <https://www.openbet.com/email_promo>
>>>
>>> This message is confidential and intended only for the addressee. If 
>>> you have received this message in error, please immediately notify 
>>> the postmaster@openbet.com <mailto:postmaster@openbet.com> and 
>>> delete it from your system as well as any copies. The content of 
>>> e-mails as well as traffic data may be monitored by OpenBet for 
>>> employment and security purposes. To protect the environment please 
>>> do not print this e-mail unless necessary. OpenBet Ltd. Registered 
>>> Office: Chiswick Park Building 9, 566 Chiswick High Road, London, W4 
>>> 5XT, United Kingdom. A company registered in England and Wales. 
>>> Registered no. 3134634. VAT no. GB927523612
>>>
>>
>> -- 
>> Signature
>> <http://www.openbet.com/> 	Michal Borowiecki
>> Senior Software Engineer L4
>> 	T: 	+44 208 742 1600
>>
>> 	
>> 	+44 203 249 8448
>>
>> 	
>> 	
>> 	E: 	michal.borowiecki@openbet.com
>> 	W: 	www.openbet.com <http://www.openbet.com/>
>>
>> 	
>> 	OpenBet Ltd
>>
>> 	Chiswick Park Building 9
>>
>> 	566 Chiswick High Rd
>>
>> 	London
>>
>> 	W4 5XT
>>
>> 	UK
>>
>> 	
>> <https://www.openbet.com/email_promo>
>>
>> This message is confidential and intended only for the addressee. If 
>> you have received this message in error, please immediately notify 
>> the postmaster@openbet.com <mailto:postmaster@openbet.com> and delete 
>> it from your system as well as any copies. The content of e-mails as 
>> well as traffic data may be monitored by OpenBet for employment and 
>> security purposes. To protect the environment please do not print 
>> this e-mail unless necessary. OpenBet Ltd. Registered Office: 
>> Chiswick Park Building 9, 566 Chiswick High Road, London, W4 5XT, 
>> United Kingdom. A company registered in England and Wales. Registered 
>> no. 3134634. VAT no. GB927523612
>>
>
> -- 
> Signature
> <http://www.openbet.com/> 	Michal Borowiecki
> Senior Software Engineer L4
> 	T: 	+44 208 742 1600
>
> 	
> 	+44 203 249 8448
>
> 	
> 	
> 	E: 	michal.borowiecki@openbet.com
> 	W: 	www.openbet.com <http://www.openbet.com/>
>
> 	
> 	OpenBet Ltd
>
> 	Chiswick Park Building 9
>
> 	566 Chiswick High Rd
>
> 	London
>
> 	W4 5XT
>
> 	UK
>
> 	
> <https://www.openbet.com/email_promo>
>
> This message is confidential and intended only for the addressee. If 
> you have received this message in error, please immediately notify the 
> postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
> from your system as well as any copies. The content of e-mails as well 
> as traffic data may be monitored by OpenBet for employment and 
> security purposes. To protect the environment please do not print this 
> e-mail unless necessary. OpenBet Ltd. Registered Office: Chiswick Park 
> Building 9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A 
> company registered in England and Wales. Registered no. 3134634. VAT 
> no. GB927523612
>

-- 
Signature
<http://www.openbet.com/> 	Michal Borowiecki
Senior Software Engineer L4
	T: 	+44 208 742 1600

	
	+44 203 249 8448

	
	
	E: 	michal.borowiecki@openbet.com
	W: 	www.openbet.com <http://www.openbet.com/>

	
	OpenBet Ltd

	Chiswick Park Building 9

	566 Chiswick High Rd

	London

	W4 5XT

	UK

	
<https://www.openbet.com/email_promo>

This message is confidential and intended only for the addressee. If you 
have received this message in error, please immediately notify the 
postmaster@openbet.com <mailto:postmaster@openbet.com> and delete it 
from your system as well as any copies. The content of e-mails as well 
as traffic data may be monitored by OpenBet for employment and security 
purposes. To protect the environment please do not print this e-mail 
unless necessary. OpenBet Ltd. Registered Office: Chiswick Park Building 
9, 566 Chiswick High Road, London, W4 5XT, United Kingdom. A company 
registered in England and Wales. Registered no. 3134634. VAT no. 
GB927523612


Mime
View raw message