hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vik...@apache.org
Subject hive git commit: HIVE-10647: Hive on LLAP: Limit HS2 from overwhelming LLAP (Vikram Dixit K, reviewed by Gunther Hagleitner)
Date Fri, 26 Jun 2015 00:12:36 GMT
Repository: hive
Updated Branches:
  refs/heads/llap c5dc87a8e -> cc4075b53


HIVE-10647: Hive on LLAP: Limit HS2 from overwhelming LLAP (Vikram Dixit K, reviewed by Gunther
Hagleitner)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/cc4075b5
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/cc4075b5
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/cc4075b5

Branch: refs/heads/llap
Commit: cc4075b53006e4cfe4eae6c8d5f25b58f49fa3cc
Parents: c5dc87a
Author: vikram <vikram@hortonworks.com>
Authored: Thu Jun 25 17:08:32 2015 -0700
Committer: vikram <vikram@hortonworks.com>
Committed: Thu Jun 25 17:08:32 2015 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |  4 +-
 .../hive/ql/exec/tez/TezSessionPoolManager.java | 25 ++++++--
 .../apache/hadoop/hive/ql/exec/tez/TezTask.java |  6 +-
 .../hive/ql/optimizer/physical/LlapDecider.java |  8 ++-
 .../org/apache/hadoop/hive/ql/plan/TezWork.java | 17 +++--
 .../hive/ql/exec/tez/TestTezSessionPool.java    | 65 +++++++++++++++-----
 6 files changed, 96 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 32944bd..2c20d51 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -2241,7 +2241,9 @@ public class HiveConf extends Configuration {
         new StringSet("throw", "skip", "ignore"), "The approach msck should take with HDFS
" +
        "directories that are partition-like but contain unsupported characters. 'throw' (an
" +
        "exception) is the default; 'skip' will skip the invalid directories and still repair
the" +
-       " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many
cases)");
+       " others; 'ignore' will skip the validation (legacy behavior, causes bugs in many
cases)"),
+    HIVE_SERVER2_LLAP_CONCURRENT_QUERIES("hive.server2.llap.concurrent.queries", -1,
+        "The number of queries allowed in parallel via llap. Negative number implies 'infinite'.");
 
 
     public final String varname;

http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
index dfa539f..b1e9235 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezSessionPoolManager.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.Semaphore;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -45,8 +46,10 @@ public class TezSessionPoolManager {
   private static final Log LOG = LogFactory.getLog(TezSessionPoolManager.class);
 
   private BlockingQueue<TezSessionState> defaultQueuePool;
+  private Semaphore llapQueue;
   private int blockingQueueLength = -1;
   private HiveConf initConf = null;
+  int numConcurrentLlapQueries = -1;
 
   private boolean inited = false;
 
@@ -83,11 +86,15 @@ public class TezSessionPoolManager {
 
     String defaultQueues = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_SERVER2_TEZ_DEFAULT_QUEUES);
     int numSessions = conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_TEZ_SESSIONS_PER_DEFAULT_QUEUE);
+    numConcurrentLlapQueries =
+        conf.getIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES);
 
     // the list of queues is a comma separated list.
     String defaultQueueList[] = defaultQueues.split(",");
     defaultQueuePool =
         new ArrayBlockingQueue<TezSessionState>(numSessions * defaultQueueList.length);
