Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D71F018D05 for ; Mon, 4 Apr 2016 22:25:11 +0000 (UTC) Received: (qmail 89003 invoked by uid 500); 4 Apr 2016 22:25:11 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 88956 invoked by uid 500); 4 Apr 2016 22:25:11 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 88945 invoked by uid 99); 4 Apr 2016 22:25:11 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 04 Apr 2016 22:25:11 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 3849DDFC72; Mon, 4 Apr 2016 22:25:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sseth@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-13365. Change the MiniLLAPCluster to work with a MiniZKCluster, and potentially allow multiple instances of LLAP within the MiniLlapCluster. (Siddharth Seth, reviewed by Sergey Shelukhin) Date: Mon, 4 Apr 2016 22:25:11 +0000 (UTC) Repository: hive Updated Branches: refs/heads/master b44650231 -> 91ab819a1 HIVE-13365. Change the MiniLLAPCluster to work with a MiniZKCluster, and potentially allow multiple instances of LLAP within the MiniLlapCluster. (Siddharth Seth, reviewed by Sergey Shelukhin) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/91ab819a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/91ab819a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/91ab819a Branch: refs/heads/master Commit: 91ab819a18d6271a6c8905d085ad90b1b184ecae Parents: b446502 Author: Siddharth Seth Authored: Mon Apr 4 15:23:37 2016 -0700 Committer: Siddharth Seth Committed: Mon Apr 4 15:23:37 2016 -0700 ---------------------------------------------------------------------- itests/hive-unit/pom.xml | 2 - .../org/apache/hive/jdbc/miniHS2/MiniHS2.java | 2 +- .../apache/hadoop/hive/llap/LlapItUtils.java | 10 +- .../org/apache/hadoop/hive/ql/QTestUtil.java | 10 +- .../hive/llap/daemon/MiniLlapCluster.java | 145 ++++++++++++------- 5 files changed, 109 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/hive-unit/pom.xml ---------------------------------------------------------------------- diff --git a/itests/hive-unit/pom.xml b/itests/hive-unit/pom.xml index 7219f1d..ae231de 100644 --- a/itests/hive-unit/pom.xml +++ b/itests/hive-unit/pom.xml @@ -210,14 +210,12 @@ org.apache.hbase hbase-server ${hbase.version} - test org.apache.hbase hbase-server ${hbase.version} test-jar - test org.apache.hbase http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java index 6141a1a..6b337d2 100644 --- a/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java +++ b/itests/hive-unit/src/main/java/org/apache/hive/jdbc/miniHS2/MiniHS2.java @@ -202,7 +202,7 @@ public class MiniHS2 extends AbstractHiveService { if (usePortsFromConf) { hiveConf.setBoolean("minillap.usePortsFromConf", true); } - llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(hiveConf, null, null); mr = ShimLoader.getHadoopShims().getMiniTezCluster(hiveConf, 4, uriString); break; http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java b/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java index cb4aba5..c1a32c9 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/llap/LlapItUtils.java @@ -25,6 +25,7 @@ import java.util.Iterator; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.llap.configuration.LlapDaemonConfiguration; import org.apache.hadoop.hive.llap.daemon.MiniLlapCluster; @@ -36,7 +37,9 @@ public class LlapItUtils { private static final Logger LOG = LoggerFactory.getLogger(LlapItUtils.class); - public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, String confDir) throws + public static MiniLlapCluster startAndGetMiniLlapCluster(Configuration conf, + MiniZooKeeperCluster miniZkCluster, + String confDir) throws IOException { MiniLlapCluster llapCluster; LOG.info("Using conf dir: {}", confDir); @@ -57,11 +60,14 @@ public class LlapItUtils { // enabling this will cause test failures in Mac OS X final boolean directMemoryEnabled = false; final int numLocalDirs = 1; - LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + " memoryForCache: " + memoryForCache + LOG.info("MiniLlap Configs - maxMemory: " + maxMemory + + " memoryForCache: " + memoryForCache + " totalExecutorMemory: " + totalExecutorMemory + " numExecutors: " + numExecutors + " asyncIOEnabled: " + asyncIOEnabled + " directMemoryEnabled: " + directMemoryEnabled + " numLocalDirs: " + numLocalDirs); llapCluster = MiniLlapCluster.create(clusterName, + miniZkCluster, + 1, numExecutors, totalExecutorMemory, asyncIOEnabled, http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java ---------------------------------------------------------------------- diff --git a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java index 5ccbcba..8473436 100644 --- a/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java +++ b/itests/util/src/main/java/org/apache/hadoop/hive/ql/QTestUtil.java @@ -429,6 +429,9 @@ public class QTestUtil { fs = dfs.getFileSystem(); } + setup = new QTestSetup(); + setup.preTest(conf); + String uriString = WindowsPathUtil.getHdfsUriString(fs.getUri().toString()); if (clusterType == MiniClusterType.tez) { if (confDir != null && !confDir.isEmpty()) { @@ -437,13 +440,16 @@ public class QTestUtil { } mr = shims.getMiniTezCluster(conf, 4, uriString); } else if (clusterType == MiniClusterType.llap) { - llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, confDir); + llapCluster = LlapItUtils.startAndGetMiniLlapCluster(conf, setup.zooKeeperCluster, confDir); mr = shims.getMiniTezCluster(conf, 2, uriString); } else if (clusterType == MiniClusterType.miniSparkOnYarn) { mr = shims.getMiniSparkCluster(conf, 4, uriString, 1); } else { mr = shims.getMiniMrCluster(conf, 4, uriString, 1); } + } else { + setup = new QTestSetup(); + setup.preTest(conf); } initConf(); @@ -471,8 +477,6 @@ public class QTestUtil { overWrite = "true".equalsIgnoreCase(System.getProperty("test.output.overwrite")); - setup = new QTestSetup(); - setup.preTest(conf); init(); } http://git-wip-us.apache.org/repos/asf/hive/blob/91ab819a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java ---------------------------------------------------------------------- diff --git a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java index a09c0b2..9871702 100644 --- a/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java +++ b/llap-server/src/test/org/apache/hadoop/hive/llap/daemon/MiniLlapCluster.java @@ -14,12 +14,11 @@ package org.apache.hadoop.hive.llap.daemon; +import javax.annotation.Nullable; import java.io.File; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.Iterator; -import java.util.Map; +import org.apache.hadoop.hbase.zookeeper.MiniZooKeeperCluster; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -41,47 +40,57 @@ public class MiniLlapCluster extends AbstractService { private static final Logger LOG = LoggerFactory.getLogger(MiniLlapCluster.class); private final File testWorkDir; + private final String clusterNameTrimmed; + private final long numInstances; private final long execBytesPerService; private final boolean llapIoEnabled; private final boolean ioIsDirect; private final long ioBytesPerService; private final int numExecutorsPerService; + private final File zkWorkDir; private final String[] localDirs; private final Configuration clusterSpecificConfiguration = new Configuration(false); - private LlapDaemon llapDaemon; + private final LlapDaemon [] llapDaemons; + private MiniZooKeeperCluster miniZooKeeperCluster; + private final boolean ownZkCluster; - public static MiniLlapCluster create(String clusterName, int numExecutorsPerService, - long execBytePerService, boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, - int numLocalDirs) { - return new MiniLlapCluster(clusterName, numExecutorsPerService, execBytePerService, + + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return new MiniLlapCluster(clusterName, miniZkCluster, numInstances, numExecutorsPerService, + execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); } - public static MiniLlapCluster createAndLaunch(Configuration conf, String clusterName, - int numExecutorsPerService, long execBytePerService, boolean llapIoEnabled, - boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { - MiniLlapCluster miniLlapCluster = create(clusterName, numExecutorsPerService, - execBytePerService, llapIoEnabled, ioIsDirect, ioBytesPerService, numLocalDirs); - miniLlapCluster.init(conf); - miniLlapCluster.start(); - Configuration llapConf = miniLlapCluster.getClusterSpecificConfiguration(); - Iterator> confIter = llapConf.iterator(); - while (confIter.hasNext()) { - Map.Entry entry = confIter.next(); - conf.set(entry.getKey(), entry.getValue()); - } - return miniLlapCluster; + public static MiniLlapCluster create(String clusterName, + @Nullable MiniZooKeeperCluster miniZkCluster, + int numExecutorsPerService, + long execBytePerService, boolean llapIoEnabled, + boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { + return create(clusterName, miniZkCluster, 1, numExecutorsPerService, execBytePerService, + llapIoEnabled, + ioIsDirect, ioBytesPerService, numLocalDirs); } - // TODO Add support for multiple instances - private MiniLlapCluster(String clusterName, int numExecutorsPerService, long execMemoryPerService, - boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, int numLocalDirs) { + private MiniLlapCluster(String clusterName, @Nullable MiniZooKeeperCluster miniZkCluster, + int numInstances, int numExecutorsPerService, long execMemoryPerService, + boolean llapIoEnabled, boolean ioIsDirect, long ioBytesPerService, + int numLocalDirs) { super(clusterName + "_" + MiniLlapCluster.class.getSimpleName()); Preconditions.checkArgument(numExecutorsPerService > 0); Preconditions.checkArgument(execMemoryPerService > 0); Preconditions.checkArgument(numLocalDirs > 0); - String clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.numInstances = numInstances; + + this.clusterNameTrimmed = clusterName.replace("$", "") + "_" + MiniLlapCluster.class.getSimpleName(); + this.llapDaemons = new LlapDaemon[numInstances]; File targetWorkDir = new File("target", clusterNameTrimmed); try { FileContext.getLocalFSFileContext().delete( @@ -123,8 +132,18 @@ public class MiniLlapCluster extends AbstractService { this.testWorkDir = link; } else { + targetWorkDir.mkdir(); this.testWorkDir = targetWorkDir; } + if (miniZkCluster == null) { + ownZkCluster = true; + this.zkWorkDir = new File(testWorkDir, "mini-zk-cluster"); + zkWorkDir.mkdir(); + } else { + miniZooKeeperCluster = miniZkCluster; + ownZkCluster = false; + this.zkWorkDir = null; + } this.numExecutorsPerService = numExecutorsPerService; this.execBytesPerService = execMemoryPerService; this.ioIsDirect = ioIsDirect; @@ -142,12 +161,13 @@ public class MiniLlapCluster extends AbstractService { } @Override - public void serviceInit(Configuration conf) { + public void serviceInit(Configuration conf) throws IOException, InterruptedException { int rpcPort = 0; int mngPort = 0; int shufflePort = 0; int webPort = 0; boolean usePortsFromConf = conf.getBoolean("minillap.usePortsFromConf", false); + LOG.info("MiniLlap configured to use ports from conf: {}", usePortsFromConf); if (usePortsFromConf) { rpcPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_DAEMON_RPC_PORT); mngPort = HiveConf.getIntVar(conf, HiveConf.ConfVars.LLAP_MANAGEMENT_RPC_PORT); @@ -155,43 +175,61 @@ public class MiniLlapCluster extends AbstractService { webPort = HiveConf.getIntVar(conf, ConfVars.LLAP_DAEMON_WEB_PORT); } - llapDaemon = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, - ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); - llapDaemon.init(conf); + if (ownZkCluster) { + miniZooKeeperCluster = new MiniZooKeeperCluster(); + miniZooKeeperCluster.startup(zkWorkDir); + } else { + // Already setup in the create method + } + + conf.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); + conf.set(ConfVars.HIVE_ZOOKEEPER_QUORUM.varname, "localhost"); + conf.setInt(ConfVars.HIVE_ZOOKEEPER_CLIENT_PORT.varname, miniZooKeeperCluster.getClientPort()); + + LOG.info("Initializing {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i] = new LlapDaemon(conf, numExecutorsPerService, execBytesPerService, llapIoEnabled, + ioIsDirect, ioBytesPerService, localDirs, rpcPort, mngPort, shufflePort, webPort); + llapDaemons[i].init(new Configuration(conf)); + } + LOG.info("Initialized {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); } @Override public void serviceStart() { - llapDaemon.start(); - - clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, - getServiceAddress().getHostName()); - clusterSpecificConfiguration.setInt(ConfVars.LLAP_DAEMON_RPC_PORT.varname, - getServiceAddress().getPort()); - - clusterSpecificConfiguration.setInt( - ConfVars.LLAP_DAEMON_NUM_EXECUTORS.varname, - numExecutorsPerService); - clusterSpecificConfiguration.setLong( - ConfVars.LLAP_DAEMON_MEMORY_PER_INSTANCE_MB.varname, execBytesPerService); + LOG.info("Starting {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + for (int i = 0 ;i < numInstances ; i++) { + llapDaemons[i].start(); + } + LOG.info("Started {} llap instances for MiniLlapCluster with name={}", numInstances, clusterNameTrimmed); + // Optimize local fetch does not work with LLAP due to different local directories // used by containers and LLAP clusterSpecificConfiguration .setBoolean(TezRuntimeConfiguration.TEZ_RUNTIME_OPTIMIZE_LOCAL_FETCH, false); + clusterSpecificConfiguration.set(ConfVars.LLAP_DAEMON_SERVICE_HOSTS.varname, "@" + clusterNameTrimmed); } @Override - public void serviceStop() { - if (llapDaemon != null) { - llapDaemon.stop(); - llapDaemon = null; + public void serviceStop() throws IOException { + for (int i = 0 ; i < numInstances ; i++) { + if (llapDaemons[i] != null) { + llapDaemons[i].stop(); + llapDaemons[i] = null; + } + } + if (ownZkCluster) { + if (miniZooKeeperCluster != null) { + LOG.info("Stopping MiniZooKeeper cluster"); + miniZooKeeperCluster.shutdown(); + miniZooKeeperCluster = null; + LOG.info("Stopped MiniZooKeeper cluster"); + } + } else { + LOG.info("Not stopping MiniZK cluster since it is now owned by us"); } } - private InetSocketAddress getServiceAddress() { - Preconditions.checkState(getServiceState() == Service.STATE.STARTED); - return llapDaemon.getListenerAddress(); - } public Configuration getClusterSpecificConfiguration() { Preconditions.checkState(getServiceState() == Service.STATE.STARTED); @@ -200,7 +238,10 @@ public class MiniLlapCluster extends AbstractService { // Mainly for verification public long getNumSubmissions() { - return llapDaemon.getNumSubmissions(); + int numSubmissions = 0; + for (int i = 0 ; i < numInstances ; i++) { + numSubmissions += llapDaemons[i].getNumSubmissions(); + } + return numSubmissions; } - }