flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [6/6] flink git commit: Remove unnecessary log level checks
Date Wed, 25 Feb 2015 10:12:54 GMT
Remove unnecessary log level checks

This addresses PR comments: https://github.com/apache/flink/pull/436/


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/639cd1c3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/639cd1c3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/639cd1c3

Branch: refs/heads/master
Commit: 639cd1c32b48a6edaad302d44b257c35666ede12
Parents: 607bf12
Author: Ufuk Celebi <uce@apache.org>
Authored: Tue Feb 24 19:37:10 2015 +0100
Committer: Ufuk Celebi <uce@apache.org>
Committed: Wed Feb 25 11:12:15 2015 +0100

----------------------------------------------------------------------
 .../apache/flink/runtime/deployment/PartitionInfo.java  |  4 +---
 .../flink/runtime/executiongraph/ExecutionVertex.java   |  8 +++-----
 .../flink/runtime/io/network/NetworkEnvironment.java    |  4 +---
 .../io/network/netty/PartitionRequestServerHandler.java |  4 +---
 .../network/partition/IntermediateResultPartition.java  | 10 +---------
 .../partition/IntermediateResultPartitionManager.java   | 12 ++++--------
 .../network/partition/consumer/LocalInputChannel.java   |  4 +---
 .../network/partition/consumer/RemoteInputChannel.java  |  8 ++++----
 .../io/network/partition/consumer/SingleInputGate.java  | 10 ++++++++++
 .../apache/flink/runtime/taskmanager/TaskManager.scala  |  7 ++++---
 10 files changed, 30 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
