flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject [14/52] [partial] flink git commit: [FLINK-1452] Rename 'flink-addons' to 'flink-staging'
Date Mon, 02 Feb 2015 18:41:52 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
new file mode 100644
index 0000000..4fe356b
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/WindowedDataStream.java
@@ -0,0 +1,658 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction;
+import org.apache.flink.streaming.api.function.aggregation.AggregationFunction.AggregationType;
+import org.apache.flink.streaming.api.function.aggregation.ComparableAggregator;
+import org.apache.flink.streaming.api.function.aggregation.SumAggregator;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+import org.apache.flink.streaming.api.invokable.operator.GroupedWindowInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowGroupReduceInvokable;
+import org.apache.flink.streaming.api.invokable.operator.WindowReduceInvokable;
+import org.apache.flink.streaming.api.windowing.helper.Time;
+import org.apache.flink.streaming.api.windowing.helper.WindowingHelper;
+import org.apache.flink.streaming.api.windowing.policy.CloneableEvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.CloneableTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.EvictionPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TimeTriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TriggerPolicy;
+import org.apache.flink.streaming.api.windowing.policy.TumblingEvictionPolicy;
+
+/**
+ * A {@link WindowedDataStream} represents a data stream that has been divided
+ * into windows (predefined chunks). User defined function such as
+ * {@link #reduce(ReduceFunction)}, {@link #reduceGroup(GroupReduceFunction)} or
+ * aggregations can be applied to the windows.
+ * 
+ * @param <OUT>
+ *            The output type of the {@link WindowedDataStream}
+ */
+public class WindowedDataStream<OUT> {
+
+	protected DataStream<OUT> dataStream;
+	protected boolean isGrouped;
+	protected boolean allCentral;
+	protected KeySelector<OUT, ?> keySelector;
+
+	protected List<WindowingHelper<OUT>> triggerHelpers;
+	protected List<WindowingHelper<OUT>> evictionHelpers;
+
+	protected LinkedList<TriggerPolicy<OUT>> userTriggers;
+	protected LinkedList<EvictionPolicy<OUT>> userEvicters;
+
+	protected WindowedDataStream(DataStream<OUT> dataStream, WindowingHelper<OUT>... policyHelpers) {
+		this.dataStream = dataStream.copy();
+		this.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
+		for (WindowingHelper<OUT> helper : policyHelpers) {
+			this.triggerHelpers.add(helper);
+		}
+
+		if (dataStream instanceof GroupedDataStream) {
+			this.isGrouped = true;
+			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			// set all policies distributed
+			this.allCentral = false;
+
+		} else {
+			this.isGrouped = false;
+			// set all policies central
+			this.allCentral = true;
+		}
+	}
+
+	protected WindowedDataStream(DataStream<OUT> dataStream, List<TriggerPolicy<OUT>> triggers,
+			List<EvictionPolicy<OUT>> evicters) {
+		this.dataStream = dataStream.copy();
+
+		if (triggers != null) {
+			this.userTriggers = new LinkedList<TriggerPolicy<OUT>>();
+			this.userTriggers.addAll(triggers);
+		}
+
+		if (evicters != null) {
+			this.userEvicters = new LinkedList<EvictionPolicy<OUT>>();
+			this.userEvicters.addAll(evicters);
+		}
+
+		if (dataStream instanceof GroupedDataStream) {
+			this.isGrouped = true;
+			this.keySelector = ((GroupedDataStream<OUT>) dataStream).keySelector;
+			// set all policies distributed
+			this.allCentral = false;
+
+		} else {
+			this.isGrouped = false;
+			// set all policies central
+			this.allCentral = true;
+		}
+	}
+
+	protected WindowedDataStream(WindowedDataStream<OUT> windowedDataStream) {
+		this.dataStream = windowedDataStream.dataStream.copy();
+		this.isGrouped = windowedDataStream.isGrouped;
+		this.keySelector = windowedDataStream.keySelector;
+		this.triggerHelpers = windowedDataStream.triggerHelpers;
+		this.evictionHelpers = windowedDataStream.evictionHelpers;
+		this.userTriggers = windowedDataStream.userTriggers;
+		this.userEvicters = windowedDataStream.userEvicters;
+		this.allCentral = windowedDataStream.allCentral;
+	}
+
+	public <F> F clean(F f) {
+		return dataStream.clean(f);
+	}
+
+	/**
+	 * Defines the slide size (trigger frequency) for the windowed data stream.
+	 * This controls how often the user defined function will be triggered on
+	 * the window. </br></br> For example to get a window of 5 elements with a
+	 * slide of 2 seconds use: </br></br>
+	 * {@code ds.window(Count.of(5)).every(Time.of(2,TimeUnit.SECONDS))}
+	 * </br></br> The user function in this case will be called on the 5 most
+	 * recent elements every 2 seconds
+	 * 
+	 * @param policyHelpers
+	 *            The policies that define the triggering frequency
+	 * 
+	 * @return The windowed data stream with triggering set
+	 */
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	public WindowedDataStream<OUT> every(WindowingHelper... policyHelpers) {
+		WindowedDataStream<OUT> ret = this.copy();
+		if (ret.evictionHelpers == null) {
+			ret.evictionHelpers = ret.triggerHelpers;
+			ret.triggerHelpers = new ArrayList<WindowingHelper<OUT>>();
+		}
+		for (WindowingHelper<OUT> helper : policyHelpers) {
+			ret.triggerHelpers.add(helper);
+		}
+		return ret;
+	}
+
+	/**
+	 * Groups the elements of the {@link WindowedDataStream} by the given key
+	 * positions. The window sizes (evictions) and slide sizes (triggers) will
+	 * be calculated on the whole stream (in a central fashion), but the user
+	 * defined functions will be applied on a per group basis. </br></br> To get
+	 * windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
+	 * 
+	 * @param fields
+	 *            The position of the fields to group by.
+	 * @return The grouped {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(int... fields) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(fields);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
+	 * Groups the elements of the {@link WindowedDataStream} by the given field
+	 * expressions. The window sizes (evictions) and slide sizes (triggers) will
+	 * be calculated on the whole stream (in a central fashion), but the user
+	 * defined functions will be applied on a per group basis. </br></br> To get
+	 * windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
+	 * </br></br> A field expression is either the name of a public field or a
+	 * getter method with parentheses of the stream's underlying type. A dot can
+	 * be used to drill down into objects, as in
+	 * {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param fields
+	 *            The fields to group by
+	 * @return The grouped {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(String... fields) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(fields);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
+	 * Groups the elements of the {@link WindowedDataStream} using the given
+	 * {@link KeySelector}. The window sizes (evictions) and slide sizes
+	 * (triggers) will be calculated on the whole stream (in a central fashion),
+	 * but the user defined functions will be applied on a per group basis.
+	 * </br></br> To get windows and triggers on a per group basis apply the
+	 * {@link DataStream#window} operator on an already grouped data stream.
+	 * 
+	 * @param keySelector
+	 *            The keySelector used to extract the key for grouping.
+	 * @return The grouped {@link WindowedDataStream}
+	 */
+	public WindowedDataStream<OUT> groupBy(KeySelector<OUT, ?> keySelector) {
+		WindowedDataStream<OUT> ret = this.copy();
+		ret.dataStream = ret.dataStream.groupBy(keySelector);
+		ret.isGrouped = true;
+		ret.keySelector = ((GroupedDataStream<OUT>) ret.dataStream).keySelector;
+		return ret;
+	}
+
+	/**
+	 * Applies a reduce transformation on the windowed data stream by reducing
+	 * the current window at every trigger.The user can also extend the
+	 * {@link RichReduceFunction} to gain access to other features provided by
+	 * the {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
+	 */
+	public SingleOutputStreamOperator<OUT, ?> reduce(ReduceFunction<OUT> reduceFunction) {
+		return dataStream.transform("Window-Reduce", getType(),
+				getReduceInvokable(reduceFunction));
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the windowed data stream by
+	 * reducing the current window at every trigger. In contrast with the
+	 * standard binary reducer, with reduceGroup the user can access all
+	 * elements of the window at the same time through the iterable interface.
+	 * The user can also extend the {@link RichGroupReduceFunction} to gain
+	 * access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+			GroupReduceFunction<OUT, R> reduceFunction) {
+
+		TypeInformation<OUT> inType = getType();
+		TypeInformation<R> outType = TypeExtractor
+				.getGroupReduceReturnTypes(reduceFunction, inType);
+
+		return dataStream.transform("WindowReduce", outType,
+				getReduceGroupInvokable(reduceFunction));
+	}
+
+	/**
+	 * Applies a reduceGroup transformation on the windowed data stream by
+	 * reducing the current window at every trigger. In contrast with the
+	 * standard binary reducer, with reduceGroup the user can access all
+	 * elements of the window at the same time through the iterable interface.
+	 * The user can also extend the {@link RichGroupReduceFunction} to gain
+	 * access to other features provided by the
+	 * {@link org.apache.flink.api.common.functions.RichFunction} interface.
+	 * </br> </br> This version of reduceGroup uses user supplied
+	 * typeinformation for serializaton. Use this only when the system is unable
+	 * to detect type information using:
+	 * {@link #reduceGroup(GroupReduceFunction)}
+	 * 
+	 * @param reduceFunction
+	 *            The reduce function that will be applied to the windows.
+	 * @return The transformed DataStream
+	 */
+	public <R> SingleOutputStreamOperator<R, ?> reduceGroup(
+			GroupReduceFunction<OUT, R> reduceFunction, TypeInformation<R> outType) {
+
+		return dataStream.transform("Window-Reduce", outType,
+				getReduceGroupInvokable(reduceFunction));
+	}
+
+	/**
+	 * Applies an aggregation that sums every window of the data stream at the
+	 * given position.
+	 * 
+	 * @param positionToSum
+	 *            The position in the tuple/array to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(int positionToSum) {
+		dataStream.checkFieldRange(positionToSum);
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(positionToSum,
+				dataStream.getClassAtPos(positionToSum), getType()));
+	}
+
+	/**
+	 * Applies an aggregation that sums every window of the pojo data stream at
+	 * the given field for every window. </br></br> A field expression is either
+	 * the name of a public field or a getter method with parentheses of the
+	 * stream's underlying type. A dot can be used to drill down into objects,
+	 * as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param positionToSum
+	 *            The field to sum
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> sum(String field) {
+		return aggregate((AggregationFunction<OUT>) SumAggregator.getSumFunction(field, getType()));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of every window
+	 * of the data stream at the given position.
+	 * 
+	 * @param positionToMin
+	 *            The position to minimize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(int positionToMin) {
+		dataStream.checkFieldRange(positionToMin);
+		return aggregate(ComparableAggregator.getAggregator(positionToMin, getType(),
+				AggregationType.MIN));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum value of the pojo data
+	 * stream at the given field expression for every window. </br></br>A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> min(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MIN,
+				false));
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns the first element by default.
+	 * 
+	 * @param positionToMinBy
+	 *            The position to minimize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String positionToMinBy) {
+		return this.minBy(positionToMinBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the minimum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * minimum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 * 
+	 * @param positionToMinBy
+	 *            The position to minimize
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            minimum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(int positionToMinBy, boolean first) {
+		dataStream.checkFieldRange(positionToMinBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMinBy, getType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the minimum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> minBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
+				AggregationType.MINBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum value of every window of
+	 * the data stream at the given position.
+	 * 
+	 * @param positionToMax
+	 *            The position to maximize
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(int positionToMax) {
+		dataStream.checkFieldRange(positionToMax);
+		return aggregate(ComparableAggregator.getAggregator(positionToMax, getType(),
+				AggregationType.MAX));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum value of the pojo data
+	 * stream at the given field expression for every window. A field expression
+	 * is either the name of a public field or a getter method with parentheses
+	 * of the {@link DataStream}S underlying type. A dot can be used to drill
+	 * down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> max(String field) {
+		return aggregate(ComparableAggregator.getAggregator(field, getType(), AggregationType.MAX,
+				false));
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns the first by default.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String positionToMaxBy) {
+		return this.maxBy(positionToMaxBy, true);
+	}
+
+	/**
+	 * Applies an aggregation that gives the maximum element of every window of
+	 * the data stream by the given position. If more elements have the same
+	 * maximum value the operator returns either the first or last one depending
+	 * on the parameter setting.
+	 * 
+	 * @param positionToMaxBy
+	 *            The position to maximize by
+	 * @param first
+	 *            If true, then the operator return the first element with the
+	 *            maximum value, otherwise returns the last
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(int positionToMaxBy, boolean first) {
+		dataStream.checkFieldRange(positionToMaxBy);
+		return aggregate(ComparableAggregator.getAggregator(positionToMaxBy, getType(),
+				AggregationType.MAXBY, first));
+	}
+
+	/**
+	 * Applies an aggregation that that gives the maximum element of the pojo
+	 * data stream by the given field expression for every window. A field
+	 * expression is either the name of a public field or a getter method with
+	 * parentheses of the {@link DataStream}S underlying type. A dot can be used
+	 * to drill down into objects, as in {@code "field1.getInnerField2()" }.
+	 * 
+	 * @param field
+	 *            The field expression based on which the aggregation will be
+	 *            applied.
+	 * @param first
+	 *            If True then in case of field equality the first object will
+	 *            be returned
+	 * @return The transformed DataStream.
+	 */
+	public SingleOutputStreamOperator<OUT, ?> maxBy(String field, boolean first) {
+		return aggregate(ComparableAggregator.getAggregator(field, getType(),
+				AggregationType.MAXBY, first));
+	}
+
+	private SingleOutputStreamOperator<OUT, ?> aggregate(AggregationFunction<OUT> aggregator) {
+		StreamInvokable<OUT, OUT> invokable = getReduceInvokable(aggregator);
+
+		SingleOutputStreamOperator<OUT, ?> returnStream = dataStream.transform("Window-Aggregation",
+				getType(), invokable);
+
+		return returnStream;
+	}
+
+	private LinkedList<TriggerPolicy<OUT>> getTriggers() {
+
+		LinkedList<TriggerPolicy<OUT>> triggers = new LinkedList<TriggerPolicy<OUT>>();
+
+		if (triggerHelpers != null) {
+			for (WindowingHelper<OUT> helper : triggerHelpers) {
+				triggers.add(helper.toTrigger());
+			}
+		}
+
+		if (userTriggers != null) {
+			triggers.addAll(userTriggers);
+		}
+
+		return triggers;
+
+	}
+
+	private LinkedList<EvictionPolicy<OUT>> getEvicters() {
+
+		LinkedList<EvictionPolicy<OUT>> evicters = new LinkedList<EvictionPolicy<OUT>>();
+
+		if (evictionHelpers != null) {
+			for (WindowingHelper<OUT> helper : evictionHelpers) {
+				evicters.add(helper.toEvict());
+			}
+		} else {
+			if (userEvicters == null) {
+				boolean notOnlyTime=false;
+				for (WindowingHelper<OUT> helper : triggerHelpers){
+					if (helper instanceof Time<?>){
+						evicters.add(helper.toEvict());
+					} else {
+						notOnlyTime=true;
+					}
+				}
+				if (notOnlyTime){
+					evicters.add(new TumblingEvictionPolicy<OUT>());
+				}
+			}
+		}
+
+		if (userEvicters != null) {
+			evicters.addAll(userEvicters);
+		}
+
+		return evicters;
+	}
+
+	private LinkedList<TriggerPolicy<OUT>> getCentralTriggers() {
+		LinkedList<TriggerPolicy<OUT>> cTriggers = new LinkedList<TriggerPolicy<OUT>>();
+		if (allCentral) {
+			cTriggers.addAll(getTriggers());
+		} else {
+			for (TriggerPolicy<OUT> trigger : getTriggers()) {
+				if (trigger instanceof TimeTriggerPolicy) {
+					cTriggers.add(trigger);
+				}
+			}
+		}
+		return cTriggers;
+	}
+
+	private LinkedList<CloneableTriggerPolicy<OUT>> getDistributedTriggers() {
+		LinkedList<CloneableTriggerPolicy<OUT>> dTriggers = null;
+
+		if (!allCentral) {
+			dTriggers = new LinkedList<CloneableTriggerPolicy<OUT>>();
+			for (TriggerPolicy<OUT> trigger : getTriggers()) {
+				if (!(trigger instanceof TimeTriggerPolicy)) {
+					dTriggers.add((CloneableTriggerPolicy<OUT>) trigger);
+				}
+			}
+		}
+
+		return dTriggers;
+	}
+
+	private LinkedList<CloneableEvictionPolicy<OUT>> getDistributedEvicters() {
+		LinkedList<CloneableEvictionPolicy<OUT>> evicters = null;
+
+		if (!allCentral) {
+			evicters = new LinkedList<CloneableEvictionPolicy<OUT>>();
+			for (EvictionPolicy<OUT> evicter : getEvicters()) {
+				evicters.add((CloneableEvictionPolicy<OUT>) evicter);
+			}
+		}
+
+		return evicters;
+	}
+
+	private LinkedList<EvictionPolicy<OUT>> getCentralEvicters() {
+		if (allCentral) {
+			return getEvicters();
+		} else {
+			return null;
+		}
+	}
+
+	private <R> StreamInvokable<OUT, R> getReduceGroupInvokable(GroupReduceFunction<OUT, R> reducer) {
+		StreamInvokable<OUT, R> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowInvokable<OUT, R>(clean(reducer), keySelector,
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+					getCentralEvicters());
+
+		} else {
+			invokable = new WindowGroupReduceInvokable<OUT, R>(clean(reducer), getTriggers(),
+					getEvicters());
+		}
+		return invokable;
+	}
+
+	private StreamInvokable<OUT, OUT> getReduceInvokable(ReduceFunction<OUT> reducer) {
+		StreamInvokable<OUT, OUT> invokable;
+		if (isGrouped) {
+			invokable = new GroupedWindowInvokable<OUT, OUT>(clean(reducer), keySelector,
+					getDistributedTriggers(), getDistributedEvicters(), getCentralTriggers(),
+					getCentralEvicters());
+
+		} else {
+			invokable = new WindowReduceInvokable<OUT>(clean(reducer), getTriggers(), getEvicters());
+		}
+		return invokable;
+	}
+
+	/**
+	 * Gets the output type.
+	 * 
+	 * @return The output type.
+	 */
+	public TypeInformation<OUT> getType() {
+		return dataStream.getType();
+	}
+
+	public DataStream<OUT> getDataStream() {
+		return dataStream;
+	}
+
+	protected WindowedDataStream<OUT> copy() {
+		return new WindowedDataStream<OUT>(this);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
new file mode 100644
index 0000000..03160c2
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamCrossOperator.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.CrossFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CrossOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.CrossWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+
+public class StreamCrossOperator<I1, I2> extends
+		TemporalOperator<I1, I2, StreamCrossOperator.CrossWindow<I1, I2>> {
+	
+	public StreamCrossOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		super(input1, input2);
+	}
+
+	@Override
+	protected CrossWindow<I1, I2> createNextWindowOperator() {
+
+		CrossWindowFunction<I1, I2, Tuple2<I1, I2>> crossWindowFunction = new CrossWindowFunction<I1, I2, Tuple2<I1, I2>>(
+				input1.clean(new CrossOperator.DefaultCrossFunction<I1, I2>()));
+
+		return new CrossWindow<I1, I2>(this, input1.connect(input2).addGeneralWindowCombine(
+				crossWindowFunction,
+				new TupleTypeInfo<Tuple2<I1, I2>>(input1.getType(), input2.getType()), windowSize,
+				slideInterval, timeStamp1, timeStamp2));
+	}
+
+	public static class CrossWindow<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, CrossWindow<I1, I2>> implements
+			TemporalWindow<CrossWindow<I1, I2>> {
+
+		private StreamCrossOperator<I1, I2> op;
+
+		public CrossWindow(StreamCrossOperator<I1, I2> op, DataStream<Tuple2<I1, I2>> ds) {
+			super(ds);
+			this.op = op;
+		}
+
+		public CrossWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+			return every(timeUnit.toMillis(length));
+		}
+
+		@SuppressWarnings("unchecked")
+		public CrossWindow<I1, I2> every(long length) {
+			((CoWindowInvokable<I1, I2, ?>) streamGraph.getInvokable(id)).setSlideSize(length);
+			return this;
+		}
+
+		/**
+		 * Finalizes a temporal Cross transformation by applying a
+		 * {@link CrossFunction} to each pair of crossed elements.<br/>
+		 * Each CrossFunction call returns exactly one element.
+		 * 
+		 * @param function
+		 *            The CrossFunction that is called for each pair of crossed
+		 *            elements.
+		 * @return The crossed data streams
+		 * 
+		 */
+		public <R> SingleOutputStreamOperator<R, ?> with(CrossFunction<I1, I2, R> function) {
+			TypeInformation<R> outTypeInfo = TypeExtractor.getCrossReturnTypes(function,
+					op.input1.getType(), op.input2.getType());
+
+			CoWindowInvokable<I1, I2, R> invokable = new CoWindowInvokable<I1, I2, R>(
+					new CrossWindowFunction<I1, I2, R>(clean(function)), op.windowSize,
+					op.slideInterval, op.timeStamp1, op.timeStamp2);
+
+			streamGraph.setInvokable(id, invokable);
+
+			return setType(outTypeInfo);
+
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
new file mode 100644
index 0000000..d2b2032
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/StreamJoinOperator.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.functions.KeySelector;
+import org.apache.flink.api.java.operators.Keys;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.function.co.JoinWindowFunction;
+import org.apache.flink.streaming.api.invokable.operator.co.CoWindowInvokable;
+import org.apache.flink.streaming.util.keys.KeySelectorUtil;
+
+public class StreamJoinOperator<I1, I2> extends
+		TemporalOperator<I1, I2, StreamJoinOperator.JoinWindow<I1, I2>> {
+
+	public StreamJoinOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		super(input1, input2);
+	}
+
+	@Override
+	protected JoinWindow<I1, I2> createNextWindowOperator() {
+		return new JoinWindow<I1, I2>(this);
+	}
+
+	public static class JoinWindow<I1, I2> implements TemporalWindow<JoinWindow<I1, I2>> {
+
+		private StreamJoinOperator<I1, I2> op;
+		private TypeInformation<I1> type1;
+
+		private JoinWindow(StreamJoinOperator<I1, I2> operator) {
+			this.op = operator;
+			this.type1 = op.input1.getType();
+		}
+
+		/**
+		 * Continues a temporal Join transformation. <br/>
+		 * Defines the {@link Tuple} fields of the first join {@link DataStream}
+		 * that should be used as join keys.<br/>
+		 * <b>Note: Fields can only be selected as join keys on Tuple
+		 * DataStreams.</b><br/>
+		 * 
+		 * @param fields
+		 *            The indexes of the other Tuple fields of the first join
+		 *            DataStreams that should be used as keys.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public JoinPredicate<I1, I2> where(int... fields) {
+			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
+					new Keys.ExpressionKeys<I1>(fields, type1), type1));
+		}
+
+		/**
+		 * Continues a temporal join transformation. <br/>
+		 * Defines the fields of the first join {@link DataStream} that should
+		 * be used as grouping keys. Fields are the names of member fields of
+		 * the underlying type of the data stream.
+		 * 
+		 * @param fields
+		 *            The fields of the first join DataStream that should be
+		 *            used as keys.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public JoinPredicate<I1, I2> where(String... fields) {
+			return new JoinPredicate<I1, I2>(op, KeySelectorUtil.getSelectorForKeys(
+					new Keys.ExpressionKeys<I1>(fields, type1), type1));
+		}
+
+		/**
+		 * Continues a temporal Join transformation and defines a
+		 * {@link KeySelector} function for the first join {@link DataStream}
+		 * .</br> The KeySelector function is called for each element of the
+		 * first DataStream and extracts a single key value on which the
+		 * DataStream is joined. </br>
+		 * 
+		 * @param keySelector
+		 *            The KeySelector function which extracts the key values
+		 *            from the DataStream on which it is joined.
+		 * @return An incomplete Join transformation. Call
+		 *         {@link JoinPredicate#equalTo} to continue the Join.
+		 */
+		public <K> JoinPredicate<I1, I2> where(KeySelector<I1, K> keySelector) {
+			return new JoinPredicate<I1, I2>(op, keySelector);
+		}
+
+		@Override
+		public JoinWindow<I1, I2> every(long length, TimeUnit timeUnit) {
+			return every(timeUnit.toMillis(length));
+		}
+
+		@Override
+		public JoinWindow<I1, I2> every(long length) {
+			op.slideInterval = length;
+			return this;
+		}
+
+		// ----------------------------------------------------------------------------------------
+
+	}
+
+	/**
+	 * Intermediate step of a temporal Join transformation. <br/>
+	 * To continue the Join transformation, select the join key of the second
+	 * input {@link DataStream} by calling {@link JoinPredicate#equalTo}
+	 * 
+	 */
+	public static class JoinPredicate<I1, I2> {
+
+		private StreamJoinOperator<I1, I2> op;
+		private KeySelector<I1, ?> keys1;
+		private KeySelector<I2, ?> keys2;
+		private TypeInformation<I2> type2;
+
+		private JoinPredicate(StreamJoinOperator<I1, I2> operator, KeySelector<I1, ?> keys1) {
+			this.op = operator;
+			this.keys1 = keys1;
+			this.type2 = op.input2.getType();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines the {@link Tuple}
+		 * fields of the second join {@link DataStream} that should be used as
+		 * join keys.<br/>
+		 * </p> The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * @param fields
+		 *            The indexes of the Tuple fields of the second join
+		 *            DataStream that should be used as keys.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public JoinedStream<I1, I2> equalTo(int... fields) {
+			keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields, type2),
+					type2);
+			return createJoinOperator();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines the fields of the
+		 * second join {@link DataStream} that should be used as join keys. </p>
+		 * The resulting operator wraps each pair of joining elements in a
+		 * Tuple2<I1,I2>(first, second). To use a different wrapping function
+		 * use {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * @param fields
+		 *            The fields of the second join DataStream that should be
+		 *            used as keys.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public JoinedStream<I1, I2> equalTo(String... fields) {
+			this.keys2 = KeySelectorUtil.getSelectorForKeys(new Keys.ExpressionKeys<I2>(fields,
+					type2), type2);
+			return createJoinOperator();
+		}
+
+		/**
+		 * Creates a temporal Join transformation and defines a
+		 * {@link KeySelector} function for the second join {@link DataStream}
+		 * .</br> The KeySelector function is called for each element of the
+		 * second DataStream and extracts a single key value on which the
+		 * DataStream is joined. </p> The resulting operator wraps each pair of
+		 * joining elements in a Tuple2<I1,I2>(first, second). To use a
+		 * different wrapping function use
+		 * {@link JoinedStream#with(JoinFunction)}
+		 * 
+		 * 
+		 * @param keySelector
+		 *            The KeySelector function which extracts the key values
+		 *            from the second DataStream on which it is joined.
+		 * @return A streaming join operator. Call {@link JoinedStream#with} to
+		 *         apply a custom wrapping
+		 */
+		public <K> JoinedStream<I1, I2> equalTo(KeySelector<I2, K> keySelector) {
+			this.keys2 = keySelector;
+			return createJoinOperator();
+		}
+
+		private JoinedStream<I1, I2> createJoinOperator() {
+
+			JoinFunction<I1, I2, Tuple2<I1, I2>> joinFunction = new DefaultJoinFunction<I1, I2>();
+
+			JoinWindowFunction<I1, I2, Tuple2<I1, I2>> joinWindowFunction = getJoinWindowFunction(
+					joinFunction, this);
+
+			TypeInformation<Tuple2<I1, I2>> outType = new TupleTypeInfo<Tuple2<I1, I2>>(
+					op.input1.getType(), op.input2.getType());
+
+			return new JoinedStream<I1, I2>(this, op.input1
+					.groupBy(keys1)
+					.connect(op.input2.groupBy(keys2))
+					.addGeneralWindowCombine(joinWindowFunction, outType, op.windowSize,
+							op.slideInterval, op.timeStamp1, op.timeStamp2));
+		}
+	}
+
+	public static class JoinedStream<I1, I2> extends
+			SingleOutputStreamOperator<Tuple2<I1, I2>, JoinedStream<I1, I2>> {
+		private final JoinPredicate<I1, I2> predicate;
+
+		private JoinedStream(JoinPredicate<I1, I2> predicate, DataStream<Tuple2<I1, I2>> ds) {
+			super(ds);
+			this.predicate = predicate;
+		}
+
+		/**
+		 * Completes a stream join. </p> The resulting operator wraps each pair
+		 * of joining elements using the user defined {@link JoinFunction}
+		 * 
+		 * @return The joined data stream.
+		 */
+		public <OUT> SingleOutputStreamOperator<OUT, ?> with(JoinFunction<I1, I2, OUT> joinFunction) {
+
+			TypeInformation<OUT> outType = TypeExtractor.getJoinReturnTypes(joinFunction,
+					predicate.op.input1.getType(), predicate.op.input2.getType());
+
+			CoWindowInvokable<I1, I2, OUT> invokable = new CoWindowInvokable<I1, I2, OUT>(
+					getJoinWindowFunction(joinFunction, predicate), predicate.op.windowSize,
+					predicate.op.slideInterval, predicate.op.timeStamp1, predicate.op.timeStamp2);
+
+			streamGraph.setInvokable(id, invokable);
+
+			return setType(outType);
+		}
+	}
+
+	public static final class DefaultJoinFunction<I1, I2> implements
+			JoinFunction<I1, I2, Tuple2<I1, I2>> {
+
+		private static final long serialVersionUID = 1L;
+		private final Tuple2<I1, I2> outTuple = new Tuple2<I1, I2>();
+
+		@Override
+		public Tuple2<I1, I2> join(I1 first, I2 second) throws Exception {
+			outTuple.f0 = first;
+			outTuple.f1 = second;
+			return outTuple;
+		}
+	}
+
+	public static <I1, I2, OUT> JoinWindowFunction<I1, I2, OUT> getJoinWindowFunction(
+			JoinFunction<I1, I2, OUT> joinFunction, JoinPredicate<I1, I2> predicate) {
+		return new JoinWindowFunction<I1, I2, OUT>(predicate.keys1, predicate.keys2, joinFunction);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
new file mode 100644
index 0000000..f121dfa
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalOperator.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.windowing.helper.SystemTimestamp;
+import org.apache.flink.streaming.api.windowing.helper.Timestamp;
+import org.apache.flink.streaming.api.windowing.helper.TimestampWrapper;
+
+public abstract class TemporalOperator<I1, I2, OP extends TemporalWindow<OP>> {
+
+	public final DataStream<I1> input1;
+	public final DataStream<I2> input2;
+
+	public long windowSize;
+	public long slideInterval;
+
+	public TimestampWrapper<I1> timeStamp1;
+	public TimestampWrapper<I2> timeStamp2;
+
+	public TemporalOperator(DataStream<I1> input1, DataStream<I2> input2) {
+		if (input1 == null || input2 == null) {
+			throw new NullPointerException();
+		}
+		this.input1 = input1.copy();
+		this.input2 = input2.copy();
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be transformed.
+	 * To define sliding windows call {@link TemporalWindow#every} on the
+	 * resulting operator.
+	 * 
+	 * @param length
+	 *            The size of the window in milliseconds.
+	 * @param timeUnit
+	 *            The unit if time to be used
+	 * @return An incomplete temporal transformation.
+	 */
+	@SuppressWarnings("unchecked")
+	public OP onWindow(long length, TimeUnit timeUnit) {
+		return onWindow(timeUnit.toMillis(length),
+				(TimestampWrapper<I1>) SystemTimestamp.getWrapper(),
+				(TimestampWrapper<I2>) SystemTimestamp.getWrapper());
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be
+	 * transformed.To define sliding windows call {@link TemporalWindow#every}
+	 * on the resulting operator.
+	 * 
+	 * @param windowSize
+	 *            The size of the window in milliseconds.
+	 * @param timeStamp1
+	 *            The timestamp used to extract time from the elements of the
+	 *            first data stream.
+	 * @param timeStamp2
+	 *            The timestamp used to extract time from the elements of the
+	 *            second data stream.
+	 * @return An incomplete temporal transformation.
+	 */
+	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2) {
+		return onWindow(length, timeStamp1, timeStamp2, 0);
+	}
+
+	/**
+	 * Continues a temporal transformation.<br/>
+	 * Defines the window size on which the two DataStreams will be
+	 * transformed.To define sliding windows call {@link TemporalWindow#every}
+	 * on the resulting operator.
+	 * 
+	 * @param windowSize
+	 *            The size of the window in milliseconds.
+	 * @param timeStamp1
+	 *            The timestamp used to extract time from the elements of the
+	 *            first data stream.
+	 * @param timeStamp2
+	 *            The timestamp used to extract time from the elements of the
+	 *            second data stream.
+	 * @param startTime
+	 *            The start time to measure the first window
+	 * @return An incomplete temporal transformation.
+	 */
+	public OP onWindow(long length, Timestamp<I1> timeStamp1, Timestamp<I2> timeStamp2,
+			long startTime) {
+		return onWindow(length, new TimestampWrapper<I1>(timeStamp1, startTime),
+				new TimestampWrapper<I2>(timeStamp2, startTime));
+	}
+
+	private OP onWindow(long length, TimestampWrapper<I1> timeStamp1,
+			TimestampWrapper<I2> timeStamp2) {
+
+		this.windowSize = length;
+		this.slideInterval = length;
+
+		this.timeStamp1 = timeStamp1;
+		this.timeStamp2 = timeStamp2;
+
+		return createNextWindowOperator();
+	}
+
+	protected abstract OP createNextWindowOperator();
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
new file mode 100644
index 0000000..8ac1492
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/temporaloperator/TemporalWindow.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream.temporaloperator;
+
+import java.util.concurrent.TimeUnit;
+
+public interface TemporalWindow<T> {
+
+	/**
+	 * Defines the slide interval for this temporal operator
+	 * 
+	 * @param length
+	 *            Length of the window
+	 * @param timeUnit
+	 *            Unit of time
+	 * @return The temporal operator with slide interval specified
+	 */
+	public T every(long length, TimeUnit timeUnit);
+
+	/**
+	 * Defines the slide interval for this temporal operator
+	 * 
+	 * @param length
+	 *            Length of the window
+	 * @return The temporal operator with slide interval specified
+	 */
+	public T every(long length);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
new file mode 100755
index 0000000..4824fca
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.streaming.util.ClusterUtil;
+
+public class LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+	protected static ClassLoader userClassLoader;
+
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a
+	 * default name.
+	 */
+	@Override
+	public void execute() throws Exception {
+		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(), getDegreeOfParallelism());
+	}
+
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil with a user
+	 * specified name.
+	 * 
+	 * @param jobName
+	 *            name of the job
+	 */
+	@Override
+	public void execute(String jobName) throws Exception {
+		ClusterUtil.runOnMiniCluster(this.streamGraph.getJobGraph(jobName),
+				getDegreeOfParallelism());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
new file mode 100644
index 0000000..2eb05ad
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
+	private static final Logger LOG = LoggerFactory.getLogger(RemoteStreamEnvironment.class);
+
+	private String host;
+	private int port;
+	private List<File> jarFiles;
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 */
+	public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
+		if (host == null) {
+			throw new NullPointerException("Host must not be null.");
+		}
+
+		if (port < 1 || port >= 0xffff) {
+			throw new IllegalArgumentException("Port out of range");
+		}
+
+		this.host = host;
+		this.port = port;
+		this.jarFiles = new ArrayList<File>();
+		for (int i = 0; i < jarFiles.length; i++) {
+			File file = new File(jarFiles[i]);
+			try {
+				JobWithJars.checkJarFile(file);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+			}
+			this.jarFiles.add(file);
+		}
+	}
+
+	@Override
+	public void execute() {
+
+		JobGraph jobGraph = streamGraph.getJobGraph();
+		executeRemotely(jobGraph);
+	}
+
+	@Override
+	public void execute(String jobName) {
+
+		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
+		executeRemotely(jobGraph);
+	}
+
+	/**
+	 * Executes the remote job.
+	 * 
+	 * @param jobGraph
+	 *            jobGraph to execute
+	 */
+	private void executeRemotely(JobGraph jobGraph) {
+		if (LOG.isInfoEnabled()) {
+			LOG.info("Running remotely at {}:{}", host, port);
+		}
+
+		for (File file : jarFiles) {
+			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		}
+
+		Configuration configuration = jobGraph.getJobConfiguration();
+		Client client = new Client(new InetSocketAddress(host, port), configuration,
+				JobWithJars.buildUserCodeClassLoader(jarFiles, JobWithJars.class.getClassLoader()));
+
+		try {
+			client.run(jobGraph, true);
+		} catch (ProgramInvocationException e) {
+			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "Remote Environment (" + this.host + ":" + this.port + " - DOP = "
+				+ (getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism()) + ")";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
new file mode 100644
index 0000000..7d41d2a
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamContextEnvironment.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.util.List;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class StreamContextEnvironment extends StreamExecutionEnvironment {
+
+	protected static ClassLoader userClassLoader;
+	protected List<File> jars;
+	protected Client client;
+
+	protected StreamContextEnvironment(Client client, List<File> jars, int dop) {
+		this.client = client;
+		this.jars = jars;
+		if (dop > 0) {
+			setDegreeOfParallelism(dop);
+		} else {
+			setDegreeOfParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+		}
+	}
+
+	@Override
+	public void execute() throws Exception {
+		execute(null);
+	}
+
+	@Override
+	public void execute(String jobName) throws Exception {
+		currentEnvironment = null;
+
+		JobGraph jobGraph;
+		if (jobName == null) {
+			jobGraph = this.streamGraph.getJobGraph();
+		} else {
+			jobGraph = this.streamGraph.getJobGraph(jobName);
+		}
+
+		for (File file : jars) {
+			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		}
+
+		try {
+			client.run(jobGraph, true);
+
+		} catch (Exception e) {
+			throw e;
+		}
+
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
new file mode 100644
index 0000000..45c14c1
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -0,0 +1,618 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.TextInputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.ContextEnvironment;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.streaming.api.StreamGraph;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.function.source.FileMonitoringFunction;
+import org.apache.flink.streaming.api.function.source.FileMonitoringFunction.WatchType;
+import org.apache.flink.streaming.api.function.source.FileReadFunction;
+import org.apache.flink.streaming.api.function.source.FileSourceFunction;
+import org.apache.flink.streaming.api.function.source.FromElementsFunction;
+import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.GenericSourceFunction;
+import org.apache.flink.streaming.api.function.source.ParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.RichParallelSourceFunction;
+import org.apache.flink.streaming.api.function.source.SocketTextStreamFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.api.invokable.StreamInvokable;
+
+/**
+ * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
+ * necessary to construct streaming topologies.
+ * 
+ */
+public abstract class StreamExecutionEnvironment {
+
+	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
+
+	private long bufferTimeout = 100;
+
+	private ExecutionConfig config = new ExecutionConfig();
+
+	protected static StreamExecutionEnvironment currentEnvironment;
+
+	protected StreamGraph streamGraph;
+
+	// --------------------------------------------------------------------------------------------
+	// Constructor and Properties
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Constructor for creating StreamExecutionEnvironment
+	 */
+	protected StreamExecutionEnvironment() {
+		streamGraph = new StreamGraph();
+	}
+
+	/**
+	 * Sets the config object.
+	 */
+	public void setConfig(ExecutionConfig config) {
+		Validate.notNull(config);
+		this.config = config;
+	}
+
+	/**
+	 * Gets the config object.
+	 */
+	public ExecutionConfig getConfig() {
+		return config;
+	}
+
+	/**
+	 * Gets the degree of parallelism with which operation are executed by
+	 * default. Operations can individually override this value to use a
+	 * specific degree of parallelism.
+	 * 
+	 * @return The degree of parallelism used by operations, unless they
+	 *         override that value.
+	 */
+	public int getDegreeOfParallelism() {
+		return config.getDegreeOfParallelism();
+	}
+
+	/**
+	 * Sets the degree of parallelism (DOP) for operations executed through this
+	 * environment. Setting a DOP of x here will cause all operators (such as
+	 * map, batchReduce) to run with x parallel instances. This method overrides
+	 * the default parallelism for this environment. The
+	 * {@link LocalStreamEnvironment} uses by default a value equal to the
+	 * number of hardware contexts (CPU cores / threads). When executing the
+	 * program via the command line client from a JAR file, the default degree
+	 * of parallelism is the one configured for that setup.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism
+	 */
+	public StreamExecutionEnvironment setDegreeOfParallelism(int degreeOfParallelism) {
+		if (degreeOfParallelism < 1) {
+			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+		}
+		config.setDegreeOfParallelism(degreeOfParallelism);
+		return this;
+	}
+
+	/**
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. By default the output buffers flush frequently to provide
+	 * low latency and to aid smooth developer experience. Setting the parameter
+	 * can result in three logical modes:
+	 * 
+	 * <ul>
+	 * <li>
+	 * A positive integer triggers flushing periodically by that integer</li>
+	 * <li>
+	 * 0 triggers flushing after every record thus minimizing latency</li>
+	 * <li>
+	 * -1 triggers flushing only when the output buffer is full thus maximizing
+	 * throughput</li>
+	 * </ul>
+	 * 
+	 * @param timeoutMillis
+	 *            The maximum time between two output flushes.
+	 */
+	public StreamExecutionEnvironment setBufferTimeout(long timeoutMillis) {
+		if (timeoutMillis < -1) {
+			throw new IllegalArgumentException("Timeout of buffer must be non-negative or -1");
+		}
+
+		this.bufferTimeout = timeoutMillis;
+		return this;
+	}
+
+	/**
+	 * Sets the maximum time frequency (milliseconds) for the flushing of the
+	 * output buffers. For clarification on the extremal values see
+	 * {@link #setBufferTimeout(long)}.
+	 * 
+	 * @return The timeout of the buffer.
+	 */
+	public long getBufferTimeout() {
+		return this.bufferTimeout;
+	}
+
+	/**
+	 * Sets the default parallelism that will be used for the local execution
+	 * environment created by {@link #createLocalEnvironment()}.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism to use as the default local
+	 *            parallelism.
+	 */
+	public static void setDefaultLocalParallelism(int degreeOfParallelism) {
+		defaultLocalDop = degreeOfParallelism;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Data stream creations
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise. The file will be read with the system's default
+	 * character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextFile(String filePath) {
+		Validate.notNull(filePath, "The file path may not be null.");
+		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+
+		return addFileSource(format, typeInfo);
+	}
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise. The file will be read with the given character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextFile(String filePath, String charsetName) {
+		Validate.notNull(filePath, "The file path may not be null.");
+		TextInputFormat format = new TextInputFormat(new Path(filePath));
+		TypeInformation<String> typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
+		format.setCharsetName(charsetName);
+
+		return addFileSource(format, typeInfo);
+	}
+
+	/**
+	 * Creates a DataStream that contains the contents of file created while
+	 * system watches the given path. The file will be read with the system's
+	 * default character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path/").
+	 * @param intervalMillis
+	 *            The interval of file watching in milliseconds.
+	 * @param watchType
+	 *            The watch type of file stream. When watchType is
+	 *            {@link WatchType.ONLY_NEW_FILES}, the system processes only
+	 *            new files. {@link WatchType.REPROCESS_WITH_APPENDED} means
+	 *            that the system re-processes all contents of appended file.
+	 *            {@link WatchType.PROCESS_ONLY_APPENDED} means that the system
+	 *            processes only appended contents of files.
+	 * 
+	 * @return The DataStream containing the given directory.
+	 */
+	public DataStream<String> readFileStream(String filePath, long intervalMillis,
+			WatchType watchType) {
+		DataStream<Tuple3<String, Long, Long>> source = addSource(new FileMonitoringFunction(
+				filePath, intervalMillis, watchType), null, "File Stream");
+
+		return source.flatMap(new FileReadFunction());
+	}
+
+	/**
+	 * Creates a new DataStream that contains the given elements. The elements
+	 * must all be of the same type, for example, all of the String or Integer.
+	 * The sequence of elements must not be empty. Furthermore, the elements
+	 * must be serializable (as defined in java.io.Serializable), because the
+	 * execution environment may ship the elements into the cluster.
+	 * 
+	 * @param data
+	 *            The collection of elements to create the DataStream from.
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return The DataStream representing the elements.
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+		if (data.length == 0) {
+			throw new IllegalArgumentException(
+					"fromElements needs at least one element as argument");
+		}
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data[0]);
+
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+
+		return addSource(function, outTypeInfo, "Elements source");
+	}
+
+	/**
+	 * Creates a DataStream from the given non-empty collection. The type of the
+	 * DataStream is that of the elements in the collection. The elements need
+	 * to be serializable (as defined by java.io.Serializable), because the
+	 * framework may move the elements into the cluster if needed.
+	 * 
+	 * @param data
+	 *            The collection of elements to create the DataStream from.
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return The DataStream representing the elements.
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+		if (data == null) {
+			throw new NullPointerException("Collection must not be null");
+		}
+
+		if (data.isEmpty()) {
+			throw new IllegalArgumentException("Collection must not be empty");
+		}
+
+		TypeInformation<OUT> outTypeInfo = TypeExtractor.getForObject(data.iterator().next());
+		SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+
+		return addSource(function, outTypeInfo, "Collection Source");
+	}
+
+	/**
+	 * Creates a new DataStream that contains the strings received infinitely
+	 * from socket. Received strings are decoded by the system's default
+	 * character set.
+	 * 
+	 * @param hostname
+	 *            The host name which a server socket bind.
+	 * @param port
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
+	 * @param delimiter
+	 *            A character which split received strings into records.
+	 * @return A DataStream, containing the strings received from socket.
+	 */
+	public DataStreamSource<String> socketTextStream(String hostname, int port, char delimiter) {
+		return addSource(new SocketTextStreamFunction(hostname, port, delimiter), null,
+				"Socket Stream");
+	}
+
+	/**
+	 * Creates a new DataStream that contains the strings received infinitely
+	 * from socket. Received strings are decoded by the system's default
+	 * character set, uses '\n' as delimiter.
+	 * 
+	 * @param hostname
+	 *            The host name which a server socket bind.
+	 * @param port
+	 *            The port number which a server socket bind. A port number of 0
+	 *            means that the port number is automatically allocated.
+	 * @return A DataStream, containing the strings received from socket.
+	 */
+	public DataStreamSource<String> socketTextStream(String hostname, int port) {
+		return socketTextStream(hostname, port, '\n');
+	}
+
+	/**
+	 * Creates a new DataStream that contains a sequence of numbers.
+	 * 
+	 * @param from
+	 *            The number to start at (inclusive).
+	 * @param to
+	 *            The number to stop at (inclusive)
+	 * @return A DataStrean, containing all number in the [from, to] interval.
+	 */
+	public DataStreamSource<Long> generateSequence(long from, long to) {
+		if (from > to) {
+			throw new IllegalArgumentException("Start of sequence must not be greater than the end");
+		}
+		return addSource(new GenSequenceFunction(from, to), null, "Sequence Source");
+	}
+
+	private DataStreamSource<String> addFileSource(InputFormat<String, ?> inputFormat,
+			TypeInformation<String> typeInfo) {
+		FileSourceFunction function = new FileSourceFunction(inputFormat, typeInfo);
+		DataStreamSource<String> returnStream = addSource(function, null, "File Source");
+		streamGraph.setInputFormat(returnStream.getId(), inputFormat);
+		return returnStream;
+	}
+
+	/**
+	 * Create a DataStream using a user defined source function for arbitrary
+	 * source functionality.</p> By default sources have a parallelism of 1. To
+	 * enable parallel execution, the user defined source should implement
+	 * {@link ParallelSourceFunction} or extend
+	 * {@link RichParallelSourceFunction}. In these cases the resulting source
+	 * will have the parallelism of the environment. To change this afterwards
+	 * call {@link DataStreamSource#setParallelism(int)}
+	 * 
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function) {
+		return addSource(function, null);
+	}
+
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(SourceFunction)} </p> By default sources have a
+	 * parallelism of 1. To enable parallel execution, the user defined source
+	 * should implement {@link ParallelSourceFunction} or extend
+	 * {@link RichParallelSourceFunction}. In these cases the resulting source
+	 * will have the parallelism of the environment. To change this afterwards
+	 * call {@link DataStreamSource#setParallelism(int)}
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param outTypeInfo
+	 *            the user defined type information for the stream
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
+			TypeInformation<OUT> outTypeInfo) {
+		return addSource(function, outTypeInfo, "Custom Source");
+	}
+
+	/**
+	 * Ads a data source with a custom type information thus opening a
+	 * {@link DataStream}. Only in very special cases does the user need to
+	 * support type information. Otherwise use
+	 * {@link #addSource(SourceFunction)}
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param outTypeInfo
+	 *            the user defined type information for the stream
+	 * @param sourceName
+	 *            Name of the data source
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	@SuppressWarnings("unchecked")
+	private <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function,
+			TypeInformation<OUT> outTypeInfo, String sourceName) {
+
+		if (outTypeInfo == null) {
+			if (function instanceof GenericSourceFunction) {
+				outTypeInfo = ((GenericSourceFunction<OUT>) function).getType();
+			} else {
+				outTypeInfo = TypeExtractor.createTypeInfo(SourceFunction.class,
+						function.getClass(), 0, null, null);
+			}
+		}
+
+		boolean isParallel = function instanceof ParallelSourceFunction;
+		int dop = isParallel ? getDegreeOfParallelism() : 1;
+
+		StreamInvokable<OUT, OUT> sourceInvokable = new SourceInvokable<OUT>(function);
+
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, sourceName,
+				outTypeInfo, sourceInvokable, isParallel);
+
+		streamGraph.addSourceVertex(returnStream.getId(), sourceInvokable, null, outTypeInfo,
+				sourceName, dop);
+
+		return returnStream;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Instantiation of Execution Contexts
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates an execution environment that represents the context in which the
+	 * program is currently executed. If the program is invoked standalone, this
+	 * method returns a local execution environment, as returned by
+	 * {@link #createLocalEnvironment()}.
+	 * 
+	 * @return The execution environment of the context in which the program is
+	 *         executed.
+	 */
+	public static StreamExecutionEnvironment getExecutionEnvironment() {
+		if (currentEnvironment != null) {
+			return currentEnvironment;
+		}
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		if (env instanceof ContextEnvironment) {
+			ContextEnvironment ctx = (ContextEnvironment) env;
+			currentEnvironment = createContextEnvironment(ctx.getClient(), ctx.getJars(),
+					ctx.getDegreeOfParallelism());
+		} else if (env instanceof OptimizerPlanEnvironment | env instanceof PreviewPlanEnvironment) {
+			currentEnvironment = new StreamPlanEnvironment(env);
+		} else {
+			return createLocalEnvironment();
+		}
+		return currentEnvironment;
+	}
+
+	private static StreamExecutionEnvironment createContextEnvironment(Client client,
+			List<File> jars, int dop) {
+		return new StreamContextEnvironment(client, jars, dop);
+	}
+
+	/**
+	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
+	 * will run the program in a multi-threaded fashion in the same JVM as the
+	 * environment was created in. The default degree of parallelism of the
+	 * local environment is the number of hardware contexts (CPU cores /
+	 * threads), unless it was specified differently by
+	 * {@link #setDegreeOfParallelism(int)}.
+	 * 
+	 * @return A local execution environment.
+	 */
+	public static LocalStreamEnvironment createLocalEnvironment() {
+		return createLocalEnvironment(defaultLocalDop);
+	}
+
+	/**
+	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
+	 * will run the program in a multi-threaded fashion in the same JVM as the
+	 * environment was created in. It will use the degree of parallelism
+	 * specified in the parameter.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism for the local environment.
+	 * @return A local execution environment with the specified degree of
+	 *         parallelism.
+	 */
+	public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
+		currentEnvironment = new LocalStreamEnvironment();
+		currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+		return (LocalStreamEnvironment) currentEnvironment;
+	}
+
+	// TODO:fix cluster default parallelism
+	/**
+	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+	 * (parts of) the program to a cluster for execution. Note that all file
+	 * paths used in the program must be accessible from the cluster. The
+	 * execution will use no parallelism, unless the parallelism is set
+	 * explicitly via {@link #setDegreeOfParallelism}.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
+	 */
+	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
+			String... jarFiles) {
+		currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
+		return currentEnvironment;
+	}
+
+	/**
+	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+	 * (parts of) the program to a cluster for execution. Note that all file
+	 * paths used in the program must be accessible from the cluster. The
+	 * execution will use the specified degree of parallelism.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism to use during the execution.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
+	 */
+	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
+			int degreeOfParallelism, String... jarFiles) {
+		currentEnvironment = new RemoteStreamEnvironment(host, port, jarFiles);
+		currentEnvironment.setDegreeOfParallelism(degreeOfParallelism);
+		return currentEnvironment;
+	}
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p>
+	 * The program execution will be logged and displayed with a generated
+	 * default name.
+	 * 
+	 * @throws Exception
+	 **/
+	public abstract void execute() throws Exception;
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p>
+	 * The program execution will be logged and displayed with the provided name
+	 * 
+	 * @param jobName
+	 *            Desired name of the job
+	 * 
+	 * @throws Exception
+	 **/
+	public abstract void execute(String jobName) throws Exception;
+
+	/**
+	 * Getter of the {@link StreamGraph} of the streaming job.
+	 * 
+	 * @return The streamgraph representing the transformations
+	 */
+	public StreamGraph getStreamGraph() {
+		return streamGraph;
+	}
+
+	/**
+	 * Creates the plan with which the system will execute the program, and
+	 * returns it as a String using a JSON representation of the execution data
+	 * flow graph. Note that this needs to be called, before the plan is
+	 * executed.
+	 * 
+	 * @return The execution plan of the program, as a JSON String.
+	 */
+	public String getExecutionPlan() {
+		return getStreamGraph().getStreamingPlanAsJSON();
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/f94112fb/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
new file mode 100644
index 0000000..1cff7e7
--- /dev/null
+++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamPlanEnvironment.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.Client.OptimizerPlanEnvironment;
+import org.apache.flink.client.program.PackagedProgram.PreviewPlanEnvironment;
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
+
+public class StreamPlanEnvironment extends StreamExecutionEnvironment {
+
+	private ExecutionEnvironment env;
+
+	protected StreamPlanEnvironment(ExecutionEnvironment env) {
+		super();
+		this.env = env;
+
+		int dop = env.getDegreeOfParallelism();
+		if (dop > 0) {
+			setDegreeOfParallelism(dop);
+		} else {
+			setDegreeOfParallelism(GlobalConfiguration.getInteger(
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE_KEY,
+					ConfigConstants.DEFAULT_PARALLELIZATION_DEGREE));
+		}
+	}
+
+	@Override
+	public void execute() throws Exception {
+		execute("");
+	}
+
+	@Override
+	public void execute(String jobName) throws Exception {
+		currentEnvironment = null;
+
+		streamGraph.setJobName(jobName);
+
+		if (env instanceof OptimizerPlanEnvironment) {
+			((OptimizerPlanEnvironment) env).setPlan(streamGraph);
+		} else if (env instanceof PreviewPlanEnvironment) {
+			((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
+		}
+
+		throw new Client.ProgramAbortException();
+	}
+}


Mime
View raw message