Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C456118770 for ; Wed, 27 Jan 2016 22:47:14 +0000 (UTC) Received: (qmail 20356 invoked by uid 500); 27 Jan 2016 22:47:14 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 20272 invoked by uid 500); 27 Jan 2016 22:47:14 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 20115 invoked by uid 99); 27 Jan 2016 22:47:14 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 27 Jan 2016 22:47:14 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 24AB6E00D6; Wed, 27 Jan 2016 22:47:14 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: sershe@apache.org To: commits@hive.apache.org Date: Wed, 27 Jan 2016 22:47:15 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [02/14] hive git commit: HIVE-12915 : Tez session pool has concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth) HIVE-12915 : Tez session pool has concurrency issues during init (Sergey Shelukhin, reviewed by Siddharth Seth) Conflicts: ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/c09cc1d2 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/c09cc1d2 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/c09cc1d2 Branch: refs/heads/branch-2.0 Commit: c09cc1d2d944d9b79a803cccd3f1f4570187e276 Parents: 36f8b75 Author: Sergey Shelukhin Authored: Wed Jan 27 14:13:40 2016 -0800 Committer: Sergey Shelukhin Committed: Wed Jan 27 14:17:21 2016 -0800 ---------------------------------------------------------------------- .../org/apache/hadoop/hive/conf/HiveConf.java | 11 +++ .../hive/ql/exec/spark/HashTableLoader.java | 1 - .../hive/ql/exec/tez/TezSessionPoolManager.java | 96 ++++++++++++-------- .../hive/ql/exec/tez/TestTezSessionPool.java | 2 +- .../apache/hive/service/server/HiveServer2.java | 12 ++- 5 files changed, 79 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java ---------------------------------------------------------------------- diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java index 7a25ece..a11d1bc 100644 --- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java +++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java @@ -3140,6 +3140,17 @@ public class HiveConf extends Configuration { return conf.getTrimmed(var.varname, var.defaultStrVal); } + public static String[] getTrimmedStringsVar(Configuration conf, ConfVars var) { + assert (var.valClass == String.class) : var.varname; + String[] result = conf.getTrimmedStrings(var.varname, (String[])null); + if (result != null) return result; + if (var.altName != null) { + result = conf.getTrimmedStrings(var.altName, (String[])null); + if (result != null) return result; + } + return org.apache.hadoop.util.StringUtils.getTrimmedStrings(var.defaultStrVal); + } + public static String getVar(Configuration conf, ConfVars var, String defaultVal) { if (var.altName != null) { return conf.get(var.varname, conf.get(var.altName, defaultVal)); http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java index 64474e6..1634f42 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/spark/HashTableLoader.java @@ -161,7 +161,6 @@ public class HashTableLoader implements org.apache.hadoop.hive.ql.exec.HashTable } MapJoinTableContainer mapJoinTable = SmallTableCache.get(path); if (mapJoinTable == null) { - // TODO#: HERE? synchronized (path.toString().intern()) { mapJoinTable = SmallTableCache.get(path); if (mapJoinTable == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java index 1321b5f..891d2a8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java @@ -26,16 +26,17 @@ import java.net.URISyntaxException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.Iterator; import java.util.LinkedList; import java.util.List; +import java.util.Queue; import java.util.Random; import java.util.Set; @@ -77,13 +78,18 @@ public class TezSessionPoolManager { private Semaphore llapQueue; - private int blockingQueueLength = -1; private HiveConf initConf = null; int numConcurrentLlapQueries = -1; private long sessionLifetimeMs = 0; private long sessionLifetimeJitterMs = 0; - - private boolean inited = false; + /** A queue for initial sessions that have not been started yet. */ + private Queue initialSessions = + new ConcurrentLinkedQueue(); + /** + * Indicates whether we should try to use defaultSessionPool. + * We assume that setupPool is either called before any activity, or not called at all. + */ + private volatile boolean hasInitialSessions = false; private static TezSessionPoolManager sessionPool = null; @@ -101,18 +107,23 @@ public class TezSessionPoolManager { protected TezSessionPoolManager() { } + private void startInitialSession(TezSessionPoolSession sessionState) throws Exception { + HiveConf newConf = new HiveConf(initConf); + boolean isUsable = sessionState.tryUse(); + if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup"); + newConf.set("tez.queue.name", sessionState.getQueueName()); + sessionState.open(newConf); + if (sessionState.returnAfterUse()) { + defaultQueuePool.put(sessionState); + } + } + public void startPool() throws Exception { - this.inited = true; - for (int i = 0; i < blockingQueueLength; i++) { - HiveConf newConf = new HiveConf(initConf); - TezSessionPoolSession sessionState = defaultQueuePool.take(); - boolean isUsable = sessionState.tryUse(); - if (!isUsable) throw new IOException(sessionState + " is not usable at pool startup"); - newConf.set("tez.queue.name", sessionState.getQueueName()); - sessionState.open(newConf); - if (sessionState.returnAfterUse()) { - defaultQueuePool.put(sessionState); - } + if (initialSessions.isEmpty()) return; + while (true) { + TezSessionPoolSession session = initialSessions.poll(); + if (session == null) break; + startInitialSession(session); } if (expirationThread != null) { expirationThread.start(); @@ -121,16 +132,32 @@ public class TezSessionPoolManager { } public void setupPool(HiveConf conf) throws InterruptedException { - String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); + String[] defaultQueueList = HiveConf.getTrimmedStringsVar( + conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES); + int emptyNames = 0; // We don't create sessions for empty entries. + for (String queueName : defaultQueueList) { + if (queueName.isEmpty()) { + ++emptyNames; + } + } int numSessions = conf.getIntVar(ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE); + int numSessionsTotal = numSessions * (defaultQueueList.length - emptyNames); + if (numSessionsTotal > 0) { + defaultQueuePool = new ArrayBlockingQueue(numSessionsTotal); + } + numConcurrentLlapQueries = conf.getIntVar(ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES); + llapQueue = new Semaphore(numConcurrentLlapQueries, true); + + this.initConf = conf; + sessionLifetimeMs = conf.getTimeVar( ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME, TimeUnit.MILLISECONDS); if (sessionLifetimeMs != 0) { sessionLifetimeJitterMs = conf.getTimeVar( ConfVars.HIVE_SERVER2_TEZ_SESSION_LIFETIME_JITTER, TimeUnit.MILLISECONDS); if (LOG.isDebugEnabled()) { - LOG.debug("Starting session expiration threads; session lifetime is " + LOG.debug("Session expiration is enabled; session lifetime is " + sessionLifetimeMs + " + [0, " + sessionLifetimeJitterMs + ") ms"); } expirationQueue = new PriorityBlockingQueue<>(11, new Comparator() { @@ -141,6 +168,11 @@ public class TezSessionPoolManager { } }); restartQueue = new LinkedBlockingQueue<>(); + } + this.hasInitialSessions = numSessionsTotal > 0; + // From this point on, session creation will wait for the default pool (if # of sessions > 0). + + if (sessionLifetimeMs != 0) { expirationThread = new Thread(new Runnable() { @Override public void run() { @@ -155,27 +187,18 @@ public class TezSessionPoolManager { }, "TezSessionPool-cleanup"); } - // the list of queues is a comma separated list. - String defaultQueueList[] = defaultQueues.split(","); - defaultQueuePool = new ArrayBlockingQueue( - numSessions * defaultQueueList.length); - llapQueue = new Semaphore(numConcurrentLlapQueries, true); - - this.initConf = conf; /* * with this the ordering of sessions in the queue will be (with 2 sessions 3 queues) * s1q1, s1q2, s1q3, s2q1, s2q2, s2q3 there by ensuring uniform distribution of * the sessions across queues at least to begin with. Then as sessions get freed up, the list * may change this ordering. */ - blockingQueueLength = 0; for (int i = 0; i < numSessions; i++) { - for (String queue : defaultQueueList) { - if (queue.length() == 0) { + for (String queueName : defaultQueueList) { + if (queueName.isEmpty()) { continue; } - defaultQueuePool.put(createAndInitSession(queue, true)); - blockingQueueLength++; + initialSessions.add(createAndInitSession(queueName, true)); } } } @@ -196,7 +219,6 @@ public class TezSessionPoolManager { private TezSessionState getSession(HiveConf conf, boolean doOpen, boolean forceCreate) throws Exception { - String queueName = conf.get("tez.queue.name"); boolean nonDefaultUser = conf.getBoolVar(HiveConf.ConfVars.HIVE_SERVER2_ENABLE_DOAS); @@ -207,12 +229,10 @@ public class TezSessionPoolManager { * their own credentials. We expect that with the new security model, things will * run as user hive in most cases. */ - if (forceCreate || !(this.inited) - || ((queueName != null) && (!queueName.isEmpty())) - || (nonDefaultUser) || (defaultQueuePool == null) || (blockingQueueLength <= 0)) { + if (forceCreate || nonDefaultUser || !hasInitialSessions + || ((queueName != null) && !queueName.isEmpty())) { LOG.info("QueueName: " + queueName + " nonDefaultUser: " + nonDefaultUser + - " defaultQueuePool: " + defaultQueuePool + - " blockingQueueLength: " + blockingQueueLength); + " defaultQueuePool: " + defaultQueuePool + " hasInitialSessions: " + hasInitialSessions); return getNewSessionState(conf, queueName, doOpen); } @@ -266,7 +286,7 @@ public class TezSessionPoolManager { // session in the SessionState } - public void closeIfNotDefault( + public static void closeIfNotDefault( TezSessionState tezSessionState, boolean keepTmpDir) throws Exception { LOG.info("Closing tez session default? " + tezSessionState.isDefault()); if (!tezSessionState.isDefault()) { @@ -275,7 +295,7 @@ public class TezSessionPoolManager { } public void stop() throws Exception { - if ((sessionPool == null) || (this.inited == false)) { + if ((sessionPool == null) || !this.hasInitialSessions) { return; } @@ -328,7 +348,7 @@ public class TezSessionPoolManager { * sessions for e.g. when a CLI session is started. The CLI session could re-use the * same tez session eliminating the latencies of new AM and containers. */ - private boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) + private static boolean canWorkWithSameSession(TezSessionState session, HiveConf conf) throws HiveException { if (session == null || conf == null) { return false; http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java index a2791a1..a38f4ba 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java @@ -124,7 +124,7 @@ public class TestTezSessionPool { poolManager.setupPool(conf); poolManager.startPool(); } catch (Exception e) { - e.printStackTrace(); + LOG.error("Initialization error", e); fail(); } http://git-wip-us.apache.org/repos/asf/hive/blob/c09cc1d2/service/src/java/org/apache/hive/service/server/HiveServer2.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/server/HiveServer2.java b/service/src/java/org/apache/hive/service/server/HiveServer2.java index 2f55591..512a810 100644 --- a/service/src/java/org/apache/hive/service/server/HiveServer2.java +++ b/service/src/java/org/apache/hive/service/server/HiveServer2.java @@ -500,6 +500,13 @@ public class HiveServer2 extends CompositeService { maxAttempts = hiveConf.getLongVar(HiveConf.ConfVars.HIVE_SERVER2_MAX_START_ATTEMPTS); HiveServer2 server = null; try { + // Initialize the pool before we start the server; don't start yet. + TezSessionPoolManager sessionPool = null; + if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { + sessionPool = TezSessionPoolManager.getInstance(); + sessionPool.setupPool(hiveConf); + } + // Cleanup the scratch dir before starting ServerUtils.cleanUpScratchDir(hiveConf); server = new HiveServer2(); @@ -522,9 +529,8 @@ public class HiveServer2 extends CompositeService { if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_SUPPORT_DYNAMIC_SERVICE_DISCOVERY)) { server.addServerInstanceToZooKeeper(hiveConf); } - if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_TEZ_INITIALIZE_DEFAULT_SESSIONS)) { - TezSessionPoolManager sessionPool = TezSessionPoolManager.getInstance(); - sessionPool.setupPool(hiveConf); + + if (sessionPool != null) { sessionPool.startPool(); }