index 1c6b53c..6a30853 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/PartitionInfo.java
@@ -150,9 +150,7 @@ public class PartitionInfo implements IOReadableWritable, Serializable
{
 		PartitionInfo partitionInfo = new PartitionInfo(partitionId, producerExecutionId,
 				producerLocation, producerAddress);
 
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Create partition info {}.", partitionInfo);
-		}
+		LOG.debug("Create partition info {}.", partitionInfo);
 
 		return partitionInfo;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 97143b1..e83154a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -365,11 +365,9 @@ public class ExecutionVertex implements Serializable {
 	// --------------------------------------------------------------------------------------------
 
 	public void resetForNewExecution() {
-		
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName());
-		}
-		
+
+		LOG.debug("Resetting execution vertex {} for new execution.", getSimpleName());
+
 		synchronized (priorExecutions) {
 			Execution execution = currentExecution;
 			ExecutionState state = execution.getState();

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index 3f4ac2c..58b21e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -179,10 +179,8 @@ public class NetworkEnvironment {
 	}
 
 	public void unregisterTask(Task task) {
-		if (LOG.isDebugEnabled()) {
-			LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
+		LOG.debug("Unregistering task {} ({}) from network environment (state: {}).",
 					task.getTaskNameWithSubtasks(), task.getExecutionState());
-		}
 
 		final ExecutionAttemptID executionId = task.getExecutionId();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
index 5c474ac..b60256f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestServerHandler.java
@@ -60,9 +60,7 @@ class PartitionRequestServerHandler extends SimpleChannelInboundHandler<NettyMes
 			if (msgClazz == PartitionRequest.class) {
 				PartitionRequest request = (PartitionRequest) msg;
 
-				if (LOG.isDebugEnabled()) {
-					LOG.debug("Read channel on {}: {}.",ctx.channel().localAddress(), request);
-				}
+				LOG.debug("Read channel on {}: {}.",ctx.channel().localAddress(), request);
 
 				IntermediateResultPartitionQueueIterator queueIterator =
 						partitionProvider.getIntermediateResultPartitionIterator(

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
index 0a94cce..174211a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartition.java
@@ -187,9 +187,7 @@ public class IntermediateResultPartition implements BufferPoolOwner {
 
 	public void releaseAllResources() throws IOException {
 		synchronized (queues) {
-			if(LOG.isDebugEnabled()) {
-				LOG.debug("Release all resources of {}.", this);
-			}
+			LOG.debug("Release all resources of {}.", this);
 
 			if (!isReleased) {
 				try {
@@ -234,12 +232,6 @@ public class IntermediateResultPartition implements BufferPoolOwner {
 
 	// ------------------------------------------------------------------------
 
-	@Override
-	public String toString() {
-		return "Intermediate result partition " + partitionId + " [num queues: " + queues.length
+ ", "
-				+ (isFinished ? "finished" : "not finished") + "]";
-	}
-
 	private void checkInProducePhase() {
 		checkState(!isReleased, "Partition has already been discarded.");
 		checkState(!isFinished, "Partition has already been finished.");

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
index 2683649..d5b8fe5 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/IntermediateResultPartitionManager.java
@@ -49,9 +49,7 @@ public class IntermediateResultPartitionManager implements IntermediateResultPar
 
 	public void registerIntermediateResultPartition(IntermediateResultPartition partition) throws
IOException {
 		synchronized (partitions) {
-			if(LOG.isDebugEnabled()){
-				LOG.debug("Register intermediate result partition {}.", partition);
-			}
+			LOG.debug("Register intermediate result partition {}.", partition);
 
 			if (isShutdown) {
 				throw new IOException("Intermediate result partition manager has already been shut down.");
@@ -125,11 +123,9 @@ public class IntermediateResultPartitionManager implements IntermediateResultPar
 
 			if (partition == null) {
 				if (!partitions.containsRow(producerExecutionId)) {
-					if(LOG.isDebugEnabled()) {
-						LOG.debug("Could not find producer execution ID {}. Registered producer" +
-								" execution IDs {}.", producerExecutionId,
-								Arrays.toString(partitions.rowKeySet().toArray()));
-					}
+					LOG.debug("Could not find producer execution ID {}. Registered producer" +
+							" execution IDs {}.", producerExecutionId,
+							Arrays.toString(partitions.rowKeySet().toArray()));
 
 					throw new IllegalQueueIteratorRequestException("Unknown producer execution ID " + producerExecutionId
+ ".");
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
index 04a494b..c05952f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannel.java
@@ -72,10 +72,8 @@ public class LocalInputChannel extends InputChannel implements NotificationListe
 	@Override
 	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
 		if (queueIterator == null) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Requesting LOCAL queue {} from partition {} produced by {}.", queueIndex,
partitionId,
+			LOG.debug("Requesting LOCAL queue {} from partition {} produced by {}.", queueIndex, partitionId,
 						producerExecutionId);
-			}
 
 			queueIterator = partitionManager.getIntermediateResultPartitionIterator(
 					producerExecutionId, partitionId, queueIndex, Optional.of(inputGate.getBufferProvider()));

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
index 002c90e..daf94e6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java
@@ -87,9 +87,7 @@ public class RemoteInputChannel extends InputChannel {
 	@Override
 	public void requestIntermediateResultPartition(int queueIndex) throws IOException {
 		if (partitionRequestClient == null) {
-			if (LOG.isDebugEnabled()) {
-				LOG.debug("Requesting queue {} from REMOTE partition {}.", partitionId, queueIndex);
-			}
+			LOG.debug("Requesting REMOTE queue {} from partition {} produced by {}.", queueIndex,
partitionId, producerExecutionId);
 
 			partitionRequestClient = connectionManager.createPartitionRequestClient(producerAddress);
 
@@ -154,13 +152,15 @@ public class RemoteInputChannel extends InputChannel {
 
 			if (partitionRequestClient != null) {
 				partitionRequestClient.close(this);
+			} else {
+				connectionManager.closeOpenChannelConnections(producerAddress);
 			}
 		}
 	}
 
 	@Override
 	public String toString() {
-		return "REMOTE " + id + " " + super.toString();
+		return "REMOTE " + id + " " + producerAddress + " " + super.toString();
 	}
 
 	// ------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index a8a92d6..0383cca 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -379,14 +379,24 @@ public class SingleInputGate implements InputGate {
 			final PartitionInfo.PartitionLocation producerLocation = partition.getProducerLocation();
 			switch (producerLocation) {
 				case LOCAL:
+					LOG.debug("Create LocalInputChannel for {}.", partition);
+
 					inputChannels[channelIndex] = new LocalInputChannel(reader, channelIndex, producerExecutionId,
partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher());
+
 					break;
 				case REMOTE:
+					LOG.debug("Create RemoteInputChannel for {}.", partition);
+
 					final RemoteAddress producerAddress = checkNotNull(partition.getProducerAddress(), "Missing
producer address for remote intermediate result partition.");
+
 					inputChannels[channelIndex] = new RemoteInputChannel(reader, channelIndex, producerExecutionId,
partitionId, producerAddress, networkEnvironment.getConnectionManager());
+
 					break;
 				case UNKNOWN:
+					LOG.debug("Create UnknownInputChannel for {}.", partition);
+
 					inputChannels[channelIndex] = new UnknownInputChannel(reader, channelIndex, producerExecutionId,
partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(),
networkEnvironment.getConnectionManager());
+
 					break;
 			}
 			reader.setInputChannel(partitionId, inputChannels[channelIndex]);

http://git-wip-us.apache.org/repos/asf/flink/blob/639cd1c3/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index cb79fef..4c85e5b 100644
--- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -422,8 +422,9 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
 
       // register the task with the network stack and profiles
       networkEnvironment match {
-        log.debug("Register task {} on {}.", task, connectionInfo)
-        case Some(ne) => ne.registerTask(task)
+        case Some(ne) =>
+          log.debug("Register task {} on {}.", task, connectionInfo)
+          ne.registerTask(task)
         case None => throw new RuntimeException(
           "Network environment has not been properly instantiated.")
       }
@@ -543,7 +544,7 @@ class TaskManager(val connectionInfo: InstanceConnectionInfo,
                       } catch {
                         case t: Throwable =>
                           log.error(t, "Failed canceling task with execution ID {} after
task" +
-                            "update failure..", executionId)
+                            "update failure.", executionId)
                       }
                   }
                 }


Mime
View raw message