flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Radu Tudoran <radu.tudo...@huawei.com>
Subject RE: [DISCUSS] Timely User Functions and Watermarks
Date Tue, 27 Sep 2016 09:41:57 GMT
Hi Aljoscha,

My 2 cents on this would be that it is worth maintaining the access to the watermarks. I think
having the option to customize this is a strong point of Flink.

Regarding the solution you proposed based on 2 input timers " would fire if the watermark
from both inputs advances sufficiently far." I would propose to have the option to set a strategy
for the timer. We could have:
- EgerTrigger - when the triggering is fired when any of the inputs watermarks has advanced
sufficiently far
- LeftEgerTrigger - when the triggering is fired when any of left input watermarks has advanced
sufficiently far
- RightEgerTrigger - when the triggering is fired when any of right input watermarks has advanced
sufficiently far
- SyncTrigger - when the triggering is fired if the watermark from both inputs advances sufficiently
far


We could potentially include here the custom handling of the watermarks under a CustomTrigger
strategy implemented as an operator that can be provided.


-----Original Message-----
From: Aljoscha Krettek [mailto:aljoscha@apache.org] 
Sent: Tuesday, September 27, 2016 11:28 AM
To: Dev
Subject: [DISCUSS] Timely User Functions and Watermarks

Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a problem with
deciding how watermarks should be treated for operators that have more than one input.

The problem is deciding when to fire event-time timers. For one-input operators it's pretty
straightforward: fire once the watermark surpasses a given timer. For two-input operators
we allow the operator implementer to observe the watermarks from the two inputs separately
and react to that and also to decide what watermark to forward. With this it becomes hard
to figure out when to fire timers.

My thinking is that we should not allow operators to observe the watermark anymore but route
it past the operator and deal with watermarks and timers outside of the operator. A timer
for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the watermark before
it enters the operator and still deal with timers in the same way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for timers of operators
with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators is that otherwise
every operator has to implement the functionality to update the current watermark at the timer
service. i.e. something like this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        timerService.updateWatermark(mark); // *<--- that's the thing I don't want*
        output.emitWatermark(mark);
    }
}

This becomes more complicated for two input operators which also do the merging of the watermarks
from the two inputs right now.
Mime
View raw message