flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [23/34] incubator-flink git commit: [streaming] Windowing helper API cleanup
Date Fri, 05 Dec 2014 17:26:28 GMT
[streaming] Windowing helper API cleanup


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/d6dc4349
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/d6dc4349
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/d6dc4349

Branch: refs/heads/master
Commit: d6dc4349c3244f460bbc1227125bf80a3f4be4c7
Parents: 996f02c
Author: Gyula Fora <gyfora@apache.org>
Authored: Tue Nov 25 18:37:25 2014 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Fri Dec 5 16:45:09 2014 +0100

----------------------------------------------------------------------
 .../streaming/api/windowing/helper/Count.java   | 20 +++++-
 .../streaming/api/windowing/helper/Delta.java   |  2 +-
 .../streaming/api/windowing/helper/Time.java    | 70 +++++++++++---------
 .../apache/flink/streaming/api/PrintTest.java   | 44 +-----------
 4 files changed, 58 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6dc4349/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
index 6e045f6..73f7daf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Count.java
@@ -56,11 +56,29 @@ public class Count implements WindowingHelper {
 		return new CountTriggerPolicy(count, startValue);
 	}
 
+	/**
+	 * Sets the number of elements deleted at each eviction (i.e when the number
+	 * elements exceeds the window size). By default the elements get deleted
+	 * one by one (deleteOnEvition = 1)
+	 * 
+	 * @param deleteOnEviction
+	 *            The number of elements deleted at each evition
+	 * @return Helper representing the count based policy
+	 * 
+	 */
 	public Count withDelete(int deleteOnEviction) {
 		this.deleteOnEviction = deleteOnEviction;
 		return this;
 	}
 
