flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From trohrm...@apache.org
Subject [1/2] flink git commit: [FLINK-3319] [cep] Add or function to CEP's pattern api
Date Fri, 19 Aug 2016 13:49:59 GMT
Repository: flink
Updated Branches:
  refs/heads/master 7dbcffb90 -> f0fef6f44


[FLINK-3319] [cep] Add or function to CEP's pattern api

This closes #2171.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/266c76b5
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/266c76b5
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/266c76b5

Branch: refs/heads/master
Commit: 266c76b55e410965d7a5332acd3b2a0dfd3b24bb
Parents: 7dbcffb
Author: Bob Thorman <rt2357@att.com>
Authored: Fri Aug 19 15:23:03 2016 +0200
Committer: Till Rohrmann <trohrmann@apache.org>
Committed: Fri Aug 19 15:26:02 2016 +0200

----------------------------------------------------------------------
 .../flink/cep/pattern/OrFilterFunction.java     | 52 ++++++++++++++++
 .../org/apache/flink/cep/pattern/Pattern.java   | 18 ++++++
 .../java/org/apache/flink/cep/CEPITCase.java    | 65 ++++++++++++++++++++
 .../apache/flink/cep/pattern/PatternTest.java   | 42 +++++++++++++
 4 files changed, 177 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
new file mode 100644
index 0000000..c42ecb1
--- /dev/null
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/OrFilterFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.cep.pattern;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+
+/**
+ * A filter function which combines two filter functions with a logical or. Thus, the filter
+ * function only returns true, iff at least one of the filter functions holds true.
+ *
+ * @param <T> Type of the element to filter
+ */
+public class OrFilterFunction<T> implements FilterFunction<T> {
+	private static final long serialVersionUID = -2109562093871155005L;
+
+	private final FilterFunction<T> left;
+	private final FilterFunction<T> right;
+
+	public OrFilterFunction(final FilterFunction<T> left, final FilterFunction<T>
right) {
+		this.left = left;
+		this.right = right;
+	}
+
+	@Override
+	public boolean filter(T value) throws Exception {
+		return left.filter(value) || right.filter(value);
+	}
+
+	public FilterFunction<T> getLeft() {
+		return left;
+	}
+
+	public FilterFunction<T> getRight() {
+		return right;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
index 696518e..14aed5d 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/Pattern.java
@@ -93,6 +93,24 @@ public class Pattern<T, F extends T> {
 	}
 
 	/**
+	 * Specifies a filter condition if fulfilled by an event will match.
+	 *
+	 * @param newFilterFunction Filter condition
+	 * @return The same pattern operator where the new filter condition is set
+	 */
+	public Pattern<T, F> or(FilterFunction<F> newFilterFunction) {
+		ClosureCleaner.clean(newFilterFunction, true);
+
+		if (this.filterFunction == null) {
+			this.filterFunction = newFilterFunction;
+		} else {
+			this.filterFunction = new OrFilterFunction<>(this.filterFunction, newFilterFunction);
+		}
+
+		return this;
+	}
+
+	/**
 	 * Applies a subtype constraint on the current pattern operator. This means that an event
has
 	 * to be of the given subtype in order to be matched.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 29044d8..0f1f845 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -504,7 +504,72 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase {
 		expected = "Left(1.0)\nRight(2.0,2.0,2.0)";
 
 		env.execute();
+	}
+
+	/**
+	 * Checks that a certain event sequence is recognized with an OR filter
+	 * @throws Exception
+	 */
+	@Test
+	public void testSimpleOrFilterPatternCEP() throws Exception {
+		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+
+		DataStream<Event> input = env.fromElements(
+			new Event(1, "start", 1.0),
+			new Event(2, "middle", 2.0),
+			new Event(3, "end", 3.0),
+			new Event(4, "start", 4.0),
+			new Event(5, "middle", 5.0),
+			new Event(6, "end", 6.0)
+		);
+
+		Pattern<Event, ?> pattern = Pattern.<Event>begin("start")
+			.where(new FilterFunction<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("start");
+				}
+			})
+			.followedBy("middle")
+			.where(new FilterFunction<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getPrice() == 2.0;
+				}
+			})
+			.or(new FilterFunction<Event>() {
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getPrice() == 5.0;
+				}
+			})
+			.followedBy("end").where(new FilterFunction<Event>() {
+
+				@Override
+				public boolean filter(Event value) throws Exception {
+					return value.getName().equals("end");
+				}
+			});
+
+		DataStream<String> result = CEP.pattern(input, pattern).select(new PatternSelectFunction<Event,
String>() {
 
+			@Override
+			public String select(Map<String, Event> pattern) {
+				StringBuilder builder = new StringBuilder();
 
+				builder.append(pattern.get("start").getId()).append(",")
+					.append(pattern.get("middle").getId()).append(",")
+					.append(pattern.get("end").getId());
+
+				return builder.toString();
+			}
+		});
+
+		result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+
+		// expected sequence of matching event ids
+		expected = "1,5,6\n1,2,3\n4,5,6\n1,2,6";
+
+		env.execute();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/266c76b5/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
index 2edf005..98c3f5a 100644
--- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
+++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/pattern/PatternTest.java
@@ -142,4 +142,46 @@ public class PatternTest extends TestLogger {
 		assertEquals(previous.getName(), "subevent");
 		assertEquals(previous2.getName(), "start");
 	}
+
+	@Test
+	public void testPatternWithOrFilter() {
+		Pattern<Event, Event> pattern = Pattern.<Event>begin("start").where(new FilterFunction<Event>()
{
+			private static final long serialVersionUID = 3518061453394250543L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return false;
+			}
+		}).or(new FilterFunction<Event>() {
+			private static final long serialVersionUID = 947463545810023841L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return false;
+			}
+		}).next("or").or(new FilterFunction<Event>() {
+			private static final long serialVersionUID = -2775487887505922250L;
+
+			@Override
+			public boolean filter(Event value) throws Exception {
+				return false;
+			}
+		}).followedBy("end");
+
+		Pattern<Event, ?> previous;
+		Pattern<Event, ?> previous2;
+
+		assertNotNull(previous = pattern.getPrevious());
+		assertNotNull(previous2 = previous.getPrevious());
+		assertNull(previous2.getPrevious());
+
+		assertTrue(pattern instanceof FollowedByPattern);
+		assertFalse(previous.getFilterFunction() instanceof OrFilterFunction);
+		assertTrue(previous2.getFilterFunction() instanceof OrFilterFunction);
+
+		assertEquals(pattern.getName(), "end");
+		assertEquals(previous.getName(), "or");
+		assertEquals(previous2.getName(), "start");
+	}
+
 }


Mime
View raw message