flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject flink git commit: [FLINK-8384] [streaming] Dynamic Gap Session Window Assigner
Date Mon, 05 Feb 2018 15:47:19 GMT
Repository: flink
Updated Branches:
  refs/heads/master 92ad53e6a -> ede4c0751


[FLINK-8384] [streaming] Dynamic Gap Session Window Assigner


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ede4c075
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ede4c075
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ede4c075

Branch: refs/heads/master
Commit: ede4c0751b630503605248e8d22f29977f58624a
Parents: 92ad53e
Author: Dyana Rose <dyana.rose@salecycle.com>
Authored: Wed Jan 10 15:50:00 2018 +0000
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Mon Feb 5 15:54:42 2018 +0100

----------------------------------------------------------------------
 docs/dev/stream/operators/windows.md            |  55 ++++-
 .../DynamicEventTimeSessionWindows.java         | 104 ++++++++++
 .../DynamicProcessingTimeSessionWindows.java    | 105 ++++++++++
 .../assigners/EventTimeSessionWindows.java      |  13 ++
 .../assigners/ProcessingTimeSessionWindows.java |  13 ++
 .../SessionWindowTimeGapExtractor.java          |  38 ++++
 .../DynamicEventTimeSessionWindowsTest.java     | 199 ++++++++++++++++++
 ...DynamicProcessingTimeSessionWindowsTest.java | 202 +++++++++++++++++++
 .../windowing/EventTimeSessionWindowsTest.java  |  12 ++
 .../ProcessingTimeSessionWindowsTest.java       |  12 ++
 .../operators/windowing/WindowOperatorTest.java | 177 ++++++++++++++++
 11 files changed, 922 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/docs/dev/stream/operators/windows.md
----------------------------------------------------------------------
diff --git a/docs/dev/stream/operators/windows.md b/docs/dev/stream/operators/windows.md
index 0327d06..4deacce 100644
--- a/docs/dev/stream/operators/windows.md
+++ b/docs/dev/stream/operators/windows.md
@@ -281,9 +281,9 @@ For example, in China you would have to specify an offset of `Time.hours(-8)`.
 The *session windows* assigner groups elements by sessions of activity. Session windows do not overlap and
 do not have a fixed start and end time, in contrast to *tumbling windows* and *sliding windows*. Instead a
 session window closes when it does not receive elements for a certain period of time, *i.e.*, when a gap of
-inactivity occurred. A session window assigner is configured with the *session gap* which
-defines how long is the required period of inactivity. When this period expires, the current session closes
-and subsequent elements are assigned to a new session window.
+inactivity occurred. A session window assigner can be configured with either a static *session gap* or with a 
+*session gap extractor* function which defines how long the period of inactivity is. When this period expires, 
+the current session closes and subsequent elements are assigned to a new session window.
 
 <img src="{{ site.baseurl }}/fig/session-windows.svg" class="center" style="width: 100%;" />
 
@@ -294,17 +294,33 @@ The following code snippets show how to use session windows.
 {% highlight java %}
 DataStream<T> input = ...;
 
-// event-time session windows
+// event-time session windows with static gap
 input
     .keyBy(<key selector>)
     .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
     .<windowed transformation>(<window function>);
+    
+// event-time session windows with dynamic gap
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withDynamicGap((element) -> {
+        // determine and return session gap
+    }))
+    .<windowed transformation>(<window function>);
 
-// processing-time session windows
+// processing-time session windows with static gap
 input
     .keyBy(<key selector>)
     .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
     .<windowed transformation>(<window function>);
+    
+// processing-time session windows with dynamic gap
+input
+    .keyBy(<key selector>)
+    .window(ProcessingTimeSessionWindows.withDynamicGap((element) -> {
+        // determine and return session gap
+    }))
+    .<windowed transformation>(<window function>);
 {% endhighlight %}
 </div>
 
@@ -312,24 +328,47 @@ input
 {% highlight scala %}
 val input: DataStream[T] = ...
 
