hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ser...@apache.org
Subject [01/35] hive git commit: HIVE-15125: LLAP: Parallelize slider package generator (Gopal V, reviewed by Sergey Shelukhin)
Date Tue, 22 Nov 2016 02:46:10 GMT
Repository: hive
Updated Branches:
  refs/heads/hive-14535 a5ba17d5e -> 05879a8ea


HIVE-15125: LLAP: Parallelize slider package generator (Gopal V, reviewed by Sergey Shelukhin)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/aa7c9cd6
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/aa7c9cd6
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/aa7c9cd6

Branch: refs/heads/hive-14535
Commit: aa7c9cd614804c0bf683745614f7a2b264ce72bf
Parents: a4a00b2
Author: Gopal V <gopalv@apache.org>
Authored: Tue Nov 15 14:52:26 2016 -0800
Committer: Gopal V <gopalv@apache.org>
Committed: Tue Nov 15 14:52:26 2016 -0800

----------------------------------------------------------------------
 .../hadoop/hive/llap/cli/LlapServiceDriver.java | 685 +++++++++++--------
 1 file changed, 390 insertions(+), 295 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/aa7c9cd6/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
----------------------------------------------------------------------
diff --git a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
index 6f533df..dfd2f7b 100644
--- a/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
+++ b/llap-server/src/java/org/apache/hadoop/hive/llap/cli/LlapServiceDriver.java
@@ -34,6 +34,13 @@ import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletionService;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration;
@@ -71,6 +78,7 @@ import org.eclipse.jetty.server.ssl.SslSocketConnector;
 import org.json.JSONObject;
 
 import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 public class LlapServiceDriver {
 
@@ -153,11 +161,21 @@ public class LlapServiceDriver {
     }
   }
 
+  private static abstract class NamedCallable<T> implements Callable<T> {
+    public final String taskName;
+    public NamedCallable (String name) {
+      this.taskName = name;
+    }
+    public String getName() {
+      return taskName;
+    }
+  }
+
   private void run(String[] args) throws Exception {
     LlapOptionsProcessor optionsProcessor = new LlapOptionsProcessor();
-    LlapOptions options = optionsProcessor.processOptions(args);
+    final LlapOptions options = optionsProcessor.processOptions(args);
 
-    Properties propsDirectOptions = new Properties();
+    final Properties propsDirectOptions = new Properties();
 
     if (options == null) {
       // help
@@ -171,346 +189,418 @@ public class LlapServiceDriver {
       throw new Exception("Cannot load any configuration to run command");
     }
 
-    FileSystem fs = FileSystem.get(conf);
-    FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
-
-    // needed so that the file is actually loaded into configuration.
-    for (String f : NEEDED_CONFIGS) {
-      conf.addResource(f);
-      if (conf.getResource(f) == null) {
-        throw new Exception("Unable to find required config file: " + f);
-      }
-    }
-    for (String f : OPTIONAL_CONFIGS) {
-      conf.addResource(f);
-    }
-
-    conf.reloadConfiguration();
+    final long t0 = System.nanoTime();
 
-    populateConfWithLlapProperties(conf, options.getConfig());
+    final FileSystem fs = FileSystem.get(conf);
+    final FileSystem lfs = FileSystem.getLocal(conf).getRawFileSystem();
 
+    final ExecutorService executor =
+        Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() / 2,
+            new ThreadFactoryBuilder().setNameFormat("llap-pkg-%d").build());
+    final CompletionService<Void> asyncRunner = new ExecutorCompletionService<Void>(executor);
 
-    if (options.getName() != null) {
-      // update service registry configs - caveat: this has nothing to do with the actual
settings
-      // as read by the AM
-      // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch
between
-      // instances
-      conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
-          "@" + options.getName());
-    }
-
-    if (options.getLogger() != null) {
-      HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
-    }
+    try {
 
-    if (options.getSize() != -1) {
-      if (options.getCache() != -1) {
-        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false)
{
-          // direct heap allocations need to be safer
-          Preconditions.checkArgument(options.getCache() < options.getSize(),
-              "Cache size (" + humanReadableByteCount(options.getCache()) + ") has to be
smaller" +
-                  " than the container sizing (" + humanReadableByteCount(options.getSize())
+ ")");
-        } else if (options.getCache() < options.getSize()) {
-          LOG.warn("Note that this might need YARN physical memory monitoring to be turned
off "
-              + "(yarn.nodemanager.pmem-check-enabled=false)");
+      // needed so that the file is actually loaded into configuration.
+      for (String f : NEEDED_CONFIGS) {
+        conf.addResource(f);
+        if (conf.getResource(f) == null) {
+          throw new Exception("Unable to find required config file: " + f);
         }
       }
-      if (options.getXmx() != -1) {
-        Preconditions.checkArgument(options.getXmx() < options.getSize(),
-            "Working memory (Xmx=" + humanReadableByteCount(options.getXmx()) + ") has to
be" +
-                " smaller than the container sizing (" +
-                humanReadableByteCount(options.getSize()) + ")");
-      }
-      if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
-          && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED))
{
-        // direct and not memory mapped
-        Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
-            "Working memory + cache (Xmx="+ humanReadableByteCount(options.getXmx()) +
-                " + cache=" + humanReadableByteCount(options.getCache()) + ")"
-                + " has to be smaller than the container sizing (" +
-                humanReadableByteCount(options.getSize()) + ")");
+      for (String f : OPTIONAL_CONFIGS) {
+        conf.addResource(f);
       }
-    }
 
