flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject flink git commit: [FLINK-2703] Prepare Flink for being used with Logback.
Date Wed, 30 Sep 2015 16:01:10 GMT
Repository: flink
Updated Branches:
  refs/heads/master 622c1be8c -> 1243d7b96


[FLINK-2703] Prepare Flink for being used with Logback.

This closes #1194


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1243d7b9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1243d7b9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1243d7b9

Branch: refs/heads/master
Commit: 1243d7b9641f7504fd81d7ac3a2f57f3bb8bb257
Parents: 622c1be
Author: Robert Metzger <rmetzger@apache.org>
Authored: Mon Sep 28 12:16:29 2015 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Wed Sep 30 18:00:26 2015 +0200

----------------------------------------------------------------------
 docs/apis/best_practices.md                     | 129 +++++++++++++++++++
 flink-dist/pom.xml                              |   2 +
 flink-dist/src/main/assemblies/bin.xml          |   2 +
 flink-shaded-hadoop/pom.xml                     |   9 +-
 .../YARNSessionCapacitySchedulerITCase.java     |   3 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  18 ++-
 .../org/apache/flink/yarn/YarnTestBase.java     |  12 ++
 .../src/main/resources/log4j-test.properties    |   2 +-
 .../org/apache/flink/yarn/FlinkYarnClient.java  |  10 +-
 9 files changed, 171 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/docs/apis/best_practices.md
----------------------------------------------------------------------
diff --git a/docs/apis/best_practices.md b/docs/apis/best_practices.md
index d8619d4..3de88f0 100644
--- a/docs/apis/best_practices.md
+++ b/docs/apis/best_practices.md
@@ -269,3 +269,132 @@ For Google Protobuf you need the following Maven dependency:
 
 
 Please adjust the versions of both libraries as needed.
