flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gyf...@apache.org
Subject [4/6] flink git commit: [streaming] Discretizer sharing added with further window optimzations for better chaining
Date Thu, 19 Feb 2015 16:41:25 GMT
[streaming] Discretizer sharing added with further window optimzations for better chaining


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ca7f36e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ca7f36e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ca7f36e1

Branch: refs/heads/master
Commit: ca7f36e1ee5791cdbda7efc5a01e329abfdcc5c2
Parents: e5528a2
Author: Gyula Fora <gyfora@apache.org>
Authored: Thu Feb 19 14:11:25 2015 +0100
Committer: Gyula Fora <gyfora@apache.org>
Committed: Thu Feb 19 17:03:00 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/streaming/api/StreamGraph.java | 26 +++++-
 .../api/StreamingJobGraphGenerator.java         | 86 +++++++++++++++++++-
 .../api/datastream/DiscretizedStream.java       | 16 +++-
 .../windowing/GroupedStreamDiscretizer.java     |  5 ++
 .../operator/windowing/StreamDiscretizer.java   |  6 ++
 .../windowing/policy/CountEvictionPolicy.java   |  5 ++
 .../windowing/policy/CountTriggerPolicy.java    |  5 ++
 .../api/windowing/policy/DeltaPolicy.java       |  5 ++
 .../api/windowing/policy/PunctuationPolicy.java |  6 ++
 .../windowing/policy/TimeEvictionPolicy.java    |  6 ++
 .../api/windowing/policy/TimeTriggerPolicy.java |  6 ++
 .../policy/TumblingEvictionPolicy.java          |  5 ++
 .../windowing/WindowIntegrationTest.java        | 70 ++++++++++++++--
 13 files changed, 231 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
index 4800c96..c87abb5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamGraph.java
@@ -27,6 +27,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Set;
 
 import org.apache.flink.api.common.ExecutionConfig;
