flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rmetz...@apache.org
Subject git commit: Fix for FLINK-708 (Hadoop 2.4 compatibility) and FLINK-887 (YARN Jobmanager heapspace calc)
Date Tue, 10 Jun 2014 19:41:02 GMT
Repository: incubator-flink
Updated Branches:
  refs/heads/release-0.5.1 99b4c62cb -> 94d944af8


Fix for FLINK-708 (Hadoop 2.4 compatibility) and FLINK-887 (YARN Jobmanager heapspace calc)


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/94d944af
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/94d944af
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/94d944af

Branch: refs/heads/release-0.5.1
Commit: 94d944af85dfda4672d2d677b87345dcba4c7087
Parents: 99b4c62
Author: Robert Metzger <rmetzger@apache.org>
Authored: Tue Jun 10 17:34:00 2014 +0200
Committer: Robert Metzger <rmetzger@apache.org>
Committed: Tue Jun 10 21:40:48 2014 +0200

----------------------------------------------------------------------
 .../eu/stratosphere/yarn/ApplicationMaster.java |  6 +--
 .../main/java/eu/stratosphere/yarn/Client.java  |  2 +-
 .../main/java/eu/stratosphere/yarn/Utils.java   | 43 ++++++++++++++++++--
 .../stratosphere-bin/yarn-bin/yarn-session.sh   |  2 +-
 4 files changed, 43 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94d944af/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
