flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mbala...@apache.org
Subject [5/8] flink git commit: [streaming] JobGraphbuilder separated to StreamGraph and StreamingJobGraphGenerator
Date Wed, 21 Jan 2015 17:53:59 GMT
http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
index de9c664..ea891c9 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/invokable/operator/WindowInvokable.java
@@ -61,8 +61,7 @@ public abstract class WindowInvokable<IN, OUT> extends StreamInvokable<IN,
OUT>
 	public WindowInvokable(Function userFunction, LinkedList<TriggerPolicy<IN>>
triggerPolicies,
 			LinkedList<EvictionPolicy<IN>> evictionPolicies) {
 		super(userFunction);
-		setChainingStrategy(ChainingStrategy.NEVER);
-		
+
 		this.triggerPolicies = triggerPolicies;
 		this.evictionPolicies = evictionPolicies;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
index 99f826d..135f742 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamvertex/OutputHandler.java
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import org.apache.flink.runtime.plugable.SerializationDelegate;
 import org.apache.flink.streaming.api.StreamConfig;
@@ -54,7 +55,7 @@ public class OutputHandler<OUT> {
 
 	private Map<String, StreamOutput<?>> outputMap;
 	private Map<String, StreamConfig> chainedConfigs;
-	private List<String> recordWriterOrder;
+	private List<Tuple2<String, String>> outEdgesInOrder;
 
 	public OutputHandler(StreamVertex<?, OUT> vertex) {
 
@@ -68,24 +69,16 @@ public class OutputHandler<OUT> {
 		// We read the chained configs, and the order of record writer
 		// registrations by outputname
 		this.chainedConfigs = configuration.getTransitiveChainedTaskConfigs(cl);
-		this.recordWriterOrder = configuration.getRecordWriterOrder(cl);
-
-		// For the network outputs of the chain head we create the stream
-		// outputs
-		for (String outName : configuration.getOutputs(cl)) {
-			StreamOutput<?> streamOutput = createStreamOutput(outName, configuration);
-			outputMap.put(outName, streamOutput);
-		}
-
-		// If we have chained tasks we iterate through them and create the
-		// stream outputs for the network outputs
-		if (chainedConfigs != null) {
-			for (StreamConfig config : chainedConfigs.values()) {
-				for (String outName : config.getOutputs(cl)) {
-					StreamOutput<?> streamOutput = createStreamOutput(outName, config);
-					outputMap.put(outName, streamOutput);
-				}
-			}
+		this.chainedConfigs.put(configuration.getTaskName(), configuration);
+				
+		this.outEdgesInOrder = configuration.getOutEdgesInOrder(cl);
+
+		// We iterate through all the out edges from this job vertex and create
+		// a stream output
+		for (Tuple2<String, String> outEdge : outEdgesInOrder) {
+			StreamOutput<?> streamOutput = createStreamOutput(outEdge.f1,
+					chainedConfigs.get(outEdge.f0), outEdgesInOrder.indexOf(outEdge));
+			outputMap.put(outEdge.f1, streamOutput);
 		}
 
 		// We create the outer collector that will be passed to the first task
@@ -196,40 +189,32 @@ public class OutputHandler<OUT> {
 	 * We create the StreamOutput for the specific output given by the name, and
 	 * the configuration of its source task
 	 * 
-	 * @param name
+	 * @param outputVertex
 	 *            Name of the output to which the streamoutput will be set up
 	 * @param configuration
 	 *            The config of upStream task
 	 * @return
 	 */
-	private <T> StreamOutput<T> createStreamOutput(String name, StreamConfig configuration)
{
-
-		int outputNumber = recordWriterOrder.indexOf(name);
-
-		StreamPartitioner<T> outputPartitioner;
-
-		try {
-			outputPartitioner = configuration.getPartitioner(vertex.userClassLoader, name);
-		} catch (Exception e) {
-			throw new StreamVertexException("Cannot deserialize partitioner for "
-					+ vertex.getName() + " with " + name + " outputs", e);
-		}
+	private <T> StreamOutput<T> createStreamOutput(String outputVertex, StreamConfig
configuration,
+			int outputIndex) {
+		
+		StreamPartitioner<T> outputPartitioner = configuration.getPartitioner(cl, outputVertex);
 
 		RecordWriter<SerializationDelegate<StreamRecord<T>>> output;
 
-		long bufferTimeout = configuration.getBufferTimeout();
+		if (configuration.getBufferTimeout() >= 0) {
 
-		if (bufferTimeout >= 0) {
 			output = new StreamRecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-					.getEnvironment().getWriter(outputNumber), outputPartitioner, bufferTimeout);
+					.getEnvironment().getWriter(outputIndex), outputPartitioner,
+					configuration.getBufferTimeout());
 
 			if (LOG.isTraceEnabled()) {
 				LOG.trace("StreamRecordWriter initiated with {} bufferTimeout for {}",
-						bufferTimeout, vertex.getClass().getSimpleName());
+						configuration.getBufferTimeout(), vertex.getClass().getSimpleName());
 			}
 		} else {
 			output = new RecordWriter<SerializationDelegate<StreamRecord<T>>>(vertex
-					.getEnvironment().getWriter(outputNumber), outputPartitioner);
+					.getEnvironment().getWriter(outputIndex), outputPartitioner);
 
 			if (LOG.isTraceEnabled()) {
 				LOG.trace("RecordWriter initiated for {}", vertex.getClass().getSimpleName());
@@ -237,11 +222,12 @@ public class OutputHandler<OUT> {
 		}
 
 		StreamOutput<T> streamOutput = new StreamOutput<T>(output,
-				configuration.isSelectAll(name) ? null : configuration.getOutputNames(name));
+				configuration.isSelectAll(outputVertex) ? null
+						: configuration.getOutputNames(outputVertex));
 
 		if (LOG.isTraceEnabled()) {
 			LOG.trace("Partitioner set: {} with {} outputs for {}", outputPartitioner.getClass()
-					.getSimpleName(), outputNumber, vertex.getClass().getSimpleName());
+					.getSimpleName(), outputIndex, vertex.getClass().getSimpleName());
 		}
 
 		return streamOutput;

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
index e7f15b6..95a5d9b 100644
--- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
+++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/util/TestStreamEnvironment.java
@@ -46,7 +46,7 @@ public class TestStreamEnvironment extends StreamExecutionEnvironment {
 
 	@Override
 	public void execute(String jobName) throws Exception {
-		JobGraph jobGraph = jobGraphBuilder.getJobGraph(jobName);
+		JobGraph jobGraph = streamGraph.getJobGraph(jobName);
 
 		Configuration configuration = jobGraph.getJobConfiguration();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
index a69454c..a408ec0 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamCrossOperator.scala
@@ -83,7 +83,7 @@ object StreamCrossOperator {
         clean(getCrossWindowFunction(op, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
 
       javaStream.setType(implicitly[TypeInformation[R]])
@@ -94,7 +94,7 @@ object StreamCrossOperator {
     }
 
     override def every(length: Long): CrossWindow[I1, I2] = {
-      val builder = javaStream.getExecutionEnvironment().getJobGraphBuilder()
+      val builder = javaStream.getExecutionEnvironment().getStreamGraph()
       val invokable = builder.getInvokable(javaStream.getId())
       invokable.asInstanceOf[CoWindowInvokable[_,_,_]].setSlideSize(length)
       this

http://git-wip-us.apache.org/repos/asf/flink/blob/e3b608ce/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
----------------------------------------------------------------------
diff --git a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
index 7ecd79a..1bd1bfb 100644
--- a/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
+++ b/flink-addons/flink-streaming/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/StreamJoinOperator.scala
@@ -186,7 +186,7 @@ object StreamJoinOperator {
         clean(getJoinWindowFunction(jp, fun)), op.windowSize, op.slideInterval, op.timeStamp1,
         op.timeStamp2)
 
-      javaStream.getExecutionEnvironment().getJobGraphBuilder().setInvokable(javaStream.getId(),
+      javaStream.getExecutionEnvironment().getStreamGraph().setInvokable(javaStream.getId(),
         invokable)
 
       javaStream.setType(implicitly[TypeInformation[R]])


Mime
View raw message