flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aljos...@apache.org
Subject [1/5] flink git commit: [hotfix][streaming] Fix formatting in OperatorChain
Date Tue, 24 Oct 2017 14:14:12 GMT
Repository: flink
Updated Branches:
  refs/heads/master 9071e3bef -> 4a6a94dfb


[hotfix][streaming] Fix formatting in OperatorChain


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/03c17857
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/03c17857
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/03c17857

Branch: refs/heads/master
Commit: 03c17857b8a91eb06b82905fee58d6273fd4cc8d
Parents: 9071e3b
Author: Piotr Nowojski <piotr.nowojski@gmail.com>
Authored: Wed Oct 18 15:02:02 2017 +0200
Committer: Aljoscha Krettek <aljoscha.krettek@gmail.com>
Committed: Tue Oct 24 15:05:13 2017 +0200

----------------------------------------------------------------------
 .../streaming/runtime/tasks/OperatorChain.java  | 45 +++++++++++++++-----
 1 file changed, 34 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/03c17857/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
----------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
index 3827982..a44cffb 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java
@@ -111,8 +111,11 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 				StreamEdge outEdge = outEdgesInOrder.get(i);
 
 				RecordWriterOutput<?> streamOutput = createStreamOutput(
-						outEdge, chainedConfigs.get(outEdge.getSourceId()), i,
-						containingTask.getEnvironment(), containingTask.getName());
+					outEdge,
+					chainedConfigs.get(outEdge.getSourceId()),
+					i,
+					containingTask.getEnvironment(),
+					containingTask.getName());
 
 				this.streamOutputs[i] = streamOutput;
 				streamOutputMap.put(outEdge, streamOutput);
@@ -120,8 +123,13 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 
 			// we create the chain of operators and grab the collector that leads into the chain
 			List<StreamOperator<?>> allOps = new ArrayList<>(chainedConfigs.size());
-			this.chainEntryPoint = createOutputCollector(containingTask, configuration,
-					chainedConfigs, userCodeClassloader, streamOutputMap, allOps);
+			this.chainEntryPoint = createOutputCollector(
+				containingTask,
+				configuration,
+				chainedConfigs,
+				userCodeClassloader,
+				streamOutputMap,
+				allOps);
 
 			if (headOperator != null) {
 				Output output = getChainEntryPoint();
@@ -272,7 +280,13 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 			StreamConfig chainedOpConfig = chainedConfigs.get(outputId);
 
 			Output<StreamRecord<T>> output = createChainedOperator(
-					containingTask, chainedOpConfig, chainedConfigs, userCodeClassloader, streamOutputs,
allOperators, outputEdge.getOutputTag());
+				containingTask,
+				chainedOpConfig,
+				chainedConfigs,
+				userCodeClassloader,
+				streamOutputs,
+				allOperators,
+				outputEdge.getOutputTag());
 			allOutputs.add(new Tuple2<>(output, outputEdge));
 		}
 
@@ -330,7 +344,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 			OutputTag<IN> outputTag) {
 		// create the output that the operator writes to first. this may recursively create more
operators
 		Output<StreamRecord<OUT>> output = createOutputCollector(
-				containingTask, operatorConfig, chainedConfigs, userCodeClassloader, streamOutputs, allOperators);
+			containingTask,
+			operatorConfig,
+			chainedConfigs,
+			userCodeClassloader,
+			streamOutputs,
+			allOperators);
 
 		// now create the operator and give it the output collector to write its output to
 		OneInputStreamOperator<IN, OUT> chainedOperator = operatorConfig.getStreamOperator(userCodeClassloader);
@@ -349,7 +368,9 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 	}
 
 	private <T> RecordWriterOutput<T> createStreamOutput(
-			StreamEdge edge, StreamConfig upStreamConfig, int outputIndex,
+			StreamEdge edge,
+			StreamConfig upStreamConfig,
+			int outputIndex,
 			Environment taskEnvironment,
 			String taskName) {
 		OutputTag sideOutputTag = edge.getOutputTag(); // OutputTag, return null if not sideOutput
@@ -529,10 +550,12 @@ public class OperatorChain<OUT, OP extends StreamOperator<OUT>>
implements Strea
 			} catch (ClassCastException e) {
 				// Enrich error message
 				ClassCastException replace = new ClassCastException(
-						String.format("%s. Failed to push OutputTag with id '%s' to operator. " +
-								"This can occur when multiple OutputTags with different types " +
-								"but identical names are being used.",
-								e.getMessage(), outputTag.getId()));
+					String.format(
+						"%s. Failed to push OutputTag with id '%s' to operator. " +
+						"This can occur when multiple OutputTags with different types " +
+						"but identical names are being used.",
+						e.getMessage(),
+						outputTag.getId()));
 
 				throw new ExceptionInChainedOperatorException(replace);
 


Mime
View raw message