@@ -256,9 +257,10 @@ public class StreamGraph extends StreamingPlan {
 
 		addVertex(vertexName, CoStreamVertex.class, taskInvokableObject, operatorName, parallelism);
 
-		addTypeSerializers(vertexName, new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
-				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig), new StreamRecordSerializer<OUT>(
-						outTypeInfo, executionConfig), null);
+		addTypeSerializers(vertexName,
+				new StreamRecordSerializer<IN1>(in1TypeInfo, executionConfig),
+				new StreamRecordSerializer<IN2>(in2TypeInfo, executionConfig),
+				new StreamRecordSerializer<OUT>(outTypeInfo, executionConfig), null);
 
 		if (LOG.isDebugEnabled()) {
 			LOG.debug("CO-TASK: {}", vertexName);
@@ -319,6 +321,17 @@ public class StreamGraph extends StreamingPlan {
 		selectedNames.get(upStreamVertexName).add(outputNames);
 	}
 
+	public void removeEdge(String upStream, String downStream) {
+		int inputIndex = getInEdges(downStream).indexOf(upStream);
+		inEdgeLists.get(downStream).remove(inputIndex);
+
+		int outputIndex = getOutEdges(upStream).indexOf(downStream);
+		outEdgeLists.get(upStream).remove(outputIndex);
+		outEdgeTypes.get(upStream).remove(outputIndex);
+		selectedNames.get(upStream).remove(outputIndex);
+		outputPartitioners.get(upStream).remove(outputIndex);
+	}
+
 	private void addTypeSerializers(String vertexName, StreamRecordSerializer<?> in1,
 			StreamRecordSerializer<?> in2, StreamRecordSerializer<?> out1,
 			StreamRecordSerializer<?> out2) {
@@ -404,7 +417,8 @@ public class StreamGraph extends StreamingPlan {
 	}
 
 	public <OUT> void setOutType(String id, TypeInformation<OUT> outType) {
-		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
executionConfig);
+		StreamRecordSerializer<OUT> serializer = new StreamRecordSerializer<OUT>(outType,
+				executionConfig);
 		typeSerializersOut1.put(id, serializer);
 	}
 
@@ -480,6 +494,10 @@ public class StreamGraph extends StreamingPlan {
 		this.chaining = chaining;
 	}
 
+	public Set<Entry<String, StreamInvokable<?, ?>>> getInvokables() {
+		return invokableObjects.entrySet();
+	}
+
 	public Collection<String> getSources() {
 		return sources;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
index d57bfc0..f3edb78 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamingJobGraphGenerator.java
@@ -23,6 +23,8 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -36,6 +38,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
 import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.invokable.StreamInvokable.ChainingStrategy;
+import org.apache.flink.streaming.api.invokable.operator.windowing.StreamDiscretizer;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationHead;
 import org.apache.flink.streaming.api.streamvertex.StreamIterationTail;
 import org.apache.flink.streaming.partitioner.StreamPartitioner;
@@ -71,20 +74,97 @@ public class StreamingJobGraphGenerator {
 
 	public JobGraph createJobGraph(String jobName) {
 		jobGraph = new JobGraph(jobName);
+
 		// Turn lazy scheduling off
 		jobGraph.setScheduleMode(ScheduleMode.ALL);
 
 		init();
 
-		for (String sourceName : streamGraph.getSources()) {
-			createChain(sourceName, sourceName);
-		}
+		applyWindowOptimizations();
+
+		setChaining();
 
 		setSlotSharing();
 
 		return jobGraph;
 	}
 
+	private void applyWindowOptimizations() {
+		Set<Entry<String, StreamInvokable<?, ?>>> invokables = streamGraph.getInvokables();
+		List<Tuple2<String, StreamDiscretizer<?>>> discretizers = new ArrayList<Tuple2<String,
StreamDiscretizer<?>>>();
+
+		// Get the discretizers
+		for (Entry<String, StreamInvokable<?, ?>> entry : invokables) {
+			if (entry.getValue() instanceof StreamDiscretizer) {
+				discretizers.add(new Tuple2<String, StreamDiscretizer<?>>(entry.getKey(),
+						(StreamDiscretizer<?>) entry.getValue()));
+			}
+		}
+
+		// Share common discrtizers
+		setDiscretizerReuse(discretizers);
+
+	}
+
+	private void setDiscretizerReuse(List<Tuple2<String, StreamDiscretizer<?>>>
discretizers) {
+		List<Tuple2<StreamDiscretizer<?>, List<String>>> matchingDiscretizers
= new ArrayList<Tuple2<StreamDiscretizer<?>, List<String>>>();
+
+		for (Tuple2<String, StreamDiscretizer<?>> discretizer : discretizers) {
+			boolean inMatching = false;
+			for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers)
{
+				Set<String> discretizerInEdges = new HashSet<String>(
+						streamGraph.getInEdges(discretizer.f0));
+				Set<String> matchingInEdges = new HashSet<String>(
+						streamGraph.getInEdges(matching.f1.get(0)));
+
+				if (discretizer.f1.equals(matching.f0)
+						&& discretizerInEdges.equals(matchingInEdges)) {
+					matching.f1.add(discretizer.f0);
+					inMatching = true;
+					break;
+				}
+			}
+			if (!inMatching) {
+				List<String> matchingNames = new ArrayList<String>();
+				matchingNames.add(discretizer.f0);
+				matchingDiscretizers.add(new Tuple2<StreamDiscretizer<?>, List<String>>(
+						discretizer.f1, matchingNames));
+			}
+		}
+
+		for (Tuple2<StreamDiscretizer<?>, List<String>> matching : matchingDiscretizers)
{
+			List<String> matchList = matching.f1;
+			if (matchList.size() > 1) {
+				String first = matchList.get(0);
+				for (int i = 1; i < matchList.size(); i++) {
+					replaceDiscretizer(matchList.get(i), first);
+				}
+			}
+		}
+	}
+
+	private void replaceDiscretizer(String toReplace, String replaceWith) {
+		// Convert to array to create a copy
+		List<String> outEdges = new ArrayList<String>(streamGraph.getOutEdges(toReplace));
+
+		int numOutputs = outEdges.size();
+
+		// Reconnect outputs
+		for (int i = 0; i < numOutputs; i++) {
+			String outName = outEdges.get(i);
+
+			streamGraph.setEdge(replaceWith, outName,
+					streamGraph.getOutPartitioner(toReplace, outName), 0, new ArrayList<String>());
+			streamGraph.removeEdge(toReplace, outName);
+		}
+	}
+
+	private void setChaining() {
+		for (String sourceName : streamGraph.getSources()) {
+			createChain(sourceName, sourceName);
+		}
+	}
+
 	private List<Tuple2<String, String>> createChain(String startNode, String current)
{
 
 		if (!builtNodes.contains(startNode)) {

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
index bc9f5b9..0c84d0a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DiscretizedStream.java
@@ -73,7 +73,7 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT>
{
 				WindowTransformation.REDUCEWINDOW, "Window Reduce", getType(),
 				new WindowReducer<OUT>(reduceFunction)).merge();
 
-		if (!isGrouped()) {
+		if (!isGrouped() && out.discretizedStream.invokable instanceof WindowMerger) {
 			return out.transform(WindowTransformation.REDUCEWINDOW, "Window Reduce", out.getType(),
 					new WindowReducer<OUT>(discretizedStream.clean(reduceFunction)));
 		} else {
@@ -122,7 +122,9 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT>
{
 			out.groupByKey = null;
 
 			return out;
-		} else if (transformation == WindowTransformation.MAPWINDOW) {
+		} else if (transformation != WindowTransformation.MAPWINDOW
+				&& parallelism != discretizedStream.getExecutionEnvironment()
+						.getDegreeOfParallelism()) {
 			return transform(transformation, "Window partitioner", getType(),
 					new WindowPartitioner<OUT>(parallelism)).setParallelism(parallelism);
 		} else {
@@ -137,8 +139,14 @@ public class DiscretizedStream<OUT> extends WindowedDataStream<OUT>
{
 	private DiscretizedStream<OUT> merge() {
 		TypeInformation<StreamWindow<OUT>> type = discretizedStream.getType();
 
-		return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
-				type, new WindowMerger<OUT>()));
+		// Only merge partitioned streams
+		if (discretizedStream.invokable instanceof WindowPartitioner) {
+			return wrap(discretizedStream.groupBy(new WindowKey<OUT>()).transform("Window Merger",
+					type, new WindowMerger<OUT>()));
+		} else {
+			return this;
+		}
+
 	}
 
 	@SuppressWarnings("rawtypes")

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index ec01e28..5e21a31 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -125,4 +125,9 @@ public class GroupedStreamDiscretizer<IN> extends StreamDiscretizer<IN>
{
 		}
 	}
 
+	@Override
+	public String toString() {
+		return "GroupedDiscretizer(Key: " + keySelector.getClass().getSimpleName() + ", Trigger:
"
+				+ triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString() + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index fb65283..104196e 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -213,4 +213,10 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN,
WindowEvent<IN>>
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "Discretizer(Trigger: " + triggerPolicy.toString() + ", Eviction: " + evictionPolicy.toString()
+				+ ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
index b4bdd43..3ede27b 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountEvictionPolicy.java
@@ -135,4 +135,9 @@ public class CountEvictionPolicy<IN> implements CloneableEvictionPolicy<IN>
{
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "CountPolicy(" + maxElements + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
index c814f5d..6d8149a 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/CountTriggerPolicy.java
@@ -101,4 +101,9 @@ public class CountTriggerPolicy<IN> implements CloneableTriggerPolicy<IN>
{
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "CountPolicy(" + max + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
index 169196f..0811e05 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/DeltaPolicy.java
@@ -128,4 +128,9 @@ public class DeltaPolicy<DATA> implements CloneableTriggerPolicy<DATA>,
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "DeltaPolicy(" + threshold + ", " + deltaFuntion.getClass().getSimpleName() + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
index 4b06aee..e37c443 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/PunctuationPolicy.java
@@ -135,4 +135,10 @@ public class PunctuationPolicy<IN, DATA> implements CloneableTriggerPolicy<IN>,
 			}
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "PunctuationPolicy(" + punctuation + extractor != null ? ", "
+				+ extractor.getClass().getSimpleName() : "" + ")";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
index de715af..982b6d5 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeEvictionPolicy.java
@@ -150,4 +150,10 @@ public class TimeEvictionPolicy<DATA> implements ActiveEvictionPolicy<DATA>,
 		}
 	}
 
+	@Override
+	public String toString() {
+		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+				+ ")";
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
index 84eccf5..7065582 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TimeTriggerPolicy.java
@@ -213,4 +213,10 @@ public class TimeTriggerPolicy<DATA> implements ActiveTriggerPolicy<DATA>,
 		}
 	}
 
+	@Override
+	public String toString() {
+		return "TimePolicy(" + granularity + ", " + timestampWrapper.getClass().getSimpleName()
+				+ ")";
+	}
+
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
index 8d9f028..08c49e9 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/windowing/policy/TumblingEvictionPolicy.java
@@ -96,4 +96,9 @@ public class TumblingEvictionPolicy<DATA> implements CloneableEvictionPolicy<DAT
 			return true;
 		}
 	}
+
+	@Override
+	public String toString() {
+		return "TumblingPolicy";
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ca7f36e1/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
index 893510b..f111898 100755
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowIntegrationTest.java
@@ -32,6 +32,8 @@ import org.apache.flink.streaming.api.function.WindowMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.windowing.StreamWindow;
 import org.apache.flink.streaming.api.windowing.helper.Count;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
 import org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.util.Collector;
 import org.junit.Test;
@@ -97,12 +99,29 @@ public class WindowIntegrationTest implements Serializable {
 		source.window(Count.of(4)).groupBy(new ModKey(2)).mapWindow(new IdentityWindowMap())
 				.flatten().addSink(new CentralSink2());
 
-		source.groupBy(new ModKey(3)).window(Count.of(2)).sum(0).getDiscretizedStream()
+		KeySelector<Integer, ?> key = new ModKey(2);
+		Timestamp<Integer> ts = new Timestamp<Integer>() {
+
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public long getTimestamp(Integer value) {
+				return value;
+			}
+		};
+
+		source.groupBy(key).window(Time.of(4, ts, 1)).sum(0).getDiscretizedStream()
 				.addSink(new DistributedSink1());
 
 		source.groupBy(new ModKey(3)).window(Count.of(2)).groupBy(new ModKey(2))
 				.mapWindow(new IdentityWindowMap()).flatten().addSink(new DistributedSink2());
 
+		source.window(Count.of(2)).every(Count.of(3)).min(0).getDiscretizedStream()
+				.addSink(new CentralSink3());
+
+		source.groupBy(key).window(Time.of(4, ts, 1)).max(0).getDiscretizedStream()
+				.addSink(new DistributedSink3());
+
 		env.execute();
 
 		// sum ( Count of 2 slide 3 )
@@ -123,14 +142,13 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected2, CentralSink2.windows);
 
-		// groupby mod 3 sum ( Tumbling Count of 2)
+		// groupby mod 2 sum ( Tumbling Time of 4)
 		List<StreamWindow<Integer>> expected3 = new ArrayList<StreamWindow<Integer>>();
 		expected3.add(StreamWindow.fromElements(4));
 		expected3.add(StreamWindow.fromElements(5));
-		expected3.add(StreamWindow.fromElements(16));
+		expected3.add(StreamWindow.fromElements(22));
+		expected3.add(StreamWindow.fromElements(8));
 		expected3.add(StreamWindow.fromElements(10));
-		expected3.add(StreamWindow.fromElements(11));
-		expected3.add(StreamWindow.fromElements(3));
 
 		validateOutput(expected3, DistributedSink1.windows);
 
@@ -146,6 +164,22 @@ public class WindowIntegrationTest implements Serializable {
 
 		validateOutput(expected4, DistributedSink2.windows);
 
+		// min ( Count of 2 slide 3 )
+		List<StreamWindow<Integer>> expected5 = new ArrayList<StreamWindow<Integer>>();
+		expected5.add(StreamWindow.fromElements(2));
+		expected5.add(StreamWindow.fromElements(4));
+		expected5.add(StreamWindow.fromElements(11));
+
+		validateOutput(expected5, CentralSink3.windows);
+
+		// groupby mod 2 max ( Tumbling Time of 4)
+		List<StreamWindow<Integer>> expected6 = new ArrayList<StreamWindow<Integer>>();
+		expected6.add(StreamWindow.fromElements(3));
+		expected6.add(StreamWindow.fromElements(5));
+		expected6.add(StreamWindow.fromElements(11));
+		expected6.add(StreamWindow.fromElements(4));
+		expected6.add(StreamWindow.fromElements(10));
+
 	}
 
 	public static <R> void validateOutput(List<R> expected, List<R> actual)
{
@@ -179,6 +213,19 @@ public class WindowIntegrationTest implements Serializable {
 	}
 
 	@SuppressWarnings("serial")
+	private static class CentralSink3 implements SinkFunction<StreamWindow<Integer>>
{
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
+
+	@SuppressWarnings("serial")
 	private static class DistributedSink1 implements SinkFunction<StreamWindow<Integer>>
{
 
 		public static List<StreamWindow<Integer>> windows = Collections
@@ -203,4 +250,17 @@ public class WindowIntegrationTest implements Serializable {
 		}
 
 	}
+
+	@SuppressWarnings("serial")
+	private static class DistributedSink3 implements SinkFunction<StreamWindow<Integer>>
{
+
+		public static List<StreamWindow<Integer>> windows = Collections
+				.synchronizedList(new ArrayList<StreamWindow<Integer>>());
+
+		@Override
+		public void invoke(StreamWindow<Integer> value) throws Exception {
+			windows.add(value);
+		}
+
+	}
 }


Mime
View raw message