flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [02/20] flink git commit: [streaming] StreamDiscretizer rework to support only 1 eviction and trigger for robustness + test cleanup
Date Mon, 16 Feb 2015 14:25:28 GMT
[streaming] StreamDiscretizer rework to support only 1 eviction and trigger for robustness + test cleanup


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/412779fa
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/412779fa
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/412779fa

Branch: refs/heads/master
Commit: 412779fae5b20513997fcacee509082d3fa78f96
Parents: c560d76
Author: Gyula Fora <gyfora@apache.org>
Authored: Sun Feb 8 13:09:34 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:07 2015 +0100

----------------------------------------------------------------------
 .../streaming/api/datastream/DataStream.java    |   7 +-
 .../api/datastream/WindowedDataStream.java      | 180 ++----
 .../api/invokable/StreamInvokable.java          |   2 +-
 .../windowing/GroupedStreamDiscretizer.java     | 437 +-------------
 .../operator/windowing/StreamDiscretizer.java   | 363 +++---------
 .../operator/windowing/StreamWindow.java        |  15 +-
 .../operator/windowing/WindowFlattener.java     |   1 +
 .../apache/flink/streaming/api/PrintTest.java   |  57 --
 .../windowing/GroupedStreamDiscretizerTest.java |  92 +++
 .../windowing/GroupedWindowInvokableTest.java   | 574 -------------------
 .../windowing/StreamDiscretizerTest.java        | 107 ++++
 .../operator/windowing/StreamWindowTest.java    |  14 +-
 .../operator/windowing/WindowInvokableTest.java | 261 ---------
 13 files changed, 382 insertions(+), 1728 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index eff6026..b3d0564 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -865,7 +865,7 @@ public class DataStream<OUT> {
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
 	@SuppressWarnings({ "rawtypes", "unchecked" })
-	public WindowedDataStream<OUT> window(WindowingHelper... policyHelpers) {
+	public WindowedDataStream<OUT> window(WindowingHelper policyHelpers) {
 		return new WindowedDataStream<OUT>(this, policyHelpers);
 	}
 
@@ -884,9 +884,8 @@ public class DataStream<OUT> {
 	 *            number of elements in each time window.
 	 * @return A {@link WindowedDataStream} providing further operations.
 	 */
-	public WindowedDataStream<OUT> window(List<TriggerPolicy<OUT>> triggers,
-			List<EvictionPolicy<OUT>> evicters) {
-		return new WindowedDataStream<OUT>(this, triggers, evicters);
+	public WindowedDataStream<OUT> window(TriggerPolicy<OUT> trigger, EvictionPolicy<OUT> evicter) {
+		return new WindowedDataStream<OUT>(this, trigger, evicter);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
index fc117a1..e2cb3f7 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -17,10 +17,6 @@
 
 package org.apache.flink.streaming.api.datastream;
 
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichGroupReduceFunction;
@@ -44,7 +40,6 @@ import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
 import org.apache.flink.streaming.util.keys.KeySelectorUtil;
@@ -64,54 +59,34 @@ public class WindowedDataStream<OUT> {
 	protected DataStream<OUT> dataStream;
 
 	protected boolean isLocal = false;
-	protected boolean isCentral = true;
 
 	protected KeySelector<OUT, ?> discretizerKey;
 	protected KeySelector<OUT, ?> groupByKey;
 
-	protected List<WindowingHelper<OUT>> triggerHelpers;
-	protected List<WindowingHelper<OUT>> evictionHelpers;
-
-	protected LinkedList<TriggerPolicy<OUT>> userTriggers;
-	protected LinkedList<EvictionPolicy<OUT>> userEvicters;
-
-	protected WindowedDataStream() {
+	protected WindowingHelper<OUT> triggerHelper;
+	protected WindowingHelper<OUT> evictionHelper;
 
-	}
+	protected TriggerPolicy<OUT> userTrigger;
+	protected EvictionPolicy<OUT> userEvicter;
 
-	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
+	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT> policyHelper) {
 		this.dataStream = dataStream.copy();
-		this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
-		for (WindowingHelper<OUT> helper : policyHelpers) {
-			this.triggerHelpers.add(helper);
-		}
+		this.triggerHelper = policyHelper;
 
 		if (dataStream instanceof GroupedDataStream) {
 			this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
-			// set all policies distributed
-			this.isCentral = false;
 		}
 	}
 
-	protected WindowedDataStream(DataStream<OUT> dataStream, List<TriggerPolicy<OUT>> triggers,
-			List<EvictionPolicy<OUT>> evicters) {
+	protected WindowedDataStream(DataStream<OUT> dataStream, TriggerPolicy<OUT> trigger,
+			EvictionPolicy<OUT> evicter) {
 		this.dataStream = dataStream.copy();
 
-		if (triggers != null) {
-			this.userTriggers = new LinkedList<TriggerPolicy<OUT>>();
-			this.userTriggers.addAll(triggers);
-		}
-
-		if (evicters != null) {
-			this.userEvicters = new LinkedList<EvictionPolicy<OUT>>();
-			this.userEvicters.addAll(evicters);
-		}
+		this.userTrigger = trigger;
+		this.userEvicter = evicter;
 
 		if (dataStream instanceof GroupedDataStream) {
 			this.discretizerKey = ((GroupedDataStream<OUT>) dataStream).keySelector;
-			// set all policies distributed
-			this.isCentral = false;
-
 		}
 	}
 
@@ -119,14 +94,16 @@ public class WindowedDataStream<OUT> {
 		this.dataStream = windowedDataStream.dataStream.copy();
 		this.discretizerKey = windowedDataStream.discretizerKey;
 		this.groupByKey = windowedDataStream.groupByKey;
-		this.triggerHelpers = windowedDataStream.triggerHelpers;
-		this.evictionHelpers = windowedDataStream.evictionHelpers;
-		this.userTriggers = windowedDataStream.userTriggers;
-		this.userEvicters = windowedDataStream.userEvicters;
-		this.isCentral = windowedDataStream.isCentral;
+		this.triggerHelper = windowedDataStream.triggerHelper;
+		this.evictionHelper = windowedDataStream.evictionHelper;
+		this.userTrigger = windowedDataStream.userTrigger;
+		this.userEvicter = windowedDataStream.userEvicter;
 		this.isLocal = windowedDataStream.isLocal;
 	}
 
+	public WindowedDataStream() {
+	}
+
 	public <F> F clean(F f) {
 		return dataStream.clean(f);
 	}
@@ -146,15 +123,13 @@ public class WindowedDataStream<OUT> {
 	 * @return The windowed data stream with triggering set
 	 */
 	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) {
+	public WindowedDataStream<OUT> every(WindowingHelper policyHelper) {
 		WindowedDataStream<OUT> ret = this.copy();
-		if (ret.evictionHelpers == null) {
-			ret.evictionHelpers = ret.triggerHelpers;
-			ret.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
-		}
-		for (WindowingHelper<OUT> helper : policyHelpers) {
-			ret.triggerHelpers.add(helper);
+		if (ret.evictionHelper == null) {
+			ret.evictionHelper = ret.triggerHelper;
+			ret.triggerHelper = policyHelper;
 		}
+
 		return ret;
 	}
 
@@ -238,11 +213,11 @@ public class WindowedDataStream<OUT> {
 		StreamInvokable<OUT, StreamWindow<OUT>> discretizer;
 
 		if (discretizerKey == null) {
-			discretizer = new StreamDiscretizer<OUT>(getTriggers(), getEvicters());
+			discretizer = new StreamDiscretizer<OUT>(getTrigger(), getEvicter());
 		} else {
 			discretizer = new GroupedStreamDiscretizer<OUT>(discretizerKey,
-					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
-					getCentralEvicters());
+					(CloneableTriggerPolicy<OUT>) getTrigger(),
+					(CloneableEvictionPolicy<OUT>) getEvicter());
 		}
 
 		int parallelism = isLocal || (discretizerKey != null) ? dataStream.environment
@@ -537,103 +512,32 @@ public class WindowedDataStream<OUT> {
 		return reduceWindow(aggregator);
 	}
 
-	protected LinkedList<TriggerPolicy<OUT>> getTriggers() {
-
-		LinkedList<TriggerPolicy<OUT>> triggers = new LinkedList<TriggerPolicy<OUT>>();
-
-		if (triggerHelpers != null) {
-			for (WindowingHelper<OUT> helper : triggerHelpers) {
-				triggers.add(helper.toTrigger());
-			}
-		}
-
-		if (userTriggers != null) {
-			triggers.addAll(userTriggers);
-		}
-
-		return triggers;
-
-	}
-
-	protected LinkedList<EvictionPolicy<OUT>> getEvicters() {
+	protected TriggerPolicy<OUT> getTrigger() {
 
-		LinkedList<EvictionPolicy<OUT>> evicters = new LinkedList<EvictionPolicy<OUT>>();
-
-		if (evictionHelpers != null) {
-			for (WindowingHelper<OUT> helper : evictionHelpers) {
-				evicters.add(helper.toEvict());
-			}
+		if (triggerHelper != null) {
+			return triggerHelper.toTrigger();
+		} else if (userTrigger != null) {
+			return userTrigger;
 		} else {
-			if (userEvicters == null) {
-				boolean notOnlyTime = false;
-				for (WindowingHelper<OUT> helper : triggerHelpers) {
-					if (helper instanceof Time<?>) {
-						evicters.add(helper.toEvict());
-					} else {
-						notOnlyTime = true;
-					}
-				}
-				if (notOnlyTime) {
-					evicters.add(new TumblingEvictionPolicy<OUT>());
-				}
-			}
-		}
-
-		if (userEvicters != null) {
-			evicters.addAll(userEvicters);
-		}
-
-		return evicters;
-	}
-
-	protected LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
-		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
-		if (isCentral) {
-			cTriggers.addAll(getTriggers());
-		} else {
-			for (TriggerPolicy<OUT> trigger : getTriggers()) {
-				if (trigger instanceof TimeTriggerPolicy) {
-					cTriggers.add(trigger);
-				}
-			}
-		}
-		return cTriggers;
-	}
-
-	protected LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
-		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;
-
-		if (!isCentral) {
-			dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
-			for (TriggerPolicy<OUT> trigger : getTriggers()) {
-				if (!(trigger instanceof TimeTriggerPolicy)) {
-					dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
-				}
-			}
+			throw new RuntimeException("Trigger must not be null");
 		}
 
-		return dTriggers;
 	}
 
-	protected LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
-		LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;
+	protected EvictionPolicy<OUT> getEvicter() {
 
-		if (!isCentral) {
-			evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
-			for (EvictionPolicy<OUT> evicter : getEvicters()) {
-				evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+		if (evictionHelper != null) {
+			return evictionHelper.toEvict();
+		} else if (userEvicter == null) {
+			if (triggerHelper instanceof Time) {
+				return triggerHelper.toEvict();
+			} else {
+				return new TumblingEvictionPolicy<OUT>();
 			}
-		}
-
-		return evicters;
-	}
-
-	protected LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
-		if (isCentral) {
-			return getEvicters();
 		} else {
-			return null;
+			return userEvicter;
 		}
+
 	}
 
 	/**
@@ -664,7 +568,7 @@ public class WindowedDataStream<OUT> {
 	protected static class WindowKey<R> implements KeySelector<StreamWindow<R>, Integer> {
 
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public Integer getKey(StreamWindow<R> value) throws Exception {
 			return value.windowID;

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
index f6e7052..7feeac8 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/StreamInvokable.java
@@ -58,7 +58,7 @@ public abstract class StreamInvokable<IN, OUT> implements Serializable {
 	protected IN nextObject;
 	protected boolean isMutable;
 
-	protected Collector<OUT> collector;
+	public Collector<OUT> collector;
 	protected Function userFunction;
 	protected volatile boolean isRunning;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
index 6178f48..efd2e06 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -18,57 +18,14 @@
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.Map;
-import java.util.NoSuchElementException;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.functions.KeySelector;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
-import org.apache.flink.streaming.api.streamrecord.StreamRecord;
-import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
-import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
 
-/**
- * This invokable allows windowing based on {@link TriggerPolicy} and
- * {@link EvictionPolicy} instances including their active and cloneable
- * versions. It is additionally aware of the creation of windows per group.
- * 
- * A {@link KeySelector} is used to specify the key position or key extraction.
- * The {@link ReduceFunction} will be executed on each group separately.
- * Policies might either be centralized or distributed. It is not possible to
- * use central and distributed eviction policies at the same time. A distributed
- * policy have to be a {@link CloneableTriggerPolicy} or
- * {@link CloneableEvictionPolicy} as it will be cloned to have separated
- * instances for each group. At the startup time the distributed policies will
- * be stored as sample, and only clones of them will be used to maintain the
- * groups. Therefore, each group starts with the initial policy states.
- * 
- * While a distributed policy only gets notified with the elements belonging to
- * the respective group, a centralized policy get notified with all arriving
- * elements. When a centralized trigger occurred, all groups get triggered. This
- * is done by submitting the element which caused the trigger as real element to
- * the groups it belongs to and as fake element to all other groups. Within the
- * groups the element might be further processed, causing more triggers,
- * prenotifications of active distributed policies and evictions like usual.
- * 
- * Central policies can be instance of {@link ActiveTriggerPolicy} and also
- * implement the
- * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
- * method. Fake elements created on prenotification will be forwarded to all
- * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
- * it forwards/distributed calls all groups.
- * 
- * @param <IN>
- *            The type of input elements handled by this operator invokable.
- */
 public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {
 
 	/**
@@ -78,239 +35,50 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 
 	private KeySelector<IN, ?> keySelector;
 	private Configuration parameters;
-	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
-	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
-	private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
-	private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
-	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies;
-	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies;
-	private Map<Object, StreamDiscretizer<IN>> windowingGroups;
-	private LinkedList<Thread> activePolicyThreads;
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
-	private LinkedList<StreamDiscretizer<IN>> deleteOrderForCentralEviction;
+	private CloneableTriggerPolicy<IN> triggerPolicy;
+	private CloneableEvictionPolicy<IN> evictionPolicy;
+
+	private Map<Object, StreamDiscretizer<IN>> groupedDiscretizers;
 
-	/**
-	 * This constructor creates an instance of the grouped windowing invokable.
-	 * 
-	 * A {@link KeySelector} is used to specify the key position or key
-	 * extraction. The {@link ReduceFunction} will be executed on each group
-	 * separately. Policies might either be centralized or distributed. It is
-	 * not possible to use central and distributed eviction policies at the same
-	 * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or
-	 * {@link CloneableEvictionPolicy} as it will be cloned to have separated
-	 * instances for each group. At the startup time the distributed policies
-	 * will be stored as sample, and only clones of them will be used to
-	 * maintain the groups. Therefore, each group starts with the initial policy
-	 * states.
-	 * 
-	 * While a distributed policy only gets notified with the elements belonging
-	 * to the respective group, a centralized policy get notified with all
-	 * arriving elements. When a centralized trigger occurred, all groups get
-	 * triggered. This is done by submitting the element which caused the
-	 * trigger as real element to the groups it belongs to and as fake element
-	 * to all other groups. Within the groups the element might be further
-	 * processed, causing more triggers, prenotifications of active distributed
-	 * policies and evictions like usual.
-	 * 
-	 * Central policies can be instance of {@link ActiveTriggerPolicy} and also
-	 * implement the
-	 * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
-	 * method. Fake elements created on prenotification will be forwarded to all
-	 * groups. The {@link ActiveTriggerCallback} is also implemented in a way,
-	 * that it forwards/distributed calls all groups.
-	 * 
-	 * @param userFunction
-	 *            The user defined function.
-	 * @param keySelector
-	 *            A key selector to extract the key for the groups from the
-	 *            input data.
-	 * @param distributedTriggerPolicies
-	 *            Trigger policies to be distributed and maintained individually
-	 *            within each group.
-	 * @param distributedEvictionPolicies
-	 *            Eviction policies to be distributed and maintained
-	 *            individually within each group. Note that there cannot be
-	 *            both, central and distributed eviction policies at the same
-	 *            time.
-	 * @param centralTriggerPolicies
-	 *            Trigger policies which will only exist once at a central
-	 *            place. In case a central policy triggers, it will cause all
-	 *            groups to be emitted. (Remark: Empty groups cannot be emitted.
-	 *            If only one element is contained a group, this element itself
-	 *            is returned as aggregated result.)
-	 * @param centralEvictionPolicies
-	 *            Eviction which will only exist once at a central place. Note
-	 *            that there cannot be both, central and distributed eviction
-	 *            policies at the same time. The central eviction policy will
-	 *            work on an simulated element buffer containing all elements no
-	 *            matter which group they belong to.
-	 */
 	public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
-			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
-			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
-			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
-			LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) {
+			CloneableTriggerPolicy<IN> triggerPolicy, CloneableEvictionPolicy<IN> evictionPolicy) {
 
 		super(null);
 
 		this.keySelector = keySelector;
 
-		// handle the triggers
-		if (centralTriggerPolicies != null) {
-			this.centralTriggerPolicies = centralTriggerPolicies;
-			this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
-
-			for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
-				if (trigger instanceof ActiveTriggerPolicy) {
-					this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
-				}
-			}
-		} else {
-			this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-		}
-
-		if (distributedTriggerPolicies != null) {
-			this.distributedTriggerPolicies = distributedTriggerPolicies;
-		} else {
-			this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
-		}
-
-		if (distributedEvictionPolicies != null) {
-			this.distributedEvictionPolicies = distributedEvictionPolicies;
-		} else {
-			this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
-		}
-
-		this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
-
-		if (centralEvictionPolicies != null) {
-			this.centralEvictionPolicies = centralEvictionPolicies;
-
-			for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
-				if (eviction instanceof ActiveEvictionPolicy) {
-					this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
-				}
-			}
-		} else {
-			this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
-		}
-
-		this.windowingGroups = new HashMap<Object, StreamDiscretizer<IN>>();
-		this.activePolicyThreads = new LinkedList<Thread>();
-		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-		this.deleteOrderForCentralEviction = new LinkedList<StreamDiscretizer<IN>>();
-
-		// check that not both, central and distributed eviction, is used at the
-		// same time.
-		if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) {
-			throw new UnsupportedOperationException(
-					"You can only use either central or distributed eviction policies but not both at the same time.");
-		}
+		this.triggerPolicy = triggerPolicy;
+		this.evictionPolicy = evictionPolicy;
 
-		// Check that there is at least one trigger and one eviction policy
-		if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) {
-			throw new UnsupportedOperationException(
-					"You have to define at least one eviction policy");
-		}
-		if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) {
-			throw new UnsupportedOperationException(
-					"You have to define at least one trigger policy");
-		}
+		this.groupedDiscretizers = new HashMap<Object, StreamDiscretizer<IN>>();
 
 	}
 
 	@Override
 	public void invoke() throws Exception {
-		// Prevent empty data streams
 		if (readNext() == null) {
 			throw new RuntimeException("DataStream must not be empty");
 		}
 
-		// Continuously run
 		while (nextRecord != null) {
-			StreamDiscretizer<IN> groupInvokable = windowingGroups.get(keySelector
-					.getKey(nextRecord.getObject()));
-			if (groupInvokable == null) {
-				groupInvokable = makeNewGroup(nextRecord);
-			}
-
-			// Run the precalls for central active triggers
-			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
-				Object[] result = trigger.preNotifyTrigger(nextRecord.getObject());
-				for (Object in : result) {
-
-					// If central eviction is used, handle it here
-					if (!activeCentralEvictionPolicies.isEmpty()) {
-						evictElements(centralActiveEviction(in));
-					}
-
-					// process in groups
-					for (StreamDiscretizer<IN> group : windowingGroups.values()) {
-						group.processFakeElement(in, trigger);
-						checkForEmptyGroupBuffer(group);
-					}
-				}
-			}
-
-			// Process non-active central triggers
-			for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
-				if (triggerPolicy.notifyTrigger(nextRecord.getObject())) {
-					currentTriggerPolicies.add(triggerPolicy);
-				}
-			}
-
-			if (currentTriggerPolicies.isEmpty()) {
 
-				// only add the element to its group
-				groupInvokable.processRealElement(nextRecord.getObject());
-				checkForEmptyGroupBuffer(groupInvokable);
+			Object key = keySelector.getKey(nextObject);
 
-				// If central eviction is used, handle it here
-				if (!centralEvictionPolicies.isEmpty()) {
-					evictElements(centralEviction(nextRecord.getObject(), false));
-					deleteOrderForCentralEviction.add(groupInvokable);
-				}
+			StreamDiscretizer<IN> groupDiscretizer = groupedDiscretizers.get(key);
 
-			} else {
-
-				// call user function for all groups
-				for (StreamDiscretizer<IN> group : windowingGroups.values()) {
-					if (group == groupInvokable) {
-						// process real with initialized policies
-						group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
-					} else {
-						// process like a fake but also initialized with
-						// policies
-						group.externalTriggerFakeElement(nextRecord.getObject(),
-								currentTriggerPolicies);
-					}
-
-					// remove group in case it has an empty buffer
-					// checkForEmptyGroupBuffer(group);
-				}
-
-				// If central eviction is used, handle it here
-				if (!centralEvictionPolicies.isEmpty()) {
-					evictElements(centralEviction(nextRecord.getObject(), true));
-					deleteOrderForCentralEviction.add(groupInvokable);
-				}
+			if (groupDiscretizer == null) {
+				groupDiscretizer = makeNewGroup(key);
+				groupedDiscretizers.put(key, groupDiscretizer);
 			}
 
-			// clear current trigger list
-			currentTriggerPolicies.clear();
+			groupDiscretizer.processRealElement(nextObject);
 
-			// read next record
 			readNext();
 		}
 
-		// Stop all remaining threads from policies
-		for (Thread t : activePolicyThreads) {
-			t.interrupt();
-		}
-
 		// finally trigger the buffer.
-		for (StreamDiscretizer<IN> group : windowingGroups.values()) {
-			group.emitFinalWindow(centralTriggerPolicies);
+		for (StreamDiscretizer<IN> group : groupedDiscretizers.values()) {
+			group.emitFinalWindow();
 		}
 
 	}
@@ -318,178 +86,21 @@ public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWind
 	/**
 	 * This method creates a new group. The method gets called in case an
 	 * element arrives which has a key which was not seen before. The method
-	 * created a nested {@link WindowInvokable} and therefore created clones of
-	 * all distributed trigger and eviction policies.
+	 * created a nested {@link StreamDiscretizer} and therefore created clones
+	 * of all distributed trigger and eviction policies.
 	 * 
-	 * @param element
-	 *            The element which leads to the generation of a new group
-	 *            (previously unseen key)
-	 * @throws Exception
-	 *             In case the {@link KeySelector} throws an exception in
-	 *             {@link KeySelector#getKey(Object)}, the exception is not
-	 *             catched by this method.
+	 * @param key
+	 *            The key of the new group.
 	 */
-	private StreamDiscretizer<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
-		// clone the policies
-		LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-		LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
-		for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) {
-			clonedDistributedTriggerPolicies.add(trigger.clone());
-		}
-		for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) {
-			clonedDistributedEvictionPolicies.add(eviction.clone());
-		}
+	private StreamDiscretizer<IN> makeNewGroup(Object key) throws Exception {
 
-		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(
-				clonedDistributedTriggerPolicies, clonedDistributedEvictionPolicies);
+		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(triggerPolicy.clone(),
+				evictionPolicy.clone());
 
-		groupDiscretizer.setup(taskContext);
+		groupDiscretizer.collector = taskContext.getOutputCollector();
 		groupDiscretizer.open(this.parameters);
-		windowingGroups.put(keySelector.getKey(element.getObject()), groupDiscretizer);
 
 		return groupDiscretizer;
 	}
 
-	@Override
-	public void open(Configuration parameters) throws Exception {
-		super.open(parameters);
-		this.parameters = parameters;
-		for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) {
-			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
-			if (target != null) {
-				Thread thread = new Thread(target);
-				activePolicyThreads.add(thread);
-				thread.start();
-			}
-		}
-	};
-
-	/**
-	 * This method is used to notify central eviction policies with a real
-	 * element.
-	 * 
-	 * @param input
-	 *            the real element to notify the eviction policy.
-	 * @param triggered
-	 *            whether a central trigger occurred or not.
-	 * @return The number of elements to be deleted from the buffer.
-	 */
-	private int centralEviction(IN input, boolean triggered) {
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEviction(input, triggered,
-					deleteOrderForCentralEviction.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
-		return currentMaxEviction;
-	}
-
-	/**
-	 * This method is used to notify active central eviction policies with a
-	 * fake element.
-	 * 
-	 * @param input
-	 *            the fake element to notify the active central eviction
-	 *            policies.
-	 * @return The number of elements to be deleted from the buffer.
-	 */
-	private int centralActiveEviction(Object input) {
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input,
-					deleteOrderForCentralEviction.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
-		return currentMaxEviction;
-	}
-
-	/**
-	 * This method is used in central eviction to delete a given number of
-	 * elements from the buffer.
-	 * 
-	 * @param numToEvict
-	 *            number of elements to delete from the virtual central element
-	 *            buffer.
-	 */
-	private void evictElements(int numToEvict) {
-		HashSet<StreamDiscretizer<IN>> usedGroups = new HashSet<StreamDiscretizer<IN>>();
-		for (; numToEvict > 0; numToEvict--) {
-			StreamDiscretizer<IN> currentGroup = deleteOrderForCentralEviction.getFirst();
-			// Do the eviction
-			currentGroup.evictFirst();
-			// Remember groups which possibly have an empty buffer after the
-			// eviction
-			usedGroups.add(currentGroup);
-			try {
-				deleteOrderForCentralEviction.removeFirst();
-			} catch (NoSuchElementException e) {
-				// when buffer is empty, ignore exception and stop deleting
-				break;
-			}
-
-		}
-
-		// Remove groups with empty buffer
-		for (StreamDiscretizer<IN> group : usedGroups) {
-			checkForEmptyGroupBuffer(group);
-		}
-	}
-
-	/**
-	 * Checks if the element buffer of a given windowing group is empty. If so,
-	 * the group will be deleted.
-	 * 
-	 * @param group
-	 *            The windowing group to be checked and and removed in case its
-	 *            buffer is empty.
-	 */
-	private void checkForEmptyGroupBuffer(StreamDiscretizer<IN> group) {
-		if (group.isBufferEmpty()) {
-			windowingGroups.remove(group);
-		}
-	}
-
-	/**
-	 * This callback class allows to handle the the callbacks done by threads
-	 * defined in active trigger policies
-	 * 
-	 * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-		private ActiveTriggerPolicy<IN> policy;
-
-		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-			this.policy = policy;
-		}
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-
-			// If central eviction is used, handle it here
-			if (!centralEvictionPolicies.isEmpty()) {
-				evictElements(centralActiveEviction(datapoint));
-			}
-
-			// handle element in groups
-			for (StreamDiscretizer<IN> group : windowingGroups.values()) {
-				group.processFakeElement(datapoint, policy);
-				checkForEmptyGroupBuffer(group);
-			}
-		}
-
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
index a63aee6..d5b4354 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -18,10 +18,8 @@
 package org.apache.flink.streaming.api.invokable.operator.windowing;
 
 import java.util.LinkedList;
-import java.util.List;
 import java.util.NoSuchElementException;
 
-import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.streaming.api.invokable.StreamInvokable;
 import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
 import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
@@ -36,286 +34,113 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 	 */
 	private static final long serialVersionUID = -8038984294071650730L;
 
-	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
-	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
-	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
-	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
-	private LinkedList<Thread> activePolicyTreads;
+	private TriggerPolicy<IN> triggerPolicy;
+	private EvictionPolicy<IN> evictionPolicy;
+	private boolean isActiveTrigger;
+	private boolean isActiveEviction;
+	private Thread activePolicyThread;
 	protected LinkedList<IN> buffer;
-	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
 
-	/**
-	 * This constructor created a windowing invokable using trigger and eviction
-	 * policies.
-	 * 
-	 * @param userFunction
-	 *            The user defined {@link ReduceFunction}
-	 * @param triggerPolicies
-	 *            A list of {@link TriggerPolicy}s and/or
-	 *            {@link ActiveTriggerPolicy}s
-	 * @param evictionPolicies
-	 *            A list of {@link EvictionPolicy}s and/or
-	 *            {@link ActiveEvictionPolicy}s
-	 */
-	public StreamDiscretizer(LinkedList<TriggerPolicy<IN>> triggerPolicies,
-			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+	public StreamDiscretizer(TriggerPolicy<IN> triggerPolicy, EvictionPolicy<IN> evictionPolicy) {
 		super(null);
 
-		this.triggerPolicies = triggerPolicies;
-		this.evictionPolicies = evictionPolicies;
+		this.triggerPolicy = triggerPolicy;
+		this.evictionPolicy = evictionPolicy;
 
-		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
-		for (TriggerPolicy<IN> tp : triggerPolicies) {
-			if (tp instanceof ActiveTriggerPolicy) {
-				activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
-			}
-		}
+		this.isActiveTrigger = triggerPolicy instanceof ActiveTriggerPolicy;
+		this.isActiveEviction = evictionPolicy instanceof ActiveEvictionPolicy;
 
-		activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
-		for (EvictionPolicy<IN> ep : evictionPolicies) {
-			if (ep instanceof ActiveEvictionPolicy) {
-				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
-			}
-		}
-
-		this.activePolicyTreads = new LinkedList<Thread>();
 		this.buffer = new LinkedList<IN>();
-		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
-	}
-
-	@Override
-	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
-		super.open(parameters);
-		for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
-			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
-			if (target != null) {
-				Thread thread = new Thread(target);
-				activePolicyTreads.add(thread);
-				thread.start();
-			}
-		}
-	}
-
-	/**
-	 * This class allows the active trigger threads to call back and push fake
-	 * elements at any time.
-	 */
-	private class WindowingCallback implements ActiveTriggerCallback {
-		private ActiveTriggerPolicy<IN> policy;
-
-		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
-			this.policy = policy;
-		}
-
-		@Override
-		public void sendFakeElement(Object datapoint) {
-			processFakeElement(datapoint, this.policy);
-		}
-
 	}
 
 	@Override
 	public void invoke() throws Exception {
 
-		// Prevent empty data streams
-		if (readNext() == null) {
-			throw new RuntimeException("DataStream must not be empty");
-		}
-
 		// Continuously run
-		while (nextRecord != null) {
-			processRealElement(nextRecord.getObject());
-
-			// Load next StreamRecord
-			readNext();
+		while (readNext() != null) {
+			processRealElement(nextObject);
 		}
 
-		// Stop all remaining threads from policies
-		for (Thread t : activePolicyTreads) {
-			t.interrupt();
+		if (activePolicyThread != null) {
+			activePolicyThread.interrupt();
 		}
 
-		// finally trigger the buffer.
-		emitFinalWindow(null);
+		emitFinalWindow();
 
 	}
 
 	/**
-	 * This method gets called in case of an grouped windowing in case central
-	 * trigger occurred and the arriving element causing the trigger is not part
-	 * of this group.
-	 * 
-	 * Remark: This is NOT the same as
-	 * {@link StreamDiscretizer#processFakeElement(Object, TriggerPolicy)}! Here
-	 * the eviction using active policies takes place after the call to the UDF.
-	 * Usually it is done before when fake elements get submitted. This special
-	 * behaviour is needed to allow the {@link GroupedStreamDiscretizer} to send
-	 * central triggers to all groups, even if the current element does not
-	 * belong to the group.
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link StreamDiscretizer#triggerOnFakeElement(Object)}
 	 * 
 	 * @param input
-	 *            a fake input element
-	 * @param policies
-	 *            the list of policies which caused the call with this fake
-	 *            element
+	 *            a real input element
 	 */
-	protected synchronized void externalTriggerFakeElement(IN input,
-			List<TriggerPolicy<IN>> policies) {
-
-		// Set the current triggers
-		currentTriggerPolicies.addAll(policies);
-
-		// emit
-		emitWindow();
-
-		// clear the flag collection
-		currentTriggerPolicies.clear();
-
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
+	protected synchronized void processRealElement(IN input) {
 
-		for (int i = 0; i < currentMaxEviction; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
+		if (isActiveTrigger) {
+			ActiveTriggerPolicy<IN> trigger = (ActiveTriggerPolicy<IN>) triggerPolicy;
+			Object[] result = trigger.preNotifyTrigger(input);
+			for (Object in : result) {
+				triggerOnFakeElement(in);
 			}
 		}
-	}
 
-	/**
-	 * This method processed an arrived fake element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processRealElement(Object)}
-	 * 
-	 * @param input
-	 *            a fake input element
-	 * @param currentPolicy
-	 *            the policy which produced this fake element
-	 */
-	protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) {
-
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
+		boolean isTriggered = false;
 
-		for (int i = 0; i < currentMaxEviction; i++) {
-			try {
-				buffer.removeFirst();
-			} catch (NoSuchElementException e) {
-				// In case no more elements are in the buffer:
-				// Prevent failure and stop deleting.
-				break;
-			}
+		if (triggerPolicy.notifyTrigger(input)) {
+			emitWindow();
+			isTriggered = true;
 		}
 
-		// Set the current trigger
-		currentTriggerPolicies.add(currentPolicy);
+		evict(input, isTriggered);
 
-		// emit
-		emitWindow();
+		buffer.add(input);
 
-		// clear the flag collection
-		currentTriggerPolicies.clear();
 	}
 
 	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processFakeElement(Object)}.
+	 * This method triggers on an arrived fake element The method is
+	 * synchronized to ensure that it cannot interleave with
+	 * {@link StreamDiscretizer#processRealElement(Object)}
 	 * 
 	 * @param input
-	 *            a real input element
-	 * @param triggerPolicies
-	 *            Allows to set trigger policies which are maintained
-	 *            externally. This is the case for central policies in
-	 *            {@link GroupedStreamDiscretizer}.
+	 *            a fake input element
 	 */
-	protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>> triggerPolicies) {
-		this.currentTriggerPolicies.addAll(triggerPolicies);
-		processRealElement(input);
+	protected synchronized void triggerOnFakeElement(Object input) {
+		activeEvict(input);
+		emitWindow();
 	}
 
 	/**
-	 * This method processed an arrived real element The method is synchronized
-	 * to ensure that it cannot interleave with
-	 * {@link StreamDiscretizer#processFakeElement(Object)}
-	 * 
-	 * @param input
-	 *            a real input element
+	 * This method emits the content of the buffer as a new {@link StreamWindow}
 	 */
-	protected synchronized void processRealElement(IN input) {
-
-		// Run the precalls to detect missed windows
-		for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
-			// Remark: In case multiple active triggers are present the ordering
-			// of the different fake elements returned by this triggers becomes
-			// a problem. This might lead to unexpected results...
-			// Should we limit the number of active triggers to 0 or 1?
-			Object[] result = trigger.preNotifyTrigger(input);
-			for (Object in : result) {
-				processFakeElement(in, trigger);
-			}
-		}
+	protected void emitWindow() {
+		StreamWindow<IN> currentWindow = new StreamWindow<IN>();
+		currentWindow.addAll(buffer);
+		collector.collect(currentWindow);
+	}
 
-		// Remember if a trigger occurred
-		boolean isTriggered = false;
+	private void activeEvict(Object input) {
+		int numToEvict = 0;
 
-		// Process the triggers
-		for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
-			if (triggerPolicy.notifyTrigger(input)) {
-				currentTriggerPolicies.add(triggerPolicy);
-			}
+		if (isActiveEviction) {
+			ActiveEvictionPolicy<IN> ep = (ActiveEvictionPolicy<IN>) evictionPolicy;
+			numToEvict = ep.notifyEvictionWithFakeElement(input, buffer.size());
 		}
 
-		// call user function
-		if (!currentTriggerPolicies.isEmpty()) {
-			// emit
-			emitWindow();
-
-			// clear the flag collection
-			currentTriggerPolicies.clear();
+		evictFromBuffer(numToEvict);
+	}
 
-			// remember trigger
-			isTriggered = true;
-		}
+	private void evict(IN input, boolean isTriggered) {
+		int numToEvict = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
 
-		// Process the evictions and take care of double evictions
-		// In case there are multiple eviction policies present,
-		// only the one with the highest return value is recognized.
-		int currentMaxEviction = 0;
-
-		for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
-			// use temporary variable to prevent multiple calls to
-			// notifyEviction
-			int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
-			if (tmp > currentMaxEviction) {
-				currentMaxEviction = tmp;
-			}
-		}
+		evictFromBuffer(numToEvict);
+	}
 
-		for (int i = 0; i < currentMaxEviction; i++) {
+	private void evictFromBuffer(int n) {
+		for (int i = 0; i < n; i++) {
 			try {
 				buffer.removeFirst();
 			} catch (NoSuchElementException e) {
@@ -324,64 +149,48 @@ public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>>
 				break;
 			}
 		}
-
-		// Add the current element to the buffer
-		buffer.add(input);
-
-	}
-
-	private void emitWindow() {
-		StreamWindow<IN> currentWindow = new StreamWindow<IN>();
-		currentWindow.addAll(buffer);
-		collector.collect(currentWindow);
 	}
 
 	/**
-	 * This method removes the first element from the element buffer. It is used
-	 * to provide central evictions in {@link GroupedStreamDiscretizer}
+	 * This function emits the partial windows at the end of the stream
 	 */
-	protected synchronized void evictFirst() {
-		try {
-			buffer.removeFirst();
-		} catch (NoSuchElementException e) {
-			// ignore exception
+	protected void emitFinalWindow() {
+		if (!buffer.isEmpty()) {
+			emitWindow();
 		}
 	}
 
-	/**
-	 * This method returns whether the element buffer is empty or not. It is
-	 * used to figure out if a group can be deleted or not when
-	 * {@link GroupedStreamDiscretizer} is used.
-	 * 
-	 * @return true in case the buffer is empty otherwise false.
-	 */
-	protected boolean isBufferEmpty() {
-		return buffer.isEmpty();
-	}
+	@Override
+	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
+		super.open(parameters);
 
-	/**
-	 * This method does the final reduce at the end of the stream and emits the
-	 * result.
-	 * 
-	 * @param centralTriggerPolicies
-	 *            Allows to set trigger policies which are maintained
-	 *            externally. This is the case for central policies in
-	 *            {@link GroupedStreamDiscretizer}.
-	 */
-	protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies) {
-		if (!buffer.isEmpty()) {
-			currentTriggerPolicies.clear();
+		if (isActiveTrigger) {
+			ActiveTriggerPolicy<IN> tp = (ActiveTriggerPolicy<IN>) triggerPolicy;
 
-			if (centralTriggerPolicies != null) {
-				currentTriggerPolicies.addAll(centralTriggerPolicies);
+			Runnable runnable = tp.createActiveTriggerRunnable(new WindowingCallback());
+			if (activePolicyThread != null) {
+				activePolicyThread = new Thread(runnable);
+				activePolicyThread.start();
 			}
+		}
+	}
 
-			for (TriggerPolicy<IN> policy : triggerPolicies) {
-				currentTriggerPolicies.add(policy);
-			}
+	@Override
+	public String toString() {
+		return buffer.toString();
+	}
 
-			emitWindow();
+	/**
+	 * This class allows the active trigger thread to call back and push fake
+	 * elements at any time.
+	 */
+	private class WindowingCallback implements ActiveTriggerCallback {
+
+		@Override
+		public void sendFakeElement(Object datapoint) {
+			triggerOnFakeElement(datapoint);
 		}
+
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
index b37bf47..51f2718 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamWindow.java
@@ -143,6 +143,11 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 	}
 
 	@Override
+	public boolean equals(Object o) {
+		return super.equals(o);
+	}
+
+	@Override
 	public void collect(T record) {
 		add(record);
 	}
@@ -153,6 +158,14 @@ public class StreamWindow<T> extends ArrayList<T> implements Collector<T> {
 
 	@Override
 	public String toString() {
-		return super.toString() + " " + windowID + " (" + numberOfParts + ")";
+		return super.toString();
+	}
+
+	public static <R> StreamWindow<R> fromElements(R... elements) {
+		StreamWindow<R> window = new StreamWindow<R>();
+		for (R element : elements) {
+			window.add(element);
+		}
+		return window;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
index 9f223c3..5f5e7d2 100644
--- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -43,6 +43,7 @@ public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
 
 	@Override
 	public void collect(StreamWindow<T> record) {
+		nextObject = record;
 		callUserFunctionAndLogException();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
deleted file mode 100755
index 757f6f6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import java.io.Serializable;
-
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
-import org.apache.flink.streaming.util.TestStreamEnvironment;
-import org.junit.Test;
-
-public class PrintTest implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-	private static final long MEMORYSIZE = 32;
-
-	private static final class IdentityMap implements MapFunction<Long, Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public Long map(Long value) throws Exception {
-			return value;
-		}
-	}
-
-	private static final class FilterAll implements FilterFunction<Long> {
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public boolean filter(Long value) throws Exception {
-			return true;
-		}
-	}
-
-	@Test
-	public void test() throws Exception {
-		StreamExecutionEnvironment env = new TestStreamEnvironment(1, MEMORYSIZE);
-		env.generateSequence(1, 10).map(new IdentityMap()).filter(new FilterAll()).print();
-		env.execute();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
new file mode 100644
index 0000000..8a6a7ce
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizerTest.java
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+import org.apache.flink.streaming.util.MockContext;
+import org.junit.Test;
+
+public class GroupedStreamDiscretizerTest {
+
+	KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
+
+		private static final long serialVersionUID = 1L;
+
+		@Override
+		public String getKey(Tuple2<Integer, String> value) throws Exception {
+			return value.f1;
+		}
+	};
+
+	/**
+	 * Test for not active distributed triggers with single field
+	 */
+	@Test
+	public void groupedDiscretizerTest() {
+
+		List<Integer> inputs = new ArrayList<Integer>();
+		inputs.add(1);
+		inputs.add(2);
+		inputs.add(2);
+		inputs.add(3);
+		inputs.add(4);
+		inputs.add(5);
+		inputs.add(10);
+		inputs.add(11);
+		inputs.add(11);
+
+		Set<StreamWindow<Integer>> expected = new HashSet<StreamWindow<Integer>>();
+		expected.add(StreamWindow.fromElements(2, 2));
+		expected.add(StreamWindow.fromElements(1, 3));
+		expected.add(StreamWindow.fromElements(5, 11));
+		expected.add(StreamWindow.fromElements(4, 10));
+		expected.add(StreamWindow.fromElements(11));
+
+		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
+			private static final long serialVersionUID = 1L;
+
+			@Override
+			public Integer getKey(Integer value) {
+				return value % 2;
+			}
+		};
+
+		CloneableTriggerPolicy<Integer> trigger = new CountTriggerPolicy<Integer>(2);
+		CloneableEvictionPolicy<Integer> eviction = new TumblingEvictionPolicy<Integer>();
+
+		GroupedStreamDiscretizer<Integer> discretizer = new GroupedStreamDiscretizer<Integer>(
+				keySelector, trigger, eviction);
+
+		List<StreamWindow<Integer>> result = MockContext.createAndExecute(discretizer, inputs);
+		assertEquals(expected, new HashSet<StreamWindow<Integer>>(result));
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/412779fa/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowInvokableTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowInvokableTest.java b/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowInvokableTest.java
deleted file mode 100644
index 1bbdcf6..0000000
--- a/flink-staging/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedWindowInvokableTest.java
+++ /dev/null
@@ -1,574 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api.invokable.operator.windowing;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.windowing.helper.Timestamp;
-import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
-import org.apache.flink.streaming.api.windowing.policy.ActiveCloneableEvictionPolicyWrapper;
-import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.CountTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeEvictionPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
-import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
-import org.apache.flink.streaming.util.MockContext;
-import org.junit.Test;
-
-public class GroupedWindowInvokableTest {
-
-	KeySelector<Tuple2<Integer, String>, ?> keySelector = new KeySelector<Tuple2<Integer, String>, String>() {
-
-		private static final long serialVersionUID = 1L;
-
-		@Override
-		public String getKey(Tuple2<Integer, String> value) throws Exception {
-			return value.f1;
-		}
-	};
-
-	/**
-	 * Tests that illegal arguments result in failure. The following cases are
-	 * tested: 1) having no trigger 2) having no eviction 3) having neither
-	 * eviction nor trigger 4) having both, central and distributed eviction.
-	 */
-	@Test
-	public void testGroupedWindowInvokableFailTest() {
-
-		// create dummy reduce function
-		ReduceFunction<Object> userFunction = new ReduceFunction<Object>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Object reduce(Object value1, Object value2) throws Exception {
-				return null;
-			}
-		};
-
-		// create dummy keySelector
-		KeySelector<Object, Object> keySelector = new KeySelector<Object, Object>() {
-
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Object getKey(Object value) throws Exception {
-				return null;
-			}
-		};
-
-		// create policy lists
-		LinkedList<CloneableEvictionPolicy<Object>> distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<Object>>();
-		LinkedList<CloneableTriggerPolicy<Object>> distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<Object>>();
-		LinkedList<EvictionPolicy<Object>> centralEvictionPolicies = new LinkedList<EvictionPolicy<Object>>();
-		LinkedList<TriggerPolicy<Object>> centralTriggerPolicies = new LinkedList<TriggerPolicy<Object>>();
-
-		// empty trigger and policy lists should fail
-		try {
-			new GroupedStreamDiscretizer<Object, Object>(userFunction, keySelector,
-					distributedTriggerPolicies, distributedEvictionPolicies,
-					centralTriggerPolicies, centralEvictionPolicies);
-			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (1)");
-		} catch (UnsupportedOperationException e) {
-			// that's the expected case
-		}
-
-		// null for trigger and policy lists should fail
-		try {
-			new GroupedStreamDiscretizer<Object, Object>(userFunction, keySelector, null, null, null,
-					null);
-			fail("Creating instance without any trigger or eviction policy should cause an UnsupportedOperationException but didn't. (2)");
-		} catch (UnsupportedOperationException e) {
-			// that's the expected case
-		}
-
-		// empty eviction should still fail
-		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
-		distributedTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
-		try {
-			new GroupedStreamDiscretizer<Object, Object>(userFunction, keySelector,
-					distributedTriggerPolicies, distributedEvictionPolicies,
-					centralTriggerPolicies, centralEvictionPolicies);
-			fail("Creating instance without any eviction policy should cause an UnsupportedOperationException but didn't. (3)");
-		} catch (UnsupportedOperationException e) {
-			// that's the expected case
-		}
-
-		// empty trigger should still fail
-		centralTriggerPolicies.clear();
-		distributedTriggerPolicies.clear();
-		centralEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
-		try {
-			new GroupedStreamDiscretizer<Object, Object>(userFunction, keySelector,
-					distributedTriggerPolicies, distributedEvictionPolicies,
-					centralTriggerPolicies, centralEvictionPolicies);
-			fail("Creating instance without any trigger policy should cause an UnsupportedOperationException but didn't. (4)");
-		} catch (UnsupportedOperationException e) {
-			// that's the expected case
-		}
-
-		// having both, central and distributed eviction, at the same time
-		// should fail
-		centralTriggerPolicies.add(new CountTriggerPolicy<Object>(5));
-		distributedEvictionPolicies.add(new CountEvictionPolicy<Object>(5));
-		try {
-			new GroupedStreamDiscretizer<Object, Object>(userFunction, keySelector,
-					distributedTriggerPolicies, distributedEvictionPolicies,
-					centralTriggerPolicies, centralEvictionPolicies);
-			fail("Creating instance with central and distributed eviction should cause an UnsupportedOperationException but didn't. (4)");
-		} catch (UnsupportedOperationException e) {
-			// that's the expected case
-		}
-
-	}
-
-	/**
-	 * Test for not active distributed triggers with single field
-	 */
-	@Test
-	public void testGroupedWindowInvokableDistributedTriggerSimple() {
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(1);
-		inputs.add(5);
-		inputs.add(5);
-		inputs.add(5);
-		inputs.add(1);
-		inputs.add(1);
-		inputs.add(5);
-		inputs.add(1);
-		inputs.add(5);
-
-		List<Integer> expectedDistributedEviction = new ArrayList<Integer>();
-		expectedDistributedEviction.add(15);
-		expectedDistributedEviction.add(3);
-		expectedDistributedEviction.add(3);
-		expectedDistributedEviction.add(15);
-
-		List<Integer> expectedCentralEviction = new ArrayList<Integer>();
-		expectedCentralEviction.add(2);
-		expectedCentralEviction.add(5);
-		expectedCentralEviction.add(15);
-		expectedCentralEviction.add(2);
-		expectedCentralEviction.add(5);
-		expectedCentralEviction.add(2);
-		expectedCentralEviction.add(5);
-		expectedCentralEviction.add(1);
-		expectedCentralEviction.add(5);
-
-		LinkedList<CloneableTriggerPolicy<Integer>> triggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
-		// Trigger on every 2nd element, but the first time after the 3rd
-		triggers.add(new CountTriggerPolicy<Integer>(2, -1));
-
-		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
-		// On every 2nd element, remove the oldest 2 elements, but the first
-		// time after the 3rd element
-		evictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
-
-		LinkedList<TriggerPolicy<Integer>> centralTriggers = new LinkedList<TriggerPolicy<Integer>>();
-
-		ReduceFunction<Integer> reduceFunction = new ReduceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer value1, Integer value2) throws Exception {
-				return value1 + value2;
-			}
-		};
-
-		KeySelector<Integer, Integer> keySelector = new KeySelector<Integer, Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer getKey(Integer value) {
-				return value;
-			}
-		};
-
-		GroupedStreamDiscretizer<Integer, Integer> invokable = new GroupedStreamDiscretizer<Integer, Integer>(
-				reduceFunction, keySelector, triggers, evictions, centralTriggers, null);
-
-		List<Integer> result = MockContext.createAndExecute(invokable, inputs);
-
-		List<Integer> actual = new LinkedList<Integer>();
-		for (Integer current : result) {
-			actual.add(current);
-		}
-
-		assertEquals(new HashSet<Integer>(expectedDistributedEviction),
-				new HashSet<Integer>(actual));
-		assertEquals(expectedDistributedEviction.size(), actual.size());
-
-		// Run test with central eviction
-		triggers.clear();
-		centralTriggers.add(new CountTriggerPolicy<Integer>(2, -1));
-		LinkedList<EvictionPolicy<Integer>> centralEvictions = new LinkedList<EvictionPolicy<Integer>>();
-		centralEvictions.add(new CountEvictionPolicy<Integer>(2, 2, -1));
-
-		invokable = new GroupedStreamDiscretizer<Integer, Integer>(reduceFunction, keySelector,
-				triggers, null, centralTriggers, centralEvictions);
-
-		result = MockContext.createAndExecute(invokable, inputs);
-		actual = new LinkedList<Integer>();
-		for (Integer current : result) {
-			actual.add(current);
-		}
-
-		assertEquals(new HashSet<Integer>(expectedCentralEviction), new HashSet<Integer>(actual));
-		assertEquals(expectedCentralEviction.size(), actual.size());
-	}
-
-	/**
-	 * Test for non active distributed triggers with separated key field
-	 */
-	@Test
-	public void testGroupedWindowInvokableDistributedTriggerComplex() {
-		List<Tuple2<Integer, String>> inputs2 = new ArrayList<Tuple2<Integer, String>>();
-		inputs2.add(new Tuple2<Integer, String>(1, "a"));
-		inputs2.add(new Tuple2<Integer, String>(0, "b"));
-		inputs2.add(new Tuple2<Integer, String>(2, "a"));
-		inputs2.add(new Tuple2<Integer, String>(-1, "a"));
-		inputs2.add(new Tuple2<Integer, String>(-2, "a"));
-		inputs2.add(new Tuple2<Integer, String>(10, "a"));
-		inputs2.add(new Tuple2<Integer, String>(2, "b"));
-		inputs2.add(new Tuple2<Integer, String>(1, "a"));
-
-		List<Tuple2<Integer, String>> expected2 = new ArrayList<Tuple2<Integer, String>>();
-		expected2.add(new Tuple2<Integer, String>(-1, "a"));
-		expected2.add(new Tuple2<Integer, String>(-2, "a"));
-		expected2.add(new Tuple2<Integer, String>(0, "b"));
-
-		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
-		// Trigger on every 2nd element, but the first time after the 3rd
-		triggers.add(new CountTriggerPolicy<Tuple2<Integer, String>>(3));
-
-		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
-		// On every 2nd element, remove the oldest 2 elements, but the first
-		// time after the 3rd element
-		evictions.add(new TumblingEvictionPolicy<Tuple2<Integer, String>>());
-
-		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> centralTriggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
-
-		GroupedStreamDiscretizer<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable2 = new GroupedStreamDiscretizer<Tuple2<Integer, String>, Tuple2<Integer, String>>(
-				new ReduceFunction<Tuple2<Integer, String>>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
-							Tuple2<Integer, String> value2) throws Exception {
-						if (value1.f0 <= value2.f0) {
-							return value1;
-						} else {
-							return value2;
-						}
-					}
-				}, keySelector, triggers, evictions, centralTriggers, null);
-
-		List<Tuple2<Integer, String>> result = MockContext.createAndExecute(invokable2, inputs2);
-
-		List<Tuple2<Integer, String>> actual2 = new LinkedList<Tuple2<Integer, String>>();
-		for (Tuple2<Integer, String> current : result) {
-			actual2.add(current);
-		}
-
-		assertEquals(new HashSet<Tuple2<Integer, String>>(expected2),
-				new HashSet<Tuple2<Integer, String>>(actual2));
-		assertEquals(expected2.size(), actual2.size());
-	}
-
-	/**
-	 * Test for active centralized trigger
-	 */
-	@Test
-	public void testGroupedWindowInvokableCentralActiveTrigger() {
-
-		List<Tuple2<Integer, String>> inputs = new ArrayList<Tuple2<Integer, String>>();
-		inputs.add(new Tuple2<Integer, String>(1, "a"));
-		inputs.add(new Tuple2<Integer, String>(1, "b"));
-		inputs.add(new Tuple2<Integer, String>(1, "c"));
-		inputs.add(new Tuple2<Integer, String>(2, "a"));
-		inputs.add(new Tuple2<Integer, String>(2, "b"));
-		inputs.add(new Tuple2<Integer, String>(2, "c"));
-		inputs.add(new Tuple2<Integer, String>(2, "b"));
-		inputs.add(new Tuple2<Integer, String>(2, "a"));
-		inputs.add(new Tuple2<Integer, String>(2, "c"));
-		inputs.add(new Tuple2<Integer, String>(3, "c"));
-		inputs.add(new Tuple2<Integer, String>(3, "a"));
-		inputs.add(new Tuple2<Integer, String>(3, "b"));
-		inputs.add(new Tuple2<Integer, String>(4, "a"));
-		inputs.add(new Tuple2<Integer, String>(4, "b"));
-		inputs.add(new Tuple2<Integer, String>(4, "c"));
-		inputs.add(new Tuple2<Integer, String>(5, "c"));
-		inputs.add(new Tuple2<Integer, String>(5, "a"));
-		inputs.add(new Tuple2<Integer, String>(5, "b"));
-		inputs.add(new Tuple2<Integer, String>(10, "b"));
-		inputs.add(new Tuple2<Integer, String>(10, "a"));
-		inputs.add(new Tuple2<Integer, String>(10, "c"));
-		inputs.add(new Tuple2<Integer, String>(11, "a"));
-		inputs.add(new Tuple2<Integer, String>(11, "a"));
-		inputs.add(new Tuple2<Integer, String>(11, "c"));
-		inputs.add(new Tuple2<Integer, String>(11, "c"));
-		inputs.add(new Tuple2<Integer, String>(11, "b"));
-		inputs.add(new Tuple2<Integer, String>(11, "b"));
-
-		// Expected result:
-		// For each group (a,b and c):
-		// 1,2,3,4-3,4,5,6-5,6,7,8-7,8,9,10-9,10,11
-		// 12-12-5-10-32
-
-		List<Tuple2<Integer, String>> expected = new ArrayList<Tuple2<Integer, String>>();
-		expected.add(new Tuple2<Integer, String>(12, "a"));
-		expected.add(new Tuple2<Integer, String>(12, "b"));
-		expected.add(new Tuple2<Integer, String>(12, "c"));
-		expected.add(new Tuple2<Integer, String>(12, "a"));
-		expected.add(new Tuple2<Integer, String>(12, "b"));
-		expected.add(new Tuple2<Integer, String>(12, "c"));
-		expected.add(new Tuple2<Integer, String>(5, "a"));
-		expected.add(new Tuple2<Integer, String>(5, "b"));
-		expected.add(new Tuple2<Integer, String>(5, "c"));
-		expected.add(new Tuple2<Integer, String>(10, "a"));
-		expected.add(new Tuple2<Integer, String>(10, "b"));
-		expected.add(new Tuple2<Integer, String>(10, "c"));
-		expected.add(new Tuple2<Integer, String>(32, "a"));
-		expected.add(new Tuple2<Integer, String>(32, "b"));
-		expected.add(new Tuple2<Integer, String>(32, "c"));
-
-		Timestamp<Tuple2<Integer, String>> myTimeStamp = new Timestamp<Tuple2<Integer, String>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public long getTimestamp(Tuple2<Integer, String> value) {
-				return value.f0;
-			}
-		};
-
-		TimestampWrapper<Tuple2<Integer, String>> myTimeStampWrapper = new TimestampWrapper<Tuple2<Integer, String>>(
-				myTimeStamp, 1);
-
-		ReduceFunction<Tuple2<Integer, String>> myReduceFunction = new ReduceFunction<Tuple2<Integer, String>>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Tuple2<Integer, String> reduce(Tuple2<Integer, String> value1,
-					Tuple2<Integer, String> value2) throws Exception {
-				return new Tuple2<Integer, String>(value1.f0 + value2.f0, value1.f1);
-			}
-		};
-
-		LinkedList<TriggerPolicy<Tuple2<Integer, String>>> triggers = new LinkedList<TriggerPolicy<Tuple2<Integer, String>>>();
-		// Trigger every 2 time units but delay the first trigger by 2 (First
-		// trigger after 4, then every 2)
-		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
-
-		LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>> evictions = new LinkedList<CloneableEvictionPolicy<Tuple2<Integer, String>>>();
-		// Always delete all elements older then 4
-		evictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
-
-		LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Tuple2<Integer, String>>>();
-
-		GroupedStreamDiscretizer<Tuple2<Integer, String>, Tuple2<Integer, String>> invokable = new GroupedStreamDiscretizer<Tuple2<Integer, String>, Tuple2<Integer, String>>(
-				myReduceFunction, keySelector, distributedTriggers, evictions, triggers, null);
-
-		ArrayList<Tuple2<Integer, String>> result = new ArrayList<Tuple2<Integer, String>>();
-		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
-			result.add(t);
-		}
-
-		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
-				new HashSet<Tuple2<Integer, String>>(result));
-		assertEquals(expected.size(), result.size());
-
-		// repeat the test with central eviction. The result should be the same.
-		triggers.clear();
-		triggers.add(new TimeTriggerPolicy<Tuple2<Integer, String>>(2L, myTimeStampWrapper, 2L));
-		evictions.clear();
-		LinkedList<EvictionPolicy<Tuple2<Integer, String>>> centralEvictions = new LinkedList<EvictionPolicy<Tuple2<Integer, String>>>();
-		centralEvictions.add(new TimeEvictionPolicy<Tuple2<Integer, String>>(4L, myTimeStampWrapper));
-
-		invokable = new GroupedStreamDiscretizer<Tuple2<Integer, String>, Tuple2<Integer, String>>(
-				myReduceFunction, keySelector, distributedTriggers, evictions, triggers,
-				centralEvictions);
-
-		result = new ArrayList<Tuple2<Integer, String>>();
-		for (Tuple2<Integer, String> t : MockContext.createAndExecute(invokable, inputs)) {
-			result.add(t);
-		}
-
-		assertEquals(new HashSet<Tuple2<Integer, String>>(expected),
-				new HashSet<Tuple2<Integer, String>>(result));
-		assertEquals(expected.size(), result.size());
-	}
-
-	/**
-	 * Test for multiple centralized trigger
-	 */
-	@Test
-	public void testGroupedWindowInvokableMultipleCentralTrigger() {
-		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
-		triggers.add(new CountTriggerPolicy<Integer>(8));
-		triggers.add(new CountTriggerPolicy<Integer>(5));
-
-		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
-		// The active wrapper causes eviction even on (fake) elements which
-		// triggered, but does not belong to the group.
-		evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
-				new TumblingEvictionPolicy<Integer>()));
-
-		LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(2);
-		inputs.add(1);
-		// 1st Trigger: 2;6
-		inputs.add(2);
-		inputs.add(1);
-		inputs.add(2);
-		// 2nd Trigger: 1;4
-		inputs.add(2);
-		inputs.add(1);
-		// Final: 1,2
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(2);
-		expected.add(6);
-		expected.add(4);
-		expected.add(1);
-		expected.add(2);
-		expected.add(1);
-
-		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer value1, Integer value2) throws Exception {
-				return value1 + value2;
-			}
-		};
-
-		GroupedStreamDiscretizer<Integer, Integer> invokable = new GroupedStreamDiscretizer<Integer, Integer>(
-				myReduceFunction, new KeySelector<Integer, Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Integer value) {
-						return value;
-					}
-				}, distributedTriggers, evictions, triggers, null);
-
-		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
-			result.add(t);
-		}
-
-		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
-		assertEquals(expected.size(), result.size());
-	}
-
-	/**
-	 * Test for combination of centralized trigger and distributed trigger at
-	 * the same time
-	 */
-	@Test
-	public void testGroupedWindowInvokableCentralAndDistrTrigger() {
-		LinkedList<TriggerPolicy<Integer>> triggers = new LinkedList<TriggerPolicy<Integer>>();
-		triggers.add(new CountTriggerPolicy<Integer>(8));
-		triggers.add(new CountTriggerPolicy<Integer>(5));
-
-		LinkedList<CloneableEvictionPolicy<Integer>> evictions = new LinkedList<CloneableEvictionPolicy<Integer>>();
-		// The active wrapper causes eviction even on (fake) elements which
-		// triggered, but does not belong to the group.
-		evictions.add(new ActiveCloneableEvictionPolicyWrapper<Integer>(
-				new TumblingEvictionPolicy<Integer>()));
-
-		LinkedList<CloneableTriggerPolicy<Integer>> distributedTriggers = new LinkedList<CloneableTriggerPolicy<Integer>>();
-		distributedTriggers.add(new CountTriggerPolicy<Integer>(2));
-
-		List<Integer> inputs = new ArrayList<Integer>();
-		inputs.add(1);
-		inputs.add(2);
-		inputs.add(2);
-		// local on 2 => 4
-		inputs.add(2);
-		inputs.add(1);
-		// local on 1 => 2
-		// and 1st Central: 2;2
-		// SUMS up to 2;2
-		inputs.add(2);
-		// local on 2 => 2
-		inputs.add(1);
-		inputs.add(2);
-		// 2nd Central: 1;2
-		inputs.add(2);
-		inputs.add(1);
-		// Final: 1,2
-
-		List<Integer> expected = new ArrayList<Integer>();
-		expected.add(4);
-		expected.add(2);
-		expected.add(2);
-		expected.add(2);
-		expected.add(1);
-		expected.add(2);
-		expected.add(1);
-		expected.add(2);
-
-		ReduceFunction<Integer> myReduceFunction = new ReduceFunction<Integer>() {
-			private static final long serialVersionUID = 1L;
-
-			@Override
-			public Integer reduce(Integer value1, Integer value2) throws Exception {
-				return value1 + value2;
-			}
-		};
-
-		GroupedStreamDiscretizer<Integer, Integer> invokable = new GroupedStreamDiscretizer<Integer, Integer>(
-				myReduceFunction, new KeySelector<Integer, Integer>() {
-					private static final long serialVersionUID = 1L;
-
-					@Override
-					public Integer getKey(Integer value) {
-						return value;
-					}
-				}, distributedTriggers, evictions, triggers, null);
-
-		ArrayList<Integer> result = new ArrayList<Integer>();
-		for (Integer t : MockContext.createAndExecute(invokable, inputs)) {
-			result.add(t);
-		}
-
-		assertEquals(new HashSet<Integer>(expected), new HashSet<Integer>(result));
-		assertEquals(expected.size(), result.size());
-	}
-
-}


Mime
View raw message