flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kostas Kloudas <k.klou...@data-artisans.com>
Subject Re: FsStateBackend with incremental backup enable does not work with Keyed CEP
Date Sat, 12 Aug 2017 08:20:34 GMT
Hi Daiqing,

I think Stefan is right and this will be fixed in the upcoming release.
Could you open a JIRA for it with the Exception that you posted here?

Thanks,
Kostas

> On Aug 12, 2017, at 10:05 AM, Stefan Richter <s.richter@data-artisans.com> wrote:
> 
> Hi,
> 
> from a quick look, I would say this is likely a problem with the NFASerializer: this
class seems to be stateful, but its 'duplicate()‘ method is simply returning ‚this‘.
This means that code which relies on duplication of serializers to shield against concurrent
accesses can break, because multiple threads can work on the same internal serializer state
and corrupt it. Will take a deeper look an monday.
> 
> Best,
> Stefan
> 
>> Am 11.08.2017 um 20:55 schrieb Daiqing Li <lidaiqing1993@gmail.com <mailto:lidaiqing1993@gmail.com>>:
>> 
>> Hi,
>> 
>> I am running fling 1.3.1 on EMR. But I am getting this exception after running for
a while.
>> 
>> 
>> java.lang.RuntimeException: Exception occurred while processing valve output watermark:

>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
>> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
>> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
>> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
>> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
>> 	at java.lang.Thread.run(Thread.java:748)
>> Caused by: java.lang.RuntimeException: Could not copy NFA.
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
>> 	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
>> 	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
>> 	at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
>> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
>> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
>> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
>> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
>> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
>> 	... 7 more
>> Caused by: java.io.StreamCorruptedException: invalid type code: 00
>> 	at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
>> 	at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
>> 	at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
>> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
>> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
>> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
>> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
>> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
>> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
>> 	... 17 more
> 


Mime
View raw message