index 65ae114..e208b01 100644
--- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
+++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/ApplicationMaster.java
@@ -62,7 +62,6 @@ import eu.stratosphere.nephele.jobmanager.JobManager;
 public class ApplicationMaster {
 
 	private static final Log LOG = LogFactory.getLog(ApplicationMaster.class);
-	private static final int HEAP_LIMIT_CAP = 500;
 	
 	private void run() throws Exception  {
 		//Utils.logFilesInCurrentDirectory(LOG);
@@ -83,10 +82,7 @@ public class ApplicationMaster {
 		final int memoryPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_MEMORY));
 		final int coresPerTaskManager = Integer.valueOf(envs.get(Client.ENV_TM_CORES));
 		
-		int heapLimit = (int)((float)memoryPerTaskManager*0.85);
-		if( (memoryPerTaskManager - heapLimit) > HEAP_LIMIT_CAP) {
-			heapLimit = memoryPerTaskManager-HEAP_LIMIT_CAP;
-		}
+		int heapLimit = Utils.calculateHeapSize(memoryPerTaskManager);
 		
 		if(currDir == null) {
 			throw new RuntimeException("Current directory unknown");

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94d944af/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
index 9d6e5ad..4ef1456 100644
--- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
+++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Client.java
@@ -347,7 +347,7 @@ public class Client {
 				.newRecord(ContainerLaunchContext.class);
 		
 		String amCommand = "$JAVA_HOME/bin/java"
-					+ " -Xmx"+jmMemory+"M " +javaOpts;
+					+ " -Xmx"+Utils.calculateHeapSize(jmMemory)+"M " +javaOpts;
 		if(hasLog4j) {
 			amCommand 	+= " -Dlog.file=\""+ApplicationConstants.LOG_DIR_EXPANSION_VAR +"/jobmanager-log4j.log\"
-Dlog4j.configuration=file:log4j.properties";
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94d944af/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
----------------------------------------------------------------------
diff --git a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
index 551b5da..da7f8ae 100644
--- a/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
+++ b/stratosphere-addons/yarn/src/main/java/eu/stratosphere/yarn/Utils.java
@@ -42,13 +42,13 @@ import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
 import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.util.StringInterner;
 import org.apache.hadoop.yarn.api.ApplicationConstants.Environment;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.LocalResource;
 import org.apache.hadoop.yarn.api.records.LocalResourceType;
 import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.util.Apps;
 import org.apache.hadoop.yarn.util.ConverterUtils;
 
 import eu.stratosphere.configuration.ConfigConstants;
@@ -57,6 +57,7 @@ import eu.stratosphere.configuration.GlobalConfiguration;
 public class Utils {
 	
 	private static final Log LOG = LogFactory.getLog(Utils.class);
+	private static final int HEAP_LIMIT_CAP = 500;
 	
 
 	public static void copyJarContents(String prefix, String pathToJar) throws IOException {
@@ -90,6 +91,23 @@ public class Utils {
 		jar.close();
 	}
 	
+	/**
+	 * Calculate the heap size for the JVMs to start in the containers.
+	 * Since JVMs are allocating more than just the heap space, and YARN is very
+	 * fast at killing processes that use memory beyond their limit, we have to come
+	 * up with a good heapsize.
+	 * This code takes 85% of the given amount of memory (in MB). If the amount we removed by
these 85%
+	 * more than 500MB (the current HEAP_LIMIT_CAP), we'll just subtract 500 MB.
+	 * 
+	 */
+	public static int calculateHeapSize(int memory) {
+		int heapLimit = (int)((float)memory*0.85);
+		if( (memory - heapLimit) > HEAP_LIMIT_CAP) {
+			heapLimit = memory-HEAP_LIMIT_CAP;
+		}
+		return heapLimit;
+	}
+	
 	public static void getStratosphereConfiguration(String confDir) {
 		GlobalConfiguration.loadConfiguration(confDir);
 	}
@@ -106,6 +124,7 @@ public class Utils {
 		ClassLoader cl = new URLClassLoader(urls, conf.getClassLoader());
 		conf.setClassLoader(cl);
 	}
+	
 	private static void setDefaultConfValues(Configuration conf) {
 		if(conf.get("fs.hdfs.impl",null) == null) {
 			conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem");
@@ -114,6 +133,7 @@ public class Utils {
 			conf.set("fs.file.impl", "org.apache.hadoop.fs.LocalFileSystem");
 		}
 	}
+	
 	public static Configuration initializeYarnConfiguration() {
 		Configuration conf = new YarnConfiguration();
 		String configuredHadoopConfig = GlobalConfiguration.getString(ConfigConstants.PATH_HADOOP_CONFIG,
null);
@@ -157,9 +177,9 @@ public class Utils {
 	
 	public static void setupEnv(Configuration conf, Map<String, String> appMasterEnv)
{
 		for (String c : conf.getStrings(YarnConfiguration.YARN_APPLICATION_CLASSPATH,YarnConfiguration.DEFAULT_YARN_APPLICATION_CLASSPATH))
{
-			Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
+			addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), c.trim());
 		}
-		Apps.addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() +
File.separator + "*");
+		addToEnvironment(appMasterEnv, Environment.CLASSPATH.name(), Environment.PWD.$() + File.separator
+ "*");
 	}
 	
 	
@@ -221,4 +241,21 @@ public class Utils {
 			}
 		});
 	}
+	
+	/**
+	 * Copied method from org.apache.hadoop.yarn.util.Apps
+	 * It was broken by YARN-1824 (2.4.0) and fixed for 2.4.1
+	 * by https://issues.apache.org/jira/browse/YARN-1931
+	 */
+	public static void addToEnvironment(Map<String, String> environment,
+			String variable, String value) {
+		String val = environment.get(variable);
+		if (val == null) {
+			val = value;
+		} else {
+			val = val + File.pathSeparator + value;
+		}
+		environment.put(StringInterner.weakIntern(variable),
+				StringInterner.weakIntern(val));
+	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/94d944af/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh b/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh
index 63d6658..6ce83e9 100644
--- a/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh
+++ b/stratosphere-dist/src/main/stratosphere-bin/yarn-bin/yarn-session.sh
@@ -49,5 +49,5 @@ CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
 export STRATOSPHERE_CONF_DIR
 # $log_setting
 
-$JAVA_RUN $JVM_ARGS  -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -ship ship/ -confDir
$STRATOSPHERE_CONF_DIR -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar $*
+$JAVA_RUN $JVM_ARGS  -classpath $CC_CLASSPATH eu.stratosphere.yarn.Client -ship $bin/../ship/
-confDir $STRATOSPHERE_CONF_DIR -j $STRATOSPHERE_LIB_DIR/*yarn-uberjar.jar $*
 


Mime
View raw message