+    llapQueue = new Semaphore(numConcurrentLlapQueries, true);
+
     this.initConf = conf;
     /*
      *  with this the ordering of sessions in the queue will be (with 2 sessions 3 queues)
@@ -164,8 +171,11 @@ public class TezSessionPoolManager {
     return retTezSessionState;
   }
 
-  public void returnSession(TezSessionState tezSessionState)
+  public void returnSession(TezSessionState tezSessionState, boolean llap)
       throws Exception {
+    if (llap && (this.numConcurrentLlapQueries > 0)) {
+      llapQueue.release();
+    }
     if (tezSessionState.isDefault()) {
       LOG.info("The session " + tezSessionState.getSessionId()
           + " belongs to the pool. Put it back in");
@@ -207,9 +217,9 @@ public class TezSessionPoolManager {
     return new TezSessionState(sessionId);
   }
 
-  public TezSessionState getSession(
-      TezSessionState session, HiveConf conf, boolean doOpen) throws Exception {
-    return getSession(session, conf, doOpen, false);
+  public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen,
+      boolean llap) throws Exception {
+    return getSession(session, conf, doOpen, false, llap);
   }
 
   /*
@@ -268,8 +278,11 @@ public class TezSessionPoolManager {
     return true;
   }
 
-  public TezSessionState getSession(TezSessionState session, HiveConf conf,
-      boolean doOpen, boolean forceCreate) throws Exception {
+  public TezSessionState getSession(TezSessionState session, HiveConf conf, boolean doOpen,
+      boolean forceCreate, boolean llap) throws Exception {
+    if (llap && (this.numConcurrentLlapQueries > 0)) {
+      llapQueue.acquire(); // blocks if no more llap queries can be submitted.
+    }
     if (canWorkWithSameSession(session, conf)) {
       return session;
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
index b2558d1..7f50bea 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java
@@ -114,7 +114,9 @@ public class TezTask extends Task<TezWork> {
       // Need to remove this static hack. But this is the way currently to get a session.
       SessionState ss = SessionState.get();
       session = ss.getTezSession();
-      session = TezSessionPoolManager.getInstance().getSession(session, conf, false);
+      session =
+          TezSessionPoolManager.getInstance().getSession(session, conf, false,
+              getWork().getLlapMode());
       ss.setTezSession(session);
 
       // jobConf will hold all the configuration for hadoop, tez, and hive
@@ -173,7 +175,7 @@ public class TezTask extends Task<TezWork> {
       // fetch the counters
       Set<StatusGetOpts> statusGetOpts = EnumSet.of(StatusGetOpts.GET_COUNTERS);
       counters = client.getDAGStatus(statusGetOpts).getDAGCounters();
-      TezSessionPoolManager.getInstance().returnSession(session);
+      TezSessionPoolManager.getInstance().returnSession(session, getWork().getLlapMode());
 
       if (LOG.isInfoEnabled() && counters != null
           && (conf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_SUMMARY) ||

http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
index 0a22f20..d49d83e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/physical/LlapDecider.java
@@ -102,8 +102,8 @@ public class LlapDecider implements PhysicalPlanResolver {
 
   class LlapDecisionDispatcher implements Dispatcher {
 
-    private PhysicalContext pctx;
-    private HiveConf conf;
+    private final PhysicalContext pctx;
+    private final HiveConf conf;
 
     public LlapDecisionDispatcher(PhysicalContext pctx) {
       this.pctx = pctx;
@@ -291,6 +291,7 @@ public class LlapDecider implements PhysicalPlanResolver {
       Map<Rule, NodeProcessor> opRules = new LinkedHashMap<Rule, NodeProcessor>();
       opRules.put(new RuleRegExp("No scripts", ScriptOperator.getOperatorName() + "%"),
           new NodeProcessor() {
+          @Override
           public Object process(Node n, Stack<Node> s, NodeProcessorCtx c,
               Object... os) {
             return new Boolean(false);
@@ -299,6 +300,7 @@ public class LlapDecider implements PhysicalPlanResolver {
       opRules.put(new RuleRegExp("No user code in fil",
               FilterOperator.getOperatorName() + "%"),
           new NodeProcessor() {
+          @Override
           public Object process(Node n, Stack<Node> s, NodeProcessorCtx c,
               Object... os) {
             ExprNodeDesc expr = ((FilterOperator)n).getConf().getPredicate();
@@ -308,6 +310,7 @@ public class LlapDecider implements PhysicalPlanResolver {
       opRules.put(new RuleRegExp("No user code in gby",
               GroupByOperator.getOperatorName() + "%"),
           new NodeProcessor() {
+          @Override
           public Object process(Node n, Stack<Node> s, NodeProcessorCtx c,
               Object... os) {
             List<AggregationDesc> aggs = ((GroupByOperator)n).getConf().getAggregators();
@@ -317,6 +320,7 @@ public class LlapDecider implements PhysicalPlanResolver {
       opRules.put(new RuleRegExp("No user code in select",
               SelectOperator.getOperatorName() + "%"),
           new NodeProcessor() {
+          @Override
           public Object process(Node n, Stack<Node> s, NodeProcessorCtx c,
               Object... os) {
             List<ExprNodeDesc> exprs = ((SelectOperator)n).getConf().getColList();

http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
index 7b91002..17c5ad7 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/TezWork.java
@@ -263,7 +263,7 @@ public class TezWork extends AbstractOperatorDesc {
 
   /*
    * Dependency is a class used for explain
-   */ 
+   */
   public class Dependency implements Serializable, Comparable<Dependency> {
     public BaseWork w;
     public EdgeType type;
@@ -272,7 +272,7 @@ public class TezWork extends AbstractOperatorDesc {
     public String getName() {
       return w.getName();
     }
-    
+
     @Explain(displayName = "Type")
     public String getType() {
       return type.toString();
@@ -306,7 +306,7 @@ public class TezWork extends AbstractOperatorDesc {
     }
     return result;
   }
-  
+
   private static final String MR_JAR_PROPERTY = "tmpjars";
   /**
    * Calls configureJobConf on instances of work that are part of this TezWork.
@@ -349,7 +349,7 @@ public class TezWork extends AbstractOperatorDesc {
   /**
    * connect adds an edge between a and b. Both nodes have
    * to be added prior to calling connect.
-   * @param  
+   * @param
    */
   public void connect(BaseWork a, BaseWork b,
       TezEdgeProperty edgeProp) {
@@ -396,4 +396,13 @@ public class TezWork extends AbstractOperatorDesc {
   public VertexType getVertexType(BaseWork w) {
     return workVertexTypeMap.get(w);
   }
+
+  public boolean getLlapMode() {
+    for (BaseWork work : getAllWork()) {
+      if (work.getLlapMode()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/cc4075b5/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
----------------------------------------------------------------------
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
index 37a84aa..c148aae 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/tez/TestTezSessionPool.java
@@ -55,13 +55,13 @@ public class TestTezSessionPool {
   public void testGetNonDefaultSession() {
     poolManager = new TestTezSessionPoolManager();
     try {
-      TezSessionState sessionState = poolManager.getSession(null, conf, true);
-      TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true);
+      TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
+      TezSessionState sessionState1 = poolManager.getSession(sessionState, conf, true, false);
       if (sessionState1 != sessionState) {
         fail();
       }
       conf.set("tez.queue.name", "nondefault");
-      TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true);
+      TezSessionState sessionState2 = poolManager.getSession(sessionState, conf, true, false);
       if (sessionState2 == sessionState) {
         fail();
       }
@@ -81,30 +81,30 @@ public class TestTezSessionPool {
       poolManager = new TestTezSessionPoolManager();
       poolManager.setupPool(conf);
       poolManager.startPool();
-      TezSessionState sessionState = poolManager.getSession(null, conf, true);
+      TezSessionState sessionState = poolManager.getSession(null, conf, true, false);
       if (sessionState.getQueueName().compareTo("a") != 0) {
         fail();
       }
-      poolManager.returnSession(sessionState);
+      poolManager.returnSession(sessionState, false);
 
-      sessionState = poolManager.getSession(null, conf, true);
+      sessionState = poolManager.getSession(null, conf, true, false);
       if (sessionState.getQueueName().compareTo("b") != 0) {
         fail();
       }
-      poolManager.returnSession(sessionState);
+      poolManager.returnSession(sessionState, false);
 
-      sessionState = poolManager.getSession(null, conf, true);
+      sessionState = poolManager.getSession(null, conf, true, false);
       if (sessionState.getQueueName().compareTo("c") != 0) {
         fail();
       }
-      poolManager.returnSession(sessionState);
+      poolManager.returnSession(sessionState, false);
 
-      sessionState = poolManager.getSession(null, conf, true);
+      sessionState = poolManager.getSession(null, conf, true, false);
       if (sessionState.getQueueName().compareTo("a") != 0) {
         fail();
       }
 
-      poolManager.returnSession(sessionState);
+      poolManager.returnSession(sessionState, false);
 
     } catch (Exception e) {
       e.printStackTrace();
@@ -112,8 +112,44 @@ public class TestTezSessionPool {
     }
   }
 
+  @Test
+  public void testLlapSessionQueuing() {
+    try {
+      random = new Random(1000);
+      conf.setIntVar(HiveConf.ConfVars.HIVE_SERVER2_LLAP_CONCURRENT_QUERIES, 2);
+      poolManager = new TestTezSessionPoolManager();
+      poolManager.setupPool(conf);
+      poolManager.startPool();
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail();
+    }
+
+    List<Thread> threadList = new ArrayList<Thread>();
+    for (int i = 0; i < 15; i++) {
+      Thread t = new Thread(new SessionThread(true));
+      threadList.add(t);
+      t.start();
+    }
+
+    for (Thread t : threadList) {
+      try {
+        t.join();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+        fail();
+      }
+    }
+  }
+
   public class SessionThread implements Runnable {
 
+    private boolean llap = false;
+
+    public SessionThread(boolean llap) {
+      this.llap = llap;
+    }
+
     @Override
     public void run() {
       try {
@@ -124,9 +160,9 @@ public class TestTezSessionPool {
           tmpConf.set("tez.queue.name", "");
         }
 
-        TezSessionState session = poolManager.getSession(null, tmpConf, true);
+        TezSessionState session = poolManager.getSession(null, tmpConf, true, llap);
         Thread.sleep((random.nextInt(9) % 10) * 1000);
-        poolManager.returnSession(session);
+        poolManager.returnSession(session, llap);
       } catch (Exception e) {
         e.printStackTrace();
       }
@@ -150,7 +186,8 @@ public class TestTezSessionPool {
     }
     List<Thread> threadList = new ArrayList<Thread>();
     for (int i = 0; i < 15; i++) {
-      Thread t = new Thread(new SessionThread());
+      Thread t = new Thread(new SessionThread(false));
+      threadList.add(t);
       t.start();
     }
 


Mime
View raw message