flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [4/4] flink git commit: [FLINK-3174] Add MergingWindowAssigner and SessionWindows
Date Tue, 05 Apr 2016 15:19:18 GMT
[FLINK-3174] Add MergingWindowAssigner and SessionWindows

This introduces MergingWindowAssigner, an extension of WindowAssigner
that can merge windows. When using a MergingWindowAssigner the
WindowOperator eagerly merges windows when processing elements.

For keeping track of in-flight windows and for merging windows this adds
MergingWindowSet, this keeps track of windows per key.

Only when using a MergingWindowAssigners is the more costly merging
logic used in the WindowOperator.

For triggers there is a new method Trigger.onMerge() that notifies the
trigger of the new merged window. This allows the trigger to set a timer
for the newly merged window. Only triggers that return true
from Trigger.canMerge() can be used with MergingWindowAssigner. Trigger
has default implementations for canMerge() and onMerge(), we return
false and onMerge() throws an Exception.

This also adds AbstractStateBackend.mergePartitionedStates for merging
state of several source namespaces into a target namespace. This is only
possible for the newly introduced MergingState which is an extension of
AppendingState. Only ReducingState and ListState are MergingState while
FoldingState is now an AppendingState.

This enables proper support for session windows.

This also adds the EventTimeSessionWindows and ProcessingTimeSessionWindows
window assigners and adapts an existing session example and adds test cases.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6cd8ceb1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6cd8ceb1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6cd8ceb1

Branch: refs/heads/master
Commit: 6cd8ceb10c841827cf89b74ecf5a0495a6933d53
Parents: 505512d
Author: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Authored: Tue Dec 15 17:37:48 2015 +0100
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Apr 5 17:18:58 2016 +0200

----------------------------------------------------------------------
 docs/apis/streaming/windows.md                  |  60 ++-
 .../flink/api/common/state/AppendingState.java  |  68 +++
 .../flink/api/common/state/FoldingState.java    |   2 +-
 .../flink/api/common/state/MergingState.java    |  45 +-
 .../examples/windowing/SessionWindowing.java    |  77 +---
 .../runtime/state/AbstractStateBackend.java     |  52 ++-
 .../api/datastream/AllWindowedStream.java       |  12 +
 .../api/datastream/WindowedStream.java          |  13 +-
 .../assigners/EventTimeSessionWindows.java      |  91 ++++
 .../api/windowing/assigners/GlobalWindows.java  |   6 +
 .../assigners/MergingWindowAssigner.java        |  57 +++
 .../assigners/ProcessingTimeSessionWindows.java |  91 ++++
 .../triggers/ContinuousEventTimeTrigger.java    |  57 ++-
 .../ContinuousProcessingTimeTrigger.java        |  73 ++--
 .../api/windowing/triggers/CountTrigger.java    |  47 +-
 .../windowing/triggers/EventTimeTrigger.java    |  12 +
 .../triggers/ProcessingTimeTrigger.java         |  12 +
 .../api/windowing/triggers/PurgingTrigger.java  |  18 +
 .../api/windowing/triggers/Trigger.java         |  46 +-
 .../api/windowing/windows/TimeWindow.java       |  71 +++
 .../windowing/EvictingWindowOperator.java       |   6 +-
 .../operators/windowing/MergingWindowSet.java   | 211 +++++++++
 .../operators/windowing/WindowOperator.java     | 147 +++++--
 .../windowing/AllWindowTranslationTest.java     |  84 ++++
 .../windowing/MergingWindowSetTest.java         | 261 +++++++++++
 .../operators/windowing/WindowOperatorTest.java | 429 +++++++++++++++++--
 .../windowing/WindowTranslationTest.java        |  98 +++++
 .../flink/streaming/util/TestHarnessUtil.java   |   6 +-
 28 files changed, 1918 insertions(+), 234 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/docs/apis/streaming/windows.md
----------------------------------------------------------------------
diff --git a/docs/apis/streaming/windows.md b/docs/apis/streaming/windows.md
index a630378..8cf5ddd 100644
--- a/docs/apis/streaming/windows.md
+++ b/docs/apis/streaming/windows.md
@@ -247,7 +247,7 @@ stream.window(GlobalWindows.create());
         </td>
       </tr>
       <tr>
-        <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
         <td>
           <p>
             Incoming elements are assigned to a window of a certain size (1 second below) based on
@@ -261,7 +261,7 @@ stream.window(TumblingEventTimeWindows.of(Time.seconds(1)));
         </td>
       </tr>
       <tr>
-        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
         <td>
           <p>
             Incoming elements are assigned to a window of a certain size (5 seconds below) based on
@@ -302,6 +302,32 @@ stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
     {% endhighlight %}
         </td>
       </tr>
+          <tr>
+        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td>
+          <p>
+            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
+            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
+            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
+            can be connected into a session by intermediate elements.
+          </p>
+    {% highlight scala %}
+keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)));
+    {% endhighlight %}
+        </td>
+      </tr>
+       <tr>
+        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td>
+          <p>
+           This is similar to event-time session windows but works on the current processing
+           time instead of the timestamp of elements
+          </p>
+    {% highlight scala %}
+keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)));
+    {% endhighlight %}
+        </td>
+      </tr>
   </tbody>
 </table>
 </div>
@@ -329,7 +355,7 @@ stream.window(GlobalWindows.create)
         </td>
       </tr>
       <tr>
-          <td><strong>Tumbling time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+          <td><strong>Tumbling event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
           <td>
             <p>
              Incoming elements are assigned to a window of a certain size (1 second below) based on
@@ -343,7 +369,7 @@ stream.window(TumblingEventTimeWindows.of(Time.seconds(1)))
           </td>
         </tr>
       <tr>
-        <td><strong>Sliding time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td><strong>Sliding event-time windows</strong><br>KeyedStream &rarr; WindowedStream</td>
         <td>
           <p>
             Incoming elements are assigned to a window of a certain size (5 seconds below) based on