-    // This parameter is read in package.py - and nowhere else. Does not need to be part
of HiveConf - that's just confusing.
-    final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-1);
-    long containerSize = -1;
-    if (options.getSize() != -1) {
-      containerSize = options.getSize() / (1024 * 1024);
-      Preconditions.checkArgument(containerSize >= minAlloc,
-          "Container size (" + humanReadableByteCount(options.getSize()) + ") should be greater"
+
-              " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L)
+ ")");
-      conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, String.valueOf(containerSize));
-    }
+      conf.reloadConfiguration();
 
-    if (options.getExecutors() != -1) {
-      conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, String.valueOf(options.getExecutors()));
-      // TODO: vcpu settings - possibly when DRFA works right
-    }
+      populateConfWithLlapProperties(conf, options.getConfig());
 
-    if (options.getIoThreads() != -1) {
-      conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads());
-      propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
-          String.valueOf(options.getIoThreads()));
-    }
+      if (options.getName() != null) {
+        // update service registry configs - caveat: this has nothing to do with the actual
settings
+        // as read by the AM
+        // if needed, use --hiveconf llap.daemon.service.hosts=@llap0 to dynamically switch
between
+        // instances
+        conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + options.getName());
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname,
+            "@" + options.getName());
+      }
 
-    long cache = -1, xmx = -1;
-    if (options.getCache() != -1) {
-      cache = options.getCache();
-      conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
-      propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-          Long.toString(cache));
-    }
+      if (options.getLogger() != null) {
+        HiveConf.setVar(conf, ConfVars.LLAP_DAEMON_LOGGER, options.getLogger());
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_LOGGER.varname, options.getLogger());
+      }
 
-    if (options.getXmx() != -1) {
-      // Needs more explanation here
-      // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor
fraction
-      // from this, to get actual usable  memory before it goes into GC
-      xmx = options.getXmx();
-      long xmxMb = (long)(xmx / (1024 * 1024));
-      conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
-      propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
-          String.valueOf(xmxMb));
-    }
+      if (options.getSize() != -1) {
+        if (options.getCache() != -1) {
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED) == false)
{
+            // direct heap allocations need to be safer
+            Preconditions.checkArgument(options.getCache() < options.getSize(), "Cache
size ("
+                + humanReadableByteCount(options.getCache()) + ") has to be smaller"
+                + " than the container sizing (" + humanReadableByteCount(options.getSize())
+ ")");
+          } else if (options.getCache() < options.getSize()) {
+            LOG.warn("Note that this might need YARN physical memory monitoring to be turned
off "
+                + "(yarn.nodemanager.pmem-check-enabled=false)");
+          }
+        }
+        if (options.getXmx() != -1) {
+          Preconditions.checkArgument(options.getXmx() < options.getSize(), "Working memory
(Xmx="
+              + humanReadableByteCount(options.getXmx()) + ") has to be"
+              + " smaller than the container sizing (" + humanReadableByteCount(options.getSize())
+              + ")");
+        }
+        if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT)
+            && false == HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_MAPPED))
{
+          // direct and not memory mapped
+          Preconditions.checkArgument(options.getXmx() + options.getCache() < options.getSize(),
+              "Working memory + cache (Xmx=" + humanReadableByteCount(options.getXmx())
+                  + " + cache=" + humanReadableByteCount(options.getCache()) + ")"
+                  + " has to be smaller than the container sizing ("
+                  + humanReadableByteCount(options.getSize()) + ")");
+        }
+      }
 
