flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Parallelism, registerEventTimeTimer and watermark problem
Date Fri, 20 Oct 2017 11:59:45 GMT
Hi Fritz,

If the watermark is not updating this usually means that one of the input partitions (if you're
using Kafka) is not carrying data. In that case, the watermark/timestamp assigner will have
no data on which to base an updated watermark. For such use cases I recently implemented a
special watermark/timestamp assigner that will notice if a stream is idle and will then artificially
advance the watermark. The code for this is available here: https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f
<https://github.com/aljoscha/flink/commit/6e4419e550caa0e5b162bc0d2ccc43f6b0b3860f>

Does this apply to your case?

Best,
Aljoscha

P.S. The exception is thrown because using timers/state is only allowed in an operation that
directly follows a keyBy().

> On 18. Oct 2017, at 05:23, Fritz Budiyanto <fbudiyan@icloud.com> wrote:
> 
> Sorry, missing copy paste for the exception thrown:
> 
> 10/17/2017 20:21:30	dropDetection -> (aggFlowDropDetectPrintln -> Sink: Unnamed,
aggFlowDropDetectPrintln -> Sink: Unnamed, Sink: kafkaSink)(3/4) switched to CANCELED 
> 20:21:30,244 INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph        - Job
Aggregate flows (313a46d5fd23e4c2d0d00d0033950b6d) switched from state FAILING to FAILED.
> java.lang.NullPointerException: Keyed state can only be used on a 'keyed stream', i.e.,
after a 'keyBy()' operation.
> 	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:75)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.checkPreconditionsAndGetKeyedStateStore(StreamingRuntimeContext.java:151)
> 	at org.apache.flink.streaming.api.operators.StreamingRuntimeContext.getState(StreamingRuntimeContext.java:115)
> 	at FlowContractStitcherProcess.endState$lzycompute(FlowContractResolver.scala:30)
> 	at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:745)
> 
> 
> --
> Fritz
> 
> 
>> On Oct 17, 2017, at 7:55 PM, Fritz Budiyanto <fbudiyan@icloud.com> wrote:
>> 
>> Hi All,
>> 
>> If I have high parallelism and use processFunction to registerEventTimeTimer, the
timer never gets fired.
>> After debugging, I found out the watermark isn't updated because I have keyBy right
after assignTimestampsAndWatermarks.
>> And if I set assignTimestampsAndWatermarks right after the keyBy, an exception is
thrown.
>> 
>> val contractFlow = enrichedFlow
>>     .keyBy(f => f.fiveTupleKey)
>>     .assignTimestampsAndWatermarks(new AggFlowTimestampAssigner) <<<<<
>>     .process(new FlowContractStitcherProcess)
>>     .name("contractStitcher")
>> 
>> at FlowContractStitcherProcess.endState(FlowContractResolver.scala:30)
>> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:96)
>> 	at FlowContractStitcherProcess.processElement(FlowContractResolver.scala:17)
>> 	at org.apache.flink.streaming.api.operators.ProcessOperator.processElement(ProcessOperator.java:66)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:528)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:503)
>> 	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:483)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:891)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:869)
>> 	at org.apache.flink.streaming.runtime.operators.TimestampsAndPeriodicWatermarksOperator.processElement(TimestampsAndPeriodicWatermarksOperator.java:68)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:206)
>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> 	at java.lang.Thread.run(Thread.java:745)
>> 
>> 
>> Any idea how to solve my problem ? How do I update the watermark after keyBy ?
>> 
>> Would I hit scaling issue if on large number of timer if I use registerProcessingTimeTimer
instead ? I'm using event time throughout the pipeline, would mixing processing timer with
event time might cause problem down the line ?
>> 
>> --
>> Fritz
> 


Mime
View raw message