flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [2/2] flink git commit: [FLINK-4230] [DataStreamAPI] Add Session Windowing ITCase
Date Thu, 21 Jul 2016 08:12:19 GMT
[FLINK-4230] [DataStreamAPI] Add Session Windowing ITCase


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/78af1e97
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/78af1e97
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/78af1e97

Branch: refs/heads/master
Commit: 78af1e97d64f7989f9640767b43f53f7e058f7fc
Parents: 19dae21
Author: Stefan Richter <s.richter@data-artisans.com>
Authored: Sun Jul 17 18:31:13 2016 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Thu Jul 21 10:11:30 2016 +0200

----------------------------------------------------------------------
 .../sessionwindows/EventGenerator.java          |  61 +++
 .../sessionwindows/EventGeneratorFactory.java   | 135 +++++++
 .../sessionwindows/LongRandomGenerator.java     |  77 ++++
 .../ParallelSessionsEventGenerator.java         | 147 +++++++
 .../sessionwindows/SessionConfiguration.java    | 131 +++++++
 .../windowing/sessionwindows/SessionEvent.java  | 121 ++++++
 .../SessionEventGeneratorImpl.java              | 379 +++++++++++++++++++
 .../SessionStreamConfiguration.java             |  70 ++++
 .../sessionwindows/SessionWindowITCase.java     | 339 +++++++++++++++++
 .../sessionwindows/StreamConfiguration.java     |  84 ++++
 .../sessionwindows/StreamEventFactory.java      |  44 +++
 .../sessionwindows/TestEventPayload.java        | 119 ++++++
 12 files changed, 1707 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