-    if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty())
{
-      conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
-      propsDirectOptions
-          .setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
-    }
+      // This parameter is read in package.py - and nowhere else. Does not need to be part
of
+      // HiveConf - that's just confusing.
+      final long minAlloc = conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-1);
+      long containerSize = -1;
+      if (options.getSize() != -1) {
+        containerSize = options.getSize() / (1024 * 1024);
+        Preconditions.checkArgument(containerSize >= minAlloc, "Container size ("
+            + humanReadableByteCount(options.getSize()) + ") should be greater"
+            + " than minimum allocation(" + humanReadableByteCount(minAlloc * 1024L * 1024L)
+ ")");
+        conf.setLong(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+            String.valueOf(containerSize));
+      }
 
-    URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
+      if (options.getExecutors() != -1) {
+        conf.setLong(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, options.getExecutors());
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
+            String.valueOf(options.getExecutors()));
+        // TODO: vcpu settings - possibly when DRFA works right
+      }
 
-    if (null == logger) {
-      throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
-    }
+      if (options.getIoThreads() != -1) {
+        conf.setLong(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname, options.getIoThreads());
+        propsDirectOptions.setProperty(ConfVars.LLAP_IO_THREADPOOL_SIZE.varname,
+            String.valueOf(options.getIoThreads()));
+      }
 
-    Path home = new Path(System.getenv("HIVE_HOME"));
-    Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin");
+      long cache = -1, xmx = -1;
+      if (options.getCache() != -1) {
+        cache = options.getCache();
+        conf.set(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname, Long.toString(cache));
+        propsDirectOptions.setProperty(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+            Long.toString(cache));
+      }
 
-    if (!lfs.exists(home)) {
-      throw new Exception("Unable to find HIVE_HOME:" + home);
-    } else if (!lfs.exists(scripts)) {
-      LOG.warn("Unable to find llap scripts:" + scripts);
-    }
+      if (options.getXmx() != -1) {
+        // Needs more explanation here
+        // Xmx is not the max heap value in JDK8. You need to subtract 50% of the survivor
fraction
+        // from this, to get actual usable memory before it goes into GC
+        xmx = options.getXmx();
+        long xmxMb = (long) (xmx / (1024 * 1024));
+        conf.setLong(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, xmxMb);
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+            String.valueOf(xmxMb));
+      }
 
+      if (options.getLlapQueueName() != null && !options.getLlapQueueName().isEmpty())
{
+        conf.set(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname, options.getLlapQueueName());
+        propsDirectOptions.setProperty(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
+            options.getLlapQueueName());
+      }
 
-    Path libDir = new Path(tmpDir, "lib");
-    Path tezDir = new Path(libDir, "tez");
-    Path udfDir = new Path(libDir, "udfs");
+      final URL logger = conf.getResource(LlapConstants.LOG4j2_PROPERTIES_FILE);
 
-    String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
-    if (tezLibs == null) {
-      LOG.warn("Missing tez.lib.uris in tez-site.xml");
-    }
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Copying tez libs from " + tezLibs);
-    }
-    lfs.mkdirs(tezDir);
-    fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
-    CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(),
true);
-    lfs.delete(new Path(libDir, "tez.tar.gz"), false);
-
-    Class<?>[] dependencies = new Class<?>[] {
-        LlapDaemonProtocolProtos.class, // llap-common
-        LlapTezUtils.class, // llap-tez
-        LlapInputFormat.class, // llap-server
-        HiveInputFormat.class, // hive-exec
-        SslSocketConnector.class, // hive-common (https deps)
-        RegistryUtils.ServiceRecordMarshal.class, // ZK registry
-        // log4j2
-        com.lmax.disruptor.RingBuffer.class, // disruptor
-        org.apache.logging.log4j.Logger.class, // log4j-api
-        org.apache.logging.log4j.core.Appender.class, // log4j-core
-        org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
-        // log4j-1.2-API needed for NDC
-        org.apache.log4j.NDC.class,
-    };
-
-    for (Class<?> c : dependencies) {
-      Path jarPath = new Path(Utilities.jarFinderGetJar(c));
-      lfs.copyFromLocalFile(jarPath, libDir);
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Copying " + jarPath + " to " + libDir);
+      if (null == logger) {
+        throw new Exception("Unable to find required config file: llap-daemon-log4j2.properties");
       }
-    }
 