-// event-time session windows
+// event-time session windows with static gap
 input
     .keyBy(<key selector>)
     .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
     .<windowed transformation>(<window function>)
 
-// processing-time session windows
+// event-time session windows with dynamic gap
+input
+    .keyBy(<key selector>)
+    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
+      override def extract(element: String): Long = {
+        // determine and return session gap
+      }
+    }))
+    .<windowed transformation>(<window function>)
+
+// processing-time session windows with static gap
 input
     .keyBy(<key selector>)
     .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
     .<windowed transformation>(<window function>)
+
+
+// processing-time session windows with dynamic gap
+input
+    .keyBy(<key selector>)
+    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
+      override def extract(element: String): Long = {
+        // determine and return session gap
+      }
+    }))
+    .<windowed transformation>(<window function>)
 {% endhighlight %}
 </div>
 </div>
 
-Time intervals can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`,
+Static gaps can be specified by using one of `Time.milliseconds(x)`, `Time.seconds(x)`,
 `Time.minutes(x)`, and so on.
 
+Dynamic gaps are specified by implementing the `SessionWindowTimeGapExtractor` interface.
+
 <span class="label label-danger">Attention</span> Since session windows do not have a fixed start and end,
 they are  evaluated differently than tumbling and sliding windows. Internally, a session window operator
 creates a new window for each arriving record and merges windows together if their are closer to each other

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
new file mode 100644
index 0000000..bba59ed
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicEventTimeSessionWindows.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on the timestamp of the
+ * elements. Windows cannot overlap.
+ *
+ * <p>For example, in order to window into windows with a dynamic time gap:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(DynamicEventTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor }));
+ * } </pre>
+ *
+ * @param <T> The type of the input elements
+ */
+@PublicEvolving
+public class DynamicEventTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
+
+	protected DynamicEventTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
+		long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
+		if (sessionTimeout <= 0) {
+			throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
+		}
+		return Collections.singletonList(new TimeWindow(timestamp, timestamp + sessionTimeout));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return (Trigger<T, TimeWindow>) EventTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "DynamicEventTimeSessionWindows()";
+	}
+
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
+	 * @return The policy.
+	 */
+	public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+
+	@Override
+	public boolean isEventTime() {
+		return true;
+	}
+
+	/**
+	 * Merge overlapping {@link TimeWindow}s.
+	 */
+	public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
+		TimeWindow.mergeWindows(windows, c);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
new file mode 100644
index 0000000..073cc05
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/DynamicProcessingTimeSessionWindows.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+
+import java.util.Collection;
+import java.util.Collections;
+
+/**
+ * A {@link WindowAssigner} that windows elements into sessions based on the current processing
+ * time. Windows cannot overlap.
+ *
+ * <p>For example, in order to window into windows with a dynamic time gap:
+ * <pre> {@code
+ * DataStream<Tuple2<String, Integer>> in = ...;
+ * KeyedStream<String, Tuple2<String, Integer>> keyed = in.keyBy(...);
+ * WindowedStream<Tuple2<String, Integer>, String, TimeWindows> windowed =
+ *   keyed.window(DynamicProcessingTimeSessionWindows.withDynamicGap({@link SessionWindowTimeGapExtractor }));
+ * } </pre>
+ *
+ * @param <T> The type of the input elements
+ */
+@PublicEvolving
+public class DynamicProcessingTimeSessionWindows<T> extends MergingWindowAssigner<T, TimeWindow> {
+	private static final long serialVersionUID = 1L;
+
+	protected SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor;
+
+	protected DynamicProcessingTimeSessionWindows(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		this.sessionWindowTimeGapExtractor = sessionWindowTimeGapExtractor;
+	}
+
+	@Override
+	public Collection<TimeWindow> assignWindows(T element, long timestamp, WindowAssignerContext context) {
+		long currentProcessingTime = context.getCurrentProcessingTime();
+		long sessionTimeout = sessionWindowTimeGapExtractor.extract(element);
+		if (sessionTimeout <= 0) {
+			throw new IllegalArgumentException("Dynamic session time gap must satisfy 0 < gap");
+		}
+		return Collections.singletonList(new TimeWindow(currentProcessingTime, currentProcessingTime + sessionTimeout));
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	public Trigger<T, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
+		return (Trigger<T, TimeWindow>) ProcessingTimeTrigger.create();
+	}
+
+	@Override
+	public String toString() {
+		return "DynamicProcessingTimeSessionWindows()";
+	}
+
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
+	 * @return The policy.
+	 */
+	public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
+	}
+
+	@Override
+	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
+		return new TimeWindow.Serializer();
+	}
+
+	@Override
+	public boolean isEventTime() {
+		return false;
+	}
+
+	/**
+	 * Merge overlapping {@link TimeWindow}s.
+	 */
+	public void mergeWindows(Collection<TimeWindow> windows, MergeCallback<TimeWindow> c) {
+		TimeWindow.mergeWindows(windows, c);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
index 249f219..a33b28a 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/EventTimeSessionWindows.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -80,6 +81,18 @@ public class EventTimeSessionWindows extends MergingWindowAssigner<Object, TimeW
 		return new EventTimeSessionWindows(size.toMilliseconds());
 	}
 
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
+	 * @return The policy.
+	 */
+	@PublicEvolving
+	public static <T> DynamicEventTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		return new DynamicEventTimeSessionWindows<>(sessionWindowTimeGapExtractor);
+	}
+
 	@Override
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
index e553310..8c5ab55 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/ProcessingTimeSessionWindows.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.streaming.api.windowing.assigners;
 
+import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -81,6 +82,18 @@ public class ProcessingTimeSessionWindows extends MergingWindowAssigner<Object,
 		return new ProcessingTimeSessionWindows(size.toMilliseconds());
 	}
 
+	/**
+	 * Creates a new {@code SessionWindows} {@link WindowAssigner} that assigns
+	 * elements to sessions based on the element timestamp.
+	 *
+	 * @param sessionWindowTimeGapExtractor The extractor to use to extract the time gap from the input elements
+	 * @return The policy.
+	 */
+	@PublicEvolving
+	public static <T> DynamicProcessingTimeSessionWindows<T> withDynamicGap(SessionWindowTimeGapExtractor<T> sessionWindowTimeGapExtractor) {
+		return new DynamicProcessingTimeSessionWindows<>(sessionWindowTimeGapExtractor);
+	}
+
 	@Override
 	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
 		return new TimeWindow.Serializer();

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
new file mode 100644
index 0000000..5fa8a79
--- /dev/null
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/assigners/SessionWindowTimeGapExtractor.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.windowing.assigners;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+import java.io.Serializable;
+
+/**
+ * A {@code SessionWindowTimeGapExtractor} extracts session time gaps for Dynamic Session Window Assigners.
+ *
+ * @param <T> The type of elements that this {@code SessionWindowTimeGapExtractor} can extract session time gaps from.
+ */
+@PublicEvolving
+public interface SessionWindowTimeGapExtractor<T> extends Serializable {
+	/**
+	 * Extracts the session time gap.
+	 * @param element The input element.
+	 * @return The session time gap in milliseconds.
+	 */
+	long extract(T element);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java
new file mode 100644
index 0000000..d048dad
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicEventTimeSessionWindowsTest.java
@@ -0,0 +1,199 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link DynamicEventTimeSessionWindows}.
+ */
+public class DynamicEventTimeSessionWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+
+		WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
+		SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
+		when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
+		when(extractor.extract(eq("gap9000"))).thenReturn(9000L);
+
+		DynamicEventTimeSessionWindows<String> assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assertThat(assigner.assignWindows("gap5000", 0L, mockContext), contains(timeWindow(0, 5000)));
+		assertThat(assigner.assignWindows("gap4000", 4999L, mockContext), contains(timeWindow(4999, 8999)));
+		assertThat(assigner.assignWindows("gap9000", 5000L, mockContext), contains(timeWindow(5000, 14000)));
+	}
+
+	@Test
+	public void testMergeSinglePointWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeSingleWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeConsecutiveWindows() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(0, 1),
+						new TimeWindow(1, 2),
+						new TimeWindow(2, 3),
+						new TimeWindow(4, 5),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
+				eq(new TimeWindow(0, 3)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+				eq(new TimeWindow(4, 6)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeCoveringWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(1, 1),
+						new TimeWindow(0, 2),
+						new TimeWindow(4, 7),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+				eq(new TimeWindow(0, 2)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+				eq(new TimeWindow(4, 7)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
+		try {
+			SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+			when(extractor.extract(any())).thenReturn(-1L);
+
+			DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+			assigner.assignWindows(Lists.newArrayList(new Object()), 1, mockContext);
+
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < gap"));
+		}
+
+		try {
+			SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+			when(extractor.extract(any())).thenReturn(0L);
+
+			DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+			assigner.assignWindows(Lists.newArrayList(new Object()), 1, mockContext);
+
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < gap"));
+		}
+
+	}
+
+	@Test
+	public void testProperties() {
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicEventTimeSessionWindows assigner = DynamicEventTimeSessionWindows.withDynamicGap(extractor);
+
+		assertTrue(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java
new file mode 100644
index 0000000..a6bcedc
--- /dev/null
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/DynamicProcessingTimeSessionWindowsTest.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.runtime.operators.windowing;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
+import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
+import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.TestLogger;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
+import org.junit.Test;
+import org.mockito.Matchers;
+
+import java.util.Collection;
+
+import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.timeWindow;
+import static org.hamcrest.CoreMatchers.containsString;
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Mockito.argThat;
+import static org.mockito.Mockito.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for {@link DynamicProcessingTimeSessionWindows}.
+ */
+public class DynamicProcessingTimeSessionWindowsTest extends TestLogger {
+
+	@Test
+	public void testWindowAssignment() {
+
+		WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
+		SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(eq("gap5000"))).thenReturn(5000L);
+		when(extractor.extract(eq("gap4000"))).thenReturn(4000L);
+		when(extractor.extract(eq("gap9000"))).thenReturn(9000L);
+
+		DynamicProcessingTimeSessionWindows<String> assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(0L);
+		assertThat(assigner.assignWindows("gap5000", Long.MIN_VALUE, mockContext), contains(timeWindow(0, 5000)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(4999L);
+		assertThat(assigner.assignWindows("gap4000", Long.MIN_VALUE, mockContext), contains(timeWindow(4999, 8999)));
+
+		when(mockContext.getCurrentProcessingTime()).thenReturn(5000L);
+		assertThat(assigner.assignWindows("gap9000", Long.MIN_VALUE, mockContext), contains(timeWindow(5000, 14000)));
+	}
+
+	@Test
+	public void testMergeSinglePointWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 0)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeSingleWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(Lists.newArrayList(new TimeWindow(0, 1)), callback);
+
+		verify(callback, never()).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeConsecutiveWindows() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(0, 1),
+						new TimeWindow(1, 2),
+						new TimeWindow(2, 3),
+						new TimeWindow(4, 5),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(0, 1), new TimeWindow(1, 2), new TimeWindow(2, 3))),
+				eq(new TimeWindow(0, 3)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(4, 5), new TimeWindow(5, 6))),
+				eq(new TimeWindow(4, 6)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testMergeCoveringWindow() {
+		MergingWindowAssigner.MergeCallback callback = mock(MergingWindowAssigner.MergeCallback.class);
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assigner.mergeWindows(
+				Lists.newArrayList(
+						new TimeWindow(1, 1),
+						new TimeWindow(0, 2),
+						new TimeWindow(4, 7),
+						new TimeWindow(5, 6)),
+				callback);
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(1, 1), new TimeWindow(0, 2))),
+				eq(new TimeWindow(0, 2)));
+
+		verify(callback, times(1)).merge(
+				(Collection<TimeWindow>) argThat(containsInAnyOrder(new TimeWindow(5, 6), new TimeWindow(4, 7))),
+				eq(new TimeWindow(4, 7)));
+
+		verify(callback, times(2)).merge(anyCollection(), Matchers.anyObject());
+	}
+
+	@Test
+	public void testInvalidParameters() {
+		WindowAssigner.WindowAssignerContext mockContext = mock(WindowAssigner.WindowAssignerContext.class);
+		try {
+			SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+			when(extractor.extract(any())).thenReturn(-1L);
+
+			DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+			assigner.assignWindows(Lists.newArrayList(new Object()), 1, mockContext);
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < gap"));
+		}
+
+		try {
+			SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+			when(extractor.extract(any())).thenReturn(-1L);
+
+			DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+			assigner.assignWindows(Lists.newArrayList(new Object()), 1, mockContext);
+			fail("should fail");
+		} catch (IllegalArgumentException e) {
+			assertThat(e.toString(), containsString("0 < gap"));
+		}
+
+	}
+
+	@Test
+	public void testProperties() {
+		SessionWindowTimeGapExtractor extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any())).thenReturn(5000L);
+
+		DynamicProcessingTimeSessionWindows assigner = DynamicProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assertFalse(assigner.isEventTime());
+		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
+		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
index 5c4c989..7d96a1f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/EventTimeSessionWindowsTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
+import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
@@ -41,6 +43,7 @@ import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
@@ -186,4 +189,13 @@ public class EventTimeSessionWindowsTest extends TestLogger {
 		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
 		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(EventTimeTrigger.class));
 	}
+
+	@Test
+	public void testDynamicGapProperties() {
+		SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
+		DynamicEventTimeSessionWindows<String> assigner = EventTimeSessionWindows.withDynamicGap(extractor);
+
+		assertNotNull(assigner);
+		assertTrue(assigner.isEventTime());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
index f49799c..5faca94 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/ProcessingTimeSessionWindowsTest.java
@@ -20,8 +20,10 @@ package org.apache.flink.streaming.runtime.operators.windowing;
 
 import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner;
 import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
 import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger;
@@ -42,6 +44,7 @@ import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.anyCollection;
@@ -193,4 +196,13 @@ public class ProcessingTimeSessionWindowsTest extends TestLogger {
 		assertEquals(new TimeWindow.Serializer(), assigner.getWindowSerializer(new ExecutionConfig()));
 		assertThat(assigner.getDefaultTrigger(mock(StreamExecutionEnvironment.class)), instanceOf(ProcessingTimeTrigger.class));
 	}
+
+	@Test
+	public void testDynamicGapProperties() {
+		SessionWindowTimeGapExtractor<String> extractor = mock(SessionWindowTimeGapExtractor.class);
+		DynamicProcessingTimeSessionWindows<String> assigner = ProcessingTimeSessionWindows.withDynamicGap(extractor);
+
+		assertNotNull(assigner);
+		assertFalse(assigner.isEventTime());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ede4c075/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index c03207e..f85b7fb 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -37,9 +37,12 @@ import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
 import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicEventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.DynamicProcessingTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
 import org.apache.flink.streaming.api.windowing.assigners.GlobalWindows;
 import org.apache.flink.streaming.api.windowing.assigners.ProcessingTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.assigners.SessionWindowTimeGapExtractor;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
 import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
@@ -86,6 +89,9 @@ import java.util.concurrent.atomic.AtomicInteger;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for {@link WindowOperator}.
@@ -1254,6 +1260,177 @@ public class WindowOperatorTest extends TestLogger {
 	}
 
 	@Test
+	@SuppressWarnings("unchecked")
+	public void testDynamicEventTimeSessionWindows() throws Exception {
+		closeCalled.set(0);
+
+		SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
+			Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
+			switch (element.f0) {
+				case "key1":
+					return 3000;
+				case "key2":
+					switch (element.f1) {
+						case 10:
+							return 1000;
+						default:
+							return 2000;
+					}
+				default:
+					return 0;
+			}
+		});
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+			DynamicEventTimeSessionWindows.withDynamicGap(extractor),
+			new TimeWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+			EventTimeTrigger.create(),
+			0,
+			null /* late data output tag */);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+			createTestHarness(operator);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// test different gaps for different keys
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));
+
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+
+		testHarness.processWatermark(new Watermark(8999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
+		expectedOutput.add(new Watermark(8999));
+
+		// test gap when it produces an end time before current timeout
+		// the furthest timeout is respected
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));
+
+		testHarness.processWatermark(new Watermark(12999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
+		expectedOutput.add(new Watermark(12999));
+
+		// test gap when it produces an end time after current timeout
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));
+
+		testHarness.processWatermark(new Watermark(16999));
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
+		expectedOutput.add(new Watermark(16999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
+	@SuppressWarnings("unchecked")
+	public void testDynamicProcessingTimeSessionWindows() throws Exception {
+		closeCalled.set(0);
+
+		SessionWindowTimeGapExtractor<Tuple2<String, Integer>> extractor = mock(SessionWindowTimeGapExtractor.class);
+		when(extractor.extract(any(Tuple2.class))).thenAnswer(invocation -> {
+			Tuple2<String, Integer> element = (Tuple2<String, Integer>) invocation.getArguments()[0];
+			switch (element.f0) {
+				case "key1":
+					return 3000;
+				case "key2":
+					switch (element.f1) {
+						case 10:
+							return 1000;
+						default:
+							return 2000;
+					}
+				default:
+					return 0;
+			}
+		});
+
+		TypeInformation<Tuple2<String, Integer>> inputType = TypeInfoParser.parse("Tuple2<String, Integer>");
+
+		ListStateDescriptor<Tuple2<String, Integer>> stateDesc = new ListStateDescriptor<>("window-contents",
+			inputType.createSerializer(new ExecutionConfig()));
+
+		WindowOperator<String, Tuple2<String, Integer>, Iterable<Tuple2<String, Integer>>, Tuple3<String, Long, Long>, TimeWindow> operator = new WindowOperator<>(
+			DynamicProcessingTimeSessionWindows.withDynamicGap(extractor),
+			new TimeWindow.Serializer(),
+			new TupleKeySelector(),
+			BasicTypeInfo.STRING_TYPE_INFO.createSerializer(new ExecutionConfig()),
+			stateDesc,
+			new InternalIterableWindowFunction<>(new SessionWindowFunction()),
+			ProcessingTimeTrigger.create(),
+			0,
+			null /* late data output tag */);
+
+		OneInputStreamOperatorTestHarness<Tuple2<String, Integer>, Tuple3<String, Long, Long>> testHarness =
+			createTestHarness(operator);
+
+		ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>();
+
+		testHarness.open();
+
+		// test different gaps for different keys
+		testHarness.setProcessingTime(10);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key1", 3), 10));
+
+		testHarness.setProcessingTime(5000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 4), 5000));
+		testHarness.setProcessingTime(6000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 5), 6000));
+		testHarness.setProcessingTime(8999);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key1-3", 10L, 3010L), 3009));
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-9", 5000L, 8000L), 7999));
+
+		// test gap when it produces an end time before current timeout
+		// the furthest timeout is respected
+		testHarness.setProcessingTime(9000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 9000));
+		testHarness.setProcessingTime(10000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 2), 10000));
+		testHarness.setProcessingTime(10500);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 10500));
+		testHarness.setProcessingTime(10500);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-13", 9000L, 12000L), 11999));
+
+		// test gap when it produces an end time after current timeout
+		testHarness.setProcessingTime(13000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13000));
+		testHarness.setProcessingTime(13500);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 10), 13500));
+		testHarness.setProcessingTime(14000);
+		testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14000));
+		testHarness.setProcessingTime(16999);
+
+		expectedOutput.add(new StreamRecord<>(new Tuple3<>("key2-21", 13000L, 16000L), 15999));
+
+		TestHarnessUtil.assertOutputEqualsSorted("Output was not correct.", expectedOutput, testHarness.getOutput(), new Tuple3ResultSortComparator());
+
+		testHarness.close();
+	}
+
+	@Test
 	public void testLateness() throws Exception {
 		final int windowSize = 2;
 		final long lateness = 500;


Mime
View raw message