flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From se...@apache.org
Subject [41/51] [abbrv] git commit: [streaming] DataStream output naming reworked from vertex to edge based model
Date Mon, 18 Aug 2014 17:26:18 GMT
[streaming] DataStream output naming reworked from vertex to edge based model


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/9be98149
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/9be98149
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/9be98149

Branch: refs/heads/master
Commit: 9be9814972458aa1c67ccf6db5dd508244a89b21
Parents: d56d48f
Author: gyfora <gyula.fora@gmail.com>
Authored: Tue Aug 5 23:15:13 2014 +0200
Committer: Stephan Ewen <sewen@apache.org>
Committed: Mon Aug 18 16:23:40 2014 +0200

----------------------------------------------------------------------
 .../flink/streaming/api/StreamConfig.java       | 23 +++------
 .../api/collector/DirectedStreamCollector.java  | 29 +++++------
 .../streaming/api/collector/OutputSelector.java |  8 +--
 .../api/collector/StreamCollector.java          | 42 +++++++++-------
 .../streaming/api/datastream/CoDataStream.java  | 21 ++------
 .../api/datastream/ConnectedDataStream.java     | 14 ------
 .../streaming/api/datastream/DataStream.java    | 44 +++--------------
 .../api/datastream/IterativeDataStream.java     | 34 +++++++------
 .../datastream/SingleOutputStreamOperator.java  | 18 ++++++-
 .../api/datastream/SplitDataStream.java         | 51 +++++++++++---------
 .../streamcomponent/StreamIterationSource.java  |  1 +
 .../api/collector/DirectedOutputTest.java       | 21 ++------
 12 files changed, 133 insertions(+), 173 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