+      Path home = new Path(System.getenv("HIVE_HOME"));
+      Path scripts = new Path(new Path(new Path(home, "scripts"), "llap"), "bin");
 
-    // copy default aux classes (json/hbase)
-
-    for (String className : DEFAULT_AUX_CLASSES) {
-      localizeJarForClass(lfs, libDir, className, false);
-    }
-    Collection<String> codecs = conf.getStringCollection("io.compression.codecs");
-    if (codecs != null) {
-      for (String codecClassName : codecs) {
-        localizeJarForClass(lfs, libDir, codecClassName, false);
+      if (!lfs.exists(home)) {
+        throw new Exception("Unable to find HIVE_HOME:" + home);
+      } else if (!lfs.exists(scripts)) {
+        LOG.warn("Unable to find llap scripts:" + scripts);
       }
-    }
 
-    if (options.getIsHBase()) {
-      try {
-        localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
-        Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
-        TableMapReduceUtil.addDependencyJars(fakeJob);
-        Collection<String> hbaseJars = fakeJob.getConfiguration().getStringCollection("tmpjars");
-        for (String jarPath : hbaseJars) {
-          if (!jarPath.isEmpty()) {
-            lfs.copyFromLocalFile(new Path(jarPath), libDir);
+      final Path libDir = new Path(tmpDir, "lib");
+      final Path tezDir = new Path(libDir, "tez");
+      final Path udfDir = new Path(libDir, "udfs");
+      final Path confPath = new Path(tmpDir, "conf");
+      lfs.mkdirs(confPath);
+
+      NamedCallable<Void> downloadTez = new NamedCallable<Void>("downloadTez")
{
+        @Override
+        public Void call() throws Exception {
+          synchronized (fs) {
+            String tezLibs = conf.get(TezConfiguration.TEZ_LIB_URIS);
+            if (tezLibs == null) {
+              LOG.warn("Missing tez.lib.uris in tez-site.xml");
+            }
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Copying tez libs from " + tezLibs);
+            }
+            lfs.mkdirs(tezDir);
+            fs.copyToLocalFile(new Path(tezLibs), new Path(libDir, "tez.tar.gz"));
+            CompressionUtils.unTar(new Path(libDir, "tez.tar.gz").toString(), tezDir.toString(),
+                true);
+            lfs.delete(new Path(libDir, "tez.tar.gz"), false);
           }
+          return null;
         }
-      } catch (Throwable t) {
-        String err = "Failed to add HBase jars. Use --auxhbase=false to avoid localizing
them";
-        LOG.error(err);
-        System.err.println(err);
-        throw new RuntimeException(t);
-      }
-    }
-
-    String auxJars = options.getAuxJars();
-    if (auxJars != null && !auxJars.isEmpty()) {
-      // TODO: transitive dependencies warning?
-      String[] jarPaths = auxJars.split(",");
-      for (String jarPath : jarPaths) {
-        if (!jarPath.isEmpty()) {
-          lfs.copyFromLocalFile(new Path(jarPath), libDir);
+      };
+
+      NamedCallable<Void> copyLocalJars = new NamedCallable<Void>("copyLocalJars")
{
+        @Override
+        public Void call() throws Exception {
+          Class<?>[] dependencies = new Class<?>[] { LlapDaemonProtocolProtos.class,
// llap-common
+              LlapTezUtils.class, // llap-tez
+              LlapInputFormat.class, // llap-server
+              HiveInputFormat.class, // hive-exec
+              SslSocketConnector.class, // hive-common (https deps)
+              RegistryUtils.ServiceRecordMarshal.class, // ZK registry
+              // log4j2
+              com.lmax.disruptor.RingBuffer.class, // disruptor
+              org.apache.logging.log4j.Logger.class, // log4j-api
+              org.apache.logging.log4j.core.Appender.class, // log4j-core
+              org.apache.logging.slf4j.Log4jLogger.class, // log4j-slf4j
+              // log4j-1.2-API needed for NDC
+              org.apache.log4j.NDC.class, };
+
+          for (Class<?> c : dependencies) {
+            Path jarPath = new Path(Utilities.jarFinderGetJar(c));
+            lfs.copyFromLocalFile(jarPath, libDir);
+            if (LOG.isDebugEnabled()) {
+              LOG.debug("Copying " + jarPath + " to " + libDir);
+            }
+          }
+          return null;
         }
-      }
-    }
+      };
 
-    // UDFs
-    final Set<String> allowedUdfs;
-    
-    if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
-      allowedUdfs = downloadPermanentFunctions(conf, udfDir);
-    } else {
-      allowedUdfs = Collections.emptySet();
-    }
+      // copy default aux classes (json/hbase)
 
-    String java_home;
-    if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
-      java_home = System.getenv("JAVA_HOME");
-      String jre_home = System.getProperty("java.home");
-      if (java_home == null) {
-        java_home = jre_home;
-      } else if (!java_home.equals(jre_home)) {
-        LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]",
-            java_home, jre_home);
-      }
-    } else {
-      java_home = options.getJavaPath();
-    }
-    if (java_home == null || java_home.isEmpty()) {
-      throw new RuntimeException(
-          "Could not determine JAVA_HOME from command line parameters, environment or system
properties");
-    }
-    LOG.info("Using [{}] for JAVA_HOME", java_home);
+      NamedCallable<Void> copyAuxJars = new NamedCallable<Void>("copyAuxJars")
{
+        @Override
+        public Void call() throws Exception {
+          for (String className : DEFAULT_AUX_CLASSES) {
+            localizeJarForClass(lfs, libDir, className, false);
+          }
+          Collection<String> codecs = conf.getStringCollection("io.compression.codecs");
+          if (codecs != null) {
+            for (String codecClassName : codecs) {
+              localizeJarForClass(lfs, libDir, codecClassName, false);
+            }
+          }
 
-    Path confPath = new Path(tmpDir, "conf");
-    lfs.mkdirs(confPath);
+          if (options.getIsHBase()) {
+            try {
+              localizeJarForClass(lfs, libDir, HBASE_SERDE_CLASS, true);
+              Job fakeJob = new Job(new JobConf()); // HBase API is convoluted.
+              TableMapReduceUtil.addDependencyJars(fakeJob);
+              Collection<String> hbaseJars =
+                  fakeJob.getConfiguration().getStringCollection("tmpjars");
+              for (String jarPath : hbaseJars) {
+                if (!jarPath.isEmpty()) {
+                  lfs.copyFromLocalFile(new Path(jarPath), libDir);
+                }
+              }
+            } catch (Throwable t) {
+              String err =
+                  "Failed to add HBase jars. Use --auxhbase=false to avoid localizing them";
+              LOG.error(err);
+              System.err.println(err);
+              throw new RuntimeException(t);
+            }
+          }
 
-    // Copy over the mandatory configs for the package.
-    for (String f : NEEDED_CONFIGS) {
-      copyConfig(lfs, confPath, f);
-    }
-    for (String f : OPTIONAL_CONFIGS) {
-      try {
-        copyConfig(lfs, confPath, f);
-      } catch (Throwable t) {
-        LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
-      }
-    }
-    createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig());
-
-    // logger can be a resource stream or a real file (cannot use copy)
-    InputStream loggerContent = logger.openStream();
-    IOUtils.copyBytes(loggerContent,
-        lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf, true);
-
-    String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
-    URL metrics2 = conf.getResource(metricsFile);
-    if (metrics2 == null) {
-      LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found." +
-          " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
-      metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
-      metrics2 = conf.getResource(metricsFile);
-    }
-    if (metrics2 != null) {
-      InputStream metrics2FileStream = metrics2.openStream();
-      IOUtils.copyBytes(metrics2FileStream, lfs.create(new Path(confPath, metricsFile), true),
-          conf, true);
-      LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
-    } else {
-      LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " or
" +
-          LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
-    }
+          String auxJars = options.getAuxJars();
+          if (auxJars != null && !auxJars.isEmpty()) {
+            // TODO: transitive dependencies warning?
+            String[] jarPaths = auxJars.split(",");
+            for (String jarPath : jarPaths) {
+              if (!jarPath.isEmpty()) {
+                lfs.copyFromLocalFile(new Path(jarPath), libDir);
+              }
+            }
+          }
+          return null;
+        }
+      };
+
+      NamedCallable<Void> copyUdfJars = new NamedCallable<Void>("copyUdfJars")
{
+        @Override
+        public Void call() throws Exception {
+          // UDFs
+          final Set<String> allowedUdfs;
+
+          if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOW_PERMANENT_FNS)) {
+            synchronized (fs) {
+              allowedUdfs = downloadPermanentFunctions(conf, udfDir);
+            }
+          } else {
+            allowedUdfs = Collections.emptySet();
+          }
 
