flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [3/5] [FLINK-1086] Replace JCL with SLF4J and Log4j with LOGBack
Date Fri, 05 Sep 2014 09:57:56 GMT
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 0cf061e..85b7a47 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -38,9 +38,6 @@ import org.apache.flink.api.java.tuple.Tuple5;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class CsvInputFormatTest {
@@ -51,13 +48,7 @@ public class CsvInputFormatTest {
 	private static final String FIRST_PART = "That is the first part";
 	
 	private static final String SECOND_PART = "That is the second part";
-	
-	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
+
 	@Test
 	public void readStringFields() {
 		try {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 167dfea..1fa606d 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -34,18 +34,9 @@ import java.io.PrintStream;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TextInputFormatTest {
-	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
 	@Test
 	public void testSimpleRead() {
 		final String FIRST = "First line";

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
index 65652b8..bce9e6a 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/CsvInputFormatTest.java
@@ -33,7 +33,6 @@ import java.io.OutputStreamWriter;
 
 import org.junit.Assert;
 
-import org.apache.flink.api.java.record.io.CsvInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.core.fs.FileInputSplit;
@@ -41,11 +40,8 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class CsvInputFormatTest {
@@ -60,12 +56,6 @@ public class CsvInputFormatTest {
 	private static final String SECOND_PART = "That is the second part";
 	
 	// --------------------------------------------------------------------------------------------
-
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
 	@Before
 	public void setup() {
 		format.setFilePath("file:///some/file/that/will/not/be/read");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
index 69a62f4..aadb525 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/FixedLenghtInputFormatTest.java
@@ -29,17 +29,13 @@ import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 
-import org.apache.flink.api.java.record.io.FixedLengthInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.Record;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
 import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class FixedLenghtInputFormatTest {
@@ -51,12 +47,7 @@ public class FixedLenghtInputFormatTest {
 	private final FixedLengthInputFormat format = new MyFixedLengthInputFormat();
 	
 	// --------------------------------------------------------------------------------------------
-	
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
+
 	@Before
 	public void setup() {
 		format.setFilePath("file:///some/file/that/will/not/be/read");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
index d7c0ea2..d7d2572 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/record/io/TextInputFormatTest.java
@@ -30,23 +30,13 @@ import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.OutputStreamWriter;
 
-import org.apache.flink.api.java.record.io.TextInputFormat;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.types.Record;
 import org.apache.flink.types.StringValue;
-import org.apache.flink.util.LogUtils;
-import org.apache.log4j.Level;
-import org.junit.BeforeClass;
 import org.junit.Test;
 
 public class TextInputFormatTest {
-		
-	@BeforeClass
-	public static void initialize() {
-		LogUtils.initializeDefaultConsoleLogger(Level.WARN);
-	}
-	
 	/**
 	 * The TextInputFormat seems to fail reading more than one record. I guess its
 	 * an off by one error.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
index e40e4bc..79da72f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/JobClient.java
@@ -25,8 +25,8 @@ import java.net.InetSocketAddress;
 import java.util.Iterator;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.configuration.ConfigConstants;
@@ -53,7 +53,7 @@ public class JobClient {
 	/**
 	 * The logging object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(JobClient.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);
 
 	/**
 	 * The job management server stub.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
index 57ff0fc..98557f9 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/ExecutionStateTransition.java
@@ -23,8 +23,8 @@ import static org.apache.flink.runtime.execution.ExecutionState.CANCELED;
 import static org.apache.flink.runtime.execution.ExecutionState.CANCELING;
 import static org.apache.flink.runtime.execution.ExecutionState.FAILED;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class is a utility class to check the consistency of Nephele's execution state model.
@@ -35,7 +35,7 @@ public final class ExecutionStateTransition {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionStateTransition.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionStateTransition.class);
 
 	/**
 	 * Private constructor to prevent instantiation of object.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
index 5cf5c33..6bfaf2a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/RuntimeEnvironment.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.execution;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.IOReadableWritable;
@@ -69,7 +69,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(RuntimeEnvironment.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RuntimeEnvironment.class);
 
 	/**
 	 * The interval to sleep in case a communication channel is not yet entirely set up (in milliseconds).
@@ -247,7 +247,7 @@ public class RuntimeEnvironment implements Environment, BufferProvider, LocalBuf
 	@Override
 	public void run() {
 		if (invokable == null) {
-			LOG.fatal("ExecutionEnvironment has no Invokable set");
+			LOG.error("ExecutionEnvironment has no Invokable set");
 		}
 
 		// Now the actual program starts to run

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 5bc9051..cea0271 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -34,8 +34,8 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.io.InitializeOnMaster;
 import org.apache.flink.api.common.io.OutputFormat;
 import org.apache.flink.configuration.Configuration;
@@ -74,7 +74,7 @@ public class ExecutionGraph implements ExecutionListener {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionGraph.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
 
 	/**
 	 * The ID of the job this graph has been built for.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphIterator.java
index 754d0bf..f4efc88 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphIterator.java
@@ -24,8 +24,8 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class provides an implementation of the {@link Iterator} interface which allows to
@@ -41,7 +41,7 @@ public class ExecutionGraphIterator implements Iterator<ExecutionVertex> {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionGraphIterator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphIterator.class);
 
 	/**
 	 * The execution this iterator traverses.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionSignature.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionSignature.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionSignature.java
index f3dae40..40f2468 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionSignature.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionSignature.java
@@ -24,8 +24,8 @@ import java.security.MessageDigest;
 import java.security.NoSuchAlgorithmException;
 import java.util.Arrays;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
 import org.apache.flink.runtime.jobgraph.JobID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
@@ -44,7 +44,7 @@ public final class ExecutionSignature {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionSignature.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionSignature.class);
 
 	/**
 	 * The name of the hashing algorithm to be used.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 266eea2..72e0696 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -30,8 +30,8 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
@@ -64,7 +64,7 @@ public final class ExecutionVertex {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ExecutionVertex.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ExecutionVertex.class);
 
 	/**
 	 * The ID of the vertex.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
index 6280fed..1700268 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/hdfs/DistributedFileSystem.java
@@ -24,8 +24,8 @@ import java.io.IOException;
 import java.lang.reflect.Method;
 import java.net.URI;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
@@ -43,7 +43,7 @@ import org.apache.hadoop.conf.Configuration;
  */
 public final class DistributedFileSystem extends FileSystem {
 	
-	private static final Log LOG = LogFactory.getLog(DistributedFileSystem.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DistributedFileSystem.class);
 	
 	private static final String DEFAULT_HDFS_CLASS = "org.apache.hadoop.hdfs.DistributedFileSystem";
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
index e85ebc5..ac08c27 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/maprfs/MapRFileSystem.java
@@ -27,8 +27,8 @@ import java.net.URI;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FSDataOutputStream;
@@ -51,7 +51,7 @@ public final class MapRFileSystem extends FileSystem {
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(MapRFileSystem.class);
+	private static final Logger LOG = LoggerFactory.getLogger(MapRFileSystem.class);
 
 	/**
 	 * The name of MapR's class containing the implementation of the Hadoop HDFS
@@ -383,4 +383,4 @@ public final class MapRFileSystem extends FileSystem {
 
 		return true;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
index 40fc9fa..73697c8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/fs/s3/S3FileSystem.java
@@ -32,8 +32,8 @@ import java.util.Date;
 import java.util.Iterator;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.BlockLocation;
 import org.apache.flink.core.fs.FSDataInputStream;
@@ -64,7 +64,7 @@ public final class S3FileSystem extends FileSystem {
 	/**
 	 * The logging object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(S3FileSystem.class);
+	private static final Logger LOG = LoggerFactory.getLogger(S3FileSystem.class);
 
 	/**
 	 * The configuration key to access the S3 host.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
index 5c65131..3475a02 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/DefaultInstanceManager.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.instance;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.jobgraph.JobID;
@@ -52,7 +52,7 @@ public class DefaultInstanceManager implements InstanceManager {
 	/**
 	 * The log object used to report debugging and error information.
 	 */
-	private static final Log LOG = LogFactory.getLog(DefaultInstanceManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultInstanceManager.class);
 
 	/**
 	 * Default duration after which a host is purged in case it did not send

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescriptionFactory.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescriptionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescriptionFactory.java
index c71f6e6..8a4de1e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescriptionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/HardwareDescriptionFactory.java
@@ -26,8 +26,8 @@ import java.io.InputStreamReader;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.util.OperatingSystem;
 
 /**
@@ -42,7 +42,7 @@ public class HardwareDescriptionFactory {
 	/**
 	 * The log object used to report errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(HardwareDescriptionFactory.class);
+	private static final Logger LOG = LoggerFactory.getLogger(HardwareDescriptionFactory.class);
 
 	/**
 	 * The path to the interface to extract memory information under Linux.
@@ -214,7 +214,7 @@ public class HardwareDescriptionFactory {
 			}
 
 		} catch (Exception e) {
-			LOG.error(e);
+			LOG.error("Exception while retrieving size of physical of memory on mac.", e);
 			return -1;
 		} finally {
 			if (bi != null) {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
index 6b2fad7..8fced32 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/disk/iomanager/IOManager.java
@@ -25,8 +25,8 @@ import java.util.List;
 import java.util.Random;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.memory.MemorySegment;
 
 /**
@@ -38,7 +38,7 @@ public final class IOManager implements UncaughtExceptionHandler
 	/**
 	 * Logging.
 	 */
-	private static final Log LOG = LogFactory.getLog(IOManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(IOManager.class);
 
 	/**
 	 * The default temp paths for anonymous Channels.
@@ -199,7 +199,7 @@ public final class IOManager implements UncaughtExceptionHandler
 	@Override
 	public void uncaughtException(Thread t, Throwable e)
 	{
-		LOG.fatal("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
+		LOG.error("IO Thread '" + t.getName() + "' terminated due to an exception. Closing I/O Manager.", e);
 		shutdown();	
 	}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
index b886e5d..3aae114 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/ChannelManager.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.io.network;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.AbstractID;
 import org.apache.flink.runtime.execution.CancelTaskException;
@@ -56,7 +56,7 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public class ChannelManager implements EnvelopeDispatcher, BufferProviderBroker {
 
-	private static final Log LOG = LogFactory.getLog(ChannelManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ChannelManager.class);
 
 	private final ChannelLookupProtocol channelLookupService;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
index f231463..844ca6e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/bufferprovider/GlobalBufferPool.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.io.network.bufferprovider;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.memory.MemorySegment;
 
 /**
@@ -36,7 +36,7 @@ import org.apache.flink.core.memory.MemorySegment;
  */
 public final class GlobalBufferPool {
 
-	private final static Log LOG = LogFactory.getLog(GlobalBufferPool.class);
+	private final static Logger LOG = LoggerFactory.getLogger(GlobalBufferPool.class);
 
 	// -----------------------------------------------------------------------------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
index 4a7b5fc..be59242 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/InputChannel.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.io.network.channels;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.AbstractTaskEvent;
@@ -56,7 +56,7 @@ public class InputChannel<T extends IOReadableWritable> extends Channel implemen
 	/**
 	 * The log object used to report warnings and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(InputChannel.class);
+	private static final Logger LOG = LoggerFactory.getLogger(InputChannel.class);
 
 	/**
 	 * The deserializer used to deserialize records.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
index a80da94..3b19ede 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/channels/OutputChannel.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.io.network.channels;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.event.task.AbstractEvent;
 import org.apache.flink.runtime.event.task.AbstractTaskEvent;
 import org.apache.flink.runtime.io.network.Buffer;
@@ -34,7 +34,7 @@ import java.util.Arrays;
 
 public class OutputChannel extends Channel {
 
-	private static final Log LOG = LogFactory.getLog(OutputChannel.class);
+	private static final Logger LOG = LoggerFactory.getLogger(OutputChannel.class);
 
 	private final Object closeLock = new Object();
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
index 8d42821..238d1e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/gates/InputGate.java
@@ -24,8 +24,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicReference;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.runtime.deployment.ChannelDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.GateDeploymentDescriptor;
@@ -55,7 +55,7 @@ public class InputGate<T extends IOReadableWritable> extends Gate<T> implements
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(InputGate.class);
+	private static final Logger LOG = LoggerFactory.getLogger(InputGate.class);
 
 	/**
 	 * The array of input channels attached to this input gate.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
index 82f84ad..1dba3f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/InboundEnvelopeDecoder.java
@@ -23,8 +23,8 @@ import io.netty.buffer.ByteBuf;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.io.network.Buffer;
 import org.apache.flink.runtime.io.network.Envelope;
 import org.apache.flink.runtime.io.network.bufferprovider.BufferAvailabilityListener;
@@ -39,7 +39,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 
 public class InboundEnvelopeDecoder extends ChannelInboundHandlerAdapter implements BufferAvailabilityListener {
 
-	private static final Log LOG = LogFactory.getLog(InboundEnvelopeDecoder.class);
+	private static final Logger LOG = LoggerFactory.getLogger(InboundEnvelopeDecoder.class);
 
 	private final BufferProviderBroker bufferProviderBroker;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
index 7e9af6f..6d7e15c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyConnectionManager.java
@@ -30,8 +30,8 @@ import io.netty.channel.nio.NioEventLoopGroup;
 import io.netty.channel.socket.SocketChannel;
 import io.netty.channel.socket.nio.NioServerSocketChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.io.network.ChannelManager;
 import org.apache.flink.runtime.io.network.Envelope;
 import org.apache.flink.runtime.io.network.EnvelopeDispatcher;
@@ -48,7 +48,7 @@ import java.util.concurrent.ConcurrentMap;
 
 public class NettyConnectionManager implements NetworkConnectionManager {
 
-	private static final Log LOG = LogFactory.getLog(NettyConnectionManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(NettyConnectionManager.class);
 
 	private static final int DEBUG_PRINT_QUEUED_ENVELOPES_EVERY_MS = 10000;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
index eb76be5..4008e97 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/OutboundConnectionQueue.java
@@ -25,8 +25,8 @@ import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.ChannelInboundHandlerAdapter;
 import io.netty.handler.timeout.IdleStateEvent;
 import io.netty.handler.timeout.IdleStateHandler;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.io.network.Envelope;
 import org.apache.flink.runtime.io.network.NetworkConnectionManager;
 import org.apache.flink.runtime.io.network.RemoteReceiver;
@@ -41,7 +41,7 @@ public class OutboundConnectionQueue extends ChannelInboundHandlerAdapter {
 		TRIGGER_WRITE
 	}
 
-	private static final Log LOG = LogFactory.getLog(OutboundConnectionQueue.class);
+	private static final Logger LOG = LoggerFactory.getLogger(OutboundConnectionQueue.class);
 
 	private final ChannelWriteListener writeListener = new ChannelWriteListener();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
index 346b79a..87267fe 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Client.java
@@ -25,8 +25,8 @@
 
 package org.apache.flink.runtime.ipc;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
@@ -65,7 +65,7 @@ import java.util.concurrent.atomic.AtomicLong;
  */
 public class Client {
 
-	public static final Log LOG = LogFactory.getLog(Client.class);
+	public static final Logger LOG = LoggerFactory.getLogger(Client.class);
 
 	private Hashtable<ConnectionId, Connection> connections = new Hashtable<ConnectionId, Connection>();
 
@@ -512,14 +512,14 @@ public class Client {
 						try {
 							c = ClassUtils.getRecordByName(returnClassName);
 						} catch (ClassNotFoundException e) {
-							LOG.error(e);
+							LOG.error("Could not find class " + returnClassName + ".", e);
 						}
 						try {
 							value = c.newInstance();
 						} catch (InstantiationException e) {
-							LOG.error(e);
+							LOG.error("Could not instantiate object of class " + c.getCanonicalName() + ".", e);
 						} catch (IllegalAccessException e) {
-							LOG.error(e);
+							LOG.error("Error instantiating object of class " + c.getCanonicalName() + ".", e);
 						} 
 						try {
 							value.read(new InputViewDataInputStreamWrapper(in)); // read value

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
index efeeadc..1768687 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/RPC.java
@@ -38,8 +38,8 @@ import java.util.Map;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.DataInputView;
@@ -64,7 +64,7 @@ import org.apache.flink.util.ClassUtils;
  */
 public class RPC {
 
-	private static final Log LOG = LogFactory.getLog(RPC.class);
+	private static final Logger LOG = LoggerFactory.getLogger(RPC.class);
 
 	private RPC() {
 	} // no public ctor

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
index a8c970c..bde6847 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/ipc/Server.java
@@ -58,8 +58,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.io.StringRecord;
 import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
@@ -76,7 +76,7 @@ import org.apache.flink.util.ClassUtils;
  */
 public abstract class Server {
 
-	public static final Log LOG = LogFactory.getLog(Server.class);
+	public static final Logger LOG = LoggerFactory.getLogger(Server.class);
 
 	private static final Class<?>[] EMPTY_ARRAY = new Class[] {};
 
@@ -889,7 +889,7 @@ public abstract class Server {
 					protocol = getProtocolClass(header.getProtocol());
 				}
 			} catch (ClassNotFoundException cnfe) {
-				LOG.error(cnfe);
+				LOG.error("Could not find class " + header.getProtocol() + ".", cnfe);
 				throw new IOException("Unknown protocol: " + header.getProtocol());
 			}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
index 9eb0637..b0ed29e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/convergence/WorksetEmptyConvergenceCriterion.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.iterative.convergence;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.types.LongValue;
 
@@ -31,7 +31,7 @@ public class WorksetEmptyConvergenceCriterion implements ConvergenceCriterion<Lo
 
 	private static final long serialVersionUID = 1L;
 
-	private static final Log log = LogFactory.getLog(WorksetEmptyConvergenceCriterion.class);
+	private static final Logger log = LoggerFactory.getLogger(WorksetEmptyConvergenceCriterion.class);
 	
 	public static final String AGGREGATOR_NAME = "pact.runtime.workset-empty-aggregator";
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
index 4c03278..1d37fd8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/AbstractIterativePactTask.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.LongSumAggregator;
 import org.apache.flink.api.common.functions.Function;
@@ -57,7 +57,7 @@ import java.io.IOException;
 public abstract class AbstractIterativePactTask<S extends Function, OT> extends RegularPactTask<S, OT>
 		implements Terminable
 {
-	private static final Log log = LogFactory.getLog(AbstractIterativePactTask.class);
+	private static final Logger log = LoggerFactory.getLogger(AbstractIterativePactTask.class);
 	
 	protected LongSumAggregator worksetAggregator;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
index 3dbd47c..3d1f599 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationHeadPactTask.java
@@ -22,8 +22,8 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.common.typeutils.TypeComparator;
 import org.apache.flink.api.common.typeutils.TypeComparatorFactory;
@@ -78,7 +78,7 @@ import org.apache.flink.util.MutableObjectIterator;
  */
 public class IterationHeadPactTask<X, Y, S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
 
-	private static final Log log = LogFactory.getLog(IterationHeadPactTask.class);
+	private static final Logger log = LoggerFactory.getLogger(IterationHeadPactTask.class);
 
 	private Collector<X> finalOutputCollector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
index b12e70b..a7835ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationIntermediatePactTask.java
@@ -20,8 +20,8 @@ package org.apache.flink.runtime.iterative.task;
 
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.io.network.api.BufferWriter;
 import org.apache.flink.runtime.io.network.channels.EndOfSuperstepEvent;
@@ -44,7 +44,7 @@ import org.apache.flink.util.Collector;
  */
 public class IterationIntermediatePactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT> {
 
-	private static final Log log = LogFactory.getLog(IterationIntermediatePactTask.class);
+	private static final Logger log = LoggerFactory.getLogger(IterationIntermediatePactTask.class);
 
 	private WorksetUpdateOutputCollector<OT> worksetUpdateOutputCollector;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
index c44f443..7852918 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationSynchronizationSinkTask.java
@@ -24,8 +24,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.aggregators.AggregatorWithName;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
@@ -51,7 +51,7 @@ import com.google.common.base.Preconditions;
  */
 public class IterationSynchronizationSinkTask extends AbstractInvokable implements Terminable {
 
-	private static final Log log = LogFactory.getLog(IterationSynchronizationSinkTask.class);
+	private static final Logger log = LoggerFactory.getLogger(IterationSynchronizationSinkTask.class);
 
 	private MutableRecordReader<IntegerRecord> headEventReader;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
index 0d9c903..0a28bd7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/IterationTailPactTask.java
@@ -18,8 +18,8 @@
 
 package org.apache.flink.runtime.iterative.task;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrier;
 import org.apache.flink.runtime.iterative.concurrent.SolutionSetUpdateBarrierBroker;
@@ -42,7 +42,7 @@ import org.apache.flink.util.Collector;
 public class IterationTailPactTask<S extends Function, OT> extends AbstractIterativePactTask<S, OT>
 		implements PactTaskContext<S, OT> {
 
-	private static final Log log = LogFactory.getLog(IterationTailPactTask.class);
+	private static final Logger log = LoggerFactory.getLogger(IterationTailPactTask.class);
 
 	private SolutionSetUpdateBarrier solutionSetUpdateBarrier;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
index d1601f2..d142348 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/iterative/task/SyncEventHandler.java
@@ -30,7 +30,7 @@ import com.google.common.base.Preconditions;
 
 public class SyncEventHandler implements EventListener {
 	
-//	private static final Log log = LogFactory.getLog(SyncEventHandler.class);
+//	private static final Logger log = LoggerFactory.getLogger(SyncEventHandler.class);
 	
 	private final ClassLoader userCodeClassLoader;
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
index 71957ad..3ba630b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManager.java
@@ -42,8 +42,8 @@ import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
 import org.apache.commons.cli.ParseException;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -111,10 +111,6 @@ import org.apache.flink.runtime.types.IntegerRecord;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.SerializableArrayList;
 import org.apache.flink.util.StringUtils;
-import org.apache.log4j.ConsoleAppender;
-import org.apache.log4j.Level;
-import org.apache.log4j.Logger;
-import org.apache.log4j.PatternLayout;
 
 /**
  * In Nephele the job manager is the central component for communication with clients, creating
@@ -128,7 +124,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 		JobManagerProtocol, ChannelLookupProtocol, JobStatusListener, AccumulatorProtocol
 {
 
-	private static final Log LOG = LogFactory.getLog(JobManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobManager.class);
 
 	private final Server jobManagerServer;
 
@@ -276,7 +272,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			try {
 				this.executorService.awaitTermination(5000L, TimeUnit.MILLISECONDS);
 			} catch (InterruptedException e) {
-				LOG.debug(e);
+				LOG.debug("Got interrupted while waiting for the executor service to shutdown.", e);
 			}
 		}
 
@@ -310,16 +306,6 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 	 */
 	
 	public static void main(String[] args) {
-		// determine if a valid log4j config exists and initialize a default logger if not
-		if (System.getProperty("log4j.configuration") == null) {
-			Logger root = Logger.getRootLogger();
-			root.removeAllAppenders();
-			PatternLayout layout = new PatternLayout("%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n");
-			ConsoleAppender appender = new ConsoleAppender(layout, "System.err");
-			root.addAppender(appender);
-			root.setLevel(Level.INFO);
-		}
-		
 		JobManager jobManager;
 		try {
 			jobManager = initialize(args);
@@ -327,7 +313,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			jobManager.startInfoServer();
 		}
 		catch (Exception e) {
-			LOG.fatal(e.getMessage(), e);
+			LOG.error(e.getMessage(), e);
 			System.exit(FAILURE_RETURN_CODE);
 		}
 		
@@ -566,7 +552,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 			LibraryCacheManager.unregister(executionGraph.getJobID());
 		} catch (IOException ioe) {
 			if (LOG.isWarnEnabled()) {
-				LOG.warn(ioe);
+				LOG.warn("IOException while unregistering the job with id " + executionGraph.getJobID() + ".",ioe);
 			}
 		}
 	}
@@ -940,7 +926,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 				try {
 					instance.killTaskManager();
 				} catch (IOException ioe) {
-					LOG.error(ioe);
+					LOG.error("IOException while killing the task manager on instance " + instanceName + ".", ioe);
 				}
 			}
 		};
@@ -1023,7 +1009,7 @@ public class JobManager implements DeploymentManager, ExtendedManagementProtocol
 						it2.next().logBufferUtilization();
 					}
 				} catch (IOException ioe) {
-					LOG.error(ioe);
+					LOG.error("IOException while logging buffer utilization.", ioe);
 				}
 
 			}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerUtils.java
index 6e11a90..4c7b957 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/JobManagerUtils.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.jobmanager;
 import java.lang.reflect.Constructor;
 import java.lang.reflect.InvocationTargetException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.ExecutionMode;
 import org.apache.flink.runtime.instance.InstanceManager;
 import org.apache.flink.runtime.jobmanager.scheduler.DefaultScheduler;
@@ -38,7 +38,7 @@ public class JobManagerUtils {
 	/**
 	 * The logging object used by the utility methods.
 	 */
-	private static final Log LOG = LogFactory.getLog(JobManagerUtils.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobManagerUtils.class);
 
 	/**
 	 * Private constructor.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
index dd2fabd..47f959b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/scheduler/DefaultScheduler.java
@@ -31,8 +31,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.Deque;
 import java.util.ArrayDeque;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ExecutionEdge;
 import org.apache.flink.runtime.executiongraph.ExecutionGate;
@@ -70,7 +70,7 @@ public class DefaultScheduler implements InstanceListener, JobStatusListener, Ex
 	/**
 	 * The LOG object to report events within the scheduler.
 	 */
-	protected static final Log LOG = LogFactory.getLog(DefaultScheduler.class);
+	protected static final Logger LOG = LoggerFactory.getLogger(DefaultScheduler.class);
 
 	/**
 	 * The instance manager assigned to this scheduler.
@@ -511,7 +511,7 @@ public class DefaultScheduler implements InstanceListener, JobStatusListener, Ex
 					getInstanceManager().releaseAllocatedResource(allocatedResource);
 				}
 			} catch (InstanceException e) {
-				LOG.error(e);
+				LOG.error("InstanceException while releasing allocated ressources.", e);
 			}
 			return;
 		}
@@ -560,7 +560,7 @@ public class DefaultScheduler implements InstanceListener, JobStatusListener, Ex
 							try {
 								getInstanceManager().releaseAllocatedResource(allocatedResource);
 							} catch (InstanceException e) {
-								LOG.error(e);
+								LOG.error("InstanceException while releasing allocated ressources.", e);
 							}
 							return;
 						}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
index 9c722b7..916772a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/DefaultInputSplitAssigner.java
@@ -25,8 +25,8 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -45,7 +45,7 @@ public class DefaultInputSplitAssigner implements InputSplitAssigner {
 	/**
 	 * The logging object used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(DefaultInputSplitAssigner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultInputSplitAssigner.class);
 
 	/**
 	 * The split map stores a list of all input splits that still must be consumed by a specific input vertex.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
index 158a283..d9de4ee 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitManager.java
@@ -24,8 +24,8 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.io.GenericInputSplit;
@@ -52,7 +52,7 @@ public final class InputSplitManager {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(InputSplitManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(InputSplitManager.class);
 
 	/**
 	 * The prefix of the configuration key which is used to retrieve the class names of the individual

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
index d15ec3e..013fbec 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/InputSplitTracker.java
@@ -25,8 +25,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphIterator;
@@ -48,7 +48,7 @@ final class InputSplitTracker {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(InputSplitTracker.class);
+	private static final Logger LOG = LoggerFactory.getLogger(InputSplitTracker.class);
 
 	/**
 	 * The central split map which stores the logs of the individual input vertices.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
index 448fa51..6a45866 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitAssigner.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.jobmanager.splitassigner;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
@@ -44,7 +44,7 @@ public final class LocatableInputSplitAssigner implements InputSplitAssigner {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(LocatableInputSplitAssigner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitAssigner.class);
 
 	private final ConcurrentMap<ExecutionGroupVertex, LocatableInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, LocatableInputSplitList>();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
index 57ea393..71fbf7a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/LocatableInputSplitList.java
@@ -27,8 +27,8 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.io.LocatableInputSplit;
 import org.apache.flink.runtime.instance.Instance;
 
@@ -47,7 +47,7 @@ public final class LocatableInputSplitList {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(LocatableInputSplitList.class);
+	private static final Logger LOG = LoggerFactory.getLogger(LocatableInputSplitList.class);
 
 	/**
 	 * The set containing all the locatable input splits that still must be consumed.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
index f4e0c74..383ed38 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitAssigner.java
@@ -22,8 +22,8 @@ package org.apache.flink.runtime.jobmanager.splitassigner.file;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.io.InputSplit;
 import org.apache.flink.runtime.executiongraph.ExecutionGroupVertex;
@@ -45,7 +45,7 @@ public final class FileInputSplitAssigner implements InputSplitAssigner {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(FileInputSplitAssigner.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitAssigner.class);
 
 	private final ConcurrentMap<ExecutionGroupVertex, FileInputSplitList> vertexMap = new ConcurrentHashMap<ExecutionGroupVertex, FileInputSplitList>();
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
index c54a98c..06cca24 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/splitassigner/file/FileInputSplitList.java
@@ -27,8 +27,8 @@ import java.util.PriorityQueue;
 import java.util.Queue;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.runtime.instance.Instance;
 
@@ -47,7 +47,7 @@ public final class FileInputSplitList {
 	/**
 	 * The logging object which is used to report information and errors.
 	 */
-	private static final Log LOG = LogFactory.getLog(FileInputSplitList.class);
+	private static final Logger LOG = LoggerFactory.getLogger(FileInputSplitList.class);
 
 	/**
 	 * The set containing all the file input splits that still must be consumed.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
index 2f738a2..91d2110 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/JobmanagerInfoServlet.java
@@ -33,8 +33,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.accumulators.AccumulatorHelper;
 import org.apache.flink.runtime.event.job.AbstractEvent;
 import org.apache.flink.runtime.event.job.ExecutionStateChangeEvent;
@@ -61,7 +61,7 @@ public class JobmanagerInfoServlet extends HttpServlet {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(JobmanagerInfoServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(JobmanagerInfoServlet.class);
 	
 	/**
 	 * Underlying JobManager

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
index 5b09704..2f915f8 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/LogfileInfoServlet.java
@@ -28,8 +28,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.flink.util.StringUtils;
 
@@ -42,7 +42,7 @@ public class LogfileInfoServlet extends HttpServlet {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(LogfileInfoServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(LogfileInfoServlet.class);
 
 	private File[] logDirs;
 
@@ -58,11 +58,11 @@ public class LogfileInfoServlet extends HttpServlet {
 		try {
 			if("stdout".equals(req.getParameter("get"))) {
 				// Find current stdout file
-				sendFile("jobmanager-stdout.log", resp);
+				sendFile(".*-jobmanager-[^\\.]*\\.out", resp);
 			}
 			else {
 				// Find current logfile
-				sendFile("jobmanager-log4j.log", resp);
+				sendFile(".*-jobmanager-[^\\.]*\\.log", resp);
 			}
 		} catch (Throwable t) {
 			resp.setStatus(HttpServletResponse.SC_BAD_REQUEST);
@@ -73,14 +73,11 @@ public class LogfileInfoServlet extends HttpServlet {
 		}
 	}
 
-	private void sendFile(String fileName, HttpServletResponse resp) throws IOException {
+	private void sendFile(String fileNamePattern, HttpServletResponse resp) throws IOException {
 		for(File logDir: logDirs) {
 			for(File f : logDir.listFiles()) {
 				// contains "jobmanager" ".log" and no number in the end ->needs improvement
-				if( f.getName().equals(fileName) /*||
-						(f.getName().indexOf("jobmanager") != -1 && f.getName().indexOf(".log") != -1 && ! Character.isDigit(f.getName().charAt(f.getName().length() - 1) )) */
-						) {
-
+				if( f.getName().matches(fileNamePattern)) {
 					resp.setStatus(HttpServletResponse.SC_OK);
 					resp.setContentType("text/plain");
 					writeFile(resp.getOutputStream(), f);

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
index 8397ab4..441b64b 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/MenuServlet.java
@@ -26,8 +26,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A Servlet that displays the Configruation in the webinterface.
@@ -43,7 +43,7 @@ public class MenuServlet extends HttpServlet {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(MenuServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(MenuServlet.class);
 	
 	/**
 	 * Array of possible menu entries on the left
@@ -68,7 +68,7 @@ public class MenuServlet extends HttpServlet {
 	
 	public MenuServlet() {
 		if (names.length != entries.length || names.length != classes.length) {
-			LOG.fatal("The Arrays 'entries', 'classes' and 'names' differ in thier length. This is not allowed!");
+			LOG.error("The Arrays 'entries', 'classes' and 'names' differ in thier length. This is not allowed!");
 		}
 	}
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
index aa781f6..a9bbec6 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/SetupInfoServlet.java
@@ -32,8 +32,8 @@ import javax.servlet.http.HttpServlet;
 import javax.servlet.http.HttpServletRequest;
 import javax.servlet.http.HttpServletResponse;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.runtime.instance.Instance;
@@ -57,7 +57,7 @@ public class SetupInfoServlet extends HttpServlet {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(SetupInfoServlet.class);
+	private static final Logger LOG = LoggerFactory.getLogger(SetupInfoServlet.class);
 	
 	private Configuration globalC;
 	private JobManager jobmanager;

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
index 933e49d..283fb83 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/web/WebInfoServer.java
@@ -23,8 +23,8 @@ import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.GlobalConfiguration;
@@ -50,7 +50,7 @@ public class WebInfoServer {
 	/**
 	 * The log for this class.
 	 */
-	private static final Log LOG = LogFactory.getLog(WebInfoServer.class);
+	private static final Logger LOG = LoggerFactory.getLogger(WebInfoServer.class);
 
 	/**
 	 * The jetty server serving all requests.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/managementgraph/ManagementGraphIterator.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/managementgraph/ManagementGraphIterator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/managementgraph/ManagementGraphIterator.java
index 79ba66a..c2b8ef3 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/managementgraph/ManagementGraphIterator.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/managementgraph/ManagementGraphIterator.java
@@ -24,8 +24,8 @@ import java.util.Iterator;
 import java.util.Set;
 import java.util.Stack;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This class provides an implementation of the {@link java.util.Iterator} interface which allows to
@@ -41,7 +41,7 @@ public final class ManagementGraphIterator implements Iterator<ManagementVertex>
 	/**
 	 * The log object used for debugging.
 	 */
-	private static final Log LOG = LogFactory.getLog(ManagementGraphIterator.class);
+	private static final Logger LOG = LoggerFactory.getLogger(ManagementGraphIterator.class);
 
 	/**
 	 * The management graph this iterator traverses.

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
index 7939a35..a633948 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/memorymanager/DefaultMemoryManager.java
@@ -28,8 +28,8 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 
@@ -47,9 +47,9 @@ public class DefaultMemoryManager implements MemoryManager {
 	public static final int MIN_PAGE_SIZE = 4 * 1024;
 	
 	/**
-	 * The Log.
+	 * The Logger.
 	 */
-	private static final Log LOG = LogFactory.getLog(DefaultMemoryManager.class);
+	private static final Logger LOG = LoggerFactory.getLogger(DefaultMemoryManager.class);
 	
 	// --------------------------------------------------------------------------------------------
 	

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
index b2103e1..46edfdc 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/NetUtils.java
@@ -46,12 +46,12 @@ import java.util.Set;
 
 import javax.net.SocketFactory;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.runtime.ipc.Server;
 
 public class NetUtils {
-	private static final Log LOG = LogFactory.getLog(NetUtils.class);
+	private static final Logger LOG = LoggerFactory.getLogger(NetUtils.class);
 
 	private static Map<String, String> hostToResolved = new HashMap<String, String>();
 
@@ -102,7 +102,7 @@ public class NetUtils {
 				hostname = addr.getHost();
 				port = addr.getPort();
 			} catch (URISyntaxException use) {
-				LOG.fatal(use);
+				LOG.error("Invalid URI syntax.", use);
 			}
 		}
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
index 2276c7f..f1fa4e7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/net/SocketIOWithTimeout.java
@@ -38,8 +38,8 @@ import java.nio.channels.spi.SelectorProvider;
 import java.util.Iterator;
 import java.util.LinkedList;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * This supports input and output streams for a socket channels.
@@ -48,7 +48,7 @@ import org.apache.commons.logging.LogFactory;
 abstract class SocketIOWithTimeout {
 	// This is intentionally package private.
 
-	static final Log LOG = LogFactory.getLog(SocketIOWithTimeout.class);
+	static final Logger LOG = LoggerFactory.getLogger(SocketIOWithTimeout.class);
 
 	private SelectableChannel channel;
 

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/08188508/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
index 0727d63..9a63226 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/AllGroupReduceDriver.java
@@ -19,8 +19,8 @@
 
 package org.apache.flink.runtime.operators;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.apache.flink.api.common.functions.FlatCombineFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
@@ -41,7 +41,7 @@ import org.apache.flink.util.MutableObjectIterator;
  */
 public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunction<IT, OT>, OT> {
 	
-	private static final Log LOG = LogFactory.getLog(AllGroupReduceDriver.class);
+	private static final Logger LOG = LoggerFactory.getLogger(AllGroupReduceDriver.class);
 
 	private PactTaskContext<GroupReduceFunction<IT, OT>, OT> taskContext;
 	
@@ -124,4 +124,4 @@ public class AllGroupReduceDriver<IT, OT> implements PactDriver<GroupReduceFunct
 
 	@Override
 	public void cancel() {}
-}
\ No newline at end of file
+}


Mime
View raw message