flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kostas Kloudas (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-7435) FsStateBackend with incremental backup enable does not work with Keyed CEP
Date Thu, 12 Oct 2017 14:29:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-7435?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16202007#comment-16202007
] 

Kostas Kloudas edited comment on FLINK-7435 at 10/12/17 2:28 PM:
-----------------------------------------------------------------

Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered when FlinkCEP
tries to deserialize the conditions of your pattern ({{NFASerializer.deserializeCondition}}).


Could you please share more details about your job and even some code. For example the code
of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the {{duplicate()}}
method of the serializer. Could you please try this branch https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug
to see if it solves the problem? The main change is that the {{NFASerializer.duplicate()}}
method becomes now:

{code}
@Override
public TypeSerializer<NFA<T>> duplicate() {
	return new NFASerializer<>(eventSerializer.duplicate());
}
{code}

So if you have Flink's source code, then you can make this change yourself and try it out.


was (Author: kkl0u):
Hi [~lidaiqing]. I am looking into this issue, and the problem is encountered when FlinkCEP
tries to deserialize the conditions of your pattern (`NFASerializer.deserializeCondition`).


Could you please share more details about your job and even some code. For example the code
of your pattern, the state backend you are using? 

Also, as I said in the previous comment, there is definitely a problem with the `duplicate()`
method of the serializer. Could you please try this branch https://github.com/kl0u/flink/tree/cep-nfa-serializer-bug
to see if it solves the problem? The main change is that the `NFASerializer.duplicate()` method
becomes now:

```
@Override
public TypeSerializer<NFA<T>> duplicate() {
	return new NFASerializer<>(eventSerializer.duplicate());
}
```

So if you have Flink's source code, then you can make this change yourself and try it out.

> FsStateBackend with incremental backup enable does not work with Keyed CEP
> --------------------------------------------------------------------------
>
>                 Key: FLINK-7435
>                 URL: https://issues.apache.org/jira/browse/FLINK-7435
>             Project: Flink
>          Issue Type: Bug
>          Components: CEP
>    Affects Versions: 1.3.1, 1.3.2
>         Environment: AWS EMR YARN, use CEP with pattern start -> next (oneOrMore.Optional.Consective)
-> next(end). Store it with FsStatebackend with Incremental option open. 
>            Reporter: daiqing
>            Assignee: Kostas Kloudas
>
> java.lang.RuntimeException: Exception occurred while processing valve output watermark:

> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:289)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.findAndOutputNewMinWatermarkAcrossAlignedChannels(StatusWatermarkValve.java:173)
> 	at org.apache.flink.streaming.runtime.streamstatus.StatusWatermarkValve.inputWatermark(StatusWatermarkValve.java:108)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:188)
> 	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
> 	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:263)
> 	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
> 	at java.lang.Thread.run(Thread.java:748)
> Caused by: java.lang.RuntimeException: Could not copy NFA.
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:908)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:852)
> 	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:279)
> 	at org.apache.flink.runtime.state.heap.CopyOnWriteStateTable.get(CopyOnWriteStateTable.java:296)
> 	at org.apache.flink.runtime.state.heap.HeapValueState.value(HeapValueState.java:55)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.getNFA(AbstractKeyedCEPPatternOperator.java:268)
> 	at org.apache.flink.cep.operator.AbstractKeyedCEPPatternOperator.onEventTime(AbstractKeyedCEPPatternOperator.java:230)
> 	at org.apache.flink.streaming.api.operators.HeapInternalTimerService.advanceWatermark(HeapInternalTimerService.java:275)
> 	at org.apache.flink.streaming.api.operators.InternalTimeServiceManager.advanceWatermark(InternalTimeServiceManager.java:107)
> 	at org.apache.flink.streaming.api.operators.AbstractStreamOperator.processWatermark(AbstractStreamOperator.java:946)
> 	at org.apache.flink.streaming.runtime.io.StreamInputProcessor$ForwardingValveOutputHandler.handleWatermark(StreamInputProcessor.java:286)
> 	... 7 more
> Caused by: java.io.StreamCorruptedException: invalid type code: 00
> 	at java.io.ObjectInputStream.readTypeString(ObjectInputStream.java:1620)
> 	at java.io.ObjectStreamClass.readNonProxy(ObjectStreamClass.java:719)
> 	at java.io.ObjectInputStream.readClassDescriptor(ObjectInputStream.java:882)
> 	at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1815)
> 	at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1713)
> 	at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2000)
> 	at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
> 	at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeCondition(NFA.java:1211)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserializeStates(NFA.java:1169)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.deserialize(NFA.java:957)
> 	at org.apache.flink.cep.nfa.NFA$NFASerializer.copy(NFA.java:903)
> 	... 17 more



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message