-    PrintWriter udfStream =
-        new PrintWriter(lfs.create(new Path(confPath, StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
-    for (String udfClass : allowedUdfs) {
-      udfStream.println(udfClass);
-    }
-    
-    udfStream.close();
+          PrintWriter udfStream =
+              new PrintWriter(lfs.create(new Path(confPath,
+                  StaticPermanentFunctionChecker.PERMANENT_FUNCTIONS_LIST)));
+          for (String udfClass : allowedUdfs) {
+            udfStream.println(udfClass);
+          }
 
-    // extract configs for processing by the python fragments in Slider
-    JSONObject configs = new JSONObject();
+          udfStream.close();
+          return null;
+        }
+      };
+
+      String java_home;
+      if (options.getJavaPath() == null || options.getJavaPath().isEmpty()) {
+        java_home = System.getenv("JAVA_HOME");
+        String jre_home = System.getProperty("java.home");
+        if (java_home == null) {
+          java_home = jre_home;
+        } else if (!java_home.equals(jre_home)) {
+          LOG.warn("Java versions might not match : JAVA_HOME=[{}],process jre=[{}]", java_home,
+              jre_home);
+        }
+      } else {
+        java_home = options.getJavaPath();
+      }
+      if (java_home == null || java_home.isEmpty()) {
+        throw new RuntimeException(
+            "Could not determine JAVA_HOME from command line parameters, environment or system
properties");
+      }
+      LOG.info("Using [{}] for JAVA_HOME", java_home);
+
+      NamedCallable<Void> copyConfigs = new NamedCallable<Void>("copyConfigs")
{
+        @Override
+        public Void call() throws Exception {
+          // Copy over the mandatory configs for the package.
+          for (String f : NEEDED_CONFIGS) {
+            copyConfig(lfs, confPath, f);
+          }
+          for (String f : OPTIONAL_CONFIGS) {
+            try {
+              copyConfig(lfs, confPath, f);
+            } catch (Throwable t) {
+              LOG.info("Error getting an optional config " + f + "; ignoring: " + t.getMessage());
+            }
+          }
+          createLlapDaemonConfig(lfs, confPath, conf, propsDirectOptions, options.getConfig());
+
+          // logger can be a resource stream or a real file (cannot use copy)
+          InputStream loggerContent = logger.openStream();
+          IOUtils.copyBytes(loggerContent,
+              lfs.create(new Path(confPath, "llap-daemon-log4j2.properties"), true), conf,
true);
+
+          String metricsFile = LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE;
+          URL metrics2 = conf.getResource(metricsFile);
+          if (metrics2 == null) {
+            LOG.warn(LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE + " cannot be found."
+                + " Looking for " + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE);
+            metricsFile = LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE;
+            metrics2 = conf.getResource(metricsFile);
+          }
+          if (metrics2 != null) {
+            InputStream metrics2FileStream = metrics2.openStream();
+            IOUtils.copyBytes(metrics2FileStream,
+                lfs.create(new Path(confPath, metricsFile), true), conf, true);
+            LOG.info("Copied hadoop metrics2 properties file from " + metrics2);
+          } else {
+            LOG.warn("Cannot find " + LlapConstants.LLAP_HADOOP_METRICS2_PROPERTIES_FILE
+ " or "
+                + LlapConstants.HADOOP_METRICS2_PROPERTIES_FILE + " in classpath.");
+          }
+          return null;
+        }
+      };
+
+      @SuppressWarnings("unchecked")
+      final NamedCallable<Void>[] asyncWork =
+          new NamedCallable[] {
+          downloadTez,
+          copyUdfJars,
+          copyLocalJars,
+          copyAuxJars,
+          copyConfigs };
+      @SuppressWarnings("unchecked")
+      final Future<Void>[] asyncResults = new Future[asyncWork.length];
+      for (int i = 0; i < asyncWork.length; i++) {
+        asyncResults[i] = asyncRunner.submit(asyncWork[i]);
+      }
 
-    configs.put("java.home", java_home);
+      // extract configs for processing by the python fragments in Slider
+      JSONObject configs = new JSONObject();
 
-    configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, HiveConf.getIntVar(conf,
-        ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
-    configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
+      configs.put("java.home", java_home);
 
-    configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
-        HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
+      configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname,
+          HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB));
+      configs.put(ConfVars.LLAP_DAEMON_YARN_CONTAINER_MB.varname, containerSize);
 
-    configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
-        HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
+      configs.put(HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE.varname,
+          HiveConf.getSizeVar(conf, HiveConf.ConfVars.LLAP_IO_MEMORY_MAX_SIZE));
 
-    configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, HiveConf.getIntVar(conf,
-        ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
+      configs.put(HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT.varname,
+          HiveConf.getBoolVar(conf, HiveConf.ConfVars.LLAP_ALLOCATOR_DIRECT));
 
-    configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname, HiveConf.getIntVar(conf,
-        ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
+      configs.put(ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname,
+          HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB));
 
-    configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, HiveConf.getIntVar(conf,
-        ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
+      configs.put(ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE.varname,
+          HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_VCPUS_PER_INSTANCE));
 
-    // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
-    if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
-      configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
-          HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
-    }
+      configs.put(ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname,
+          HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_NUM_EXECUTORS));
 
