flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [05/20] flink git commit: [FLINK-1176] [streaming] Added invokables for modular windowing tranformations
Date Mon, 16 Feb 2015 14:25:31 GMT
[FLINK-1176] [streaming] Added invokables for modular windowing tranformations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1146f64c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1146f64c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1146f64c

Branch: refs/heads/master
Commit: 1146f64c01c89f6790aef7375eb60e389e0071f4
Parents: 667f819
Author: Gyula Fora <gyfora@apache.org>
Authored: Wed Feb 4 17:26:16 2015 +0100
Committer: mbalassi <mbalassi@apache.org>
Committed: Mon Feb 16 13:06:07 2015 +0100

----------------------------------------------------------------------
 .../windowing/GroupedStreamDiscretizer.java     | 495 +++++++++++++++++++
 .../operator/windowing/StreamDiscretizer.java   | 387 +++++++++++++++
 .../operator/windowing/WindowFlattener.java     |  49 ++
 .../operator/windowing/WindowMapper.java        |  57 +++
 .../operator/windowing/WindowMerger.java        |  69 +++
 .../operator/windowing/WindowPartitioner.java   |  74 +++
 .../operator/windowing/WindowReducer.java       |  62 +++
 7 files changed, 1193 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
new file mode 100644
index 0000000..6178f48
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/GroupedStreamDiscretizer.java
@@ -0,0 +1,495 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.streamrecord.StreamRecord;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+/**
+ * This invokable allows windowing based on {@link TriggerPolicy} and
+ * {@link EvictionPolicy} instances including their active and cloneable
+ * versions. It is additionally aware of the creation of windows per group.
+ * 
+ * A {@link KeySelector} is used to specify the key position or key extraction.
+ * The {@link ReduceFunction} will be executed on each group separately.
+ * Policies might either be centralized or distributed. It is not possible to
+ * use central and distributed eviction policies at the same time. A distributed
+ * policy have to be a {@link CloneableTriggerPolicy} or
+ * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+ * instances for each group. At the startup time the distributed policies will
+ * be stored as sample, and only clones of them will be used to maintain the
+ * groups. Therefore, each group starts with the initial policy states.
+ * 
+ * While a distributed policy only gets notified with the elements belonging to
+ * the respective group, a centralized policy get notified with all arriving
+ * elements. When a centralized trigger occurred, all groups get triggered. This
+ * is done by submitting the element which caused the trigger as real element to
+ * the groups it belongs to and as fake element to all other groups. Within the
+ * groups the element might be further processed, causing more triggers,
+ * prenotifications of active distributed policies and evictions like usual.
+ * 
+ * Central policies can be instance of {@link ActiveTriggerPolicy} and also
+ * implement the
+ * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+ * method. Fake elements created on prenotification will be forwarded to all
+ * groups. The {@link ActiveTriggerCallback} is also implemented in a way, that
+ * it forwards/distributed calls all groups.
+ * 
+ * @param <IN>
+ *            The type of input elements handled by this operator invokable.
+ */
+public class GroupedStreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {
+
+	/**
+	 * Auto-generated serial version UID
+	 */
+	private static final long serialVersionUID = -3469545957144404137L;
+
+	private KeySelector<IN, ?> keySelector;
+	private Configuration parameters;
+	private LinkedList<ActiveTriggerPolicy<IN>> activeCentralTriggerPolicies;
+	private LinkedList<TriggerPolicy<IN>> centralTriggerPolicies;
+	private LinkedList<ActiveEvictionPolicy<IN>> activeCentralEvictionPolicies;
+	private LinkedList<EvictionPolicy<IN>> centralEvictionPolicies;
+	private LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies;
+	private LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies;
+	private Map<Object, StreamDiscretizer<IN>> windowingGroups;
+	private LinkedList<Thread> activePolicyThreads;
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
+	private LinkedList<StreamDiscretizer<IN>> deleteOrderForCentralEviction;
+
+	/**
+	 * This constructor creates an instance of the grouped windowing invokable.
+	 * 
+	 * A {@link KeySelector} is used to specify the key position or key
+	 * extraction. The {@link ReduceFunction} will be executed on each group
+	 * separately. Policies might either be centralized or distributed. It is
+	 * not possible to use central and distributed eviction policies at the same
+	 * time. A distributed policy have to be a {@link CloneableTriggerPolicy} or
+	 * {@link CloneableEvictionPolicy} as it will be cloned to have separated
+	 * instances for each group. At the startup time the distributed policies
+	 * will be stored as sample, and only clones of them will be used to
+	 * maintain the groups. Therefore, each group starts with the initial policy
+	 * states.
+	 * 
+	 * While a distributed policy only gets notified with the elements belonging
+	 * to the respective group, a centralized policy get notified with all
+	 * arriving elements. When a centralized trigger occurred, all groups get
+	 * triggered. This is done by submitting the element which caused the
+	 * trigger as real element to the groups it belongs to and as fake element
+	 * to all other groups. Within the groups the element might be further
+	 * processed, causing more triggers, prenotifications of active distributed
+	 * policies and evictions like usual.
+	 * 
+	 * Central policies can be instance of {@link ActiveTriggerPolicy} and also
+	 * implement the
+	 * {@link ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)}
+	 * method. Fake elements created on prenotification will be forwarded to all
+	 * groups. The {@link ActiveTriggerCallback} is also implemented in a way,
+	 * that it forwards/distributed calls all groups.
+	 * 
+	 * @param userFunction
+	 *            The user defined function.
+	 * @param keySelector
+	 *            A key selector to extract the key for the groups from the
+	 *            input data.
+	 * @param distributedTriggerPolicies
+	 *            Trigger policies to be distributed and maintained individually
+	 *            within each group.
+	 * @param distributedEvictionPolicies
+	 *            Eviction policies to be distributed and maintained
+	 *            individually within each group. Note that there cannot be
+	 *            both, central and distributed eviction policies at the same
+	 *            time.
+	 * @param centralTriggerPolicies
+	 *            Trigger policies which will only exist once at a central
+	 *            place. In case a central policy triggers, it will cause all
+	 *            groups to be emitted. (Remark: Empty groups cannot be emitted.
+	 *            If only one element is contained a group, this element itself
+	 *            is returned as aggregated result.)
+	 * @param centralEvictionPolicies
+	 *            Eviction which will only exist once at a central place. Note
+	 *            that there cannot be both, central and distributed eviction
+	 *            policies at the same time. The central eviction policy will
+	 *            work on an simulated element buffer containing all elements no
+	 *            matter which group they belong to.
+	 */
+	public GroupedStreamDiscretizer(KeySelector<IN, ?> keySelector,
+			LinkedList<CloneableTriggerPolicy<IN>> distributedTriggerPolicies,
+			LinkedList<CloneableEvictionPolicy<IN>> distributedEvictionPolicies,
+			LinkedList<TriggerPolicy<IN>> centralTriggerPolicies,
+			LinkedList<EvictionPolicy<IN>> centralEvictionPolicies) {
+
+		super(null);
+
+		this.keySelector = keySelector;
+
+		// handle the triggers
+		if (centralTriggerPolicies != null) {
+			this.centralTriggerPolicies = centralTriggerPolicies;
+			this.activeCentralTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+
+			for (TriggerPolicy<IN> trigger : centralTriggerPolicies) {
+				if (trigger instanceof ActiveTriggerPolicy) {
+					this.activeCentralTriggerPolicies.add((ActiveTriggerPolicy<IN>) trigger);
+				}
+			}
+		} else {
+			this.centralTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+		}
+
+		if (distributedTriggerPolicies != null) {
+			this.distributedTriggerPolicies = distributedTriggerPolicies;
+		} else {
+			this.distributedTriggerPolicies = new LinkedList<CloneableTriggerPolicy<IN>>();
+		}
+
+		if (distributedEvictionPolicies != null) {
+			this.distributedEvictionPolicies = distributedEvictionPolicies;
+		} else {
+			this.distributedEvictionPolicies = new LinkedList<CloneableEvictionPolicy<IN>>();
+		}
+
+		this.activeCentralEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+
+		if (centralEvictionPolicies != null) {
+			this.centralEvictionPolicies = centralEvictionPolicies;
+
+			for (EvictionPolicy<IN> eviction : centralEvictionPolicies) {
+				if (eviction instanceof ActiveEvictionPolicy) {
+					this.activeCentralEvictionPolicies.add((ActiveEvictionPolicy<IN>) eviction);
+				}
+			}
+		} else {
+			this.centralEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+		}
+
+		this.windowingGroups = new HashMap<Object, StreamDiscretizer<IN>>();
+		this.activePolicyThreads = new LinkedList<Thread>();
+		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+		this.deleteOrderForCentralEviction = new LinkedList<StreamDiscretizer<IN>>();
+
+		// check that not both, central and distributed eviction, is used at the
+		// same time.
+		if (!this.centralEvictionPolicies.isEmpty() && !this.distributedEvictionPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You can only use either central or distributed eviction policies but not both at the same time.");
+		}
+
+		// Check that there is at least one trigger and one eviction policy
+		if (this.centralEvictionPolicies.isEmpty() && this.distributedEvictionPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one eviction policy");
+		}
+		if (this.centralTriggerPolicies.isEmpty() && this.distributedTriggerPolicies.isEmpty()) {
+			throw new UnsupportedOperationException(
+					"You have to define at least one trigger policy");
+		}
+
+	}
+
+	@Override
+	public void invoke() throws Exception {
+		// Prevent empty data streams
+		if (readNext() == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		// Continuously run
+		while (nextRecord != null) {
+			StreamDiscretizer<IN> groupInvokable = windowingGroups.get(keySelector
+					.getKey(nextRecord.getObject()));
+			if (groupInvokable == null) {
+				groupInvokable = makeNewGroup(nextRecord);
+			}
+
+			// Run the precalls for central active triggers
+			for (ActiveTriggerPolicy<IN> trigger : activeCentralTriggerPolicies) {
+				Object[] result = trigger.preNotifyTrigger(nextRecord.getObject());
+				for (Object in : result) {
+
+					// If central eviction is used, handle it here
+					if (!activeCentralEvictionPolicies.isEmpty()) {
+						evictElements(centralActiveEviction(in));
+					}
+
+					// process in groups
+					for (StreamDiscretizer<IN> group : windowingGroups.values()) {
+						group.processFakeElement(in, trigger);
+						checkForEmptyGroupBuffer(group);
+					}
+				}
+			}
+
+			// Process non-active central triggers
+			for (TriggerPolicy<IN> triggerPolicy : centralTriggerPolicies) {
+				if (triggerPolicy.notifyTrigger(nextRecord.getObject())) {
+					currentTriggerPolicies.add(triggerPolicy);
+				}
+			}
+
+			if (currentTriggerPolicies.isEmpty()) {
+
+				// only add the element to its group
+				groupInvokable.processRealElement(nextRecord.getObject());
+				checkForEmptyGroupBuffer(groupInvokable);
+
+				// If central eviction is used, handle it here
+				if (!centralEvictionPolicies.isEmpty()) {
+					evictElements(centralEviction(nextRecord.getObject(), false));
+					deleteOrderForCentralEviction.add(groupInvokable);
+				}
+
+			} else {
+
+				// call user function for all groups
+				for (StreamDiscretizer<IN> group : windowingGroups.values()) {
+					if (group == groupInvokable) {
+						// process real with initialized policies
+						group.processRealElement(nextRecord.getObject(), currentTriggerPolicies);
+					} else {
+						// process like a fake but also initialized with
+						// policies
+						group.externalTriggerFakeElement(nextRecord.getObject(),
+								currentTriggerPolicies);
+					}
+
+					// remove group in case it has an empty buffer
+					// checkForEmptyGroupBuffer(group);
+				}
+
+				// If central eviction is used, handle it here
+				if (!centralEvictionPolicies.isEmpty()) {
+					evictElements(centralEviction(nextRecord.getObject(), true));
+					deleteOrderForCentralEviction.add(groupInvokable);
+				}
+			}
+
+			// clear current trigger list
+			currentTriggerPolicies.clear();
+
+			// read next record
+			readNext();
+		}
+
+		// Stop all remaining threads from policies
+		for (Thread t : activePolicyThreads) {
+			t.interrupt();
+		}
+
+		// finally trigger the buffer.
+		for (StreamDiscretizer<IN> group : windowingGroups.values()) {
+			group.emitFinalWindow(centralTriggerPolicies);
+		}
+
+	}
+
+	/**
+	 * This method creates a new group. The method gets called in case an
+	 * element arrives which has a key which was not seen before. The method
+	 * created a nested {@link WindowInvokable} and therefore created clones of
+	 * all distributed trigger and eviction policies.
+	 * 
+	 * @param element
+	 *            The element which leads to the generation of a new group
+	 *            (previously unseen key)
+	 * @throws Exception
+	 *             In case the {@link KeySelector} throws an exception in
+	 *             {@link KeySelector#getKey(Object)}, the exception is not
+	 *             catched by this method.
+	 */
+	private StreamDiscretizer<IN> makeNewGroup(StreamRecord<IN> element) throws Exception {
+		// clone the policies
+		LinkedList<TriggerPolicy<IN>> clonedDistributedTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+		LinkedList<EvictionPolicy<IN>> clonedDistributedEvictionPolicies = new LinkedList<EvictionPolicy<IN>>();
+		for (CloneableTriggerPolicy<IN> trigger : this.distributedTriggerPolicies) {
+			clonedDistributedTriggerPolicies.add(trigger.clone());
+		}
+		for (CloneableEvictionPolicy<IN> eviction : this.distributedEvictionPolicies) {
+			clonedDistributedEvictionPolicies.add(eviction.clone());
+		}
+
+		StreamDiscretizer<IN> groupDiscretizer = new StreamDiscretizer<IN>(
+				clonedDistributedTriggerPolicies, clonedDistributedEvictionPolicies);
+
+		groupDiscretizer.setup(taskContext);
+		groupDiscretizer.open(this.parameters);
+		windowingGroups.put(keySelector.getKey(element.getObject()), groupDiscretizer);
+
+		return groupDiscretizer;
+	}
+
+	@Override
+	public void open(Configuration parameters) throws Exception {
+		super.open(parameters);
+		this.parameters = parameters;
+		for (ActiveTriggerPolicy<IN> tp : activeCentralTriggerPolicies) {
+			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
+			if (target != null) {
+				Thread thread = new Thread(target);
+				activePolicyThreads.add(thread);
+				thread.start();
+			}
+		}
+	};
+
+	/**
+	 * This method is used to notify central eviction policies with a real
+	 * element.
+	 * 
+	 * @param input
+	 *            the real element to notify the eviction policy.
+	 * @param triggered
+	 *            whether a central trigger occurred or not.
+	 * @return The number of elements to be deleted from the buffer.
+	 */
+	private int centralEviction(IN input, boolean triggered) {
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (EvictionPolicy<IN> evictionPolicy : centralEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEviction(input, triggered,
+					deleteOrderForCentralEviction.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+		return currentMaxEviction;
+	}
+
+	/**
+	 * This method is used to notify active central eviction policies with a
+	 * fake element.
+	 * 
+	 * @param input
+	 *            the fake element to notify the active central eviction
+	 *            policies.
+	 * @return The number of elements to be deleted from the buffer.
+	 */
+	private int centralActiveEviction(Object input) {
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeCentralEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input,
+					deleteOrderForCentralEviction.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+		return currentMaxEviction;
+	}
+
+	/**
+	 * This method is used in central eviction to delete a given number of
+	 * elements from the buffer.
+	 * 
+	 * @param numToEvict
+	 *            number of elements to delete from the virtual central element
+	 *            buffer.
+	 */
+	private void evictElements(int numToEvict) {
+		HashSet<StreamDiscretizer<IN>> usedGroups = new HashSet<StreamDiscretizer<IN>>();
+		for (; numToEvict > 0; numToEvict--) {
+			StreamDiscretizer<IN> currentGroup = deleteOrderForCentralEviction.getFirst();
+			// Do the eviction
+			currentGroup.evictFirst();
+			// Remember groups which possibly have an empty buffer after the
+			// eviction
+			usedGroups.add(currentGroup);
+			try {
+				deleteOrderForCentralEviction.removeFirst();
+			} catch (NoSuchElementException e) {
+				// when buffer is empty, ignore exception and stop deleting
+				break;
+			}
+
+		}
+
+		// Remove groups with empty buffer
+		for (StreamDiscretizer<IN> group : usedGroups) {
+			checkForEmptyGroupBuffer(group);
+		}
+	}
+
+	/**
+	 * Checks if the element buffer of a given windowing group is empty. If so,
+	 * the group will be deleted.
+	 * 
+	 * @param group
+	 *            The windowing group to be checked and and removed in case its
+	 *            buffer is empty.
+	 */
+	private void checkForEmptyGroupBuffer(StreamDiscretizer<IN> group) {
+		if (group.isBufferEmpty()) {
+			windowingGroups.remove(group);
+		}
+	}
+
+	/**
+	 * This callback class allows to handle the the callbacks done by threads
+	 * defined in active trigger policies
+	 * 
+	 * @see ActiveTriggerPolicy#createActiveTriggerRunnable(ActiveTriggerCallback)
+	 */
+	private class WindowingCallback implements ActiveTriggerCallback {
+		private ActiveTriggerPolicy<IN> policy;
+
+		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+			this.policy = policy;
+		}
+
+		@Override
+		public void sendFakeElement(Object datapoint) {
+
+			// If central eviction is used, handle it here
+			if (!centralEvictionPolicies.isEmpty()) {
+				evictElements(centralActiveEviction(datapoint));
+			}
+
+			// handle element in groups
+			for (StreamDiscretizer<IN> group : windowingGroups.values()) {
+				group.processFakeElement(datapoint, policy);
+				checkForEmptyGroupBuffer(group);
+			}
+		}
+
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
new file mode 100644
index 0000000..a63aee6
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/StreamDiscretizer.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.windowing.policy.ActiveEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerCallback;
+import org.apache.flink.streaming.api.windowing.policy.ActiveTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+
+public class StreamDiscretizer<IN> extends StreamInvokable<IN, StreamWindow<IN>> {
+
+	/**
+	 * Auto-generated serial version UID
+	 */
+	private static final long serialVersionUID = -8038984294071650730L;
+
+	private LinkedList<TriggerPolicy<IN>> triggerPolicies;
+	private LinkedList<EvictionPolicy<IN>> evictionPolicies;
+	private LinkedList<ActiveTriggerPolicy<IN>> activeTriggerPolicies;
+	private LinkedList<ActiveEvictionPolicy<IN>> activeEvictionPolicies;
+	private LinkedList<Thread> activePolicyTreads;
+	protected LinkedList<IN> buffer;
+	private LinkedList<TriggerPolicy<IN>> currentTriggerPolicies;
+
+	/**
+	 * This constructor created a windowing invokable using trigger and eviction
+	 * policies.
+	 * 
+	 * @param userFunction
+	 *            The user defined {@link ReduceFunction}
+	 * @param triggerPolicies
+	 *            A list of {@link TriggerPolicy}s and/or
+	 *            {@link ActiveTriggerPolicy}s
+	 * @param evictionPolicies
+	 *            A list of {@link EvictionPolicy}s and/or
+	 *            {@link ActiveEvictionPolicy}s
+	 */
+	public StreamDiscretizer(LinkedList<TriggerPolicy<IN>> triggerPolicies,
+			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
+		super(null);
+
+		this.triggerPolicies = triggerPolicies;
+		this.evictionPolicies = evictionPolicies;
+
+		activeTriggerPolicies = new LinkedList<ActiveTriggerPolicy<IN>>();
+		for (TriggerPolicy<IN> tp : triggerPolicies) {
+			if (tp instanceof ActiveTriggerPolicy) {
+				activeTriggerPolicies.add((ActiveTriggerPolicy<IN>) tp);
+			}
+		}
+
+		activeEvictionPolicies = new LinkedList<ActiveEvictionPolicy<IN>>();
+		for (EvictionPolicy<IN> ep : evictionPolicies) {
+			if (ep instanceof ActiveEvictionPolicy) {
+				activeEvictionPolicies.add((ActiveEvictionPolicy<IN>) ep);
+			}
+		}
+
+		this.activePolicyTreads = new LinkedList<Thread>();
+		this.buffer = new LinkedList<IN>();
+		this.currentTriggerPolicies = new LinkedList<TriggerPolicy<IN>>();
+	}
+
+	@Override
+	public void open(org.apache.flink.configuration.Configuration parameters) throws Exception {
+		super.open(parameters);
+		for (ActiveTriggerPolicy<IN> tp : activeTriggerPolicies) {
+			Runnable target = tp.createActiveTriggerRunnable(new WindowingCallback(tp));
+			if (target != null) {
+				Thread thread = new Thread(target);
+				activePolicyTreads.add(thread);
+				thread.start();
+			}
+		}
+	}
+
+	/**
+	 * This class allows the active trigger threads to call back and push fake
+	 * elements at any time.
+	 */
+	private class WindowingCallback implements ActiveTriggerCallback {
+		private ActiveTriggerPolicy<IN> policy;
+
+		public WindowingCallback(ActiveTriggerPolicy<IN> policy) {
+			this.policy = policy;
+		}
+
+		@Override
+		public void sendFakeElement(Object datapoint) {
+			processFakeElement(datapoint, this.policy);
+		}
+
+	}
+
+	@Override
+	public void invoke() throws Exception {
+
+		// Prevent empty data streams
+		if (readNext() == null) {
+			throw new RuntimeException("DataStream must not be empty");
+		}
+
+		// Continuously run
+		while (nextRecord != null) {
+			processRealElement(nextRecord.getObject());
+
+			// Load next StreamRecord
+			readNext();
+		}
+
+		// Stop all remaining threads from policies
+		for (Thread t : activePolicyTreads) {
+			t.interrupt();
+		}
+
+		// finally trigger the buffer.
+		emitFinalWindow(null);
+
+	}
+
+	/**
+	 * This method gets called in case of an grouped windowing in case central
+	 * trigger occurred and the arriving element causing the trigger is not part
+	 * of this group.
+	 * 
+	 * Remark: This is NOT the same as
+	 * {@link StreamDiscretizer#processFakeElement(Object, TriggerPolicy)}! Here
+	 * the eviction using active policies takes place after the call to the UDF.
+	 * Usually it is done before when fake elements get submitted. This special
+	 * behaviour is needed to allow the {@link GroupedStreamDiscretizer} to send
+	 * central triggers to all groups, even if the current element does not
+	 * belong to the group.
+	 * 
+	 * @param input
+	 *            a fake input element
+	 * @param policies
+	 *            the list of policies which caused the call with this fake
+	 *            element
+	 */
+	protected synchronized void externalTriggerFakeElement(IN input,
+			List<TriggerPolicy<IN>> policies) {
+
+		// Set the current triggers
+		currentTriggerPolicies.addAll(policies);
+
+		// emit
+		emitWindow();
+
+		// clear the flag collection
+		currentTriggerPolicies.clear();
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+	}
+
+	/**
+	 * This method processed an arrived fake element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link StreamDiscretizer#processRealElement(Object)}
+	 * 
+	 * @param input
+	 *            a fake input element
+	 * @param currentPolicy
+	 *            the policy which produced this fake element
+	 */
+	protected synchronized void processFakeElement(Object input, TriggerPolicy<IN> currentPolicy) {
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+		for (ActiveEvictionPolicy<IN> evictionPolicy : activeEvictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEvictionWithFakeElement(input, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Set the current trigger
+		currentTriggerPolicies.add(currentPolicy);
+
+		// emit
+		emitWindow();
+
+		// clear the flag collection
+		currentTriggerPolicies.clear();
+	}
+
+	/**
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link StreamDiscretizer#processFakeElement(Object)}.
+	 * 
+	 * @param input
+	 *            a real input element
+	 * @param triggerPolicies
+	 *            Allows to set trigger policies which are maintained
+	 *            externally. This is the case for central policies in
+	 *            {@link GroupedStreamDiscretizer}.
+	 */
+	protected synchronized void processRealElement(IN input, List<TriggerPolicy<IN>> triggerPolicies) {
+		this.currentTriggerPolicies.addAll(triggerPolicies);
+		processRealElement(input);
+	}
+
+	/**
+	 * This method processed an arrived real element The method is synchronized
+	 * to ensure that it cannot interleave with
+	 * {@link StreamDiscretizer#processFakeElement(Object)}
+	 * 
+	 * @param input
+	 *            a real input element
+	 */
+	protected synchronized void processRealElement(IN input) {
+
+		// Run the precalls to detect missed windows
+		for (ActiveTriggerPolicy<IN> trigger : activeTriggerPolicies) {
+			// Remark: In case multiple active triggers are present the ordering
+			// of the different fake elements returned by this triggers becomes
+			// a problem. This might lead to unexpected results...
+			// Should we limit the number of active triggers to 0 or 1?
+			Object[] result = trigger.preNotifyTrigger(input);
+			for (Object in : result) {
+				processFakeElement(in, trigger);
+			}
+		}
+
+		// Remember if a trigger occurred
+		boolean isTriggered = false;
+
+		// Process the triggers
+		for (TriggerPolicy<IN> triggerPolicy : triggerPolicies) {
+			if (triggerPolicy.notifyTrigger(input)) {
+				currentTriggerPolicies.add(triggerPolicy);
+			}
+		}
+
+		// call user function
+		if (!currentTriggerPolicies.isEmpty()) {
+			// emit
+			emitWindow();
+
+			// clear the flag collection
+			currentTriggerPolicies.clear();
+
+			// remember trigger
+			isTriggered = true;
+		}
+
+		// Process the evictions and take care of double evictions
+		// In case there are multiple eviction policies present,
+		// only the one with the highest return value is recognized.
+		int currentMaxEviction = 0;
+
+		for (EvictionPolicy<IN> evictionPolicy : evictionPolicies) {
+			// use temporary variable to prevent multiple calls to
+			// notifyEviction
+			int tmp = evictionPolicy.notifyEviction(input, isTriggered, buffer.size());
+			if (tmp > currentMaxEviction) {
+				currentMaxEviction = tmp;
+			}
+		}
+
+		for (int i = 0; i < currentMaxEviction; i++) {
+			try {
+				buffer.removeFirst();
+			} catch (NoSuchElementException e) {
+				// In case no more elements are in the buffer:
+				// Prevent failure and stop deleting.
+				break;
+			}
+		}
+
+		// Add the current element to the buffer
+		buffer.add(input);
+
+	}
+
+	private void emitWindow() {
+		StreamWindow<IN> currentWindow = new StreamWindow<IN>();
+		currentWindow.addAll(buffer);
+		collector.collect(currentWindow);
+	}
+
+	/**
+	 * This method removes the first element from the element buffer. It is used
+	 * to provide central evictions in {@link GroupedStreamDiscretizer}
+	 */
+	protected synchronized void evictFirst() {
+		try {
+			buffer.removeFirst();
+		} catch (NoSuchElementException e) {
+			// ignore exception
+		}
+	}
+
+	/**
+	 * This method returns whether the element buffer is empty or not. It is
+	 * used to figure out if a group can be deleted or not when
+	 * {@link GroupedStreamDiscretizer} is used.
+	 * 
+	 * @return true in case the buffer is empty otherwise false.
+	 */
+	protected boolean isBufferEmpty() {
+		return buffer.isEmpty();
+	}
+
+	/**
+	 * This method does the final reduce at the end of the stream and emits the
+	 * result.
+	 * 
+	 * @param centralTriggerPolicies
+	 *            Allows to set trigger policies which are maintained
+	 *            externally. This is the case for central policies in
+	 *            {@link GroupedStreamDiscretizer}.
+	 */
+	protected void emitFinalWindow(List<TriggerPolicy<IN>> centralTriggerPolicies) {
+		if (!buffer.isEmpty()) {
+			currentTriggerPolicies.clear();
+
+			if (centralTriggerPolicies != null) {
+				currentTriggerPolicies.addAll(centralTriggerPolicies);
+			}
+
+			for (TriggerPolicy<IN> policy : triggerPolicies) {
+				currentTriggerPolicies.add(policy);
+			}
+
+			emitWindow();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
new file mode 100644
index 0000000..9f223c3
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowFlattener.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class WindowFlattener<T> extends ChainableInvokable<StreamWindow<T>, T> {
+
+	public WindowFlattener() {
+		super(null);
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void invoke() throws Exception {
+		while (readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		for (T element : nextObject) {
+			collector.collect(element);
+		}
+	}
+
+	@Override
+	public void collect(StreamWindow<T> record) {
+		callUserFunctionAndLogException();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
new file mode 100644
index 0000000..23aaf32
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMapper.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+
+public class WindowMapper<IN, OUT> extends MapInvokable<StreamWindow<IN>, StreamWindow<OUT>> {
+
+	private static final long serialVersionUID = 1L;
+
+	GroupReduceFunction<IN, OUT> reducer;
+
+	public WindowMapper(GroupReduceFunction<IN, OUT> reducer) {
+		super(new WindowMapfunction<IN, OUT>(reducer));
+		this.reducer = reducer;
+	}
+
+	private static class WindowMapfunction<T, R> implements
+			MapFunction<StreamWindow<T>, StreamWindow<R>> {
+
+		private static final long serialVersionUID = 1L;
+		GroupReduceFunction<T, R> reducer;
+
+		public WindowMapfunction(GroupReduceFunction<T, R> reducer) {
+			this.reducer = reducer;
+		}
+
+		@Override
+		public StreamWindow<R> map(StreamWindow<T> window) throws Exception {
+			StreamWindow<R> outputWindow = new StreamWindow<R>(window.windowID);
+			outputWindow.numberOfParts = window.numberOfParts;
+
+			reducer.reduce(window, outputWindow);
+
+			return outputWindow;
+		}
+
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
new file mode 100644
index 0000000..738c78f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowMerger.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class WindowCombiner<T> extends ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
+
+	private Map<Integer, StreamWindow<T>> windows;
+
+	public WindowCombiner() {
+		super(null);
+		this.windows = new HashMap<Integer, StreamWindow<T>>();
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void invoke() throws Exception {
+		while (readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@SuppressWarnings("unchecked")
+	@Override
+	protected void callUserFunction() throws Exception {
+		StreamWindow<T> nextWindow = nextObject;
+
+		StreamWindow<T> current = windows.get(nextWindow.windowID);
+
+		if (current == null) {
+			current = nextWindow;
+		} else {
+			current = StreamWindow.merge(current, nextWindow);
+		}
+
+		if (current.numberOfParts == 1) {
+			collector.collect(current);
+			windows.remove(nextWindow.windowID);
+		} else {
+			windows.put(nextWindow.windowID, current);
+		}
+	}
+
+	@Override
+	public void collect(StreamWindow<T> record) {
+		nextObject = record;
+		callUserFunctionAndLogException();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
new file mode 100644
index 0000000..e10692b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowPartitioner.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.streaming.api.invokable.ChainableInvokable;
+
+public class WindowPartitioner<T> extends
+		ChainableInvokable<StreamWindow<T>, StreamWindow<T>> {
+
+	private KeySelector<T, ?> keySelector;
+	private int numberOfSplits;
+
+	public WindowPartitioner(KeySelector<T, ?> keySelector) {
+		super(null);
+		this.keySelector = keySelector;
+	}
+
+	public WindowPartitioner(int numberOfSplits) {
+		super(null);
+		this.numberOfSplits = numberOfSplits;
+	}
+
+	private static final long serialVersionUID = 1L;
+
+	@Override
+	public void invoke() throws Exception {
+		while (readNext() != null) {
+			callUserFunctionAndLogException();
+		}
+	}
+
+	@Override
+	protected void callUserFunction() throws Exception {
+		StreamWindow<T> currentWindow = nextObject;
+		if (keySelector == null) {
+			if (numberOfSplits <= 1) {
+				collector.collect(currentWindow);
+			} else {
+				for (StreamWindow<T> window : currentWindow.split(numberOfSplits)) {
+					collector.collect(window);
+				}
+			}
+		} else {
+
+			for (StreamWindow<T> window : currentWindow.partitionBy(keySelector)) {
+				collector.collect(window);
+			}
+
+		}
+	}
+
+	@Override
+	public void collect(StreamWindow<T> record) {
+		nextObject = record;
+		callUserFunctionAndLogException();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/1146f64c/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
new file mode 100644
index 0000000..b4f965f
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/windowing/WindowReducer.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.invokable.operator.windowing;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.streaming.api.invokable.operator.MapInvokable;
+
+public class WindowReducer<IN> extends MapInvokable<StreamWindow<IN>, StreamWindow<IN>> {
+
+	private static final long serialVersionUID = 1L;
+
+	ReduceFunction<IN> reducer;
+
+	public WindowReducer(ReduceFunction<IN> reducer) {
+		super(new WindowReduceFunction<IN>(reducer));
+		this.reducer = reducer;
+	}
+
+	private static class WindowReduceFunction<T> implements
+			MapFunction<StreamWindow<T>, StreamWindow<T>> {
+
+		private static final long serialVersionUID = 1L;
+		ReduceFunction<T> reducer;
+
+		public WindowReduceFunction(ReduceFunction<T> reducer) {
+			this.reducer = reducer;
+		}
+
+		@Override
+		public StreamWindow<T> map(StreamWindow<T> window) throws Exception {
+			StreamWindow<T> outputWindow = new StreamWindow<T>(window.windowID);
+			outputWindow.numberOfParts = window.numberOfParts;
+
+			if (!window.isEmpty()) {
+				T reduced = window.get(0);
+				for (int i = 1; i < window.size(); i++) {
+					reduced = reducer.reduce(reduced, window.get(i));
+				}
+				outputWindow.add(reduced);
+			}
+			return outputWindow;
+		}
+
+	}
+
+}


Mime
View raw message