flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kklou...@apache.org
Subject [4/4] flink git commit: comments
Date Tue, 28 Mar 2017 16:01:19 GMT
comments


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ad21a441
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ad21a441
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ad21a441

Branch: refs/heads/master
Commit: ad21a441434b9ac5886b664871553bf57885e984
Parents: 7fbdc10
Author: kl0u <kkloudas@gmail.com>
Authored: Tue Mar 28 10:46:49 2017 +0200
Committer: kl0u <kkloudas@gmail.com>
Committed: Tue Mar 28 11:08:04 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                                 | 15 +++------------
 .../org/apache/flink/cep/scala/pattern/Pattern.scala | 15 +++++++++++++++
 .../src/main/java/org/apache/flink/cep/nfa/NFA.java  |  6 +++---
 .../cep/pattern/conditions/IterativeCondition.java   |  2 +-
 4 files changed, 22 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 9d4ca91..932ba30 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -146,11 +146,10 @@ start.where(new IterativeCondition<SubEvent>() {
             return false;
         }
         
-        double sum = 0.0;
+        double sum = value.getPrice();
         for (Event event : ctx.getEventsForPattern("middle")) {
             sum += event.getPrice();
         }
-        sum += value.getPrice();
         return Double.compare(sum, 5.0) < 0;
     }
 });
@@ -161,16 +160,8 @@ start.where(new IterativeCondition<SubEvent>() {
 {% highlight scala %}
 start.where(
     (value, ctx) => {
-        var res = value.getName.startsWith("foo")
-        if (res) {
-            var sum = 0.0
-            for (e: Event <- ctx.getEventsForPattern("middle")) {
-                sum += e.getPrice
-            }
-            sum += value.getPrice
-            res = res && sum < 5.0
-        }
-        res
+        lazy val sum = ctx.getEventsForPattern("middle").asScala.map(_.getPrice).sum
+        value.getName.startsWith("foo") && sum + value.getPrice < 5.0
     }
 )
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
index a1db460..07dfc5a 100644
--- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
+++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/pattern/Pattern.scala
@@ -145,6 +145,21 @@ class Pattern[T , F <: T](jPattern: JPattern[T, F]) {
   }
 
   /**
+    * Specifies a filter condition which is ORed with an existing filter function.
+    *
+    * @param filterFun Or filter function
+    * @return The same pattern operator where the new filter condition is set
+    */
+  def or(filterFun: (F, Context[F]) => Boolean): Pattern[T, F] = {
+    val filter = new IterativeCondition[F] {
+      val cleanFilter = cep.scala.cleanClosure(filterFun)
+
+      override def filter(value: F, ctx: Context[F]): Boolean = cleanFilter(value, ctx)
+    }
+    or(filter)
+  }
+
+  /**
     * Specifies a filter condition which has to be fulfilled by an event in order to be matched.
     *
     * @param filterFun Filter condition

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index cddc1ed..98c1fc9 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -92,7 +92,7 @@ public class NFA<T> implements Serializable {
 	private final NonDuplicatingTypeSerializer<T> nonDuplicatingTypeSerializer;
 
 	/**
-	 * 	Used only for backward compatibility. Buffer used to store the matched events.
+	 * Used only for backwards compatibility. Buffer used to store the matched events.
 	 */
 	private final SharedBuffer<State<T>, T> sharedBuffer = null;
 
@@ -575,7 +575,7 @@ public class NFA<T> implements Serializable {
 				computationState.getVersion());
 
 		// for a given computation state, we cannot have more than one matching patterns.
-		Preconditions.checkArgument(paths.size() <= 1);
+		Preconditions.checkState(paths.size() <= 1);
 
 		TypeSerializer<T> serializer = nonDuplicatingTypeSerializer.getTypeSerializer();
 
@@ -609,7 +609,7 @@ public class NFA<T> implements Serializable {
 			computationState.getVersion());
 
 		// for a given computation state, we cannot have more than one matching patterns.
-		Preconditions.checkArgument(paths.size() <= 1);
+		Preconditions.checkState(paths.size() <= 1);
 
 		List<Map<String, T>> result = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/ad21a441/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
index f225e01..016cdef 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/pattern/conditions/IterativeCondition.java
@@ -88,7 +88,7 @@ public abstract class IterativeCondition<T> implements Function, Serializable
{
 
 		/**
 		 * @return An {@link Iterable} over the already accepted elements
-		 * for a given pattern. Elements are iterated in the order the were
+		 * for a given pattern. Elements are iterated in the order they were
 		 * inserted in the pattern.
 		 *
 		 * @param name The name of the pattern.


Mime
View raw message