index 7cfc808..6fe9878 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/StreamConfig.java
@@ -40,7 +40,6 @@ public class StreamConfig {
 	private static final String NUMBER_OF_INPUTS = "numberOfInputs";
 	private static final String OUTPUT_NAME = "outputName_";
 	private static final String PARTITIONER_OBJECT = "partitionerObject_";
-	private static final String USER_DEFINED_NAME = "userDefinedName";
 	private static final String NUMBER_OF_OUTPUT_CHANNELS = "numOfOutputs_";
 	private static final String ITERATION_ID = "iteration-id";
 	private static final String OUTPUT_SELECTOR = "outputSelector";
@@ -74,8 +73,7 @@ public class StreamConfig {
 
 	// CONFIGS
 
-	public void setTypeWrapper(
-			TypeSerializerWrapper<?, ?, ?> typeWrapper) {
+	public void setTypeWrapper(TypeSerializerWrapper<?, ?, ?> typeWrapper) {
 		config.setBytes("typeWrapper", SerializationUtils.serialize(typeWrapper));
 	}
 
@@ -166,12 +164,6 @@ public class StreamConfig {
 		return config.getString(FUNCTION_NAME, "");
 	}
 
-	public void setUserDefinedName(List<String> userDefinedName) {
-		if (!userDefinedName.isEmpty()) {
-			config.setBytes(USER_DEFINED_NAME, SerializationUtils.serialize((Serializable) userDefinedName));
-		}
-	}
-
 	public void setDirectedEmit(boolean directedEmit) {
 		config.setBoolean(DIRECTED_EMIT, directedEmit);
 	}
@@ -212,28 +204,29 @@ public class StreamConfig {
 		return config.getInteger(NUMBER_OF_OUTPUT_CHANNELS + outputIndex, 0);
 	}
 
-	public <T> void setPartitioner(int outputIndex,
-			StreamPartitioner<T> partitionerObject) {
+	public <T> void setPartitioner(int outputIndex, StreamPartitioner<T> partitionerObject)
{
 
 		config.setBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(partitionerObject));
 	}
 
-	public <T> StreamPartitioner<T> getPartitioner(int outputIndex)
-			throws ClassNotFoundException, IOException {
+	public <T> StreamPartitioner<T> getPartitioner(int outputIndex) throws ClassNotFoundException,
+			IOException {
 		return deserializeObject(config.getBytes(PARTITIONER_OBJECT + outputIndex,
 				SerializationUtils.serialize(new ShufflePartitioner<T>())));
 	}
 
 	public void setOutputName(int outputIndex, List<String> outputName) {
 		if (outputName != null) {
-			config.setBytes(OUTPUT_NAME + outputIndex, SerializationUtils.serialize((Serializable)
outputName));
+			config.setBytes(OUTPUT_NAME + outputIndex,
+					SerializationUtils.serialize((Serializable) outputName));
 		}
 	}
 
 	@SuppressWarnings("unchecked")
 	public List<String> getOutputName(int outputIndex) {
-		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
+ outputIndex, null));
+		return (List<String>) SerializationUtils.deserialize(config.getBytes(OUTPUT_NAME
+				+ outputIndex, null));
 	}
 
 	public void setNumberOfInputs(int numberOfInputs) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
index ced3de7..285a7b4 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/DirectedStreamCollector.java
@@ -34,14 +34,14 @@ import org.apache.flink.util.StringUtils;
  * A StreamCollector that uses user defined output names and a user defined
  * output selector to make directed emits.
  * 
- * @param <T>
+ * @param <OUT>
  *            Type of the Tuple collected.
  */
-public class DirectedStreamCollector<T> extends StreamCollector<T> {
+public class DirectedStreamCollector<OUT> extends StreamCollector<OUT> {
 
-	OutputSelector<T> outputSelector;
+	OutputSelector<OUT> outputSelector;
 	private static final Log log = LogFactory.getLog(DirectedStreamCollector.class);
-	private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>
emitted;
+	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
emitted;
 
 	/**
 	 * Creates a new DirectedStreamCollector
@@ -54,11 +54,11 @@ public class DirectedStreamCollector<T> extends StreamCollector<T>
{
 	 *            User defined {@link OutputSelector}
 	 */
 	public DirectedStreamCollector(int channelID,
-			SerializationDelegate<StreamRecord<T>> serializationDelegate,
-			OutputSelector<T> outputSelector) {
+			SerializationDelegate<StreamRecord<OUT>> serializationDelegate,
+			OutputSelector<OUT> outputSelector) {
 		super(channelID, serializationDelegate);
 		this.outputSelector = outputSelector;
-		this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
+		this.emitted = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
 
 	}
 
@@ -70,7 +70,7 @@ public class DirectedStreamCollector<T> extends StreamCollector<T>
{
 	 *            Object to be collected and emitted.
 	 */
 	@Override
-	public void collect(T outputObject) {
+	public void collect(OUT outputObject) {
 		streamRecord.setObject(outputObject);
 		emit(streamRecord);
 	}
@@ -82,18 +82,19 @@ public class DirectedStreamCollector<T> extends StreamCollector<T>
{
 	 * @param streamRecord
 	 *            Record to emit.
 	 */
-	private void emit(StreamRecord<T> streamRecord) {
+	private void emit(StreamRecord<OUT> streamRecord) {
 		Collection<String> outputNames = outputSelector.getOutputs(streamRecord.getObject());
 		streamRecord.setId(channelID);
 		serializationDelegate.setInstance(streamRecord);
 		emitted.clear();
 		for (String outputName : outputNames) {
 			try {
-				RecordWriter<SerializationDelegate<StreamRecord<T>>> output = outputMap
-						.get(outputName);
-				if (!emitted.contains(output)) {
-					output.emit(serializationDelegate);
-					emitted.add(output);
+				for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output
: outputMap
+						.get(outputName)) {
+					if (!emitted.contains(output)) {
+						output.emit(serializationDelegate);
+						emitted.add(output);
+					}
 				}
 			} catch (Exception e) {
 				if (log.isErrorEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
index 17d7e7b..fbd2147 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/OutputSelector.java
@@ -31,10 +31,10 @@ import org.apache.flink.streaming.api.datastream.SplitDataStream;
  * {@link SingleOutputStreamOperator#split} call. Every output object of a
  * {@link SplitDataStream} will run through this operator to select outputs.
  * 
- * @param <T>
+ * @param <OUT>
  *            Type parameter of the split values.
  */
-public abstract class OutputSelector<T> implements Serializable {
+public abstract class OutputSelector<OUT> implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	private Collection<String> outputs;
@@ -43,7 +43,7 @@ public abstract class OutputSelector<T> implements Serializable {
 		outputs = new ArrayList<String>();
 	}
 
-	Collection<String> getOutputs(T outputObject) {
+	Collection<String> getOutputs(OUT outputObject) {
 		outputs.clear();
 		select(outputObject, outputs);
 		return outputs;
@@ -60,5 +60,5 @@ public abstract class OutputSelector<T> implements Serializable {
 	 * @param outputs
 	 *            Selected output names should be added to this collection.
 	 */
-	public abstract void select(T value, Collection<String> outputs);
+	public abstract void select(OUT value, Collection<String> outputs);
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
index 20c3b78..fe21c29 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/collector/StreamCollector.java
@@ -33,22 +33,22 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.StringUtils;
 
 /**
- * Collector for tuples in Apache Flink stream processing. The collected
- * values will be wrapped with ID in a {@link StreamRecord} and then
- * emitted to the outputs.
+ * Collector for tuples in Apache Flink stream processing. The collected values
+ * will be wrapped with ID in a {@link StreamRecord} and then emitted to the
+ * outputs.
  * 
- * @param <T>
+ * @param <OUT>
  *            Type of the Tuples/Objects collected.
  */
-public class StreamCollector<T> implements Collector<T> {
+public class StreamCollector<OUT> implements Collector<OUT> {
 
 	private static final Log LOG = LogFactory.getLog(StreamCollector.class);
 
-	protected StreamRecord<T> streamRecord;
+	protected StreamRecord<OUT> streamRecord;
 	protected int channelID;
-	private List<RecordWriter<SerializationDelegate<StreamRecord<T>>>>
outputs;
-	protected Map<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>>
outputMap;
-	protected SerializationDelegate<StreamRecord<T>> serializationDelegate;
+	private List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>
outputs;
+	protected Map<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>
outputMap;
+	protected SerializationDelegate<StreamRecord<OUT>> serializationDelegate;
 
 	/**
 	 * Creates a new StreamCollector
@@ -59,13 +59,13 @@ public class StreamCollector<T> implements Collector<T> {
 	 *            Serialization delegate used for serialization
 	 */
 	public StreamCollector(int channelID,
-			SerializationDelegate<StreamRecord<T>> serializationDelegate) {
+			SerializationDelegate<StreamRecord<OUT>> serializationDelegate) {
 
 		this.serializationDelegate = serializationDelegate;
-		this.streamRecord = new StreamRecord<T>();
+		this.streamRecord = new StreamRecord<OUT>();
 		this.channelID = channelID;
-		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
-		this.outputMap = new HashMap<String, RecordWriter<SerializationDelegate<StreamRecord<T>>>>();
+		this.outputs = new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>();
+		this.outputMap = new HashMap<String, List<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>>();
 	}
 
 	/**
@@ -76,13 +76,19 @@ public class StreamCollector<T> implements Collector<T> {
 	 * @param outputNames
 	 *            User defined names of the output.
 	 */
-	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<T>>>
output,
+	public void addOutput(RecordWriter<SerializationDelegate<StreamRecord<OUT>>>
output,
 			List<String> outputNames) {
 		outputs.add(output);
 		for (String outputName : outputNames) {
 			if (outputName != null) {
 				if (!outputMap.containsKey(outputName)) {
-					outputMap.put(outputName, output);
+					outputMap.put(outputName,
+							new ArrayList<RecordWriter<SerializationDelegate<StreamRecord<OUT>>>>());
+					outputMap.get(outputName).add(output);
+				} else {
+					if (!outputMap.get(outputName).contains(output)) {
+						outputMap.get(outputName).add(output);
+					}
 				}
 
 			}
@@ -97,7 +103,7 @@ public class StreamCollector<T> implements Collector<T> {
 	 *            Object to be collected and emitted.
 	 */
 	@Override
-	public void collect(T outputObject) {
+	public void collect(OUT outputObject) {
 		streamRecord.setObject(outputObject);
 		emit(streamRecord);
 	}
@@ -108,10 +114,10 @@ public class StreamCollector<T> implements Collector<T>
{
 	 * @param streamRecord
 	 *            StreamRecord to emit.
 	 */
-	private void emit(StreamRecord<T> streamRecord) {
+	private void emit(StreamRecord<OUT> streamRecord) {
 		streamRecord.setId(channelID);
 		serializationDelegate.setInstance(streamRecord);
-		for (RecordWriter<SerializationDelegate<StreamRecord<T>>> output : outputs)
{
+		for (RecordWriter<SerializationDelegate<StreamRecord<OUT>>> output :
outputs) {
 			try {
 				output.emit(serializationDelegate);
 			} catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
index c6cb8af..b974b1d 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/CoDataStream.java
@@ -78,11 +78,11 @@ public class CoDataStream<IN1, IN2> {
 
 	/**
 	 * Applies a CoMap transformation on two separate {@link DataStream}s. The
-	 * transformation calls a {@link CoMapFunction#map1} for each element
-	 * of the first input and {@link CoMapFunction#map2} for each element
-	 * of the second input. Each CoMapFunction call returns exactly one element.
-	 * The user can also extend {@link RichCoMapFunction} to gain access to
-	 * other features provided by the {@link RichFuntion} interface.
+	 * transformation calls a {@link CoMapFunction#map1} for each element of the
+	 * first input and {@link CoMapFunction#map2} for each element of the second
+	 * input. Each CoMapFunction call returns exactly one element. The user can
+	 * also extend {@link RichCoMapFunction} to gain access to other features
+	 * provided by the {@link RichFuntion} interface.
 	 * 
 	 * @param coMapper
 	 *            The CoMapFunction used to jointly transform the two input
@@ -113,17 +113,6 @@ public class CoDataStream<IN1, IN2> {
 		input1.connectGraph(input1, returnStream.getId(), 1);
 		input1.connectGraph(input2, returnStream.getId(), 2);
 
-		if ((input1.userDefinedName != null) && (input2.userDefinedName != null)) {
-			throw new RuntimeException("An operator cannot have two names");
-		} else {
-			if (input1.userDefinedName != null) {
-				returnStream.name(input1.getUserDefinedNames());
-			}
-
-			if (input2.userDefinedName != null) {
-				returnStream.name(input2.getUserDefinedNames());
-			}
-		}
 		// TODO consider iteration
 
 		return returnStream;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
index d17990c..1d8fb48 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/ConnectedDataStream.java
@@ -62,24 +62,10 @@ public class ConnectedDataStream<OUT> extends DataStream<OUT>
{
 	// }
 
 	protected void addConnection(DataStream<OUT> stream) {
-		if ((stream.userDefinedName != null) || (this.userDefinedName != null)) {
-			if (!this.userDefinedName.equals(stream.userDefinedName)) {
-				throw new RuntimeException("Error: Connected NamedDataStreams must have same names");
-			}
-		}
 		connectedStreams.add(stream.copy());
 	}
 
 	@Override
-	protected List<String> getUserDefinedNames() {
-		List<String> nameList = new ArrayList<String>();
-		for (DataStream<OUT> stream : connectedStreams) {
-			nameList.add(stream.userDefinedName);
-		}
-		return nameList;
-	}
-
-	@Override
 	protected DataStream<OUT> setConnectionType(StreamPartitioner<OUT> partitioner)
{
 		ConnectedDataStream<OUT> returnStream = (ConnectedDataStream<OUT>) this.copy();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index b692984..d15eaa5 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -36,7 +36,6 @@ import org.apache.flink.api.java.functions.RichGroupReduceFunction;
 import org.apache.flink.api.java.functions.RichMapFunction;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.streaming.api.JobGraphBuilder;
-import org.apache.flink.streaming.api.collector.OutputSelector;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.function.sink.PrintSinkFunction;
 import org.apache.flink.streaming.api.function.sink.SinkFunction;
@@ -80,7 +79,7 @@ public abstract class DataStream<OUT> {
 	protected final StreamExecutionEnvironment environment;
 	protected final String id;
 	protected int degreeOfParallelism;
-	protected String userDefinedName;
+	protected List<String> userDefinedNames;
 	protected StreamPartitioner<OUT> partitioner;
 
 	protected final JobGraphBuilder jobGraphBuilder;
@@ -105,6 +104,7 @@ public abstract class DataStream<OUT> {
 		this.environment = environment;
 		this.degreeOfParallelism = environment.getDegreeOfParallelism();
 		this.jobGraphBuilder = environment.getJobGraphBuilder();
+		this.userDefinedNames = new ArrayList<String>();
 		this.partitioner = new ForwardPartitioner<OUT>();
 
 	}
@@ -119,7 +119,7 @@ public abstract class DataStream<OUT> {
 		this.environment = dataStream.environment;
 		this.id = dataStream.id;
 		this.degreeOfParallelism = dataStream.degreeOfParallelism;
-		this.userDefinedName = dataStream.userDefinedName;
+		this.userDefinedNames = new ArrayList<String>(dataStream.userDefinedNames);
 		this.partitioner = dataStream.partitioner;
 		this.jobGraphBuilder = dataStream.jobGraphBuilder;
 
@@ -734,36 +734,9 @@ public abstract class DataStream<OUT> {
 					.toString());
 		}
 
-		if (userDefinedName != null) {
-			returnStream.name(getUserDefinedNames());
-		}
-
 		return returnStream;
 	}
 
-	protected List<String> getUserDefinedNames() {
-		List<String> nameList = new ArrayList<String>();
-		nameList.add(userDefinedName);
-		return nameList;
-	}
-
-	/**
-	 * Gives the data transformation(vertex) a user defined name in order to use
-	 * with directed outputs. The {@link OutputSelector} of the input vertex
-	 * should use this name for directed emits.
-	 * 
-	 * @param name
-	 *            The name to set
-	 * @return The named DataStream.
-	 */
-	protected DataStream<OUT> name(List<String> name) {
-
-		userDefinedName = name.get(0);
-		jobGraphBuilder.setUserDefinedName(id, name);
-
-		return this;
-	}
-
 	/**
 	 * Internal function for setting the partitioner for the DataStream
 	 * 
@@ -795,11 +768,12 @@ public abstract class DataStream<OUT> {
 	protected <X> void connectGraph(DataStream<X> inputStream, String outputID,
int typeNumber) {
 		if (inputStream instanceof ConnectedDataStream) {
 			for (DataStream<X> stream : ((ConnectedDataStream<X>) inputStream).connectedStreams)
{
-				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber);
+				jobGraphBuilder.setEdge(stream.getId(), outputID, stream.partitioner, typeNumber,
+						inputStream.userDefinedNames);
 			}
 		} else {
 			jobGraphBuilder.setEdge(inputStream.getId(), outputID, inputStream.partitioner,
-					typeNumber);
+					typeNumber, inputStream.userDefinedNames);
 		}
 
 	}
@@ -834,11 +808,7 @@ public abstract class DataStream<OUT> {
 			throw new RuntimeException("Cannot serialize SinkFunction");
 		}
 
-		inputStream.connectGraph(inputStream, returnStream.getId(), 0);
-
-		if (this.copy().userDefinedName != null) {
-			returnStream.name(getUserDefinedNames());
-		}
+		inputStream.connectGraph(inputStream.copy(), returnStream.getId(), 0);
 
 		return returnStream;
 	}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
index b9aadcd..bdadee4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/IterativeDataStream.java
@@ -19,27 +19,30 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.Arrays;
+import java.util.List;
+
 import org.apache.flink.streaming.partitioner.ForwardPartitioner;
 
 /**
  * The iterative data stream represents the start of an iteration in a
  * {@link DataStream}.
  * 
- * @param <T>
+ * @param <IN>
  *            Type of the DataStream
  */
-public class IterativeDataStream<T> extends SingleOutputStreamOperator<T, IterativeDataStream<T>>
{
+public class IterativeDataStream<IN> extends SingleOutputStreamOperator<IN, IterativeDataStream<IN>>
{
 
 	static Integer iterationCount = 0;
 	protected Integer iterationID;
 
-	protected IterativeDataStream(DataStream<T> dataStream) {
+	protected IterativeDataStream(DataStream<IN> dataStream) {
 		super(dataStream);
 		iterationID = iterationCount;
 		iterationCount++;
 	}
 
-	protected IterativeDataStream(DataStream<T> dataStream, Integer iterationID) {
+	protected IterativeDataStream(DataStream<IN> dataStream, Integer iterationID) {
 		super(dataStream);
 		this.iterationID = iterationID;
 	}
@@ -55,8 +58,8 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T,
Iterat
 	 *            The data stream that can be fed back to the next iteration.
 	 * 
 	 */
-	public DataStream<T> closeWith(DataStream<T> iterationResult) {
-		return closeWith(iterationResult, null);
+	public DataStream<IN> closeWith(DataStream<IN> iterationResult) {
+		return closeWith(iterationResult, "iterate");
 	}
 
 	/**
@@ -73,31 +76,34 @@ public class IterativeDataStream<T> extends SingleOutputStreamOperator<T,
Iterat
 	 *            when used with directed emits
 	 * 
 	 */
-	public <R> DataStream<T> closeWith(DataStream<T> iterationTail, String
iterationName) {
+	public <R> DataStream<IN> closeWith(DataStream<IN> iterationTail, String
iterationName) {
 		DataStream<R> returnStream = new DataStreamSink<R>(environment, "iterationSink");
 
 		jobGraphBuilder.addIterationSink(returnStream.getId(), iterationTail.getId(),
-				iterationID.toString(), iterationTail.getParallelism(), iterationName);
+				iterationID.toString(), iterationTail.getParallelism());
 
 		jobGraphBuilder.setIterationSourceParallelism(iterationID.toString(),
 				iterationTail.getParallelism());
 
+		List<String> name = Arrays.asList(new String[] { iterationName });
+
 		if (iterationTail instanceof ConnectedDataStream) {
-			for (DataStream<T> stream : ((ConnectedDataStream<T>) iterationTail).connectedStreams)
{
+			for (DataStream<IN> stream : ((ConnectedDataStream<IN>) iterationTail).connectedStreams)
{
 				String inputID = stream.getId();
-				jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<T>(),
-						0);
+				jobGraphBuilder.setEdge(inputID, returnStream.getId(), new ForwardPartitioner<IN>(),
+						0, name);
 			}
 		} else {
+
 			jobGraphBuilder.setEdge(iterationTail.getId(), returnStream.getId(),
-					new ForwardPartitioner<T>(), 0);
+					new ForwardPartitioner<IN>(), 0, name);
 		}
 
 		return iterationTail;
 	}
 
 	@Override
-	protected IterativeDataStream<T> copy() {
-		return new IterativeDataStream<T>(this, iterationID);
+	protected IterativeDataStream<IN> copy() {
+		return new IterativeDataStream<IN>(this, iterationID);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
index 9af4dc8..f798563 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java
@@ -105,6 +105,22 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 	 * @return The {@link SplitDataStream}
 	 */
 	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector) {
+		return split(outputSelector, null);
+	}
+
+	/**
+	 * Operator used for directing tuples to specific named outputs using an
+	 * {@link OutputSelector}. Calling this method on an operator creates a new
+	 * {@link SplitDataStream}.
+	 * 
+	 * @param outputSelector
+	 *            The user defined {@link OutputSelector} for directing the
+	 *            tuples.
+	 * @param outputNames
+	 *            An array of all the output names to be used for selectAll
+	 * @return The {@link SplitDataStream}
+	 */
+	public SplitDataStream<OUT> split(OutputSelector<OUT> outputSelector, String[]
outputNames) {
 		try {
 			jobGraphBuilder.setOutputSelector(id, SerializationUtils.serialize(outputSelector));
 
@@ -112,7 +128,7 @@ public class SingleOutputStreamOperator<OUT, O extends SingleOutputStreamOperato
 			throw new RuntimeException("Cannot serialize OutputSelector");
 		}
 
-		return new SplitDataStream<OUT>(this);
+		return new SplitDataStream<OUT>(this, outputNames);
 	}
 
 	@SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
index 8bcde44..69d8f61 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/SplitDataStream.java
@@ -19,20 +19,26 @@
 
 package org.apache.flink.streaming.api.datastream;
 
+import java.util.Arrays;
+
+import org.apache.flink.streaming.api.collector.OutputSelector;
+
 /**
  * The SplitDataStream represents an operator that has been split using an
  * {@link OutputSelector}. Named outputs can be selected using the
  * {@link #select} function.
  *
- * @param <T>
+ * @param <OUT>
  *            The type of the output.
  */
-public class SplitDataStream<T> {
+public class SplitDataStream<OUT> {
 
-	DataStream<T> dataStream;
+	DataStream<OUT> dataStream;
+	String[] allNames;
 
-	protected SplitDataStream(DataStream<T> dataStream) {
+	protected SplitDataStream(DataStream<OUT> dataStream, String[] outputNames) {
 		this.dataStream = dataStream.copy();
+		this.allNames = outputNames;
 	}
 
 	/**
@@ -41,29 +47,30 @@ public class SplitDataStream<T> {
 	 * @param outputNames
 	 *            The output names for which the operator will receive the
 	 *            input.
-	 * @return Returns the modified DataStream
+	 * @return Returns the selected DataStream
 	 */
-	public DataStream<T> select(String... outputNames) {
-		DataStream<T> returnStream = selectOutput(outputNames[0]);
-		for (int i = 1; i < outputNames.length; i++) {
-			if (outputNames[i] == "") {
-				throw new IllegalArgumentException("User defined name must not be empty string");
-			}
-
-			returnStream = connectWithNames(returnStream, selectOutput(outputNames[i]));
-		}
-		return returnStream;
+	public DataStream<OUT> select(String... outputNames) {
+		return selectOutput(outputNames);
 	}
 
-	private DataStream<T> connectWithNames(DataStream<T> stream1, DataStream<T>
stream2) {
-		ConnectedDataStream<T> returnStream = new ConnectedDataStream<T>(stream1.copy());
-		returnStream.connectedStreams.add(stream2.copy());
-		return returnStream;
+	/**
+	 * Selects all output names from a split data stream. Output names must
+	 * predefined to use selectAll.
+	 * 
+	 * @return Returns the selected DataStream
+	 */
+	public DataStream<OUT> selectAll() {
+		if (allNames != null) {
+			return selectOutput(allNames);
+		} else {
+			throw new RuntimeException(
+					"Output names must be predefined in order to use select all.");
+		}
 	}
 
-	private DataStream<T> selectOutput(String outputName) {
-		DataStream<T> returnStream = dataStream.copy();
-		returnStream.userDefinedName = outputName;
+	private DataStream<OUT> selectOutput(String[] outputName) {
+		DataStream<OUT> returnStream = dataStream.copy();
+		returnStream.userDefinedNames = Arrays.asList(outputName);
 		return returnStream;
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
index cf3d47e..67d5066 100755
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamcomponent/StreamIterationSource.java
@@ -58,6 +58,7 @@ public class StreamIterationSource<OUT extends Tuple> extends
 			setConfigOutputs(outputs);
 			setSinkSerializer();
 		} catch (StreamComponentException e) {
+			e.printStackTrace();
 			throw new StreamComponentException("Cannot register outputs", e);
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/9be98149/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
index 08387f9..0e390e4 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/collector/DirectedOutputTest.java
@@ -20,7 +20,6 @@
 package org.apache.flink.streaming.api.collector;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -96,9 +95,11 @@ public class DirectedOutputTest {
 		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
 
 		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector());
+		SplitDataStream<Long> s = env.generateSequence(1, 6).split(new MySelector(),
+				new String[] { "ds1", "ds2" });
 		DataStream<Long> ds1 = s.select("ds1").shuffle().map(new PlusTwo()).addSink(new EvenSink());
 		DataStream<Long> ds2 = s.select("ds2").map(new PlusTwo()).addSink(new OddSink());
+
 		env.executeTest(32);
 
 		HashSet<Long> expectedEven = new HashSet<Long>(Arrays.asList(4L, 6L, 8L));
@@ -107,20 +108,4 @@ public class DirectedOutputTest {
 		assertEquals(expectedEven, evenSet);
 		assertEquals(expectedOdd, oddSet);
 	}
-
-	@SuppressWarnings({ "unchecked" })
-	@Test
-	public void directNamingTest() {
-		LogUtils.initializeDefaultConsoleLogger(Level.OFF, Level.OFF);
-
-		LocalStreamEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(1);
-		SplitDataStream<Long> s = env.generateSequence(1, 10).split(new MySelector());
-		try {
-			s.select("ds2").connectWith(s.select("ds1"));
-			fail();
-		} catch (Exception e) {
-			// Exception thrown
-		}
-
-	}
 }


Mime
View raw message