flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [5/7] flink git commit: [FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor
Date Wed, 30 Nov 2016 15:22:26 GMT
[FLINK-5196] [logging] Don't log InputChannelDeploymentDescriptor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7b9a4445
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7b9a4445
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7b9a4445

Branch: refs/heads/release-1.1
Commit: 7b9a4445981ac8993af7a53cf057666e78c92140
Parents: c8ade63
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Nov 29 16:04:48 2016 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Nov 30 16:22:11 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java         |  3 ---
 .../partition/consumer/SingleInputGate.java       | 18 +++++++++++++++---
 2 files changed, 15 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 6b87e69..24b95ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -32,7 +32,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.Serializable;
-import java.util.Arrays;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -135,8 +134,6 @@ public class InputChannelDeploymentDescriptor implements Serializable
{
 					consumedPartitionId, partitionLocation);
 		}
 
-		LOG.debug("Created {} from edges {}.", Arrays.toString(icdd), Arrays.toString(edges));
-
 		return icdd;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7b9a4445/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 8f44fbc..1550b0d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -46,7 +46,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.BitSet;
 import java.util.List;
 import java.util.Map;
@@ -573,8 +572,11 @@ public class SingleInputGate implements InputGate {
 		// Create the input channels. There is one input channel for each consumed partition.
 		final InputChannel[] inputChannels = new InputChannel[icdd.length];
 
-		for (int i = 0; i < inputChannels.length; i++) {
+		int numLocalChannels = 0;
+		int numRemoteChannels = 0;
+		int numUnknownChannels = 0;
 
+		for (int i = 0; i < inputChannels.length; i++) {
 			final ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
 			final ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
 
@@ -585,6 +587,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numLocalChannels++;
 			}
 			else if (partitionLocation.isRemote()) {
 				inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId,
@@ -593,6 +597,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numRemoteChannels++;
 			}
 			else if (partitionLocation.isUnknown()) {
 				inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId,
@@ -602,6 +608,8 @@ public class SingleInputGate implements InputGate {
 						networkEnvironment.getPartitionRequestInitialAndMaxBackoff(),
 						metrics
 				);
+
+				numUnknownChannels++;
 			}
 			else {
 				throw new IllegalStateException("Unexpected partition location.");
@@ -610,7 +618,11 @@ public class SingleInputGate implements InputGate {
 			inputGate.setInputChannel(partitionId.getPartitionId(), inputChannels[i]);
 		}
 
-		LOG.debug("Created input channels {} from {}.", Arrays.toString(inputChannels), igdd);
+		LOG.debug("Created {} input channels (local: {}, remote: {}, unknown: {}).",
+			inputChannels.length,
+			numLocalChannels,
+			numRemoteChannels,
+			numUnknownChannels);
 
 		return inputGate;
 	}


Mime
View raw message