-    // Propagate the cluster name to the script.
-    String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
-    if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@") &&
-        clusterHosts.length() > 1) {
-      configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
-    }
+      // Let YARN pick the queue name, if it isn't provided in hive-site, or via the command-line
+      if (HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME) != null) {
+        configs.put(ConfVars.LLAP_DAEMON_QUEUE_NAME.varname,
+            HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_QUEUE_NAME));
+      }
 
-    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
-        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
+      // Propagate the cluster name to the script.
+      String clusterHosts = HiveConf.getVar(conf, ConfVars.LLAP_DAEMON_SERVICE_HOSTS);
+      if (!StringUtils.isEmpty(clusterHosts) && clusterHosts.startsWith("@")
+          && clusterHosts.length() > 1) {
+        configs.put(CONFIG_CLUSTER_NAME, clusterHosts.substring(1));
+      }
+
+      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+          conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, -1));
 
-    configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
-        conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
+      configs.put(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+          conf.getInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, -1));
 
-    long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25)
? (long)(cache * 1.25) : -1;
-    configs.put("max_direct_memory", Long.toString(maxDirect));
+      long maxDirect = (xmx > 0 && cache > 0 && xmx < cache * 1.25)
? (long) (cache * 1.25) : -1;
+      configs.put("max_direct_memory", Long.toString(maxDirect));
 
