flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Xiaojun Jin (JIRA)" <j...@apache.org>
Subject [jira] [Created] (FLINK-5947) NullPointerException in ContinuousProcessingTimeTrigger.clear()
Date Thu, 02 Mar 2017 02:06:45 GMT
Xiaojun Jin created FLINK-5947:
----------------------------------

             Summary: NullPointerException in ContinuousProcessingTimeTrigger.clear()
                 Key: FLINK-5947
                 URL: https://issues.apache.org/jira/browse/FLINK-5947
             Project: Flink
          Issue Type: Bug
          Components: DataStream API
    Affects Versions: 1.2.0
            Reporter: Xiaojun Jin
            Priority: Critical


The fireTimestamp may  be null when deleting processing timer in the ContinuousProcessingTimerTrigger.
Exception stack is as follows:
{quote}
Caused by: java.lang.NullPointerException
	at org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger.clear(ContinuousProcessingTimeTrigger.java:89)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.clear(WindowOperator.java:761)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:348)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$2.merge(WindowOperator.java:336)
	at org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:210)
	at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator.processElement(WindowOperator.java:336)
	at org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:208)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:70)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:265)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:668)
	at java.lang.Thread.run(Thread.java:745)

{quote}

The patch is as follows:
{code}
@@ -86,9 +86,10 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends
Trigger<O
        @Override
        public void clear(W window, TriggerContext ctx) throws Exception {
                ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
-               long timestamp = fireTimestamp.get();
-               ctx.deleteProcessingTimeTimer(timestamp);
-               fireTimestamp.clear();
+               if (fireTimestamp.get() != null) {
+                       ctx.deleteProcessingTimeTimer(fireTimestamp.get());
+                       fireTimestamp.clear();
+               }
        }
{code}




--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message