flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [2/2] flink git commit: [FLINK-3682] [cep] Assign processing timestamp in CEP operators
Date Wed, 30 Mar 2016 17:20:28 GMT
[FLINK-3682] [cep] Assign processing timestamp in CEP operators

This PR fixes the problem that the CEP operators did not assign the wall clock time
as the timestamp to incoming in StreamRecords if the TimeCharacteristic was set to
ProcessingTime. Processing element with a Long.MIN_VALUE timestamp can lead to underflows
in the NFA if a positive window length is subtracted from the timestamp. For this
underflow a sanity check has been added to notify the user with an exception about it.

This closes #1841.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/dae29b42
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/dae29b42
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/dae29b42

Branch: refs/heads/release-1.0
Commit: dae29b4258c5042beaeead314444ea8245ed16f1
Parents: a96e1a6
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Wed Mar 30 15:27:21 2016 +0200
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Mar 30 19:19:52 2016 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  7 ++++++
 .../operator/AbstractCEPPatternOperator.java    |  2 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 24 ++++++++++++++++++++
 3 files changed, 32 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/dae29b42/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index 4e4157c..47fc7df 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -155,6 +155,13 @@ public class NFA<T> implements Serializable {
 			if(windowTime > 0) {
 				long pruningTimestamp = timestamp - windowTime;
 
+				// sanity check to guard against underflows
+				if (pruningTimestamp >= timestamp) {
+					throw new IllegalStateException("Detected an underflow in the pruning timestamp. This
indicates that" +
+						" either the window length is too long (" + windowTime + ") or that the timestamp has
not been" +
+						" set correctly (e.g. Long.MIN_VALUE).");
+				}
+
 				// remove all elements which are expired with respect to the window length
 				sharedBuffer.prune(pruningTimestamp);
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/dae29b42/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
index a943f0d..6b087e3 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractCEPPatternOperator.java
@@ -66,7 +66,7 @@ public abstract class AbstractCEPPatternOperator<IN>
 		if (isProcessingTime) {
 			// there can be no out of order elements in processing time
 			NFA<IN> nfa = getNFA();
-			processEvent(nfa, element.getValue(), element.getTimestamp());
+			processEvent(nfa, element.getValue(), System.currentTimeMillis());
 		} else {
 			PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/dae29b42/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 40c3c86..0a287a2 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -29,6 +29,7 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.junit.After;
@@ -398,4 +399,27 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 
 		env.execute();
 	}
+
+	@Test
+	public void testProcessingTimeWithWindow() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+		env.setParallelism(1);
+
+		DataStream<Integer> input = env.fromElements(1, 2);
+
+		Pattern<Integer, ?> pattern = Pattern.<Integer>begin("start").followedBy("end").within(Time.days(1));
+
+		DataStream<Integer> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Integer,
Integer>() {
+			@Override
+			public Integer select(Map<String, Integer> pattern) throws Exception {
+				return pattern.get("start") + pattern.get("end");
+			}
+		});
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		expected = "3";
+
+		env.execute();
+	}
 }


Mime
View raw message