-    FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
-    OutputStreamWriter w = new OutputStreamWriter(os);
-    configs.write(w);
-    w.close();
-    os.close();
+      FSDataOutputStream os = lfs.create(new Path(tmpDir, "config.json"));
+      OutputStreamWriter w = new OutputStreamWriter(os);
+      configs.write(w);
+      w.close();
+      os.close();
 
-    lfs.close();
-    fs.close();
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Config generation took " + (System.nanoTime() - t0) + " ns");
+      }
+      for (int i = 0; i < asyncWork.length; i++) {
+        final long t1 = System.nanoTime();
+        asyncResults[i].get();
+        final long t2 = System.nanoTime();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(asyncWork[i].getName() + " waited for " + (t2 - t1) + " ns");
+        }
+      }
+    } finally {
+      executor.shutdown();
+      lfs.close();
+      fs.close();
+    }
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Exiting successfully");
@@ -520,7 +610,12 @@ public class LlapServiceDriver {
   private Set<String> downloadPermanentFunctions(Configuration conf, Path udfDir) throws
HiveException,
       URISyntaxException, IOException {
     Map<String,String> udfs = new HashMap<String, String>();
-    Hive hive = Hive.get(false);
+    HiveConf hiveConf = new HiveConf();
+    // disable expensive operations on the metastore
+    hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_INIT_METADATA_COUNT_ENABLED, false);
+    hiveConf.setBoolVar(HiveConf.ConfVars.METASTORE_METRICS, false);
+    // performance problem: ObjectStore does its own new HiveConf()
+    Hive hive = Hive.getWithFastCheck(hiveConf, false);
     ResourceDownloader resourceDownloader =
         new ResourceDownloader(conf, udfDir.toUri().normalize().getPath());
     List<Function> fns = hive.getAllFunctions();


Mime
View raw message