Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2A9F8200C69 for ; Fri, 21 Apr 2017 14:24:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2962C160B97; Fri, 21 Apr 2017 12:24:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 48B6A160BA2 for ; Fri, 21 Apr 2017 14:24:16 +0200 (CEST) Received: (qmail 99260 invoked by uid 500); 21 Apr 2017 12:24:15 -0000 Mailing-List: contact commits-help@flink.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@flink.apache.org Delivered-To: mailing list commits@flink.apache.org Received: (qmail 99205 invoked by uid 99); 21 Apr 2017 12:24:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 21 Apr 2017 12:24:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3F3BAF49F1; Fri, 21 Apr 2017 12:24:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sewen@apache.org To: commits@flink.apache.org Date: Fri, 21 Apr 2017 12:24:17 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [03/13] flink git commit: [FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically archived-at: Fri, 21 Apr 2017 12:24:17 -0000 [FLINK-6176] [scripts] [yarn] [mesos] Add JARs to CLASSPATH deterministically Sorts files read from Flink's lib directory and places the distribution JAR to the end of the CLASSPATH. This closes #3632 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/daf4038c Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/daf4038c Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/daf4038c Branch: refs/heads/master Commit: daf4038c88b459084a1232c69fb584a7e2e100da Parents: f75466f Author: Greg Hogan Authored: Sun Mar 26 15:46:00 2017 -0400 Committer: Stephan Ewen Committed: Fri Apr 21 10:30:44 2017 +0200 ---------------------------------------------------------------------- flink-dist/src/main/flink-bin/bin/config.sh | 18 +++++++++-- .../main/flink-bin/mesos-bin/mesos-appmaster.sh | 17 +---------- .../flink-bin/mesos-bin/mesos-taskmanager.sh | 17 +---------- .../src/main/flink-bin/yarn-bin/yarn-session.sh | 17 +---------- .../yarn/AbstractYarnClusterDescriptor.java | 32 +++++++++++--------- 5 files changed, 35 insertions(+), 66 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/bin/config.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/bin/config.sh b/flink-dist/src/main/flink-bin/bin/config.sh index 5ef4ef7..dbfdd0e 100755 --- a/flink-dist/src/main/flink-bin/bin/config.sh +++ b/flink-dist/src/main/flink-bin/bin/config.sh @@ -18,16 +18,28 @@ ################################################################################ constructFlinkClassPath() { + local FLINK_DIST + local FLINK_CLASSPATH while read -d '' -r jarfile ; do - if [[ $FLINK_CLASSPATH = "" ]]; then + if [[ "$jarfile" =~ .*flink-dist.*.jar ]]; then + FLINK_DIST="$FLINK_DIST":"$jarfile" + elif [[ "$FLINK_CLASSPATH" == "" ]]; then FLINK_CLASSPATH="$jarfile"; else FLINK_CLASSPATH="$FLINK_CLASSPATH":"$jarfile" fi - done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) + done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0 | sort -z) - echo $FLINK_CLASSPATH + if [[ "$FLINK_DIST" == "" ]]; then + # write error message to stderr since stdout is stored as the classpath + (>&2 echo "[ERROR] Flink distribution jar not found in $FLINK_LIB_DIR.") + + # exit function with empty classpath to force process failure + exit 1 + fi + + echo "$FLINK_CLASSPATH""$FLINK_DIST" } # These are used to mangle paths that are passed to java when using http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh index 6fae6c2..67eab9d 100755 --- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-appmaster.sh @@ -23,26 +23,11 @@ bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh -# auxilliary function to construct a lightweight classpath for the -# Flink AppMaster -constructAppMasterClassPath() { - - while read -d '' -r jarfile ; do - if [[ $CC_CLASSPATH = "" ]]; then - CC_CLASSPATH="$jarfile"; - else - CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" - fi - done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) - - echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS -} - if [ "$FLINK_IDENT_STRING" = "" ]; then FLINK_IDENT_STRING="$USER" fi -CC_CLASSPATH=`manglePathList $(constructAppMasterClassPath)` +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log="${FLINK_LOG_DIR}/flink-${FLINK_IDENT_STRING}-mesos-appmaster-${HOSTNAME}.log" log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh index 23a301b..ab2f7b1 100755 --- a/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh +++ b/flink-dist/src/main/flink-bin/mesos-bin/mesos-taskmanager.sh @@ -23,22 +23,7 @@ bin=`cd "$bin"; pwd` # get Flink config . "$bin"/config.sh -# auxilliary function to construct a lightweight classpath for the -# Flink TaskManager -constructTaskManagerClassPath() { - - while read -d '' -r jarfile ; do - if [[ $CC_CLASSPATH = "" ]]; then - CC_CLASSPATH="$jarfile"; - else - CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" - fi - done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) - - echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS -} - -CC_CLASSPATH=`manglePathList $(constructTaskManagerClassPath)` +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log=flink-taskmanager.log log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback.xml" http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh ---------------------------------------------------------------------- diff --git a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh index 1755f32..03b1e3a 100755 --- a/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh +++ b/flink-dist/src/main/flink-bin/yarn-bin/yarn-session.sh @@ -29,22 +29,7 @@ fi JVM_ARGS="$JVM_ARGS -Xmx512m" -# auxilliary function to construct a lightweight classpath for the -# Flink CLI client -constructCLIClientClassPath() { - - while read -d '' -r jarfile ; do - if [[ $CC_CLASSPATH = "" ]]; then - CC_CLASSPATH="$jarfile"; - else - CC_CLASSPATH="$CC_CLASSPATH":"$jarfile" - fi - done < <(find "$FLINK_LIB_DIR" ! -type d -name '*.jar' -print0) - - echo $CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS -} - -CC_CLASSPATH=`manglePathList $(constructCLIClientClassPath)` +CC_CLASSPATH=`manglePathList $(constructFlinkClassPath):$INTERNAL_HADOOP_CLASSPATHS` log=$FLINK_LOG_DIR/flink-$FLINK_IDENT_STRING-yarn-session-$HOSTNAME.log log_setting="-Dlog.file="$log" -Dlog4j.configuration=file:"$FLINK_CONF_DIR"/log4j-yarn-session.properties -Dlogback.configurationFile=file:"$FLINK_CONF_DIR"/logback-yarn.xml" http://git-wip-us.apache.org/repos/asf/flink/blob/daf4038c/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java index ec7af5a..a5a6c36 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java @@ -23,10 +23,10 @@ import org.apache.flink.client.deployment.ClusterDescriptor; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.GlobalConfiguration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.SecurityOptions; -import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; @@ -63,10 +63,10 @@ import org.slf4j.LoggerFactory; import java.io.ByteArrayOutputStream; import java.io.File; -import java.io.IOException; -import java.io.PrintStream; import java.io.FileOutputStream; +import java.io.IOException; import java.io.ObjectOutputStream; +import java.io.PrintStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; import java.net.URISyntaxException; @@ -669,12 +669,11 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor final Map localResources = new HashMap<>(2 + effectiveShipFiles.size()); // list of remote paths (after upload) final List paths = new ArrayList<>(2 + effectiveShipFiles.size()); - // classpath assembler - final StringBuilder classPathBuilder = new StringBuilder(); // ship list that enables reuse of resources for task manager containers StringBuilder envShipFileList = new StringBuilder(); // upload and register ship files + final List classPaths = new ArrayList<>(); for (File shipFile : effectiveShipFiles) { LocalResource shipResources = Records.newRecord(LocalResource.class); @@ -693,29 +692,32 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor Files.walkFileTree(shipPath, new SimpleFileVisitor() { @Override - public FileVisitResult preVisitDirectory(java.nio.file.Path dir, BasicFileAttributes attrs) + public FileVisitResult visitFile(java.nio.file.Path file, BasicFileAttributes attrs) throws IOException { - super.preVisitDirectory(dir, attrs); - - java.nio.file.Path relativePath = parentPath.relativize(dir); + java.nio.file.Path relativePath = parentPath.relativize(file); - classPathBuilder - .append(relativePath) - .append(File.separator) - .append("*") - .append(File.pathSeparator); + classPaths.add(relativePath.toString()); return FileVisitResult.CONTINUE; } }); } else { // add files to the classpath - classPathBuilder.append(shipFile.getName()).append(File.pathSeparator); + classPaths.add(shipFile.getName()); } envShipFileList.append(remotePath).append(","); } + // normalize classpath by sorting + Collections.sort(classPaths); + + // classpath assembler + StringBuilder classPathBuilder = new StringBuilder(); + for (String classPath : classPaths) { + classPathBuilder.append(classPath).append(File.pathSeparator); + } + // Setup jar for ApplicationMaster LocalResource appMasterJar = Records.newRecord(LocalResource.class); LocalResource flinkConf = Records.newRecord(LocalResource.class);