flink-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject flink git commit: [FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically
Date Thu, 27 Apr 2017 19:53:44 GMT
Repository: flink
Updated Branches:
  refs/heads/release-1.2 664c49df7 -> ef20aa1a1


[FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically

Sorts files read from Flink's lib directory and places the distribution
JAR to the end of the CLASSPATH.

This closes #3632


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ef20aa1a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ef20aa1a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ef20aa1a

Branch: refs/heads/release-1.2
Commit: ef20aa1a1540844888829bfd685e94764f3fd8ea
Parents: 664c49d
Author: Greg Hogan <code@greghogan.com>
Authored: Sun Mar 26 15:46:00 2017 -0400
Committer: Greg Hogan <code@greghogan.com>
Committed: Thu Apr 27 15:00:16 2017 -0400

----------------------------------------------------------------------
 flink-dist/src/main/flink-bin/bin/config.sh     | 18 ++++++++++---
 .../main/flink-bin/mesos-bin/mesos-appmaster.sh | 17 +-----------
 .../flink-bin/mesos-bin/mesos-taskmanager.sh    | 17 +-----------
 .../src/main/flink-bin/yarn-bin/yarn-session.sh | 17 +-----------
 .../yarn/AbstractYarnClusterDescriptor.java     | 28 +++++++++++---------
 5 files changed, 33 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/bin/config.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh
index 69d70ef..071bd71 100755
--- a/flink-dist/src/main/flink-bin/bin/config.sh
+++ b/flink-dist/src/main/flink-bin/bin/config.sh
@@ -18,16 +18,28 @@
 ################################################################################
 
 constructFlinkClassPath() {
+    local FLINK_DIST
+    local FLINK_CLASSPATH
 
     while read -d '' -r jarfile ; do
-        if [[ $FLINK_CLASSPATH = "" ]]; then
+        if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then
+            FLINK_DIST="$FLINK_DIST":"$jarfile"
+        elif [[ "$FLINK_CLASSPATH" == "" ]]; then
             FLINK_CLASSPATH="$jarfile";
         else
             FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile"
         fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
+    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z)
 
-    echo $FLINK_CLASSPATH
+    if [[ "$FLINK_DIST" == "" ]]; then
+        # write error message to stderr since stdout is stored as the classpath
+        (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.")
+
+        # exit function with empty classpath to force process failure
+        exit 1
+    fi
+
+    echo "$FLINK_CLASSPATH""$FLINK_DIST"
 }
 
 # These are used to mangle paths that are passed to java when using

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
index 6fae6c2..67eab9d 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh
@@ -23,26 +23,11 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink AppMaster
-constructAppMasterClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
 if [ "$FLINK_IDENT_STRING" = "" ]; then
     FLINK_IDENT_STRING="$USER"
 fi
 
-CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log"
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
index 23a301b..ab2f7b1 100755
--- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
+++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh
@@ -23,22 +23,7 @@ bin=`cd "$bin"; pwd`
 # get Flink config
 . "$bin"/config.sh
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink TaskManager
-constructTaskManagerClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=flink-taskmanager.log
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
----------------------------------------------------------------------
diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
index 1755f32..03b1e3a 100755
--- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
+++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh
@@ -29,22 +29,7 @@ fi
 
 JVM_ARGS="$JVM_ARGS -Xmx512m"
 
-# auxilliary function to construct a lightweight classpath for the
-# Flink CLI client
-constructCLIClientClassPath() {
-
-    while read -d '' -r jarfile ; do
-        if [[ $CC_CLASSPATH = "" ]]; then
-            CC_CLASSPATH="$jarfile";
-        else
-            CC_CLASSPATH="$CC_CLASSPATH":"$jarfile"
-        fi
-    done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0)
-
-    echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS
-}
-
-CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)`
+CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS`
 
 log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log
 log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties
-Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml"

http://git-wip-us.apache.org/repos/asf/flink/blob/ef20aa1a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 52d5402..9306090 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -22,9 +22,9 @@ import org.apache.flink.client.CliFrontend;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.SecurityOptions;
-import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.jobmanager.HighAvailabilityMode;
 import org.apache.hadoop.conf.Configuration;
@@ -640,12 +640,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 		final Map<String, LocalResource> localResources = new HashMap<>(2 + effectiveShipFiles.size());
 		// list of remote paths (after upload)
 		final List<Path> paths = new ArrayList<>(2 + effectiveShipFiles.size());
-		// classpath assembler
-		final StringBuilder classPathBuilder = new StringBuilder();
 		// ship list that enables reuse of resources for task manager containers
 		StringBuilder envShipFileList = new StringBuilder();
 
 		// upload and register ship files
+		final List<String> classPaths = new ArrayList<>();
 		for (File shipFile : effectiveShipFiles) {
 			LocalResource shipResources = Records.newRecord(LocalResource.class);
 
@@ -664,29 +663,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor
 
 				Files.walkFileTree(shipPath, new SimpleFileVisitor<java.nio.file.Path>() {
 					@Override
-					public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes
attrs)
+					public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs)
 							throws IOException {
-						super.preVisitDirectory(dir, attrs);
-
-						java.nio.file.Path relativePath = parentPath.relativize(dir);
+						java.nio.file.Path relativePath = parentPath.relativize(file);
 
-						classPathBuilder
-							.append(relativePath)
-							.append(File.separator)
-							.append("*")
-							.append(File.pathSeparator);
+						classPaths.add(relativePath.toString());
 
 						return FileVisitResult.CONTINUE;
 					}
 				});
 			} else {
 				// add files to the classpath
-				classPathBuilder.append(shipFile.getName()).append(File.pathSeparator);
+				classPaths.add(shipFile.getName());
 			}
 
 			envShipFileList.append(remotePath).append(",");
 		}
 
+		// normalize classpath by sorting
+		Collections.sort(classPaths);
+
+		// classpath assembler
+		StringBuilder classPathBuilder = new StringBuilder();
+		for (String classPath : classPaths) {
+			classPathBuilder.append(classPath).append(File.pathSeparator);
+		}
+
 		// Setup jar for ApplicationMaster
 		LocalResource appMasterJar = Records.newRecord(LocalResource.class);
 		LocalResource flinkConf = Records.newRecord(LocalResource.class);


Mime
View raw message