flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [38/51] [abbrv] [streaming] API update with more differentiated DataStream types and javadoc + several fixes
Date Mon, 18 Aug 2014 17:26:15 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
new file mode 100755
index 0000000..8bcde44
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -0,0 +1,70 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+/**
+ * The SplitDataStream represents an operator that has been split using an
+ * {@link OutputSelector}. Named outputs can be selected using the
+ * {@link #select} function.
+ *
+ * @param <T>
+ *            The type of the output.
+ */
+public class SplitDataStream<T> {
+
+	DataStream<T> dataStream;
+
+	protected SplitDataStream(DataStream<T> dataStream) {
+		this.dataStream = dataStream.copy();
+	}
+
+	/**
+	 * Sets the output names for which the next operator will receive values.
+	 * 
+	 * @param outputNames
+	 *            The output names for which the operator will receive the
+	 *            input.
+	 * @return Returns the modified DataStream
+	 */
+	public DataStream<T> select(String... outputNames) {
+		DataStream<T> returnStream = selectOutput(outputNames[0]);
+		for (int i = 1; i < outputNames.length; i++) {
+			if (outputNames[i] == "") {
+				throw new IllegalArgumentException("User defined name must not be empty string");
+			}
+
+			returnStream = connectWithNames(returnStream, selectOutput(outputNames[i]));
+		}
+		return returnStream;
+	}
+
+	private DataStream<T> connectWithNames(DataStream<T> stream1, DataStream<T> stream2) {
+		ConnectedDataStream<T> returnStream = new ConnectedDataStream<T>(stream1.copy());
+		returnStream.connectedStreams.add(stream2.copy());
+		return returnStream;
+	}
+
+	private DataStream<T> selectOutput(String outputName) {
+		DataStream<T> returnStream = dataStream.copy();
+		returnStream.userDefinedName = outputName;
+		return returnStream;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
new file mode 100755
index 0000000..16e84bb
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/LocalStreamEnvironment.java
@@ -0,0 +1,40 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import org.apache.flink.streaming.util.ClusterUtil;
+
+public class LocalStreamEnvironment extends StreamExecutionEnvironment {
+
+	/**
+	 * Executes the JobGraph of the on a mini cluster of CLusterUtil.
+	 * 
+	 */
+	@Override
+	public void execute() {
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism());
+	}
+
+	public void executeTest(long memorySize) {
+		ClusterUtil.runOnMiniCluster(this.jobGraphBuilder.getJobGraph(), getExecutionParallelism(),
+				memorySize);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
new file mode 100644
index 0000000..19a2d48
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/RemoteStreamEnvironment.java
@@ -0,0 +1,107 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.apache.flink.client.program.Client;
+import org.apache.flink.client.program.JobWithJars;
+import org.apache.flink.client.program.ProgramInvocationException;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+
+public class RemoteStreamEnvironment extends StreamExecutionEnvironment {
+	private static final Log log = LogFactory.getLog(RemoteStreamEnvironment.class);
+
+	private String host;
+	private int port;
+	private String[] jarFiles;
+
+	/**
+	 * Creates a new RemoteStreamEnvironment that points to the master
+	 * (JobManager) described by the given host name and port.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 */
+	public RemoteStreamEnvironment(String host, int port, String... jarFiles) {
+		if (host == null) {
+			throw new NullPointerException("Host must not be null.");
+		}
+
+		if (port < 1 || port >= 0xffff) {
+			throw new IllegalArgumentException("Port out of range");
+		}
+
+		this.host = host;
+		this.port = port;
+		this.jarFiles = jarFiles;
+	}
+
+	@Override
+	public void execute() {
+		if (log.isInfoEnabled()) {
+			log.info("Running remotely at " + host + ":" + port);
+		}
+
+		JobGraph jobGraph = jobGraphBuilder.getJobGraph();
+
+		for (int i = 0; i < jarFiles.length; i++) {
+			File file = new File(jarFiles[i]);
+			try {
+				JobWithJars.checkJarFile(file);
+			} catch (IOException e) {
+				throw new RuntimeException("Problem with jar file " + file.getAbsolutePath(), e);
+			}
+			jobGraph.addJar(new Path(file.getAbsolutePath()));
+		}
+
+		Configuration configuration = jobGraph.getJobConfiguration();
+		Client client = new Client(new InetSocketAddress(host, port), configuration);
+
+		try {
+			client.run(jobGraph, true);
+		} catch (ProgramInvocationException e) {
+			throw new RuntimeException("Cannot execute job due to ProgramInvocationException", e);
+		}
+	}
+
+	@Override
+	public String toString() {
+		return "Remote Environment (" + this.host + ":" + this.port + " - DOP = "
+				+ (getDegreeOfParallelism() == -1 ? "default" : getDegreeOfParallelism()) + ")";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
new file mode 100644
index 0000000..3773d8e
--- /dev/null
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -0,0 +1,413 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.flink.streaming.api.environment;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.commons.lang3.SerializationException;
+import org.apache.commons.lang3.SerializationUtils;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.streaming.api.JobGraphBuilder;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.function.source.FileSourceFunction;
+import org.apache.flink.streaming.api.function.source.FileStreamFunction;
+import org.apache.flink.streaming.api.function.source.FromElementsFunction;
+import org.apache.flink.streaming.api.function.source.GenSequenceFunction;
+import org.apache.flink.streaming.api.function.source.SourceFunction;
+import org.apache.flink.streaming.api.invokable.SourceInvokable;
+import org.apache.flink.streaming.util.serialization.FunctionTypeWrapper;
+import org.apache.flink.streaming.util.serialization.ObjectTypeWrapper;
+
+/**
+ * {@link ExecutionEnvironment} for streaming jobs. An instance of it is
+ * necessary to construct streaming topologies.
+ * 
+ */
+public abstract class StreamExecutionEnvironment {
+
+	/**
+	 * The environment of the context (local by default, cluster if invoked
+	 * through command line)
+	 */
+	private static StreamExecutionEnvironment contextEnvironment;
+
+	/** flag to disable local executor when using the ContextEnvironment */
+	private static boolean allowLocalExecution = true;
+
+	private static int defaultLocalDop = Runtime.getRuntime().availableProcessors();
+
+	private int degreeOfParallelism = 1;
+
+	private int executionParallelism = -1;
+
+	protected JobGraphBuilder jobGraphBuilder;
+
+	// --------------------------------------------------------------------------------------------
+	// Constructor and Properties
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Constructor for creating StreamExecutionEnvironment
+	 */
+	protected StreamExecutionEnvironment() {
+		jobGraphBuilder = new JobGraphBuilder("jobGraph");
+	}
+
+	public int getExecutionParallelism() {
+		return executionParallelism == -1 ? degreeOfParallelism : executionParallelism;
+	}
+
+	/**
+	 * Gets the degree of parallelism with which operation are executed by
+	 * default. Operations can individually override this value to use a
+	 * specific degree of parallelism via {@link DataStream#setParallelism}.
+	 * 
+	 * @return The degree of parallelism used by operations, unless they
+	 *         override that value.
+	 */
+	public int getDegreeOfParallelism() {
+		return this.degreeOfParallelism;
+	}
+
+	/**
+	 * Sets the degree of parallelism (DOP) for operations executed through this
+	 * environment. Setting a DOP of x here will cause all operators (such as
+	 * map, batchReduce) to run with x parallel instances. This method overrides
+	 * the default parallelism for this environment. The
+	 * {@link LocalStreamEnvironment} uses by default a value equal to the
+	 * number of hardware contexts (CPU cores / threads). When executing the
+	 * program via the command line client from a JAR file, the default degree
+	 * of parallelism is the one configured for that setup.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism
+	 */
+	protected void setDegreeOfParallelism(int degreeOfParallelism) {
+		if (degreeOfParallelism < 1) {
+			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+		}
+		this.degreeOfParallelism = degreeOfParallelism;
+	}
+
+	/**
+	 * Sets the number of hardware contexts (CPU cores / threads) used when
+	 * executed in {@link LocalStreamEnvironment}.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism in local environment
+	 */
+	public void setExecutionParallelism(int degreeOfParallelism) {
+		if (degreeOfParallelism < 1) {
+			throw new IllegalArgumentException("Degree of parallelism must be at least one.");
+		}
+
+		this.executionParallelism = degreeOfParallelism;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Data stream creations
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise. The file will be read with the system's default
+	 * character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextFile(String filePath) {
+		return addSource(new FileSourceFunction(filePath), 1);
+	}
+
+	public DataStreamSource<String> readTextFile(String filePath, int parallelism) {
+		return addSource(new FileSourceFunction(filePath), parallelism);
+	}
+
+	/**
+	 * Creates a DataStream that represents the Strings produced by reading the
+	 * given file line wise multiple times(infinite). The file will be read with
+	 * the system's default character set.
+	 * 
+	 * @param filePath
+	 *            The path of the file, as a URI (e.g.,
+	 *            "file:///some/local/file" or "hdfs://host:port/file/path").
+	 * @return The DataStream representing the text file.
+	 */
+	public DataStreamSource<String> readTextStream(String filePath) {
+		return addSource(new FileStreamFunction(filePath), 1);
+	}
+
+	public DataStreamSource<String> readTextStream(String filePath, int parallelism) {
+		return addSource(new FileStreamFunction(filePath), parallelism);
+	}
+
+	/**
+	 * Creates a new DataStream that contains the given elements. The elements
+	 * must all be of the same type, for example, all of the String or Integer.
+	 * The sequence of elements must not be empty. Furthermore, the elements
+	 * must be serializable (as defined in java.io.Serializable), because the
+	 * execution environment may ship the elements into the cluster.
+	 * 
+	 * @param data
+	 *            The collection of elements to create the DataStream from.
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return The DataStream representing the elements.
+	 */
+	public <OUT extends Serializable> DataStreamSource<OUT> fromElements(OUT... data) {
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(
+				this, "elements");
+
+		try {
+			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
+					new ObjectTypeWrapper<OUT, Tuple, OUT>(data[0], null, data[0]), "source",
+					SerializationUtils.serialize(function), 1);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize elements");
+		}
+		return returnStream;
+	}
+
+	/**
+	 * Creates a DataStream from the given non-empty collection. The type of the
+	 * DataStream is that of the elements in the collection. The elements need
+	 * to be serializable (as defined by java.io.Serializable), because the
+	 * framework may move the elements into the cluster if needed.
+	 * 
+	 * @param data
+	 *            The collection of elements to create the DataStream from.
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return The DataStream representing the elements.
+	 */
+	@SuppressWarnings("unchecked")
+	public <OUT extends Serializable> DataStreamSource<OUT> fromCollection(Collection<OUT> data) {
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "elements");
+
+		if (data.isEmpty()) {
+			throw new RuntimeException("Collection must not be empty");
+		}
+
+		try {
+			SourceFunction<OUT> function = new FromElementsFunction<OUT>(data);
+
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(
+					new FromElementsFunction<OUT>(data)), new ObjectTypeWrapper<OUT, Tuple, OUT>(
+					(OUT) data.toArray()[0], null, (OUT) data.toArray()[0]), "source",
+					SerializationUtils.serialize(function), 1);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize collection");
+		}
+
+		return returnStream;
+	}
+
+	/**
+	 * Creates a new DataStream that contains a sequence of numbers.
+	 * 
+	 * @param from
+	 *            The number to start at (inclusive).
+	 * @param to
+	 *            The number to stop at (inclusive)
+	 * @return A DataStrean, containing all number in the [from, to] interval.
+	 */
+	public DataStreamSource<Long> generateSequence(long from, long to) {
+		return addSource(new GenSequenceFunction(from, to), 1);
+	}
+
+	/**
+	 * Ads a data source thus opening a {@link DataStream}.
+	 * 
+	 * @param function
+	 *            the user defined function
+	 * @param parallelism
+	 *            number of parallel instances of the function
+	 * @param <OUT>
+	 *            type of the returned stream
+	 * @return the data stream constructed
+	 */
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, int parallelism) {
+		DataStreamSource<OUT> returnStream = new DataStreamSource<OUT>(this, "source");
+
+		try {
+			jobGraphBuilder.addSource(returnStream.getId(), new SourceInvokable<OUT>(function),
+					new FunctionTypeWrapper<OUT, Tuple, OUT>(function, SourceFunction.class, 0, -1,
+							0), "source", SerializationUtils.serialize(function), parallelism);
+		} catch (SerializationException e) {
+			throw new RuntimeException("Cannot serialize SourceFunction");
+		}
+
+		return returnStream;
+	}
+
+	public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> sourceFunction) {
+		return addSource(sourceFunction, 1);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Instantiation of Execution Contexts
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Creates an execution environment that represents the context in which the
+	 * program is currently executed. If the program is invoked standalone, this
+	 * method returns a local execution environment, as returned by
+	 * {@link #createLocalEnvironment()}.
+	 * 
+	 * @return The execution environment of the context in which the program is
+	 *         executed.
+	 */
+	public static StreamExecutionEnvironment getExecutionEnvironment() {
+		return contextEnvironment == null ? createLocalEnvironment() : contextEnvironment;
+	}
+
+	/**
+	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
+	 * will run the program in a multi-threaded fashion in the same JVM as the
+	 * environment was created in. The default degree of parallelism of the
+	 * local environment is the number of hardware contexts (CPU cores /
+	 * threads), unless it was specified differently by
+	 * {@link #setDegreeOfParallelism(int)}.
+	 * 
+	 * @return A local execution environment.
+	 */
+	public static LocalStreamEnvironment createLocalEnvironment() {
+		return createLocalEnvironment(defaultLocalDop);
+	}
+
+	/**
+	 * Creates a {@link LocalStreamEnvironment}. The local execution environment
+	 * will run the program in a multi-threaded fashion in the same JVM as the
+	 * environment was created in. It will use the degree of parallelism
+	 * specified in the parameter.
+	 * 
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism for the local environment.
+	 * @return A local execution environment with the specified degree of
+	 *         parallelism.
+	 */
+	public static LocalStreamEnvironment createLocalEnvironment(int degreeOfParallelism) {
+		LocalStreamEnvironment lee = new LocalStreamEnvironment();
+		lee.setDegreeOfParallelism(degreeOfParallelism);
+		return lee;
+	}
+
+	// TODO:fix cluster default parallelism
+	/**
+	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+	 * (parts of) the program to a cluster for execution. Note that all file
+	 * paths used in the program must be accessible from the cluster. The
+	 * execution will use no parallelism, unless the parallelism is set
+	 * explicitly via {@link #setDegreeOfParallelism}.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
+	 */
+	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
+			String... jarFiles) {
+		return new RemoteStreamEnvironment(host, port, jarFiles);
+	}
+
+	/**
+	 * Creates a {@link RemoteStreamEnvironment}. The remote environment sends
+	 * (parts of) the program to a cluster for execution. Note that all file
+	 * paths used in the program must be accessible from the cluster. The
+	 * execution will use the specified degree of parallelism.
+	 * 
+	 * @param host
+	 *            The host name or address of the master (JobManager), where the
+	 *            program should be executed.
+	 * @param port
+	 *            The port of the master (JobManager), where the program should
+	 *            be executed.
+	 * @param degreeOfParallelism
+	 *            The degree of parallelism to use during the execution.
+	 * @param jarFiles
+	 *            The JAR files with code that needs to be shipped to the
+	 *            cluster. If the program uses user-defined functions,
+	 *            user-defined input formats, or any libraries, those must be
+	 *            provided in the JAR files.
+	 * @return A remote environment that executes the program on a cluster.
+	 */
+	public static StreamExecutionEnvironment createRemoteEnvironment(String host, int port,
+			int degreeOfParallelism, String... jarFiles) {
+		RemoteStreamEnvironment rec = new RemoteStreamEnvironment(host, port, jarFiles);
+		rec.setDegreeOfParallelism(degreeOfParallelism);
+		return rec;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	// Methods to control the context and local environments for execution from
+	// packaged programs
+	// --------------------------------------------------------------------------------------------
+
+	protected static void initializeContextEnvironment(StreamExecutionEnvironment ctx) {
+		contextEnvironment = ctx;
+	}
+
+	protected static boolean isContextEnvironmentSet() {
+		return contextEnvironment != null;
+	}
+
+	protected static void disableLocalExecution() {
+		allowLocalExecution = false;
+	}
+
+	public static boolean localExecutionIsAllowed() {
+		return allowLocalExecution;
+	}
+
+	/**
+	 * Triggers the program execution. The environment will execute all parts of
+	 * the program that have resulted in a "sink" operation. Sink operations are
+	 * for example printing results or forwarding them to a message queue.
+	 * <p>
+	 * The program execution will be logged and displayed with a generated
+	 * default name.
+	 **/
+	public abstract void execute();
+
+	/**
+	 * Getter of the {@link JobGraphBuilder} of the streaming job.
+	 * 
+	 * @return jobgraph
+	 */
+	public JobGraphBuilder getJobGraphBuilder() {
+		return jobGraphBuilder;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
index d1ef3d0..cc14e2c 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/CoMapFunction.java
@@ -22,6 +22,17 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 
+/**
+ * A CoMapFunction represents a Map transformation with two different input
+ * types.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
 public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
 
 	public OUT map1(IN1 value);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
index c93fc81..1468181 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/co/RichCoMapFunction.java
@@ -19,7 +19,20 @@
 package org.apache.flink.streaming.api.function.co;
 
 import org.apache.flink.api.common.functions.AbstractRichFunction;
+import org.apache.flink.api.common.functions.RichFunction;
 
+/**
+ * A RichCoMapFunction represents a Map transformation with two different input
+ * types. In addition to that the user can use the features provided by the
+ * {@link RichFunction} interface.
+ *
+ * @param <IN1>
+ *            Type of the first input.
+ * @param <IN2>
+ *            Type of the second input.
+ * @param <OUT>
+ *            Output type.
+ */
 public abstract class RichCoMapFunction<IN1, IN2, OUT> extends AbstractRichFunction implements
 		CoMapFunction<IN1, IN2, OUT> {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
index e50803f..76dee5d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/AbstractStreamComponent.java
@@ -141,7 +141,7 @@ public abstract class AbstractStreamComponent<OUT> extends AbstractInvokable {
 			}
 
 			outputs.add(output);
-			String outputName = configuration.getOutputName(outputNumber);
+			List<String> outputName = configuration.getOutputName(outputNumber);
 
 			if (collector != null) {
 				collector.addOutput(output, outputName);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
index 2b3edc2..5872da9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/IterateTest.java
@@ -25,6 +25,10 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.IterativeDataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
index 67dce9d..83c98fc 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/PrintTest.java
@@ -19,6 +19,8 @@
 
 package org.apache.flink.streaming.api;
 
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.log4j.Level;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
index 28cbc6e..8e9475a 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsCsvTest.java
@@ -29,6 +29,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
index 337ca4e..122979d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/WriteAsTextTest.java
@@ -29,6 +29,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index f2c647f..08387f9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -27,9 +27,10 @@ import java.util.Collection;
 import java.util.HashSet;
 
 import org.apache.flink.api.java.functions.RichMapFunction;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.SplitDataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SplitDataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.log4j.Level;
@@ -94,13 +95,11 @@ public class DirectedOutputTest {
 	public void directOutputTest() throws Exception {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector());
 		DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
 		DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
-		DataStream<Long> ds3 = s.map(new PlusTwo()).addSink(new OddSink());
-
-		env.execute();
+		env.executeTest(32);
 
 		HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
 		HashSet<Long> expectedOdd = new HashSet<Long>(Arrays.asList(3L, 5L, 7L));
@@ -114,7 +113,7 @@ public class DirectedOutputTest {
 	public void directNamingTest() {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
-		StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
+		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
 		SplitDataStream<Long> s = env.generateSequence(1, 10).split(new MySelector());
 		try {
 			s.select("ds2").connectWith(s.select("ds1"));
@@ -122,20 +121,6 @@ public class DirectedOutputTest {
 		} catch (Exception e) {
 			// Exception thrown
 		}
-		try {
-			s.shuffle().connectWith(s.select("ds1"));
-			fail();
-		} catch (Exception e) {
-			// Exception thrown
-		}
-		try {
-			s.select("ds2").connectWith(s);
-			fail();
-		} catch (Exception e) {
-			// Exception thrown
-		}
-		s.connectWith(s);
-		s.select("ds2").connectWith(s.select("ds2"));
 
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
index 7c7f593..72b09c9 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/StreamCollectorTest.java
@@ -21,6 +21,8 @@ package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertArrayEquals;
 
+import java.util.ArrayList;
+
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.streamcomponent.MockRecordWriter;
@@ -34,14 +36,15 @@ public class StreamCollectorTest {
 	public void testCollect() {
 		MockRecordWriter recWriter = MockRecordWriterFactory.create();
 
-		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2, new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
-		collector.addOutput(recWriter, null);
+		StreamCollector<Tuple1<Integer>> collector = new StreamCollector<Tuple1<Integer>>(2,
+				new SerializationDelegate<StreamRecord<Tuple1<Integer>>>(null));
+		collector.addOutput(recWriter, new ArrayList<String>());
 		collector.collect(new Tuple1<Integer>(3));
 		collector.collect(new Tuple1<Integer>(4));
 		collector.collect(new Tuple1<Integer>(5));
 		collector.collect(new Tuple1<Integer>(6));
 
-		assertArrayEquals(new Integer[] {3, 4, 5, 6}, recWriter.emittedRecords.toArray());
+		assertArrayEquals(new Integer[] { 3, 4, 5, 6 }, recWriter.emittedRecords.toArray());
 	}
 
 	@Test

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
index 49f4509..c91878b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/BatchReduceTest.java
@@ -25,9 +25,9 @@ import java.util.ArrayList;
 
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.LogUtils;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
index 82a5f89..020dae4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/CoMapTest.java
@@ -23,9 +23,9 @@ import java.io.Serializable;
 import java.util.HashSet;
 import java.util.Set;
 
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.co.CoMapFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
@@ -47,8 +47,7 @@ public class CoMapTest implements Serializable {
 		}
 	}
 
-	private final static class MyCoMap implements
-			CoMapFunction<String, Integer, Boolean> {
+	private final static class MyCoMap implements CoMapFunction<String, Integer, Boolean> {
 		private static final long serialVersionUID = 1L;
 
 		@Override
@@ -86,10 +85,8 @@ public class CoMapTest implements Serializable {
 		DataStream<String> ds3 = env.fromElements("a", "b");
 
 		@SuppressWarnings({ "unused", "unchecked" })
-		DataStream<Boolean> ds4 = env.fromElements("c").connectWith(ds3)
-				.coMapWith(new MyCoMap(),
-
-				ds2).addSink(new EmptySink());
+		DataStream<Boolean> ds4 = env.fromElements("c").connectWith(ds3).co(ds2).map(new MyCoMap())
+				.addSink(new EmptySink());
 
 		env.executeTest(32);
 		Assert.assertEquals(expected, result);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
index 0cba0bf..ec625e9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FilterTest.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.log4j.Level;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
index ca9c1cb..06f8447 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/FlatMapTest.java
@@ -26,9 +26,9 @@ import java.util.HashSet;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
index 73185df..0c59864 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/invokable/operator/MapTest.java
@@ -28,9 +28,9 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.streaming.util.LogUtils;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
index 7d78d8f..545169d 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/streamcomponent/StreamComponentTest.java
@@ -27,9 +27,9 @@ import java.util.Map;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.LocalStreamEnvironment;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.LocalStreamEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
index c5b51e6..b3cb9dc 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/basictopology/BasicTopology.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.examples.basictopology;
 
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
index bb5cc07..4f98f5a 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/cellinfo/CellInfoLocal.java
@@ -24,8 +24,8 @@ import java.util.Random;
 import org.apache.flink.api.java.functions.RichFlatMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/kmeans/KMeansLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/kmeans/KMeansLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/kmeans/KMeansLocal.java
index f9be802..e132867 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/kmeans/KMeansLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/kmeans/KMeansLocal.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.streaming.examples.iterative.kmeans;
 
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class KMeansLocal {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/pagerank/PageRankLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/pagerank/PageRankLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/pagerank/PageRankLocal.java
index 3d4a451..c84b86e 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/pagerank/PageRankLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/pagerank/PageRankLocal.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.streaming.examples.iterative.pagerank;
 
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class PageRankLocal {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/sssp/SSSPLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/sssp/SSSPLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/sssp/SSSPLocal.java
index 528511d..87f4a75 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/sssp/SSSPLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/iterative/sssp/SSSPLocal.java
@@ -19,7 +19,7 @@
 
 package org.apache.flink.streaming.examples.iterative.sssp;
 
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 
 public class SSSPLocal {
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
index cd29e7b..08738a2 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/join/JoinLocal.java
@@ -19,11 +19,10 @@
 
 package org.apache.flink.streaming.examples.join;
 
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.log4j.Level;
-
 import org.apache.flink.api.java.tuple.Tuple3;
 
 public class JoinLocal {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
index 6264cb9..4e9022b 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalLearningSkeleton.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.examples.ml;
 
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
index 366e7b5..078b514 100755
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/ml/IncrementalOLS.java
@@ -27,8 +27,8 @@ import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.source.SourceFunction;
 import org.apache.flink.util.Collector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
index 2b182c8..83cdd52 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/window/join/WindowJoinLocal.java
@@ -21,8 +21,8 @@ package org.apache.flink.streaming.examples.window.join;
 
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.LogUtils;
 import org.apache.log4j.Level;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/d56d48f1/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
index f77ab37..fc31930 100644
--- a/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
+++ b/flink-addons/flink-streaming/flink-streaming-examples/src/main/java/org/apache/flink/streaming/examples/wordcount/WordCountLocal.java
@@ -19,10 +19,9 @@
 
 package org.apache.flink.streaming.examples.wordcount;
 
-import org.apache.flink.streaming.api.DataStream;
-import org.apache.flink.streaming.api.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.util.TestDataUtil;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 
 public class WordCountLocal {


Mime
View raw message