+	/**
+	 * Sets the initial value of the counter. 0 by default
+	 * 
+	 * @param startValue
+	 *            Starting value of the window counter
+	 * @return Helper representing the count based policy
+	 * 
+	 */
 	public Count startingAt(int startValue) {
 		this.startValue = startValue;
 		return this;
@@ -72,7 +90,7 @@ public class Count implements WindowingHelper {
 	 * 
 	 * @param count
 	 *            the number of elements to count before trigger/evict
-	 * @return An helper representing the policy
+	 * @return Helper representing the count based policy
 	 */
 	public static Count of(int count) {
 		return new Count(count);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6dc4349/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
index 89c6c4a..ce75fc5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Delta.java
@@ -75,7 +75,7 @@ public class Delta<DATA> implements WindowingHelper<DATA> {
 	 *            delta.
 	 * @param threshold
 	 *            The threshold used by the delta function.
-	 * @return a delta helper representing a delta count or eviction policy
+	 * @return Helper representing a delta trigger or eviction policy
 	 */
 	public static <DATA> Delta<DATA> of(DeltaFunction<DATA> deltaFunction,
DATA initVal,
 			double threshold) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6dc4349/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
index 86ad952..908d0cf 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/helper/Time.java
@@ -21,10 +21,9 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.streaming.api.invokable.util.DefaultTimeStamp;
 import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 
 /**
@@ -38,27 +37,30 @@ import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
  */
 public class Time<DATA> implements WindowingHelper<DATA> {
 
-	private int timeVal;
+	private long length;
 	private TimeUnit granularity;
 	private TimeStamp<DATA> timeStamp;
 	private long delay;
 
 	/**
 	 * Creates an helper representing a trigger which triggers every given
-	 * timeVal or an eviction which evicts all elements older than timeVal.
+	 * length or an eviction which evicts all elements older than length.
 	 * 
-	 * @param timeVal
+	 * @param length
 	 *            The number of time units
 	 * @param timeUnit
 	 *            The unit of time such as minute oder millisecond. Note that
 	 *            the smallest possible granularity is milliseconds. Any smaller
 	 *            time unit might cause an error at runtime due to conversion
 	 *            problems.
+	 * @param timeStamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
 	 */
-	private Time(int timeVal, TimeUnit timeUnit) {
-		this.timeVal = timeVal;
+	private Time(long length, TimeUnit timeUnit, TimeStamp<DATA> timeStamp) {
+		this.length = length;
 		this.granularity = timeUnit;
-		this.timeStamp = new DefaultTimeStamp<DATA>();
+		this.timeStamp = timeStamp;
 		this.delay = 0;
 	}
 
@@ -74,47 +76,49 @@ public class Time<DATA> implements WindowingHelper<DATA> {
 
 	/**
 	 * Creates an helper representing a trigger which triggers every given
-	 * timeVal or an eviction which evicts all elements older than timeVal.
+	 * length or an eviction which evicts all elements older than length.
 	 * 
-	 * @param timeVal
+	 * @param length
 	 *            The number of time units
-	 * @param granularity
+	 * @param timeUnit
 	 *            The unit of time such as minute oder millisecond. Note that
 	 *            the smallest possible granularity is milliseconds. Any smaller
 	 *            time unit might cause an error at runtime due to conversion
 	 *            problems.
-	 * @return an helper representing a trigger which triggers every given
-	 *         timeVal or an eviction which evicts all elements older than
-	 *         timeVal.
+	 * @return Helper representing the time based trigger and eviction policy
 	 */
-	public static <DATA> Time<DATA> of(int timeVal, TimeUnit granularity) {
-		return new Time<DATA>(timeVal, granularity);
+	public static <DATA> Time<DATA> of(long length, TimeUnit timeUnit) {
+		return new Time<DATA>(length, timeUnit, new DefaultTimeStamp<DATA>());
 	}
 
-	@SuppressWarnings("unchecked")
-	public <R> Time<R> withTimeStamp(TimeStamp<R> timeStamp) {
-		this.timeStamp = (TimeStamp<DATA>) timeStamp;
-		return (Time<R>) this;
+	/**
+	 * Creates an helper representing a trigger which triggers every given
+	 * length or an eviction which evicts all elements older than length.
+	 * 
+	 * @param length
+	 *            The number of time units
+	 * @param timeStamp
+	 *            The user defined timestamp that will be used to extract time
+	 *            information from the incoming elements
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
+	public static <DATA> Time<DATA> of(long length, TimeStamp<DATA> timeStamp)
{
+		return new Time<DATA>(length, TimeUnit.MILLISECONDS, timeStamp);
 	}
 
+	/**
+	 * Sets the delay for the first processed window.
+	 * 
+	 * @param delay
+	 *            The number of time units before the first processed window.
+	 * @return Helper representing the time based trigger and eviction policy
+	 */
 	public Time<DATA> withDelay(long delay) {
 		this.delay = delay;
 		return this;
 	}
 
 	private long granularityInMillis() {
-		return this.granularity.toMillis(this.timeVal);
-	}
-
-	public static class NullExtractor<T> implements Extractor<Long, T> {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public T extract(Long in) {
-			return null;
-		}
-
+		return this.granularity.toMillis(this.length);
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d6dc4349/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index f7ba82e..dd8652e 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -18,21 +18,11 @@
 package org.apache.flink.streaming.api;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.api.invokable.util.TimeStamp;
-import org.apache.flink.streaming.api.windowing.extractor.Extractor;
-import org.apache.flink.streaming.api.windowing.helper.Count;
-import org.apache.flink.streaming.api.windowing.helper.Time;
-import org.apache.flink.util.Collector;
 import org.junit.Test;
 
 public class PrintTest implements Serializable {
@@ -60,39 +50,7 @@ public class PrintTest implements Serializable {
 	@Test
 	public void test() throws Exception {
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-
-		List<Tuple2<String, Integer>> input = new ArrayList<Tuple2<String, Integer>>();
-
-		env.fromElements(1, 2, 3, 4, 5, 6, 7, 8, 9)
-				.window(Time.of(2, TimeUnit.MILLISECONDS).withTimeStamp(new TimeStamp<Integer>()
{
-
-					/**
-					 * 
-					 */
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public long getTimestamp(Integer value) {
-
-						return value;
-					}
-
-					@Override
-					public long getStartTime() {
-						return 1;
-					}
-				})).every(Count.of(2)).reduceGroup(new GroupReduceFunction<Integer, String>() {
-
-					@Override
-					public void reduce(Iterable<Integer> values, Collector<String> out)
-							throws Exception {
-						String o = "|";
-						for (Integer v : values) {
-							o = o + v + "|";
-						}
-						out.collect(o);
-					}
-				}).print();
+		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
 		env.executeTest(MEMORYSIZE);
 
 	}


Mime
View raw message