flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject flink git commit: [FLINK-3684] [cep] Add proper watermark emission to CEP operators
Date Thu, 31 Mar 2016 09:47:05 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.0 dae29b425 -> 630e77f22


[FLINK-3684] [cep] Add proper watermark emission to CEP operators

This closes #1842.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/630e77f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/630e77f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/630e77f2

Branch: refs/heads/release-1.0
Commit: 630e77f226e2634a8a47aca9fb4736421851d7be
Parents: dae29b4
Author: Till Rohrmann <trohrmann@apache.org>
Authored: Thu Mar 31 11:35:19 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Thu Mar 31 11:46:47 2016 +0200

----------------------------------------------------------------------
 .../flink/cep/operator/CEPPatternOperator.java  |   2 +
 .../cep/operator/KeyedCEPPatternOperator.java   |   2 +
 .../flink/cep/operator/CEPOperatorTest.java     | 112 +++++++++++++++++++
 3 files changed, 116 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/630e77f2/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
index 153c9c9..7760817 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPPatternOperator.java
@@ -87,6 +87,8 @@ public class CEPPatternOperator<IN> extends AbstractCEPPatternOperator<IN>
{
 
 			processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
 		}
+
+		output.emitWatermark(mark);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/630e77f2/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 5d754ce..5db8ef2 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -163,6 +163,8 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractCEPPatternOperator
 				processEvent(nfa, streamRecord.getValue(), streamRecord.getTimestamp());
 			}
 		}
+
+		output.emitWatermark(mark);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/630e77f2/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
new file mode 100644
index 0000000..589589b
--- /dev/null
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.operator;
+
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.common.typeutils.base.IntSerializer;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.cep.nfa.NFA;
+import org.apache.flink.cep.nfa.compiler.NFACompiler;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.util.TestLogger;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import java.util.Map;
+
+
+public class CEPOperatorTest extends TestLogger {
+
+	@Test
+	public void testCEPOperatorWatermarkForwarding() throws Exception {
+		OneInputStreamOperatorTestHarness<Integer, Map<String, Integer>> harness =
new OneInputStreamOperatorTestHarness<>(
+			new CEPPatternOperator<Integer>(
+				IntSerializer.INSTANCE,
+				false,
+				new DummyNFAFactory<>(IntSerializer.INSTANCE))
+		);
+
+		harness.open();
+
+		Watermark expectedWatermark = new Watermark(42L);
+
+		harness.processWatermark(expectedWatermark);
+
+		Object watermark = harness.getOutput().poll();
+
+		assertTrue(watermark instanceof Watermark);
+		assertEquals(expectedWatermark, watermark);
+
+		harness.close();
+	}
+
+	@Test
+	public void testKeyedCEPOperatorWatermarkForwarding() throws Exception {
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>()
{
+			private static final long serialVersionUID = -4873366487571254798L;
+
+			@Override
+			public Integer getKey(Integer value) throws Exception {
+				return value;
+			}
+		};
+
+		OneInputStreamOperatorTestHarness<Integer, Map<String, Integer>> harness =
new OneInputStreamOperatorTestHarness<>(
+			new KeyedCEPPatternOperator<Integer, Integer>(
+				IntSerializer.INSTANCE,
+				false,
+				keySelector,
+				IntSerializer.INSTANCE,
+			new DummyNFAFactory<>(IntSerializer.INSTANCE))
+		);
+
+		harness.configureForKeyedStream(keySelector, BasicTypeInfo.INT_TYPE_INFO);
+
+		harness.open();
+
+		Watermark expectedWatermark = new Watermark(42L);
+
+		harness.processWatermark(expectedWatermark);
+
+		Object watermark = harness.getOutput().poll();
+
+		assertTrue(watermark instanceof Watermark);
+		assertEquals(expectedWatermark, watermark);
+
+		harness.close();
+	}
+
+	public static class DummyNFAFactory<T> implements NFACompiler.NFAFactory<T>
{
+
+		private static final long serialVersionUID = 1173020762472766713L;
+
+		private final TypeSerializer<T> inputTypeSerializer;
+
+		public DummyNFAFactory(TypeSerializer<T> inputTypeSerializer) {
+			this.inputTypeSerializer = inputTypeSerializer;
+		}
+
+		@Override
+		public NFA<T> createNFA() {
+			return new NFA<>(inputTypeSerializer.duplicate(), 0);
+		}
+	}
+}


Mime
View raw message