new file mode 100644
index 0000000..cd2b4af
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGenerator.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+/**
+ * @param <K> session key type
+ * @param <E> session event type
+ */
+public interface EventGenerator<K, E> {
+
+	/**
+	 * Only call this method if hasMoreEvents() is true and canProduceEventAtWatermark(...) is true.
+	 *
+	 * @param globalWatermark
+	 * @return a generated event
+	 */
+	E generateEvent(long globalWatermark);
+
+	/**
+	 * @param globalWatermark
+	 * @return true if the generator can produce events for the provided global watermark
+	 */
+	boolean canProduceEventAtWatermark(long globalWatermark);
+
+	/**
+	 * @return true more events can been produced
+	 */
+	boolean hasMoreEvents();
+
+	/**
+	 * @return the watermark that tracks this generator's progress
+	 */
+	long getLocalWatermark();
+
+	/**
+	 * @param globalWatermark
+	 * @return a successor for this generator if hasMoreEvents() is false
+	 */
+	EventGenerator<K, E> getNextGenerator(long globalWatermark);
+
+	/**
+	 * @return key for the generated events
+	 */
+	K getKey();
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
new file mode 100644
index 0000000..12b249c
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/EventGeneratorFactory.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.HashMap;
+import java.util.Map;
+
+
+/**
+ * Produces the session event generators
+ *
+ * @param <K>
+ */
+public class EventGeneratorFactory<K, E> {
+
+	// map key -> latest generator for this key
+	private final Map<K, EventGenerator<K, E>> latestGeneratorsByKey;
+
+	// pseudo random engine
+	private final LongRandomGenerator randomGenerator;
+
+	// configuration for the streams that are simulated by the generators this factory creates
+	private final StreamConfiguration streamConfiguration;
+
+	// factory for the events that is employed be the event generators this factory creates
+	private final StreamEventFactory<K, E> eventFactory;
+
+	// number of timely events generators produce per session
+	private final int timelyEventsPerSession;
+
+	// the max. gap between events that belong to one session
+	private final long maxSessionEventGap;
+
+	// counter that tracks how many generators this has produced
+	private int producedGeneratorsCount;
+
+	/**
+	 * @param streamConfiguration
+	 * @param eventFactory
+	 * @param sessionTimeout
+	 * @param timelyEventsPerSession
+	 * @param randomGenerator
+	 */
+	public EventGeneratorFactory(
+			StreamConfiguration streamConfiguration,
+			StreamEventFactory<K, E> eventFactory,
+			long sessionTimeout,
+			int timelyEventsPerSession,
+			LongRandomGenerator randomGenerator) {
+
+		Preconditions.checkNotNull(streamConfiguration);
+		Preconditions.checkNotNull(randomGenerator);
+		Preconditions.checkArgument(sessionTimeout >= 0);
+		Preconditions.checkArgument(timelyEventsPerSession >= 0);
+
+		this.latestGeneratorsByKey = new HashMap<>();
+		this.streamConfiguration = streamConfiguration;
+		this.eventFactory = eventFactory;
+		this.randomGenerator = randomGenerator;
+		this.maxSessionEventGap = sessionTimeout;
+		this.timelyEventsPerSession = timelyEventsPerSession;
+		this.producedGeneratorsCount = 0;
+	}
+
+	/**
+	 * @param key
+	 * @param globalWatermark
+	 * @return
+	 */
+	public EventGenerator<K, E> newSessionStreamForKey(K key, long globalWatermark) {
+		EventGenerator<K, E> eventGenerator = latestGeneratorsByKey.get(key);
+
+		if (eventGenerator == null) {
+			SessionConfiguration<K, E> sessionConfiguration = SessionConfiguration.of(
+					key,
+					0,
+					maxSessionEventGap,
+					globalWatermark,
+					timelyEventsPerSession,
+					eventFactory);
+			SessionStreamConfiguration<K, E> sessionStreamConfiguration =
+					new SessionStreamConfiguration<>(sessionConfiguration, streamConfiguration);
+			eventGenerator = new SessionEventGeneratorImpl<>(sessionStreamConfiguration, randomGenerator);
+		} else {
+			eventGenerator = eventGenerator.getNextGenerator(globalWatermark);
+		}
+		latestGeneratorsByKey.put(key, eventGenerator);
+		++producedGeneratorsCount;
+		return eventGenerator;
+	}
+
+	/**
+	 * @return
+	 */
+	public StreamConfiguration getStreamConfiguration() {
+		return streamConfiguration;
+	}
+
+	/**
+	 * @return
+	 */
+	public long getMaxSessionEventGap() {
+		return maxSessionEventGap;
+	}
+
+	/**
+	 * @return
+	 */
+	public int getProducedGeneratorsCount() {
+		return producedGeneratorsCount;
+	}
+
+	static class PreviousSessionMetaData {
+		int sessionId;
+		long maxTimestamp;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
new file mode 100644
index 0000000..8c0a7b4
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/LongRandomGenerator.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import java.util.Collection;
+import java.util.Random;
+
+public class LongRandomGenerator extends Random {
+
+	static final long serialVersionUID = 32523525277L;
+
+	/**
+	 *
+	 * @param seed
+	 */
+	public LongRandomGenerator(long seed) {
+		super(seed);
+	}
+
+	/**
+	 * @param minInclusive
+	 * @param maxExclusive
+	 * @return random long between the provided min (inclusive) and max (exclusive)
+	 */
+	public long randomLongBetween(long minInclusive, long maxExclusive) {
+		if (maxExclusive <= minInclusive) {
+			throw new IllegalArgumentException(String.format("Error: min (found %s) must be < than max (found %s)!",
+					minInclusive, maxExclusive));
+		}
+		long bits;
+		long generatedValue;
+		long delta = maxExclusive - minInclusive;
+		do {
+			bits = (nextLong() << 1) >>> 1;
+			generatedValue = bits % delta;
+		} while (bits - generatedValue + (delta - 1) < 0L);
+		return minInclusive + generatedValue;
+	}
+
+	/**
+	 * @param collection
+	 * @return
+	 */
+	public int choseRandomIndex(Collection<?> collection) {
+		return nextInt(collection.size());
+	}
+
+	/**
+	 * @return a randomly chosen element from collection
+	 */
+	public <T> T choseRandomElement(Collection<T> collection) {
+		int choice = choseRandomIndex(collection);
+		for (T key : collection) {
+			if (choice == 0) {
+				return key;
+			}
+			--choice;
+		}
+		return null;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
new file mode 100644
index 0000000..1d7c0d6
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/ParallelSessionsEventGenerator.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Generator that employs several (sub-) event generators to produce events for multiple sessions in parallel, i.e.
+ * events are emitted in an interleaved way.
+ * <p>
+ * Even events that belong to different sessions for the same key can be generated in parallel.
+ * <p>
+ * The watermark is computed as the minimum of watermarks among all current sub-generators.
+ *
+ * @param <K> session key type
+ * @param <E> session event type
+ */
+public class ParallelSessionsEventGenerator<K, E> {
+
+	// set of all possible keys for generated sessions
+	private final Set<K> sessionKeys;
+
+	// factory for streams that generate exactly one session
+	private final EventGeneratorFactory<K, E> streamFactory;
+
+	// list of sub-generators for the current sessions
+	private final List<EventGenerator<K, E>> subGeneratorLists;
+
+	// pseudo random engine
+	private final LongRandomGenerator randomGenerator;
+
+	// maximum number of sessions this will generate
+	private final long sessionCountLimit;
+
+	public ParallelSessionsEventGenerator(
+			Set<K> keys,
+			EventGeneratorFactory<K, E> streamFactory,
+			int parallelSessions,
+			long sessionCountLimit,
+			LongRandomGenerator randomGenerator) {
+
+		Preconditions.checkNotNull(keys);
+		Preconditions.checkNotNull(streamFactory);
+		Preconditions.checkArgument(parallelSessions > 0);
+		Preconditions.checkArgument(!keys.isEmpty());
+		Preconditions.checkNotNull(randomGenerator);
+
+		this.sessionKeys = keys;
+		this.randomGenerator = randomGenerator;
+		this.streamFactory = streamFactory;
+		this.sessionCountLimit = sessionCountLimit;
+
+		this.subGeneratorLists = new ArrayList<>(parallelSessions);
+		initParallelSessionGenerators(parallelSessions);
+	}
+
+	/**
+	 * @return
+	 */
+	public E nextEvent() {
+
+		// the session limit is reached and all generators are exhausted
+		if (subGeneratorLists.isEmpty()) {
+			return null;
+		}
+
+		final long globalWatermark = getWatermark();
+
+		// iterates at most once over all sub-generators, starting at a randomly chosen index to find one that can
+		// currently produce events
+		final int choice = randomGenerator.choseRandomIndex(subGeneratorLists);
+
+		for (int i = choice; i < choice + subGeneratorLists.size(); ++i) {
+
+			final int index = i % subGeneratorLists.size();
+			EventGenerator<K, E> subGenerator = subGeneratorLists.get(index);
+
+			// check if the sub-generator can produce an event under the current gloabl watermark
+			if (subGenerator.canProduceEventAtWatermark(globalWatermark)) {
+
+				E event = subGenerator.generateEvent(globalWatermark);
+
+				// check if the sub-generator produced it's last event
+				if (!subGenerator.hasMoreEvents()) {
+
+					// replaces exhausted generator if the session limit is not met
+					if (streamFactory.getProducedGeneratorsCount() < sessionCountLimit) {
+						subGeneratorLists.set(index,
+								streamFactory.newSessionStreamForKey(
+										randomGenerator.choseRandomElement(sessionKeys), getWatermark()));
+					} else {
+						// otherwise removes the sub-generator and shrinks the list of open sessions permanently
+						subGeneratorLists.remove(index);
+					}
+				}
+				return event;
+			}
+		}
+
+		//if everything works correctly, this should never happen
+		throw new IllegalStateException("Unable to find an open sub-generator that can produce events");
+
+	}
+
+	/**
+	 * @return a global watermark that is the minimum of all individual watermarks of the sub-generators
+	 */
+	public long getWatermark() {
+		long watermark = Long.MAX_VALUE;
+
+		for (EventGenerator<K, E> sessionEventStream : subGeneratorLists) {
+			watermark = Math.min(watermark, sessionEventStream.getLocalWatermark());
+		}
+		return watermark;
+	}
+
+	/**
+	 * @param parallelSessions
+	 */
+	private void initParallelSessionGenerators(int parallelSessions) {
+		for (int i = 0; i < parallelSessions && streamFactory.getProducedGeneratorsCount() < sessionCountLimit; ++i) {
+			subGeneratorLists.add(streamFactory.newSessionStreamForKey(
+					randomGenerator.choseRandomElement(sessionKeys), 0L));
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
new file mode 100644
index 0000000..3eb3cfb
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionConfiguration.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * @param <K>
+ * @param <E>
+ */
+public final class SessionConfiguration<K, E> {
+
+	// key of the session
+	private final K key;
+
+	// id of the session w.r.t. key
+	private final int sessionId;
+
+	// allowed gap between events in one session
+	private final long gap;
+
+	// minimum event time among all events in the session
+	private final long minEventTimestamp;
+
+	// number of timely events in the session
+	private final int numberOfTimelyEvents;
+
+	// factory that produces the events for the session from metadata such as timestamps
+	private final StreamEventFactory<K, E> eventFactory;
+
+	/**
+	 * @param key
+	 * @param sessionId
+	 * @param gap
+	 * @param minEventTimestamp
+	 * @param numberOfTimelyEvents
+	 * @param eventFactory
+	 */
+	public SessionConfiguration(
+			K key,
+			int sessionId,
+			long gap,
+			long minEventTimestamp,
+			int numberOfTimelyEvents,
+			StreamEventFactory<K, E> eventFactory) {
+
+		Preconditions.checkNotNull(key);
+		Preconditions.checkNotNull(eventFactory);
+		Preconditions.checkArgument(numberOfTimelyEvents > 0);
+		Preconditions.checkArgument(gap > 0);
+
+		this.key = key;
+		this.eventFactory = eventFactory;
+		this.sessionId = sessionId;
+		this.gap = gap;
+		this.numberOfTimelyEvents = numberOfTimelyEvents;
+		this.minEventTimestamp = minEventTimestamp;
+	}
+
+	public K getKey() {
+		return key;
+	}
+
+	public StreamEventFactory<K, E> getEventFactory() {
+		return eventFactory;
+	}
+
+	public long getGap() {
+		return gap;
+	}
+
+	public long getMinEventTimestamp() {
+		return minEventTimestamp;
+	}
+
+	public int getNumberOfTimelyEvents() {
+		return numberOfTimelyEvents;
+	}
+
+	public int getSessionId() {
+		return sessionId;
+	}
+
+	public static <K, E> SessionConfiguration<K, E> of(
+			K key,
+			int sessionId,
+			long timeout,
+			long startTimestamp,
+			int numberOfEvents,
+			StreamEventFactory<K, E> eventFactory) {
+		return new SessionConfiguration<>(key, sessionId, timeout, startTimestamp, numberOfEvents, eventFactory);
+	}
+
+	@Override
+	public String toString() {
+		return "SessionConfiguration{" +
+				"key=" + key +
+				", sessionId=" + sessionId +
+				", gap=" + gap +
+				", minEventTimestamp=" + minEventTimestamp +
+				", numberOfTimelyEvents=" + numberOfTimelyEvents +
+				", eventFactory=" + eventFactory +
+				'}';
+	}
+
+	public SessionConfiguration<K, E> getFollowupSessionConfiguration(long startTimestamp) {
+		return SessionConfiguration.of(
+				getKey(),
+				getSessionId() + 1,
+				getGap(),
+				startTimestamp,
+				getNumberOfTimelyEvents(),
+				getEventFactory());
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
new file mode 100644
index 0000000..c941fcf
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEvent.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Simple class that represents session events for our example job.
+ *
+ * @param <K> session key type
+ * @param <V> session event value type (the event's payload)
+ */
+public final class SessionEvent<K, V> {
+
+	// key of the session this event belongs to
+	private K sessionKey;
+
+	// value of the event (payload)
+	private V eventValue;
+
+	// event timestamp (in ms)
+	private long eventTimestamp;
+	
+	public SessionEvent() {
+	}
+
+	/**
+	 * @param sessionKey
+	 * @param eventValue
+	 * @param eventTimestamp
+	 */
+	public SessionEvent(K sessionKey, V eventValue, long eventTimestamp) {
+		Preconditions.checkNotNull(sessionKey);
+		Preconditions.checkNotNull(eventValue);
+		setSessionKey(sessionKey);
+		setEventValue(eventValue);
+		setEventTimestamp(eventTimestamp);
+	}
+
+	/**
+	 * @return
+	 */
+	public K getSessionKey() {
+		return sessionKey;
+	}
+
+	/**
+	 * @param sessionKey
+	 */
+	public void setSessionKey(K sessionKey) {
+		Preconditions.checkNotNull(sessionKey);
+		this.sessionKey = sessionKey;
+	}
+
+	/**
+	 * @return
+	 */
+	public V getEventValue() {
+		return eventValue;
+	}
+
+	/**
+	 * @param eventValue
+	 */
+	public void setEventValue(V eventValue) {
+		Preconditions.checkNotNull(eventValue);
+		this.eventValue = eventValue;
+	}
+
+	/**
+	 * @return
+	 */
+	public long getEventTimestamp() {
+		return eventTimestamp;
+	}
+
+	/**
+	 * @param eventTimestamp
+	 */
+	public void setEventTimestamp(long eventTimestamp) {
+		this.eventTimestamp = eventTimestamp;
+	}
+
+	@Override
+	public String toString() {
+		return "SessionEvent{" +
+				"sessionKey=" + sessionKey +
+				", eventValue=" + eventValue +
+				", eventTimestamp=" + eventTimestamp +
+				'}';
+	}
+
+	/**
+	 * @param sessionKey
+	 * @param eventValue
+	 * @param eventTimestamp
+	 * @param <K>
+	 * @param <V>
+	 * @return
+	 */
+	public static <K, V> SessionEvent<K, V> of(K sessionKey, V eventValue, long eventTimestamp) {
+		return new SessionEvent<>(sessionKey, eventValue, eventTimestamp);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
new file mode 100644
index 0000000..a269da3
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionEventGeneratorImpl.java
@@ -0,0 +1,379 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Implementation of EventGenerator that generates timely and late (in-lateness and after-lateness) events for
+ * a single session.
+ *
+ * @param <K>
+ * @param <E>
+ */
+public class SessionEventGeneratorImpl<K, E> implements EventGenerator<K, E> {
+
+	/**
+	 * Event timing w.r.t the global watermark
+	 */
+	public enum Timing {
+		TIMELY, IN_LATENESS, AFTER_LATENESS
+	}
+
+	// pseudo random engine
+	private final LongRandomGenerator randomGenerator;
+
+	// configuration for the generated session stream
+	private final SessionStreamConfiguration<K, E> configuration;
+
+	// precomputed timestamps for the timely events (could be a list of primitive longs)
+	private final List<Long> orderedTimelyTimestamps;
+
+	// the minimum timestamp for events emitted by this generator
+	private final long minTimestamp;
+
+	// the maximum timestamp for events emitted by this generator
+	private final long maxTimestamp;
+
+	// tracks how many events this generator has produced
+	private int producedEventsCount;
+
+	// sub-generator that capture the lifecycle w.r.t. the global watermark (timely, in lateness, after lateness)
+	private EventGenerator<K, E> timingAwareEventGenerator;
+
+	/**
+	 * @param configuration   stream configuration
+	 * @param randomGenerator random engine for the event generation
+	 */
+	public SessionEventGeneratorImpl(
+			SessionStreamConfiguration<K, E> configuration, LongRandomGenerator randomGenerator) {
+		Preconditions.checkNotNull(configuration);
+		Preconditions.checkNotNull(randomGenerator);
+
+		this.producedEventsCount = 0;
+		this.configuration = configuration;
+		this.randomGenerator = randomGenerator;
+
+		//pre-compute and store all timestamps for the timely events in this session
+		final int timelyEventsInSessionCount = configuration.getSessionConfiguration().getNumberOfTimelyEvents();
+		this.orderedTimelyTimestamps = new ArrayList<>(timelyEventsInSessionCount);
+		this.minTimestamp = configuration.getSessionConfiguration().getMinEventTimestamp();
+		generateOrderedTimelyTimestamps(minTimestamp, timelyEventsInSessionCount);
+		this.maxTimestamp = orderedTimelyTimestamps.get(orderedTimelyTimestamps.size() - 1);
+		this.timingAwareEventGenerator = new TimelyGenerator();
+	}
+
+	/**
+	 * @see EventGenerator
+	 */
+	@Override
+	public boolean canProduceEventAtWatermark(long globalWatermark) {
+		return timingAwareEventGenerator.canProduceEventAtWatermark(globalWatermark);
+	}
+
+	/**
+	 * @see EventGenerator
+	 */
+	@Override
+	public E generateEvent(long globalWatermark) {
+		if (hasMoreEvents()) {
+			++producedEventsCount;
+			E event = timingAwareEventGenerator.generateEvent(globalWatermark);
+
+			while (!timingAwareEventGenerator.hasMoreEvents()) {
+				timingAwareEventGenerator = timingAwareEventGenerator.getNextGenerator(globalWatermark);
+			}
+
+			return event;
+		} else {
+			throw new IllegalStateException("All events exhausted");
+		}
+	}
+
+	/**
+	 * @see EventGenerator
+	 */
+	@Override
+	public long getLocalWatermark() {
+		return timingAwareEventGenerator.getLocalWatermark();
+	}
+
+	/**
+	 * @see EventGenerator
+	 */
+	@Override
+	public boolean hasMoreEvents() {
+		return producedEventsCount < getAllEventsCount();
+	}
+
+	/**
+	 * pre-computes and stores the timestamps for timely events in this session in a list (ordered)
+	 *
+	 * @param minTimestamp              the minimum event time in the session
+	 * @param onTimeEventCountInSession the number of timestamps to generate
+	 */
+	private void generateOrderedTimelyTimestamps(long minTimestamp, int onTimeEventCountInSession) {
+		long generatedTimestamp = minTimestamp;
+
+		for (int i = 1; i < onTimeEventCountInSession; ++i) {
+			orderedTimelyTimestamps.add(generatedTimestamp);
+			generatedTimestamp += randomGenerator.randomLongBetween(0, getGap() - 1);
+		}
+
+		orderedTimelyTimestamps.add(generatedTimestamp);
+	}
+
+	/**
+	 * @param eventTimestamp
+	 * @param globalWatermark
+	 * @param timing
+	 * @return
+	 */
+	private E createEventFromTimestamp(long eventTimestamp, long globalWatermark, Timing timing) {
+		return getEventFactory().createEvent(
+				getKey(),
+				getSessionId(),
+				producedEventsCount,
+				eventTimestamp,
+				globalWatermark,
+				timing);
+	}
+
+	/**
+	 * @return
+	 */
+	private long generateTimelyTimestamp() {
+		int chosenTimestampIndex = randomGenerator.choseRandomIndex(orderedTimelyTimestamps);
+		// performance: consider that remove is an O(n) operation here, with n being the number of timely events but
+		// this should not matter too much for a IT case
+		return orderedTimelyTimestamps.remove(chosenTimestampIndex);
+	}
+
+	/**
+	 * @return
+	 */
+	private long generateLateTimestamp() {
+		return randomGenerator.randomLongBetween(minTimestamp, maxTimestamp + 1);
+	}
+
+	/**
+	 * @param globalWatermark
+	 * @return true if the session window for this session has already triggered at global watermark
+	 */
+	private boolean isTriggered(long globalWatermark) {
+		return globalWatermark >= maxTimestamp + getGap() - 1;
+	}
+
+	/**
+	 * @param globalWatermark
+	 * @return true if all future generated events are after lateness w.r.t global watermark
+	 */
+	private boolean isAfterLateness(long globalWatermark) {
+		return globalWatermark >= getAfterLatenessTimestamp();
+	}
+
+	/**
+	 * @return timestamp of the watermark at events for this session are after the lateness
+	 */
+	private long getAfterLatenessTimestamp() {
+		return getTriggerTimestamp() + getLateness();
+	}
+
+	/**
+	 * @return timestamp of the watermark at which the window for this session will trigger
+	 */
+	private long getTriggerTimestamp() {
+		return maxTimestamp + getGap() - 1;
+	}
+
+	@Override
+	public K getKey() {
+		return configuration.getSessionConfiguration().getKey();
+	}
+
+	private long getGap() {
+		return configuration.getSessionConfiguration().getGap();
+	}
+
+	private long getLateness() {
+		return configuration.getStreamConfiguration().getAllowedLateness();
+	}
+
+	private StreamEventFactory<K, E> getEventFactory() {
+		return configuration.getSessionConfiguration().getEventFactory();
+	}
+
+	private int getSessionId() {
+		return configuration.getSessionConfiguration().getSessionId();
+	}
+
+	private int getTimelyEventsCount() {
+		return configuration.getSessionConfiguration().getNumberOfTimelyEvents();
+	}
+
+	private int getLateEventsCount() {
+		return getTimelyEventsCount() + configuration.getStreamConfiguration().getLateEventsWithinLateness();
+	}
+
+	private int getAllEventsCount() {
+		return getLateEventsCount() + configuration.getStreamConfiguration().getLateEventsAfterLateness();
+	}
+
+	private boolean hasMoreTimelyEvents() {
+		return !orderedTimelyTimestamps.isEmpty();
+	}
+
+	private boolean hasMoreInLatenessEvents() {
+		return producedEventsCount < getLateEventsCount();
+	}
+
+	/**
+	 * @see EventGenerator
+	 */
+	@Override
+	public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
+		StreamConfiguration streamConfiguration = configuration.getStreamConfiguration();
+		SessionConfiguration<K, E> sessionConfiguration = configuration.getSessionConfiguration();
+
+		//compute the start timestamp for the next session
+		long maxAdditionalGap = streamConfiguration.getMaxAdditionalSessionGap();
+		long nextStartTime = Math.max(
+				getAfterLatenessTimestamp() + randomGenerator.randomLongBetween(0, maxAdditionalGap),
+				globalWatermark);
+
+		sessionConfiguration = configuration.getSessionConfiguration().getFollowupSessionConfiguration(nextStartTime);
+		SessionStreamConfiguration<K, E> sessionStreamConfiguration =
+				new SessionStreamConfiguration<>(sessionConfiguration, streamConfiguration);
+
+		return new SessionEventGeneratorImpl<>(sessionStreamConfiguration, randomGenerator);
+	}
+
+	public long getMinTimestamp() {
+		return minTimestamp;
+	}
+
+	public long getMaxTimestamp() {
+		return maxTimestamp;
+	}
+
+
+	abstract class AbstractEventGenerator implements EventGenerator<K, E> {
+		@Override
+		public K getKey() {
+			return configuration.getSessionConfiguration().getKey();
+		}
+	}
+
+	/**
+	 * internal generator delegate for producing session events that are timely
+	 */
+	class TimelyGenerator extends AbstractEventGenerator {
+
+		@Override
+		public E generateEvent(long globalWatermark) {
+			return createEventFromTimestamp(generateTimelyTimestamp(), globalWatermark, Timing.TIMELY);
+		}
+
+		@Override
+		public long getLocalWatermark() {
+			return orderedTimelyTimestamps.get(0);
+		}
+
+		@Override
+		public boolean canProduceEventAtWatermark(long globalWatermark) {
+			return true;
+		}
+
+		@Override
+		public boolean hasMoreEvents() {
+			return hasMoreTimelyEvents();
+		}
+
+		@Override
+		public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
+			return new InLatenessGenerator();
+		}
+	}
+
+	/**
+	 * internal generator delegate for producing late session events with timestamps within the allowed lateness
+	 */
+	class InLatenessGenerator extends AbstractEventGenerator {
+
+		@Override
+		public E generateEvent(long globalWatermark) {
+			return createEventFromTimestamp(generateLateTimestamp(), globalWatermark, Timing.IN_LATENESS);
+		}
+
+		@Override
+		public long getLocalWatermark() {
+			return getAfterLatenessTimestamp() - 1;
+		}
+
+		@Override
+		public boolean canProduceEventAtWatermark(long globalWatermark) {
+			return isTriggered(globalWatermark);
+		}
+
+		@Override
+		public boolean hasMoreEvents() {
+			return hasMoreInLatenessEvents();
+		}
+
+		@Override
+		public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
+			return new AfterLatenessGenerator();
+		}
+	}
+
+	/**
+	 * internal generator delegate for producing late session events with timestamps after the lateness
+	 */
+	class AfterLatenessGenerator extends AbstractEventGenerator {
+
+		@Override
+		public E generateEvent(long globalWatermark) {
+			return createEventFromTimestamp(generateLateTimestamp(), globalWatermark, Timing.AFTER_LATENESS);
+		}
+
+		@Override
+		public long getLocalWatermark() {
+			return getAfterLatenessTimestamp();
+		}
+
+		@Override
+		public boolean canProduceEventAtWatermark(long globalWatermark) {
+			return isAfterLateness(globalWatermark);
+		}
+
+		@Override
+		public boolean hasMoreEvents() {
+			return true;
+		}
+
+		@Override
+		public EventGenerator<K, E> getNextGenerator(long globalWatermark) {
+			throw new IllegalStateException("This generator has no successor");
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionStreamConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionStreamConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionStreamConfiguration.java
new file mode 100644
index 0000000..291b6b2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionStreamConfiguration.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Configuration for a session event stream, consisting of the stream configuration and the session configuration.
+ *
+ * @param <K>
+ * @param <E>
+ */
+public final class SessionStreamConfiguration<K, E> {
+
+	private final SessionConfiguration<K, E> sessionConfiguration;
+	private final StreamConfiguration streamConfiguration;
+
+	/**
+	 * @param sessionConfiguration
+	 * @param streamConfiguration
+	 */
+	public SessionStreamConfiguration(
+			SessionConfiguration<K, E> sessionConfiguration,
+			StreamConfiguration streamConfiguration) {
+
+		Preconditions.checkNotNull(sessionConfiguration);
+		Preconditions.checkNotNull(streamConfiguration);
+
+		this.sessionConfiguration = sessionConfiguration;
+		this.streamConfiguration = streamConfiguration;
+	}
+
+	/**
+	 * @return
+	 */
+	public SessionConfiguration<K, E> getSessionConfiguration() {
+		return sessionConfiguration;
+	}
+
+	/**
+	 * @return
+	 */
+	public StreamConfiguration getStreamConfiguration() {
+		return streamConfiguration;
+	}
+
+	@Override
+	public String toString() {
+		return "SessionStreamConfiguration{" +
+				"sessionConfiguration=" + sessionConfiguration +
+				", streamConfiguration=" + streamConfiguration +
+				'}';
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
new file mode 100644
index 0000000..55fe6d2
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/SessionWindowITCase.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.TimeCharacteristic;
+import org.apache.flink.streaming.api.datastream.WindowedStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction;
+import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
+import org.apache.flink.streaming.api.windowing.time.Time;
+import org.apache.flink.streaming.api.windowing.triggers.EventTimeTrigger;
+import org.apache.flink.streaming.api.windowing.triggers.Trigger;
+import org.apache.flink.streaming.api.windowing.triggers.TriggerResult;
+import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
+import org.apache.flink.util.Collector;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.BitSet;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SessionWindowITCase {
+
+	// seed for the pseudo random engine of this test
+	private static final long RANDOM_SEED = 1234567;
+
+	// flag to activate outputs (for debugging)
+	private static final boolean OUTPUT_RESULTS_AS_STRING = false;
+
+	// IMPORTANT: this should currently always be set to false
+	private static final boolean PURGE_WINDOW_ON_FIRE = false;
+
+	// number of sessions generated in the test (the more, the longer it takes)
+	private static final long NUMBER_OF_SESSIONS = 20_000;
+
+	// max. allowed gap between two events of one session
+	private static final long MAX_SESSION_EVENT_GAP_MS = 1_000;
+
+	// the allowed lateness after the watermark
+	private static final long ALLOWED_LATENESS_MS = 500;
+
+	// maximum additional gap we randomly add between two sessions
+	private static final long MAX_ADDITIONAL_SESSION_GAP_MS = 5_000;
+
+	// number of timely events per session
+	private static final int EVENTS_PER_SESSION = 10;
+
+	// number of late events per session inside lateness
+	private static final int LATE_EVENTS_PER_SESSION = 5;
+
+	// number of late events per session after lateness (will be dropped)
+	private static final int MAX_DROPPED_EVENTS_PER_SESSION = 5;
+
+	// number of different session keys
+	private static final int NUMBER_OF_DIFFERENT_KEYS = 20;
+
+	// number of parallel in-flight sessions generated in the test stream
+	private static final int PARALLEL_SESSIONS = 10;
+	
+	// names to address some counters used for result checks
+	private static final String SESSION_COUNTER_ON_TIME_KEY = "ALL_SESSIONS_ON_TIME_COUNT";
+	private static final String SESSION_COUNTER_LATE_KEY = "ALL_SESSIONS_LATE_COUNT";
+
+	@Test
+	public void testSessionWindowing() throws Exception {
+		SessionEventGeneratorDataSource dataSource = new SessionEventGeneratorDataSource();
+		runTest(dataSource, new ValidatingWindowFunction());
+
+	}
+
+	private void runTest(
+			SourceFunction<SessionEvent<Integer, TestEventPayload>> dataSource,
+			WindowFunction<SessionEvent<Integer, TestEventPayload>,
+					String, Tuple, TimeWindow> windowFunction) throws Exception {
+
+
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+		WindowedStream<SessionEvent<Integer, TestEventPayload>, Tuple, TimeWindow> windowedStream
+				= env.addSource(dataSource).keyBy("sessionKey")
+				.window(EventTimeSessionWindows.withGap(Time.milliseconds(MAX_SESSION_EVENT_GAP_MS)));
+
+		if (ALLOWED_LATENESS_MS != Long.MAX_VALUE) {
+			windowedStream = windowedStream.allowedLateness(Time.milliseconds(ALLOWED_LATENESS_MS));
+		}
+
+		if (!PURGE_WINDOW_ON_FIRE) {
+			windowedStream = windowedStream.trigger(new NonPurgingEventTimeTriggerWrapper());
+		}
+
+		windowedStream.apply(windowFunction).print();
+		env.execute();
+	}
+
+	/**
+	 * Window function that performs the correctness checks for this test case
+	 */
+	private static final class ValidatingWindowFunction extends RichWindowFunction<SessionEvent<Integer,
+			TestEventPayload>, String, Tuple, TimeWindow> {
+
+		static final long serialVersionUID = 865723993979L;
+
+		@Override
+		public void apply(
+				Tuple tuple,
+				TimeWindow timeWindow,
+				Iterable<SessionEvent<Integer, TestEventPayload>> input,
+				Collector<String> output) throws Exception {
+
+			if (OUTPUT_RESULTS_AS_STRING) {
+				output.collect("--- window triggered ---");
+			}
+
+			List<SessionEvent<Integer, TestEventPayload>> sessionEvents = new ArrayList<>();
+
+			for (SessionEvent<Integer, TestEventPayload> evt : input) {
+
+				if (OUTPUT_RESULTS_AS_STRING) {
+					output.collect(evt.toString());
+				}
+
+				sessionEvents.add(evt);
+			}
+
+			// bit-sets to track uniqueness of ids
+			BitSet onTimeBits = new BitSet(EVENTS_PER_SESSION);
+			BitSet lateWithingBits = new BitSet(LATE_EVENTS_PER_SESSION);
+
+			int onTimeCount = 0;
+			int lateCount = 0;
+
+			for (SessionEvent<Integer, TestEventPayload> evt : sessionEvents) {
+
+				if (SessionEventGeneratorImpl.Timing.TIMELY.equals(evt.getEventValue().getTiming())) {
+
+					++onTimeCount;
+					onTimeBits.set(evt.getEventValue().getEventId());
+				} else if (SessionEventGeneratorImpl.Timing.IN_LATENESS.equals(evt.getEventValue().getTiming())) {
+
+					++lateCount;
+					lateWithingBits.set(evt.getEventValue().getEventId() - EVENTS_PER_SESSION);
+				} else {
+
+					Assert.fail("Illegal event type in window " + timeWindow + ": " + evt);
+				}
+			}
+
+			getRuntimeContext().getIntCounter(SESSION_COUNTER_ON_TIME_KEY).add(onTimeCount);
+			getRuntimeContext().getIntCounter(SESSION_COUNTER_LATE_KEY).add(lateCount);
+
+			if (sessionEvents.size() >= EVENTS_PER_SESSION) { //on time events case or non-purging
+
+				//check that the expected amount if events is in the window
+				Assert.assertEquals(onTimeCount, EVENTS_PER_SESSION);
+
+				//check that no duplicate events happened
+				Assert.assertEquals(onTimeBits.cardinality(), onTimeCount);
+				Assert.assertEquals(lateWithingBits.cardinality(), lateCount);
+			} else {
+
+				Assert.fail("Event count for session window " + timeWindow + " is too low: " + sessionEvents);
+			}
+		}
+
+		@Override
+		public void close() throws Exception {
+			Assert.assertEquals(
+					(LATE_EVENTS_PER_SESSION + 1) * NUMBER_OF_SESSIONS * EVENTS_PER_SESSION,
+					getRuntimeContext().getIntCounter(SESSION_COUNTER_ON_TIME_KEY).getLocalValuePrimitive());
+			Assert.assertEquals(
+					NUMBER_OF_SESSIONS * (LATE_EVENTS_PER_SESSION * (LATE_EVENTS_PER_SESSION + 1) / 2),
+					getRuntimeContext().getIntCounter(SESSION_COUNTER_LATE_KEY).getLocalValuePrimitive());
+		}
+	}
+
+	/**
+	 * A data source that is fed from a ParallelSessionsEventGenerator
+	 */
+	private static final class SessionEventGeneratorDataSource
+			implements ParallelSourceFunction<SessionEvent<Integer, TestEventPayload>> {
+
+		static final long serialVersionUID = 11341498979L;
+
+		private volatile boolean isRunning;
+
+		public SessionEventGeneratorDataSource() {
+			this.isRunning = false;
+		}
+
+		@Override
+		public void run(SourceContext<SessionEvent<Integer, TestEventPayload>> ctx) {
+			ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> stream = createTestStream();
+			this.isRunning = true;
+			//main data source driver loop
+			while (isRunning) {
+				synchronized (ctx.getCheckpointLock()) {
+					SessionEvent<Integer, TestEventPayload> evt = stream.nextEvent();
+					if (evt != null) {
+						ctx.collectWithTimestamp(evt, evt.getEventTimestamp());
+						ctx.emitWatermark(new Watermark(stream.getWatermark()));
+					} else {
+						break;
+					}
+				}
+			}
+		}
+
+		private ParallelSessionsEventGenerator<Integer, SessionEvent<Integer, TestEventPayload>> createTestStream() {
+			LongRandomGenerator randomGenerator = new LongRandomGenerator(RANDOM_SEED);
+
+			Set<Integer> keys = new HashSet<>();
+			for (int i = 0; i < NUMBER_OF_DIFFERENT_KEYS; ++i) {
+				keys.add(i);
+			}
+
+			StreamConfiguration streamConfiguration = StreamConfiguration.of(
+					ALLOWED_LATENESS_MS,
+					LATE_EVENTS_PER_SESSION,
+					MAX_DROPPED_EVENTS_PER_SESSION,
+					MAX_ADDITIONAL_SESSION_GAP_MS);
+			StreamEventFactory<Integer, SessionEvent<Integer, TestEventPayload>> streamEventFactory =
+					new StreamEventFactory<Integer, SessionEvent<Integer, TestEventPayload>>() {
+						@Override
+						public SessionEvent<Integer, TestEventPayload> createEvent(
+								Integer key,
+								int sessionId,
+								int eventId,
+								long eventTimestamp,
+								long globalWatermark,
+								SessionEventGeneratorImpl.Timing timing) {
+							return SessionEvent.of(
+									key,
+									TestEventPayload.of(globalWatermark, sessionId, eventId, timing),
+									eventTimestamp);
+						}
+					};
+
+			EventGeneratorFactory<Integer, SessionEvent<Integer, TestEventPayload>> eventGeneratorFactory =
+					new EventGeneratorFactory<>(
+							streamConfiguration,
+							streamEventFactory,
+							MAX_SESSION_EVENT_GAP_MS,
+							EVENTS_PER_SESSION,
+							randomGenerator);
+			return new ParallelSessionsEventGenerator<>(
+					keys,
+					eventGeneratorFactory,
+					PARALLEL_SESSIONS,
+					NUMBER_OF_SESSIONS,
+					randomGenerator);
+		}
+
+		@Override
+		public void cancel() {
+			isRunning = false;
+		}
+	}
+
+	/**
+	 * Wrapper class that converts purging triggers into non-purging ones
+	 */
+	private static final class NonPurgingEventTimeTriggerWrapper
+			extends Trigger<SessionEvent<Integer, TestEventPayload>, TimeWindow> {
+
+		static final long serialVersionUID = 34763482396L;
+
+		EventTimeTrigger delegate = EventTimeTrigger.create();
+
+		@Override
+		public TriggerResult onElement(
+				SessionEvent<Integer, TestEventPayload> element,
+				long timestamp,
+				TimeWindow window,
+				TriggerContext ctx) throws Exception {
+			return removePurging(delegate.onElement(element, timestamp, window, ctx));
+
+		}
+
+		@Override
+		public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+			return removePurging(delegate.onProcessingTime(time, window, ctx));
+		}
+
+		@Override
+		public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
+			return removePurging(delegate.onEventTime(time, window, ctx));
+		}
+
+		@Override
+		public boolean canMerge() {
+			return delegate.canMerge();
+		}
+
+		@Override
+		public TriggerResult onMerge(TimeWindow window, OnMergeContext ctx) throws Exception {
+			return removePurging(delegate.onMerge(window, ctx));
+		}
+
+		@Override
+		public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
+			delegate.clear(window, ctx);
+		}
+
+		private TriggerResult removePurging(TriggerResult result) {
+			if (TriggerResult.PURGE == result) {
+				return TriggerResult.CONTINUE;
+			} else if (TriggerResult.FIRE_AND_PURGE == result) {
+				return TriggerResult.FIRE;
+			} else {
+				return result;
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamConfiguration.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamConfiguration.java
new file mode 100644
index 0000000..49e1ef9
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamConfiguration.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+/**
+ * Configuration for event streams
+ */
+public final class StreamConfiguration {
+
+	// allowed lateness for this stream (in ms)
+	private final long allowedLateness;
+
+	// how many late events within lateness per session the stream will contain
+	private final int lateEventsWithinLateness;
+
+	// how many late events after lateness per session the stream will contain
+	private final int lateEventsAfterLateness;
+
+	// hint for the maximum additional gap used to in between two sessions
+	private final long maxAdditionalSessionGap;
+
+	public StreamConfiguration(long allowedLateness,
+	                           int lateEventsWithinLateness,
+	                           int lateEventsAfterLateness,
+	                           long maxAdditionalSessionGap) {
+		this.allowedLateness = allowedLateness;
+		this.lateEventsWithinLateness = lateEventsWithinLateness;
+		this.lateEventsAfterLateness = lateEventsAfterLateness;
+		this.maxAdditionalSessionGap = maxAdditionalSessionGap;
+	}
+
+	public long getAllowedLateness() {
+		return allowedLateness;
+	}
+
+	public int getLateEventsWithinLateness() {
+		return lateEventsWithinLateness;
+	}
+
+	public int getLateEventsAfterLateness() {
+		return lateEventsAfterLateness;
+	}
+
+	public long getMaxAdditionalSessionGap() {
+		return maxAdditionalSessionGap;
+	}
+
+	public static StreamConfiguration of(long allowedLateness,
+	                                     int lateEventsPerSessionWithinLateness,
+	                                     int lateEventsPerSessionOutsideLateness,
+	                                     long maxAdditionalSessionGap) {
+		return new StreamConfiguration(
+				allowedLateness,
+				lateEventsPerSessionWithinLateness,
+				lateEventsPerSessionOutsideLateness,
+				maxAdditionalSessionGap);
+	}
+
+	@Override
+	public String toString() {
+		return "StreamConfiguration{" +
+				"allowedLateness=" + allowedLateness +
+				", lateEventsWithinLateness=" + lateEventsWithinLateness +
+				", lateEventsAfterLateness=" + lateEventsAfterLateness +
+				", maxAdditionalSessionGap=" + maxAdditionalSessionGap +
+				'}';
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamEventFactory.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamEventFactory.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamEventFactory.java
new file mode 100644
index 0000000..0e96791
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/StreamEventFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+/**
+ * Factory that produces events of keyed session streams
+ *
+ * @param <K>
+ * @param <E>
+ */
+public interface StreamEventFactory<K, E> {
+
+	/**
+	 * @param key             session key
+	 * @param sessionId       session id
+	 * @param eventId         event id w.r.t. the session
+	 * @param eventTimestamp  event time
+	 * @param globalWatermark current value of the global watermark
+	 * @param timing          indicator for lateness
+	 * @return event for an keyed event stream
+	 */
+	E createEvent(K key,
+	              int sessionId,
+	              int eventId,
+	              long eventTimestamp,
+	              long globalWatermark,
+	              SessionEventGeneratorImpl.Timing timing);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/78af1e97/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
new file mode 100644
index 0000000..8e3d62b
--- /dev/null
+++ b/flink-tests/src/test/java/org/apache/flink/test/windowing/sessionwindows/TestEventPayload.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.windowing.sessionwindows;
+
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Test payload that contains useful information for the correctness checks in our test
+ */
+public final class TestEventPayload {
+
+	// id of the session w.r.t the key
+	private int sessionId;
+
+	// id of the event w.r.t. tje session id
+	private int eventId;
+
+	// the watermark under which the event was emitted
+	private long watermark;
+
+	// the timing characteristic of the event w.r.t. the watermark
+	private SessionEventGeneratorImpl.Timing timing;
+
+	/**
+	 * @param watermark
+	 * @param sessionId
+	 * @param eventSequenceNumber
+	 * @param timing
+	 */
+	public TestEventPayload(long watermark, int sessionId, int eventSequenceNumber, SessionEventGeneratorImpl.Timing timing) {
+		setWatermark(watermark);
+		setSessionId(sessionId);
+		setEventId(eventSequenceNumber);
+		setTiming(timing);
+	}
+
+	/**
+	 * @return global watermark at the time this event was generated
+	 */
+	public long getWatermark() {
+		return watermark;
+	}
+
+	public void setWatermark(long watermark) {
+		this.watermark = watermark;
+	}
+
+	/**
+	 * @return id of the session to identify a sessions in the sequence of all sessions for the same key
+	 */
+	public int getSessionId() {
+		return sessionId;
+	}
+
+	public void setSessionId(int sessionId) {
+		this.sessionId = sessionId;
+	}
+
+	/**
+	 * @return a sequence number that acts as an id for the even inside the session
+	 */
+	public int getEventId() {
+		return eventId;
+	}
+
+	public void setEventId(int eventId) {
+		this.eventId = eventId;
+	}
+
+	/**
+	 * @return indicates whether the event is on time, late within the timing, or late after the timing
+	 */
+	public SessionEventGeneratorImpl.Timing getTiming() {
+		return timing;
+	}
+
+	public void setTiming(SessionEventGeneratorImpl.Timing timing) {
+		Preconditions.checkNotNull(timing);
+		this.timing = timing;
+	}
+
+	@Override
+	public String toString() {
+		return "TestEventPayload{" +
+				"sessionId=" + sessionId +
+				", eventId=" + eventId +
+				", watermark=" + watermark +
+				", timing=" + timing +
+				'}';
+	}
+
+	/**
+	 * @param watermark
+	 * @param sessionId
+	 * @param eventId
+	 * @param timing
+	 * @return
+	 */
+	public static TestEventPayload of(long watermark, int sessionId, int eventId, SessionEventGeneratorImpl.Timing timing) {
+		return new TestEventPayload(watermark, sessionId, eventId, timing);
+	}
+
+}
\ No newline at end of file


Mime
View raw message