+
+
+## Using Logback instead of Log4j
+
+**Note: This tutorial is applicable starting from Flink 0.10**
+
+Apache Flink is using [slf4j](http://www.slf4j.org/) as the logging abstraction in the code.
Users are advised to use sfl4j as well in their user functions.
+
+Sfl4j is a compile-time logging interface that can use different logging implementations
at runtime, such as [log4j](http://logging.apache.org/log4j/2.x/) or [Logback](http://logback.qos.ch/).
+
+Flink is depending on Log4j by default. This page describes how to use Flink with Logback.
+
+To get a logger instance in the code, use the following code:
+
+
+{% highlight java %}
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MyClass implements MapFunction {
+	private static final Logger LOG = LoggerFactory.getLogger(MyClass.class);
+	// ...
+{% endhighlight %}
+
+
+### Use Logback when running Flink out of the IDE / from a Java application
+
+
+In all cases were classes are executed with a classpath created by a dependency manager such
as Maven, Flink will pull log4j into the classpath.
+
+Therefore, you will need to exclude log4j from Flink's dependencies. The following description
will assume a Maven project created from a [Flink quickstart](../quickstart/java_api_quickstart.html).
+
+Change your projects `pom.xml` file like this:
+
+{% highlight xml %}
+<dependencies>
+	<!-- Add the two required logback dependencies -->
+	<dependency>
+		<groupId>ch.qos.logback</groupId>
+		<artifactId>logback-core</artifactId>
+		<version>1.1.3</version>
+	</dependency>
+	<dependency>
+		<groupId>ch.qos.logback</groupId>
+		<artifactId>logback-classic</artifactId>
+		<version>1.1.3</version>
+	</dependency>
+
+	<!-- Add the log4j -> sfl4j (-> logback) bridge into the classpath
+	 Hadoop is logging to log4j! -->
+	<dependency>
+		<groupId>org.slf4j</groupId>
+		<artifactId>log4j-over-slf4j</artifactId>
+		<version>1.7.7</version>
+	</dependency>
+
+	<dependency>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-java</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<exclusions>
+			<exclusion>
+				<groupId>log4j</groupId>
+				<artifactId>*</artifactId>
+			</exclusion>
+			<exclusion>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-log4j12</artifactId>
+			</exclusion>
+		</exclusions>
+	</dependency>
+	<dependency>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-streaming-core</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<exclusions>
+			<exclusion>
+				<groupId>log4j</groupId>
+				<artifactId>*</artifactId>
+			</exclusion>
+			<exclusion>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-log4j12</artifactId>
+			</exclusion>
+		</exclusions>
+	</dependency>
+	<dependency>
+		<groupId>org.apache.flink</groupId>
+		<artifactId>flink-clients</artifactId>
+		<version>0.10-SNAPSHOT</version>
+		<exclusions>
+			<exclusion>
+				<groupId>log4j</groupId>
+				<artifactId>*</artifactId>
+			</exclusion>
+			<exclusion>
+				<groupId>org.slf4j</groupId>
+				<artifactId>slf4j-log4j12</artifactId>
+			</exclusion>
+		</exclusions>
+	</dependency>
+</dependencies>
+{% endhighlight %}
+
+The following changes were done in the `<dependencies>` section:
+
+ * Exclude all `log4j` dependencies from all Flink dependencies: This causes Maven to ignore
Flink's transitive dependencies to log4j.
+ * Exclude the `slf4j-log4j12` artifact from Flink's dependencies: Since we are going to
use the slf4j to logback binding, we have to remove the slf4j to log4j binding.
+ * Add the Logback dependencies: `logback-core` and `logback-classic`
+ * Add dependencies for `log4j-over-slf4j`. `log4j-over-slf4j` is a tool which allows legacy
applications which are directly using the Log4j APIs to use the Slf4j interface. Flink depends
on Hadoop which is directly using Log4j for logging. Therefore, we need to redirect all logger
calls from Log4j to Slf4j which is in turn logging to Logback.
+
+Please note that you need to manually add the exclusions to all new Flink dependencies you
are adding to the pom file.
+
+You may also need to check if other dependencies (non Flink) are pulling in log4j bindings.
You can analyze the dependencies of your project with `mvn dependency:tree`.
+
+
+
+### Use Logback when running Flink on a cluster
+
+This tutorial is applicable when running Flink on YARN or as a standalone cluster.
+
+In order to use Logback instead of Log4j with Flink, you need to remove the `log4j-1.2.xx.jar`
and `sfl4j-log4j12-xxx.jar` from the `lib/` directory.
+
+Next, you need to put the following jar files into the `lib/` folder:
+
+ * `logback-classic.jar`
+ * `logback-core.jar`
+ * `log4j-over-slf4j.jar`: This bridge needs to be present in the classpath for redirecting
logging calls from Hadoop (which is using Log4j) to Slf4j.
+

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-dist/pom.xml
----------------------------------------------------------------------
diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml
index bf7b9df..32059ea 100644
--- a/flink-dist/pom.xml
+++ b/flink-dist/pom.xml
@@ -334,6 +334,8 @@ under the License.
 									<exclude>org.apache.flink:flink-scala-examples</exclude>
 									<exclude>org.apache.flink:flink-streaming-examples</exclude>
 									<exclude>org.apache.flink:flink-python</exclude>
+									<exclude>org.slf4j:slf4j-log4j12</exclude>
+									<exclude>log4j:log4j</exclude>
 								</excludes>
 							</artifactSet>
 							<transformers>

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-dist/src/main/assemblies/bin.xml
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/assemblies/bin.xml b/flink-dist/src/main/assemblies/bin.xml
index 270fb17..09102ef 100644
--- a/flink-dist/src/main/assemblies/bin.xml
+++ b/flink-dist/src/main/assemblies/bin.xml
@@ -40,6 +40,8 @@ under the License.
 
 			<includes>
 				<include>org.apache.flink:flink-python</include>
+				<include>org.slf4j:slf4j-log4j12</include>
+				<include>log4j:log4j</include>
 			</includes>
 		</dependencySet>
 	</dependencySets>

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-shaded-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/flink-shaded-hadoop/pom.xml b/flink-shaded-hadoop/pom.xml
index a99f3c4..76d3ef4 100644
--- a/flink-shaded-hadoop/pom.xml
+++ b/flink-shaded-hadoop/pom.xml
@@ -85,7 +85,14 @@ under the License.
 								<filter>
 									<artifact>org.slf4j:*</artifact>
 									<excludes>
-										<exclude>org/slf4j/impl/StaticLoggerBinder*</exclude>
+										<exclude>org/slf4j/impl/**</exclude>
+									</excludes>
+								</filter>
+								<!-- Exclude Hadoop's log4j. Hadoop can use Flink's log4j dependency -->
+								<filter>
+									<artifact>log4j:*</artifact>
+									<excludes>
+										<exclude>org/apache/log4j/**</exclude>
 									</excludes>
 								</filter>
 							</filters>

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index 7f14ad5..bf47dde 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -54,7 +54,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	@Test
 	public void testClientStartup() {
 		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
 						"-jm", "768",
 						"-tm", "1024", "-qu", "qa-team"},
@@ -72,6 +72,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 		LOG.info("Starting testNonexistingQueue()");
 		addTestAppender(FlinkYarnClient.class, Level.WARN);
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
 				"-jm", "768",
 				"-tm", "1024",

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index 1d08ebf..0d0a7f2 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -101,7 +101,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Test
 	public void testClientStartup() {
 		LOG.info("Starting testClientStartup()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
 						"-jm", "768",
 						"-tm", "1024",
@@ -119,6 +119,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		LOG.info("Starting testDetachedMode()");
 		addTestAppender(FlinkYarnSessionCli.class, Level.INFO);
 		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+						"-t", flinkLibFolder.getAbsolutePath(),
 						"-n", "1",
 						"-jm", "768",
 						"-tm", "1024",
@@ -166,7 +167,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Test(timeout=100000) // timeout after 100 seconds
 	public void testTaskManagerFailure() {
 		LOG.info("Starting testTaskManagerFailure()");
-		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		Runner runner = startWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t",
flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
 				"-jm", "768",
 				"-tm", "1024",
@@ -338,6 +339,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	public void testNonexistingQueue() {
 		LOG.info("Starting testNonexistingQueue()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+				"-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",
 				"-jm", "768",
 				"-tm", "1024",
@@ -362,7 +364,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	public void testResourceComputation() {
 		addTestAppender(FlinkYarnClient.class, Level.WARN);
 		LOG.info("Starting testResourceComputation()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "5",
 				"-jm", "256",
 				"-tm", "1585"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION,
0);
@@ -390,7 +392,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	public void testfullAlloc() {
 		addTestAppender(FlinkYarnClient.class, Level.WARN);
 		LOG.info("Starting testfullAlloc()");
-		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
+		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "2",
 				"-jm", "256",
 				"-tm", "3840"}, "Number of connected TaskManagers changed to", null, RunTypes.YARN_SESSION,
0);
@@ -413,7 +415,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		File exampleJarLocation = YarnTestBase.findFile("..", new ContainsName(new String[] {"-WordCount.jar"}
, "streaming")); // exclude streaming wordcount here.
 		Assert.assertNotNull("Could not find wordcount jar", exampleJarLocation);
 		runWithArgs(new String[]{"run", "-m", "yarn-cluster",
-						"-yj", flinkUberjar.getAbsolutePath(),
+						"-yj", flinkUberjar.getAbsolutePath(), "-yt", flinkLibFolder.getAbsolutePath(),
 						"-yn", "1",
 						"-ys", "2", //test that the job is executed with a DOP of 2
 						"-yjm", "768",
@@ -441,6 +443,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 						"-p", "2", //test that the job is executed with a DOP of 2
 						"-m", "yarn-cluster",
 						"-yj", flinkUberjar.getAbsolutePath(),
+						"-yt", flinkLibFolder.getAbsolutePath(),
 						"-yn", "1",
 						"-yjm", "768",
 						"-ytm", "1024", exampleJarLocation.getAbsolutePath()},
@@ -477,6 +480,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		}
 
 		Runner runner = startWithArgs(new String[]{"run", "-m", "yarn-cluster", "-yj", flinkUberjar.getAbsolutePath(),
+						"-yt", flinkLibFolder.getAbsolutePath(),
 						"-yn", "1",
 						"-yjm", "768",
 						"-yD", "yarn.heap-cutoff-ratio=0.5", // test if the cutoff is passed correctly
@@ -621,6 +625,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 		flinkYarnClient.setJobManagerMemory(768);
 		flinkYarnClient.setTaskManagerMemory(1024);
 		flinkYarnClient.setLocalJarPath(new Path(flinkUberjar.getAbsolutePath()));
+		flinkYarnClient.setShipFiles(Arrays.asList(flinkLibFolder.listFiles()));
 		String confDirPath = System.getenv("FLINK_CONF_DIR");
 		flinkYarnClient.setConfigurationDirectory(confDirPath);
 		flinkYarnClient.setFlinkConfigurationObject(GlobalConfiguration.getConfiguration());
@@ -632,9 +637,8 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 			yarnCluster = flinkYarnClient.deploy();
 			yarnCluster.connectToCluster();
 		} catch (Exception e) {
-			System.err.println("Error while deploying YARN cluster: "+e.getMessage());
 			LOG.warn("Failing test", e);
-			Assert.fail();
+			Assert.fail("Error while deploying YARN cluster: "+e.getMessage());
 		}
 		FlinkYarnClusterStatus expectedStatus = new FlinkYarnClusterStatus(1, 1);
 		for(int second = 0; second < WAIT_TIME * 2; second++) { // run "forever"

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
index 3337f1a..b7d0b33 100644
--- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java
@@ -98,10 +98,18 @@ public abstract class YarnTestBase extends TestLogger {
 
 	protected static MiniYARNCluster yarnCluster = null;
 
+	/**
+	 * Uberjar (fat jar) file of Flink
+	 */
 	protected static File flinkUberjar;
 
 	protected static final Configuration yarnConfiguration;
 
+	/**
+	 * lib/ folder of the flink distribution.
+	 */
+	protected static File flinkLibFolder;
+
 	static {
 		yarnConfiguration = new YarnConfiguration();
 		yarnConfiguration.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
@@ -329,6 +337,10 @@ public abstract class YarnTestBase extends TestLogger {
 		flinkUberjar = findFile(uberjarStartLoc, new RootDirFilenameFilter());
 		Assert.assertNotNull("Flink uberjar not found", flinkUberjar);
 		String flinkDistRootDir = flinkUberjar.getParentFile().getParent();
+		flinkLibFolder = flinkUberjar.getParentFile(); // the uberjar is located in lib/
+		Assert.assertNotNull("Flink flinkLibFolder not found", flinkLibFolder);
+		Assert.assertTrue("lib folder not found", flinkLibFolder.exists());
+		Assert.assertTrue("lib folder not found", flinkLibFolder.isDirectory());
 
 		if (!flinkUberjar.exists()) {
 			Assert.fail("Unable to locate yarn-uberjar.jar");

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-yarn-tests/src/main/resources/log4j-test.properties
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/main/resources/log4j-test.properties b/flink-yarn-tests/src/main/resources/log4j-test.properties
index 3fee7d6..dc02575 100644
--- a/flink-yarn-tests/src/main/resources/log4j-test.properties
+++ b/flink-yarn-tests/src/main/resources/log4j-test.properties
@@ -16,7 +16,7 @@
 # limitations under the License.
 ################################################################################
 
-log4j.rootLogger=FATAL, console
+log4j.rootLogger=INFO, console
 
 # Log all infos in the given file
 log4j.appender.console=org.apache.log4j.ConsoleAppender

http://git-wip-us.apache.org/repos/asf/flink/blob/1243d7b9/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
index c7ec254..44eca0d 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
@@ -26,7 +26,6 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -248,15 +247,14 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
 	@Override
 	public void setShipFiles(List<File> shipFiles) {
 		File shipFile;
-		for(Iterator<File> it = shipFiles.iterator(); it.hasNext(); ) {
-			shipFile = it.next();
+		for (File shipFile1 : shipFiles) {
+			shipFile = shipFile1;
 			// remove uberjar from ship list (by default everything in the lib/ folder is added to
 			// the list of files to ship, but we handle the uberjar separately.
-			if(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar"))
{
-				it.remove();
+			if (!(shipFile.getName().startsWith("flink-dist-") && shipFile.getName().endsWith("jar")))
{
+				this.shipFiles.add(shipFile);
 			}
 		}
-		this.shipFiles.addAll(shipFiles);
 	}
 
 	public void setDynamicPropertiesEncoded(String dynamicPropertiesEncoded) {


Mime
View raw message