@@ -385,6 +411,32 @@ stream.window(SlidingProcessingTimeWindows.of(Time.seconds(5), Time.seconds(1)))
     {% endhighlight %}
         </td>
       </tr>
+         <tr>
+        <td><strong>Event-time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td>
+          <p>
+            Incoming elements are assigned to sessions based on a session gap interval (5 seconds in the example below).
+            Elements whose timestamp differs by more than the session gap are assigned to different sessions. If there are
+            consecutive elements which are less than the session gap apart then these will also be put into the same session, i.e. elements
+            can be connected into a session by intermediate elements.
+          </p>
+    {% highlight scala %}
+keyedStream.window(EventTimeSessionWindows.withGap(Time.seconds(5)))
+    {% endhighlight %}
+        </td>
+      </tr>
+       <tr>
+        <td><strong>Processing time Session windows</strong><br>KeyedStream &rarr; WindowedStream</td>
+        <td>
+          <p>
+           This is similar to event-time session windows but works on the current processing
+           time instead of the timestamp of elements
+          </p>
+    {% highlight scala %}
+keyedStream.window(ProcessingTimeSessionWindows.withGap(Time.seconds(5)))
+    {% endhighlight %}
+        </td>
+      </tr>
   </tbody>
 </table>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
new file mode 100644
index 0000000..04dc784
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/AppendingState.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.state;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.IOException;
+
+/**
+ * Base interface for partitioned state that supports adding elements and inspecting the current
+ * state. Elements can either be kept in a buffer (list-like) or aggregated into one value.
+ *
+ * <p>The state is accessed and modified by user functions, and checkpointed consistently
+ * by the system as part of the distributed snapshots.
+ * 
+ * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
+ * automatically supplied by the system, so the function always sees the value mapped to the
+ * key of the current element. That way, the system can handle stream and state partitioning
+ * consistently together.
+ * 
+ * @param <IN> Type of the value that can be added to the state.
+ * @param <OUT> Type of the value that can be retrieved from the state.
+ */
+@PublicEvolving
+public interface AppendingState<IN, OUT> extends State {
+
+	/**
+	 * Returns the current value for the state. When the state is not
+	 * partitioned the returned value is the same for all inputs in a given
+	 * operator instance. If state partitioning is applied, the value returned
+	 * depends on the current operator input, as the operator maintains an
+	 * independent state for each partition.
+	 * 
+	 * @return The operator state value corresponding to the current input.
+	 * 
+	 * @throws Exception Thrown if the system cannot access the state.
+	 */
+	OUT get() throws Exception ;
+
+	/**
+	 * Updates the operator state accessible by {@link #get()} by adding the given value
+	 * to the list of values. The next time {@link #get()} is called (for the same state
+	 * partition) the returned state will represent the updated list.
+	 * 
+	 * @param value
+	 *            The new value for the state.
+	 *            
+	 * @throws IOException Thrown if the system cannot access the state.
+	 */
+	void add(IN value) throws Exception;
+	
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
index fa8eda8..684a612 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/FoldingState.java
@@ -37,4 +37,4 @@ import org.apache.flink.annotation.PublicEvolving;
  * @param <ACC> Type of the value in the state
  */
 @PublicEvolving
-public interface FoldingState<T, ACC> extends MergingState<T, ACC> {}
+public interface FoldingState<T, ACC> extends AppendingState<T, ACC> {}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
index 09abfd3..e79f907 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/state/MergingState.java
@@ -20,50 +20,13 @@ package org.apache.flink.api.common.state;
 
 import org.apache.flink.annotation.PublicEvolving;
 
-import java.io.IOException;
-
 /**
- * Base interface for partitioned state that supports adding elements and inspecting the current
- * state of merged elements. Elements can either be kept in a buffer (list-like) or merged together
- * into one value.
- *
- * <p>The state is accessed and modified by user functions, and checkpointed consistently
- * by the system as part of the distributed snapshots.
- * 
- * <p>The state is only accessible by functions applied on a KeyedDataStream. The key is
- * automatically supplied by the system, so the function always sees the value mapped to the
- * key of the current element. That way, the system can handle stream and state partitioning
- * consistently together.
+ * Extension of {@link AppendingState} that allows merging of state. That is, two instances
+ * of {@link MergingState} can be combined into a single instance that contains all the
+ * information of the two merged states.
  * 
  * @param <IN> Type of the value that can be added to the state.
  * @param <OUT> Type of the value that can be retrieved from the state.
  */
 @PublicEvolving
-public interface MergingState<IN, OUT> extends State {
-
-	/**
-	 * Returns the current value for the state. When the state is not
-	 * partitioned the returned value is the same for all inputs in a given
-	 * operator instance. If state partitioning is applied, the value returned
-	 * depends on the current operator input, as the operator maintains an
-	 * independent state for each partition.
-	 * 
-	 * @return The operator state value corresponding to the current input.
-	 * 
-	 * @throws Exception Thrown if the system cannot access the state.
-	 */
-	OUT get() throws Exception ;
-
-	/**
-	 * Updates the operator state accessible by {@link #get()} by adding the given value
-	 * to the list of values. The next time {@link #get()} is called (for the same state
-	 * partition) the returned state will represent the updated list.
-	 * 
-	 * @param value
-	 *            The new value for the state.
-	 *            
-	 * @throws IOException Thrown if the system cannot access the state.
-	 */
-	void add(IN value) throws Exception;
-	
-}
+public interface MergingState<IN, OUT> extends AppendingState<IN, OUT> { }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
index 16dbdfa..27804fe 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/windowing/SessionWindowing.java
@@ -17,8 +17,6 @@
 
 package org.apache.flink.streaming.examples.windowing;
 
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.streaming.api.TimeCharacteristic;
@@ -26,10 +24,8 @@ import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
-import org.apache.flink.streaming.api.windowing.triggers.Trigger;
-import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
-import org.apache.flink.streaming.api.windowing.windows.GlobalWindow;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -86,8 +82,7 @@ public class SessionWindowing {
 		// We create sessions for each id with max timeout of 3 time units
 		DataStream<Tuple3<String, Long, Integer>> aggregated = source
 				.keyBy(0)
-				.window(GlobalWindows.create())
-				.trigger(new SessionTrigger(3L))
+				.window(EventTimeSessionWindows.withGap(Time.milliseconds(3L)))
 				.sum(2);
 
 		if (fileOutput) {
@@ -99,70 +94,4 @@ public class SessionWindowing {
 
 		env.execute();
 	}
-
-	private static class SessionTrigger extends Trigger<Tuple3<String, Long, Integer>, GlobalWindow> {
-
-		private static final long serialVersionUID = 1L;
-
-		private final Long sessionTimeout;
-
-		private final ValueStateDescriptor<Long> stateDesc = 
-				new ValueStateDescriptor<>("last-seen", Long.class, -1L);
-
-
-		public SessionTrigger(Long sessionTimeout) {
-			this.sessionTimeout = sessionTimeout;
-
-		}
-
-		@Override
-		public TriggerResult onElement(Tuple3<String, Long, Integer> element, long timestamp, GlobalWindow window, TriggerContext ctx) throws Exception {
-
-			ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
-			Long lastSeen = lastSeenState.value();
-
-			Long timeSinceLastEvent = timestamp - lastSeen;
-
-			ctx.deleteEventTimeTimer(lastSeen + sessionTimeout);
-
-			// Update the last seen event time
-			lastSeenState.update(timestamp);
-
-			ctx.registerEventTimeTimer(timestamp + sessionTimeout);
-
-			if (lastSeen != -1 && timeSinceLastEvent > sessionTimeout) {
-				System.out.println("FIRING ON ELEMENT: " + element + " ts: " + timestamp + " last " + lastSeen);
-				return TriggerResult.FIRE_AND_PURGE;
-			} else {
-				return TriggerResult.CONTINUE;
-			}
-		}
-
-		@Override
-		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
-			ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
-			Long lastSeen = lastSeenState.value();
-
-			if (time - lastSeen >= sessionTimeout) {
-				System.out.println("CTX: " + ctx + " Firing Time " + time + " last seen " + lastSeen);
-				return TriggerResult.FIRE_AND_PURGE;
-			}
-			return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) throws Exception {
-			return TriggerResult.CONTINUE;
-		}
-
-		@Override
-		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {
-			ValueState<Long> lastSeenState = ctx.getPartitionedState(stateDesc);
-			if (lastSeenState.value() != -1) {
-				ctx.deleteEventTimeTimer(lastSeenState.value() + sessionTimeout);
-			}
-			lastSeenState.clear();
-		}
-	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index 3e99362..df6fcd3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -19,10 +19,12 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.FoldingState;
 import org.apache.flink.api.common.state.FoldingStateDescriptor;
 import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.ReducingState;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.state.State;
@@ -40,7 +42,10 @@ import org.apache.flink.runtime.execution.Environment;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 
 /**
@@ -196,7 +201,7 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 	public <N, S extends State> S getPartitionedState(final N namespace, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
 
 		if (keySerializer == null) {
-			throw new Exception("State key serializer has not been configured in the config. " +
+			throw new RuntimeException("State key serializer has not been configured in the config. " +
 					"This operation cannot use partitioned state.");
 		}
 		
@@ -257,6 +262,51 @@ public abstract class AbstractStateBackend implements java.io.Serializable {
 		return kvstate;
 	}
 
+	@SuppressWarnings("unchecked,rawtypes")
+	public <N, S extends MergingState<?, ?>> void mergePartitionedStates(final N target, Collection<N> sources, final TypeSerializer<N> namespaceSerializer, final StateDescriptor<S, ?> stateDescriptor) throws Exception {
+		if (stateDescriptor instanceof ReducingStateDescriptor) {
+			ReducingStateDescriptor reducingStateDescriptor = (ReducingStateDescriptor) stateDescriptor;
+			ReduceFunction reduceFn = reducingStateDescriptor.getReduceFunction();
+			ReducingState state = (ReducingState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			Object result = null;
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Object sourceValue = state.get();
+				if (result == null) {
+					result = state.get();
+				} else if (sourceValue != null) {
+					result = reduceFn.reduce(result, sourceValue);
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			if (result != null) {
+				state.add(result);
+			}
+		} else if (stateDescriptor instanceof ListStateDescriptor) {
+			ListState<Object> state = (ListState) getPartitionedState(target, namespaceSerializer, stateDescriptor);
+			KvState kvState = (KvState) state;
+			List<Object> result = new ArrayList<>();
+			for (N source: sources) {
+				kvState.setCurrentNamespace(source);
+				Iterable<Object> sourceValue = state.get();
+				if (sourceValue != null) {
+					for (Object o : sourceValue) {
+						result.add(o);
+					}
+				}
+				state.clear();
+			}
+			kvState.setCurrentNamespace(target);
+			for (Object o : result) {
+				state.add(o);
+			}
+		} else {
+			throw new RuntimeException("Cannot merge states for " + stateDescriptor);
+		}
+	}
+
 	public HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshotPartitionedState(long checkpointId, long timestamp) throws Exception {
 		if (keyValueStates != null) {
 			HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> snapshots = new HashMap<>(keyValueStatesByName.size());

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
index 1a59bf1..5a1b56d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/AllWindowedStream.java
@@ -39,6 +39,7 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughAllWindowFu
 import org.apache.flink.streaming.api.functions.windowing.FoldApplyAllWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyAllWindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.evictors.Evictor;
 import org.apache.flink.streaming.api.windowing.triggers.Trigger;
@@ -100,6 +101,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> trigger(Trigger<? super T, ? super W> trigger) {
+		if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
+			throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
+		}
+
 		this.trigger = trigger;
 		return this;
 	}
@@ -113,6 +118,10 @@ public class AllWindowedStream<T, W extends Window> {
 	 */
 	@PublicEvolving
 	public AllWindowedStream<T, W> evictor(Evictor<? super T, ? super W> evictor) {
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
+		}
+
 		this.evictor = evictor;
 		return this;
 	}
@@ -393,6 +402,9 @@ public class AllWindowedStream<T, W extends Window> {
 		if (foldFunction instanceof RichFunction) {
 			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
 		}
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
+		}
 
 		//clean the closures
 		function = input.getExecutionEnvironment().clean(function);

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
index 6282cbc..84290b2 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.functions.windowing.PassThroughWindowFunct
 import org.apache.flink.streaming.api.functions.windowing.ReduceApplyWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
@@ -111,6 +112,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> trigger(Trigger<? super T, ? super W> trigger) {
+		if (windowAssigner instanceof MergingWindowAssigner && !trigger.canMerge()) {
+			throw new UnsupportedOperationException("A merging window assigner cannot be used with a trigger that does not support merging.");
+		}
+
 		this.trigger = trigger;
 		return this;
 	}
@@ -124,6 +129,9 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	@PublicEvolving
 	public WindowedStream<T, K, W> evictor(Evictor<? super T, ? super W> evictor) {
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Cannot use a merging WindowAssigner with an Evictor.");
+		}
 		this.evictor = evictor;
 		return this;
 	}
@@ -413,7 +421,10 @@ public class WindowedStream<T, K, W extends Window> {
 	 */
 	public <R> SingleOutputStreamOperator<R> apply(R initialValue, FoldFunction<T, R> foldFunction, WindowFunction<R, R, K, W> function, TypeInformation<R> resultType) {
 		if (foldFunction instanceof RichFunction) {
-			throw new UnsupportedOperationException("ReduceFunction of apply can not be a RichFunction.");
+			throw new UnsupportedOperationException("FoldFunction of apply can not be a RichFunction.");
+		}
+		if (windowAssigner instanceof MergingWindowAssigner) {
+			throw new UnsupportedOperationException("Fold cannot be used with a merging WindowAssigner.");
 		}
 
 		//clean the closures

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
new file mode 100644
index 0000000..ed5add5
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(EventTimeSessionWindows.withGap(Time.minutes(1)));
+ * } </pre>
+ */
+public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	protected long sessionTimeout;
+
+	protected EventTimeSessionWindows(long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return EventTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "EventTimeSessionWindows(" + sessionTimeout + ")";
+	}
+
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param size The session timeout, i.e. the time gap between sessions
+	 * @return The policy.
+	 */
+	public static EventTimeSessionWindows withGap(Time size) {
+		return new EventTimeSessionWindows(size.toMilliseconds());
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+
+	/**
+	 * Merge overlapping {@link TimeWindow}s.
+	 */
+	public void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
+		TimeWindow.mergeWindows(windows, c);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
index d63a81d..dcf440c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/GlobalWindows.java
@@ -90,6 +90,12 @@ public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
 
 		@Override
 		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}
+
+		@Override
+		public TriggerResult onMerge(GlobalWindow window,
+				OnMergeContext ctx) {
+			return TriggerResult.CONTINUE;
+		}
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java
new file mode 100644
index 0000000..1450b1a
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/MergingWindowAssigner.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+
+import java.util.Collection;
+
+/**
+ * A {@code WindowAssigner} that can merge windows.
+ *
+ * @param <T> The type of elements that this WindowAssigner can assign windows to.
+ * @param <W> The type of {@code Window} that this assigner assigns.
+ */
+@PublicEvolving
+public abstract class MergingWindowAssigner<T, W extends Window> extends WindowAssigner<T, W> {
+	private static final long serialVersionUID = 1L;
+
+	/**
+	 * Determines which windows (if any) should be merged.
+	 *
+	 * @param windows The window candidates.
+	 * @param callback A callback that can be invoked to signal which windows should be merged.
+	 */
+	public abstract void mergeWindows(Collection<W> windows, MergeCallback<W> callback);
+
+	/**
+	 * Callback to be used in {@link #mergeWindows(Collection, MergeCallback)} for specifying which
+	 * windows should be merged.
+	 */
+	public interface MergeCallback<W> {
+
+		/**
+		 * Specifies that the given windows should be merged into the result window.
+		 *
+		 * @param toBeMerged The list of windows that should be merged into one window.
+		 * @param mergeResult The resulting merged window.
+		 */
+		void merge(Collection<W> toBeMerged, W mergeResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
new file mode 100644
index 0000000..608ebbc
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on the current processing
+ * time. Windows cannot overlap.
+ *
+ * <p>
+ * For example, in order to window into windows of 1 minute, every 10 seconds:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(ProcessingTimeSessionWindows.withGap(Time.minutes(1)));
+ * } </pre>
+ */
+public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	protected long sessionTimeout;
+
+	protected ProcessingTimeSessionWindows(long sessionTimeout) {
+		this.sessionTimeout = sessionTimeout;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(Object element, long timestamp) {
+		return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+	}
+
+	@Override
+	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return ProcessingTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "ProcessingTimeSessionWindows(" + sessionTimeout + ")";
+	}
+
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param size The session timeout, i.e. the time gap between sessions
+	 * @return The policy.
+	 */
+	public static ProcessingTimeSessionWindows withGap(Time size) {
+		return new ProcessingTimeSessionWindows(size.toMilliseconds());
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+
+	/**
+	 * Merge overlapping {@link TimeWindow}s.
+	 */
+	public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
+		TimeWindow.mergeWindows(windows, c);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
index 775d6d5..5cb0e4d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java
@@ -21,9 +21,10 @@ package org.apache.flink.streaming.api.windowing.triggers;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
-import org.apache.flink.api.common.typeutils.base.BooleanSerializer;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
+import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
@@ -41,8 +42,9 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 
 	private final long interval;
 
-	private final ValueStateDescriptor<Boolean> stateDesc = 
-			new ValueStateDescriptor<>("first", BooleanSerializer.INSTANCE, true);
+	/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
+	private final ReducingStateDescriptor<Long> stateDesc =
+			new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
 
 	private ContinuousEventTimeTrigger(long interval) {
 		this.interval = interval;
@@ -51,24 +53,32 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
 
-		ValueState<Boolean> first = ctx.getPartitionedState(stateDesc);
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
-		if (first.value()) {
+		if (fireTimestamp.get() == null) {
 			long start = timestamp - (timestamp % interval);
 			long nextFireTimestamp = start + interval;
 
 			ctx.registerEventTimeTimer(nextFireTimestamp);
 
-			first.update(false);
+			fireTimestamp.add(nextFireTimestamp);
 			return TriggerResult.CONTINUE;
 		}
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
-		ctx.registerEventTimeTimer(time + interval);
-		return TriggerResult.FIRE;
+	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+
+		if (fireTimestamp.get().equals(time)) {
+			fireTimestamp.clear();
+			fireTimestamp.add(time + interval);
+			ctx.registerEventTimeTimer(time + interval);
+			return TriggerResult.FIRE;
+
+		}
+		return TriggerResult.CONTINUE;
 	}
 
 	@Override
@@ -77,7 +87,21 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	}
 
 	@Override
-	public void clear(W window, TriggerContext ctx) throws Exception {}
+	public void clear(W window, TriggerContext ctx) throws Exception {
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		fireTimestamp.clear();
+	}
+
+	@Override
+	public boolean canMerge() {
+		return true;
+	}
+
+	@Override
+	public TriggerResult onMerge(W window, OnMergeContext ctx) {
+		ctx.mergePartitionedState(stateDesc);
+		return TriggerResult.CONTINUE;
+	}
 
 	@Override
 	public String toString() {
@@ -98,4 +122,13 @@ public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object
 	public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
 		return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
 	}
+
+	private static class Min implements ReduceFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return Math.min(value1, value2);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
index a952c42..c6e11b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
@@ -21,8 +21,9 @@ package org.apache.flink.streaming.api.windowing.triggers;
 import com.google.common.annotations.VisibleForTesting;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.windows.Window;
@@ -39,9 +40,9 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 
 	private final long interval;
 
-	private final ValueStateDescriptor<Long> stateDesc =
-			new ValueStateDescriptor<>("fire-timestamp", LongSerializer.INSTANCE, 0L);
-
+	/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
+	private final ReducingStateDescriptor<Long> stateDesc =
+			new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);
 
 	private ContinuousProcessingTimeTrigger(long interval) {
 		this.interval = interval;
@@ -49,25 +50,18 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 
 	@Override
 	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
-		long currentTime = System.currentTimeMillis();
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
-		ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
-		long nextFireTimestamp = fireState.value();
+		timestamp = System.currentTimeMillis();
 
-		if (nextFireTimestamp == 0) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
+		if (fireTimestamp.get() == null) {
+			long start = timestamp - (timestamp % interval);
+			long nextFireTimestamp = start + interval;
 
-			ctx.registerProcessingTimeTimer((start + interval));
-			return TriggerResult.CONTINUE;
-		}
-		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
+			ctx.registerProcessingTimeTimer(nextFireTimestamp);
 
-			ctx.registerProcessingTimeTimer((start + interval));
-
-			return TriggerResult.FIRE;
+			fireTimestamp.add(nextFireTimestamp);
+			return TriggerResult.CONTINUE;
 		}
 		return TriggerResult.CONTINUE;
 	}
@@ -79,22 +73,34 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 
 	@Override
 	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
 
-		ValueState<Long> fireState = ctx.getPartitionedState(stateDesc);
-		long nextFireTimestamp = fireState.value();
-
-		// only fire if an element didn't already fire
-		long currentTime = System.currentTimeMillis();
-		if (currentTime > nextFireTimestamp) {
-			long start = currentTime - (currentTime % interval);
-			fireState.update(start + interval);
+		if (fireTimestamp.get().equals(time)) {
+			fireTimestamp.clear();
+			fireTimestamp.add(time + interval);
+			ctx.registerProcessingTimeTimer(time + interval);
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
 	}
 
 	@Override
-	public void clear(W window, TriggerContext ctx) throws Exception {}
+	public void clear(W window, TriggerContext ctx) throws Exception {
+		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
+		fireTimestamp.clear();
+	}
+
+	@Override
+	public boolean canMerge() {
+		return true;
+	}
+
+	@Override
+	public TriggerResult onMerge(W window,
+			OnMergeContext ctx) {
+		ctx.mergePartitionedState(stateDesc);
+		return TriggerResult.CONTINUE;
+	}
 
 	@VisibleForTesting
 	public long getInterval() {
@@ -115,4 +121,13 @@ public class ContinuousProcessingTimeTrigger<W extends Window> extends Trigger<O
 	public static <W extends Window> ContinuousProcessingTimeTrigger<W> of(Time interval) {
 		return new ContinuousProcessingTimeTrigger<>(interval.toMilliseconds());
 	}
+
+	private static class Min implements ReduceFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return Math.min(value1, value2);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
index 34b65e1..86c5c4c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/CountTrigger.java
@@ -19,13 +19,12 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.state.ValueState;
-import org.apache.flink.api.common.state.ValueStateDescriptor;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.state.ReducingState;
+import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.common.typeutils.base.LongSerializer;
 import org.apache.flink.streaming.api.windowing.windows.Window;
 
-import java.io.IOException;
-
 /**
  * A {@link Trigger} that fires once the count of elements in a pane reaches the given count.
  *
@@ -37,8 +36,8 @@ public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 
 	private final long maxCount;
 
-	private final ValueStateDescriptor<Long> stateDesc =
-			new ValueStateDescriptor<>("count", LongSerializer.INSTANCE, 0L);
+	private final ReducingStateDescriptor<Long> stateDesc =
+			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);
 
 
 	private CountTrigger(long maxCount) {
@@ -46,12 +45,11 @@ public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 	}
 
 	@Override
-	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws IOException {
-		ValueState<Long> count = ctx.getPartitionedState(stateDesc);
-		long currentCount = count.value() + 1;
-		count.update(currentCount);
-		if (currentCount >= maxCount) {
-			count.update(0L);
+	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
+		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
+		count.add(1L);
+		if (count.get() >= maxCount) {
+			count.clear();
 			return TriggerResult.FIRE;
 		}
 		return TriggerResult.CONTINUE;
@@ -73,6 +71,21 @@ public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 	}
 
 	@Override
+	public boolean canMerge() {
+		return true;
+	}
+
+	@Override
+	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+		ctx.mergePartitionedState(stateDesc);
+		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
+		if (count.get() >= maxCount) {
+			return TriggerResult.FIRE;
+		}
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
 	public String toString() {
 		return "CountTrigger(" +  maxCount + ")";
 	}
@@ -86,4 +99,14 @@ public class CountTrigger<W extends Window> extends Trigger<Object, W> {
 	public static <W extends Window> CountTrigger<W> of(long maxCount) {
 		return new CountTrigger<>(maxCount);
 	}
+
+	private static class Sum implements ReduceFunction<Long> {
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public Long reduce(Long value1, Long value2) throws Exception {
+			return value1 + value2;
+		}
+
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
index f8e4cd7..75c6a9d 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/EventTimeTrigger.java
@@ -55,6 +55,18 @@ public class EventTimeTrigger extends Trigger<Object, TimeWindow> {
 	}
 
 	@Override
+	public boolean canMerge() {
+		return true;
+	}
+
+	@Override
+	public TriggerResult onMerge(TimeWindow window,
+			OnMergeContext ctx) {
+		ctx.registerEventTimeTimer(window.maxTimestamp());
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
 	public String toString() {
 		return "EventTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
index 387e73b..06193cd 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
@@ -53,6 +53,18 @@ public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
 	}
 
 	@Override
+	public boolean canMerge() {
+		return true;
+	}
+
+	@Override
+	public TriggerResult onMerge(TimeWindow window,
+			OnMergeContext ctx) {
+		ctx.registerProcessingTimeTimer(window.maxTimestamp());
+		return TriggerResult.CONTINUE;
+	}
+
+	@Override
 	public String toString() {
 		return "ProcessingTimeTrigger()";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
index 8fdc56b..289702a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/PurgingTrigger.java
@@ -86,6 +86,24 @@ public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
 	}
 
 	@Override
+	public boolean canMerge() {
+		return nestedTrigger.canMerge();
+	}
+
+	@Override
+	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+		TriggerResult triggerResult = nestedTrigger.onMerge(window, ctx);
+		switch (triggerResult) {
+			case FIRE:
+				return TriggerResult.FIRE_AND_PURGE;
+			case FIRE_AND_PURGE:
+				return TriggerResult.FIRE_AND_PURGE;
+			default:
+				return TriggerResult.CONTINUE;
+		}
+	}
+
+	@Override
 	public String toString() {
 		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
index db78558..452a5b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/Trigger.java
@@ -19,6 +19,7 @@
 package org.apache.flink.streaming.api.windowing.triggers;
 
 import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.state.MergingState;
 import org.apache.flink.api.common.state.State;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.state.ValueState;
@@ -31,18 +32,20 @@ import java.io.Serializable;
  * A {@code Trigger} determines when a pane of a window should be evaluated to emit the
  * results for that part of the window.
  *
- * <p>
- * A pane is the bucket of elements that have the same key (assigned by the
+ * <p>A pane is the bucket of elements that have the same key (assigned by the
  * {@link org.apache.flink.api.java.functions.KeySelector}) and same {@link Window}. An element can
  * be in multiple panes of it was assigned to multiple windows by the
  * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}. These panes all
  * have their own instance of the {@code Trigger}.
  *
- * <p>
- * Triggers must not maintain state internally since they can be re-created or reused for
+ * <p>Triggers must not maintain state internally since they can be re-created or reused for
  * different keys. All necessary state should be persisted using the state abstraction
  * available on the {@link TriggerContext}.
  *
+ * <p>When used with a {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}
+ * the {@code Trigger} must return {@code true} from {@link #canMerge()} and
+ * {@link #onMerge(Window, OnMergeContext)} most be properly implemented.
+ *
  * @param <T> The type of elements on which this {@code Trigger} works.
  * @param <W> The type of {@link Window Windows} on which this {@code Trigger} can operate.
  */
@@ -57,7 +60,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	 *
 	 * @param element The element that arrived.
 	 * @param timestamp The timestamp of the element that arrived.
-	 * @param window The window to which this pane belongs.
+	 * @param window The window to which the element is being added.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
 	public abstract TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception;
@@ -66,6 +69,7 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	 * Called when a processing-time timer that was set using the trigger context fires.
 	 *
 	 * @param time The timestamp at which the timer fired.
+	 * @param window The window for which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
 	public abstract TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception;
@@ -74,11 +78,35 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 	 * Called when an event-time timer that was set using the trigger context fires.
 	 *
 	 * @param time The timestamp at which the timer fired.
+	 * @param window The window for which the timer fired.
 	 * @param ctx A context object that can be used to register timer callbacks.
 	 */
 	public abstract TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception;
 
 	/**
+	 * Returns true if this trigger supports merging of trigger state and can therefore
+	 * be used with a
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner}.
+	 *
+	 * <p>If this returns {@code true} you must properly implement
+	 * {@link #onMerge(Window, OnMergeContext)}
+	 */
+	public boolean canMerge() {
+		return false;
+	}
+
+	/**
+	 * Called when several windows have been merged into one window by the
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner}.
+	 *
+	 * @param window The new window that results from the merge.
+	 * @param ctx A context object that can be used to register timer callbacks and access state.
+	 */
+	public TriggerResult onMerge(W window, OnMergeContext ctx) throws Exception {
+		throw new RuntimeException("This trigger does not support merging.");
+	}
+
+	/**
 	 * Clears any state that the trigger might still hold for the given window. This is called
 	 * when a window is purged. Timers set using {@link TriggerContext#registerEventTimeTimer(long)}
 	 * and {@link TriggerContext#registerProcessingTimeTimer(long)} should be deleted here as
@@ -181,4 +209,12 @@ public abstract class Trigger<T, W extends Window> implements Serializable {
 		@Deprecated
 		<S extends Serializable> ValueState<S> getKeyValueState(String name, TypeInformation<S> stateType, S defaultState);
 	}
+
+	/**
+	 * Extension of {@link TriggerContext} that is given to
+	 * {@link Trigger#onMerge(Window, OnMergeContext)}.
+	 */
+	public interface OnMergeContext extends TriggerContext {
+		<S extends MergingState<?, ?>> void mergePartitionedState(StateDescriptor<S, ?> stateDescriptor);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
index 517a2f5..5dfd60b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.java
@@ -19,10 +19,19 @@ package org.apache.flink.streaming.api.windowing.windows;
 
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
 
 /**
  * A {@link Window} that represents a time interval from {@code start} (inclusive) to
@@ -81,6 +90,20 @@ public class TimeWindow extends Window {
 				'}';
 	}
 
+	/**
+	 * Returns {@code true} if this window intersects the given window.
+	 */
+	public boolean intersects(TimeWindow other) {
+		return this.start <= other.end && this.end >= other.start;
+	}
+
+	/**
+	 * Returns the minimal window covers both this window and the given window.
+	 */
+	public TimeWindow cover(TimeWindow other) {
+		return new TimeWindow(Math.min(start, other.start), Math.max(end, other.end));
+	}
+
 	public static class Serializer extends TypeSerializer<TimeWindow> {
 		private static final long serialVersionUID = 1L;
 
@@ -156,4 +179,52 @@ public class TimeWindow extends Window {
 		}
 	}
 
+	/**
+	 * Merge overlapping {@link TimeWindow}s. For use by merging
+	 * {@link org.apache.flink.streaming.api.windowing.assigners.WindowAssigner WindowAssigners}.
+	 */
+	public static void mergeWindows(Collection<TimeWindow> windows, MergingWindowAssigner.MergeCallback<TimeWindow> c) {
+
+		// sort the windows by the start time and then merge overlapping windows
+
+		List<TimeWindow> sortedWindows = new ArrayList<>(windows);
+
+		Collections.sort(sortedWindows, new Comparator<TimeWindow>() {
+			@Override
+			public int compare(TimeWindow o1, TimeWindow o2) {
+				return Long.compare(o1.getStart(), o2.getStart());
+			}
+		});
+
+		List<Tuple2<TimeWindow, Set<TimeWindow>>> merged = new ArrayList<>();
+		Tuple2<TimeWindow, Set<TimeWindow>> currentMerge = null;
+
+		for (TimeWindow candidate: sortedWindows) {
+			if (currentMerge == null) {
+				currentMerge = new Tuple2<>();
+				currentMerge.f0 = candidate;
+				currentMerge.f1 = new HashSet<>();
+				currentMerge.f1.add(candidate);
+			} else if (currentMerge.f0.intersects(candidate)) {
+				currentMerge.f0 = currentMerge.f0.cover(candidate);
+				currentMerge.f1.add(candidate);
+			} else {
+				merged.add(currentMerge);
+				currentMerge = new Tuple2<>();
+				currentMerge.f0 = candidate;
+				currentMerge.f1 = new HashSet<>();
+				currentMerge.f1.add(candidate);
+			}
+		}
+
+		if (currentMerge != null) {
+			merged.add(currentMerge);
+		}
+
+		for (Tuple2<TimeWindow, Set<TimeWindow>> m: merged) {
+			if (m.f1.size() > 1) {
+				c.merge(m.f1, m.f0);
+			}
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index 16ca488..54d8f9f 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -23,7 +23,7 @@ import com.google.common.collect.FluentIterable;
 import com.google.common.collect.Iterables;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.state.ListState;
-import org.apache.flink.api.common.state.MergingState;
+import org.apache.flink.api.common.state.AppendingState;
 import org.apache.flink.api.common.state.StateDescriptor;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.api.java.functions.KeySelector;
@@ -156,7 +156,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window
 	@Override
 	@VisibleForTesting
 	@SuppressWarnings("unchecked, rawtypes")
-	public StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
-		return (StateDescriptor<? extends MergingState<IN, Iterable<IN>>, ?>) windowStateDescriptor;
+	public StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?> getStateDescriptor() {
+		return (StateDescriptor<? extends AppendingState<IN, Iterable<IN>>, ?>) windowStateDescriptor;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6cd8ceb1/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
new file mode 100644
index 0000000..7ef1af4
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/MergingWindowSet.java
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.windows.Window;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Utility for keeping track of merging {@link Window Windows} when using a
+ * {@link MergingWindowAssigner} in a {@link WindowOperator}.
+ *
+ * <p>When merging windows, we keep one of the original windows as the state window, i.e. the
+ * window that is used as namespace to store the window elements. Elements from the state
+ * windows of merged windows must be merged into this one state window. We keep
+ * a mapping from in-flight window to state window that can be queried using
+ * {@link #getStateWindow(Window)}.
+ *
+ * <p>A new window can be added to the set of in-flight windows using
+ * {@link #addWindow(Window, MergeFunction)}, this might merge other windows and the caller
+ * must react accordingly in {@link MergeFunction#merge(Object, Collection, Object, Collection)
+ * and adjust the outside view of windows and state.
+ *
+ * <p>Windows can be removed from the set of windows using {@link #retireWindow(Window)}.
+ *
+ * @param <W> The type of {@code Window} that this set is keeping track of.
+ */
+public class MergingWindowSet<W extends Window> {
+
+	private static final Logger LOG = LoggerFactory.getLogger(MergingWindowSet.class);
+
+	/**
+	 * Mapping from window to the window that keeps the window state. When
+	 * we are incrementally merging windows starting from some window we keep that starting
+	 * window as the state window to prevent costly state juggling.
+	 */
+	private final Map<W, W> windows;
+
+	/**
+	 * Our window assigner.
+	 */
+	private final MergingWindowAssigner<?, W> windowAssigner;
+
+	/**
+	 * Creates a new {@link MergingWindowSet} that uses the given {@link MergingWindowAssigner}.
+	 */
+	public MergingWindowSet(MergingWindowAssigner<?, W> windowAssigner) {
+		this.windowAssigner = windowAssigner;
+
+		windows = new HashMap<>();
+	}
+
+	/**
+	 * Returns the state window for the given in-flight {@code Window}. The state window is the
+	 * {@code Window} in which we keep the actual state of a given in-flight window. Windows
+	 * might expand but we keep to original state window for keeping the elements of the window
+	 * to avoid costly state juggling.
+	 *
+	 * @param window The window for which to get the state window.
+	 */
+	public W getStateWindow(W window) {
+		W result = windows.get(window);
+		if (result == null) {
+			throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
+		}
+
+		return result;
+	}
+
+	/**
+	 * Removes the given window from the set of in-flight windows.
+	 *
+	 * @param window The {@code Window} to remove.
+	 */
+	public void retireWindow(W window) {
+		W removed = this.windows.remove(window);
+		if (removed == null) {
+			throw new IllegalStateException("Window " + window + " is not in in-flight window set.");
+		}
+	}
+
+	/**
+	 * Adds a new {@code Window} to the set of in-flight windows. It might happen that this
+	 * triggers merging of previously in-flight windows. In that case, the provided
+	 * {@link MergeFunction} is called.
+	 *
+	 * <p>This returns the window that is the representative of the added window after adding.
+	 * This can either be the new window itself, if no merge occured, or the newly merged
+	 * window. Adding an element to a window or calling trigger functions should only
+	 * happen on the returned representative. This way, we never have to deal with a new window
+	 * that is immediately swallowed up by another window.
+	 *
+	 * <p>If the new window is merged the {@code MergeFunction} callback arguments also don't
+	 * contain the new window as part of the list of merged windows.
+	 *
+	 * @param newWindow The new {@code Window} to add.
+	 * @param mergeFunction The callback to be invoked in case a merge occurs.
+	 *
+	 * @return The {@code Window} that new new {@code Window} ended up in. This can also be the
+	 *          the new {@code Window} itself in case no merge occurred.
+	 * @throws Exception
+	 */
+	public W addWindow(W newWindow, MergeFunction<W> mergeFunction) throws Exception {
+
+		List<W> windows = new ArrayList<>();
+
+		windows.addAll(this.windows.keySet());
+		windows.add(newWindow);
+
+		final Map<W, Collection<W>> mergeResults = new HashMap<>();
+		windowAssigner.mergeWindows(windows,
+				new MergingWindowAssigner.MergeCallback<W>() {
+					@Override
+					public void merge(Collection<W> toBeMerged, W mergeResult) {
+						if (LOG.isDebugEnabled()) {
+							LOG.debug("Merging {} into {}", toBeMerged, mergeResult);
+						}
+						mergeResults.put(mergeResult, toBeMerged);
+					}
+				});
+
+		W resultWindow = newWindow;
+
+		// perform the merge
+		for (Map.Entry<W, Collection<W>> c: mergeResults.entrySet()) {
+			W mergeResult = c.getKey();
+			Collection<W> mergedWindows = c.getValue();
+
+			// if our new window is in the merged windows make the merge result the
+			// result window
+			if (mergedWindows.remove(newWindow)) {
+				resultWindow = mergeResult;
+			}
+
+			// pick any of the merged windows and choose that window's state window
+			// as the state window for the merge result
+			W mergedStateWindow = this.windows.get(mergedWindows.iterator().next());
+
+			// figure out the state windows that we are merging
+			List<W> mergedStateWindows = new ArrayList<>();
+			for (W mergedWindow: mergedWindows) {
+				W res = this.windows.remove(mergedWindow);
+				if (res != null) {
+					mergedStateWindows.add(res);
+				}
+			}
+
+			this.windows.put(mergeResult, mergedStateWindow);
+
+			// don't put the target state window into the merged windows
+			mergedStateWindows.remove(mergedStateWindow);
+
+			// don't merge the new window itself, it never had any state associated with it
+			// i.e. if we are only merging one pre-existing window into itself
+			// without extending the pre-exising window
+			if (!(mergedWindows.contains(mergeResult) && mergedWindows.size() == 1)) {
+				mergeFunction.merge(mergeResult,
+						mergedWindows,
+						this.windows.get(mergeResult),
+						mergedStateWindows);
+			}
+		}
+
+		// the new window created a new, self-contained window without merging
+		if (resultWindow.equals(newWindow)) {
+			this.windows.put(resultWindow, resultWindow);
+		}
+
+		return resultWindow;
+	}
+
+	/**
+	 * Callback for {@link #addWindow(Window, MergeFunction)}.
+	 * @param <W>
+	 */
+	public interface MergeFunction<W> {
+
+		/**
+		 * This gets called when a merge occurs.
+		 *
+		 * @param mergeResult The newly resulting merged {@code Window}.
+		 * @param mergedWindows The merged {@code Window Windows}.
+		 * @param stateWindowResult The state window of the merge result.
+		 * @param mergedStateWindows The merged state windows.
+		 * @throws Exception
+		 */
+		void merge(W mergeResult, Collection<W> mergedWindows, W stateWindowResult, Collection<W> mergedStateWindows) throws Exception;
+	}
+}


Mime
View raw message