hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject hive git commit: HIVE-20549: Allow user set query tag, and kill query with tag (Daniel Dai, reviewed by Thejas Nair, Sergey Shelukhin)
Date Fri, 19 Oct 2018 03:54:52 GMT
Repository: hive
Updated Branches:
  refs/heads/master 3e5e77d1f -> 3963c729f


HIVE-20549: Allow user set query tag, and kill query with tag (Daniel Dai, reviewed by Thejas
Nair, Sergey Shelukhin)

Signed-off-by: Thejas M Nair <thejas@hortonworks.com>, sergey <sershe@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/3963c729
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/3963c729
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/3963c729

Branch: refs/heads/master
Commit: 3963c729fabf90009cb67d277d40fe5913936358
Parents: 3e5e77d
Author: Daniel Dai <daijyc@gmail.com>
Authored: Thu Oct 18 20:50:14 2018 -0700
Committer: Daniel Dai <daijyc@gmail.com>
Committed: Thu Oct 18 20:50:19 2018 -0700

----------------------------------------------------------------------
 .../org/apache/hadoop/hive/conf/HiveConf.java   |   5 +
 .../hive/jdbc/TestJdbcWithMiniLlapArrow.java    | 151 +++++++++++++------
 .../java/org/apache/hadoop/hive/ql/Driver.java  |   7 +-
 .../org/apache/hadoop/hive/ql/QueryState.java   |  23 +--
 .../ql/exec/tez/KillTriggerActionHandler.java   |   5 +
 .../hive/ql/exec/tez/WorkloadManager.java       |   3 +
 .../ql/parse/ReplicationSemanticAnalyzer.java   |   2 +-
 .../clientnegative/authorization_kill_query.q   |  15 --
 .../service/cli/operation/OperationManager.java |  29 ++--
 .../hive/service/server/KillQueryImpl.java      | 112 ++++++++++----
 10 files changed, 240 insertions(+), 112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
----------------------------------------------------------------------
diff --git a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
index 8a06cb5..07d5205 100644
--- a/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
+++ b/common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
@@ -1535,6 +1535,10 @@ public class HiveConf extends Configuration {
     HIVEQUERYID("hive.query.id", "",
         "ID for query being executed (might be multiple per a session)"),
 
+    HIVEQUERYTAG("hive.query.tag", null, "Tag for the queries in the session. User can kill
the queries with the tag " +
+        "in another session. Currently there is no tag duplication check, user need to make
sure his tag is unique. " +
+        "Also 'kill query' needs to be issued to all HiveServer2 instances to proper kill
the queries"),
+
     HIVESPARKJOBNAMELENGTH("hive.spark.jobname.length", 100000, "max jobname length for Hive
on " +
             "Spark queries"),
     HIVEJOBNAMELENGTH("hive.jobname.length", 50, "max jobname length"),
@@ -5497,6 +5501,7 @@ public class HiveConf extends Configuration {
     ConfVars.SHOW_JOB_FAIL_DEBUG_INFO.varname,
     ConfVars.TASKLOG_DEBUG_TIMEOUT.varname,
     ConfVars.HIVEQUERYID.varname,
+    ConfVars.HIVEQUERYTAG.varname,
   };
 
   /**

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
index e125ba3..b69a2f9 100644
--- a/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
+++ b/itests/hive-unit/src/test/java/org/apache/hive/jdbc/TestJdbcWithMiniLlapArrow.java
@@ -41,8 +41,8 @@ import org.apache.hadoop.hive.llap.LlapArrowRowInputFormat;
 import org.apache.hive.jdbc.miniHS2.MiniHS2;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.fs.permission.FsPermission;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -57,6 +57,7 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
   private static final String tableName = "testJdbcMinihs2Tbl";
   private static String dataFileDir;
   private static final String testDbName = "testJdbcMinihs2";
+  private static final String tag = "mytag";
 
   private static class ExceptionHolder {
     Throwable throwable;
@@ -66,6 +67,12 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
   public static void beforeTest() throws Exception {
     HiveConf conf = defaultConf();
     conf.setBoolVar(ConfVars.LLAP_OUTPUT_FORMAT_ARROW, true);
+    conf.setVar(ConfVars.HIVE_AUTHENTICATOR_MANAGER, "org.apache.hadoop.hive.ql.security"
+
+            ".SessionStateUserAuthenticator");
+    conf.setVar(ConfVars.USERS_IN_ADMIN_ROLE, System.getProperty("user.name"));
+    conf.setBoolVar(ConfVars.HIVE_AUTHORIZATION_ENABLED, true);
+    conf.setVar(ConfVars.HIVE_AUTHORIZATION_SQL_STD_AUTH_CONFIG_WHITELIST_APPEND, ConfVars.HIVE_SUPPORT_CONCURRENCY
+            .varname + "|" + ConfVars.HIVE_SERVER2_ENABLE_DOAS.varname);
     MiniHS2.cleanupLocalDir();
     miniHS2 = BaseJdbcWithMiniLlap.beforeTest(conf);
     dataFileDir = conf.get("test.data.files").replace('\\', '/').replace("c:", "");
@@ -73,8 +80,19 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap {
     Connection conDefault = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(),
             System.getProperty("user.name"), "bar");
     Statement stmt = conDefault.createStatement();
+    String tblName = testDbName + "." + tableName;
+    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
+    String udfName = SleepMsUDF.class.getName();
     stmt.execute("drop database if exists " + testDbName + " cascade");
     stmt.execute("create database " + testDbName);
+    stmt.execute("set role admin");
+    stmt.execute("dfs -put " + dataFilePath.toString() + " " + "kv1.txt");
+    stmt.execute("use " + testDbName);
+    stmt.execute("create table " + tblName + " (int_col int, value string) ");
+    stmt.execute("load data inpath 'kv1.txt' into table " + tblName);
+    stmt.execute("create function sleepMsUDF as '" + udfName + "'");
+    stmt.execute("grant select on table " + tblName + " to role public");
+
     stmt.close();
     conDefault.close();
   }
@@ -291,29 +309,16 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap
{
    * that runs for a sufficiently long time.
    * @throws Exception
    */
-  @Test
-  public void testKillQuery() throws Exception {
-    Connection con = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
+  private void testKillQueryInternal(String user, String killUser, boolean useTag, final
+      ExceptionHolder stmtHolder, final ExceptionHolder tKillHolder) throws Exception {
+    Connection con1 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
+            user, "bar");
     Connection con2 = BaseJdbcWithMiniLlap.getConnection(miniHS2.getJdbcURL(testDbName),
-            System.getProperty("user.name"), "bar");
+            killUser, "bar");
 
-    String udfName = SleepMsUDF.class.getName();
-    Statement stmt1 = con.createStatement();
     final Statement stmt2 = con2.createStatement();
-    Path dataFilePath = new Path(dataFileDir, "kv1.txt");
-
-    String tblName = testDbName + "." + tableName;
-
-    stmt1.execute("create temporary function sleepMsUDF as '" + udfName + "'");
-    stmt1.execute("create table " + tblName + " (int_col int, value string) ");
-    stmt1.execute("load data local inpath '" + dataFilePath.toString() + "' into table "
+ tblName);
-
-
-    stmt1.close();
-    final Statement stmt = con.createStatement();
-    final ExceptionHolder tExecuteHolder = new ExceptionHolder();
-    final ExceptionHolder tKillHolder = new ExceptionHolder();
+    final HiveStatement stmt = (HiveStatement)con1.createStatement();
+    final StringBuffer stmtQueryId = new StringBuffer();
 
     // Thread executing the query
     Thread tExecute = new Thread(new Runnable() {
@@ -323,46 +328,104 @@ public class TestJdbcWithMiniLlapArrow extends BaseJdbcWithMiniLlap
{
           System.out.println("Executing query: ");
           stmt.execute("set hive.llap.execution.mode = none");
 
+          if (useTag) {
+            stmt.execute("set hive.query.tag = " + tag);
+          }
           // The test table has 500 rows, so total query time should be ~ 500*500ms
-          stmt.executeQuery("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col "
+
+          stmt.executeAsync("select sleepMsUDF(t1.int_col, 100), t1.int_col, t2.int_col "
+
                   "from " + tableName + " t1 join " + tableName + " t2 on t1.int_col = t2.int_col");
+          stmtQueryId.append(stmt.getQueryId());
+          stmt.getUpdateCount();
         } catch (SQLException e) {
-          tExecuteHolder.throwable = e;
+          stmtHolder.throwable = e;
         }
       }
     });
-    // Thread killing the query
-    Thread tKill = new Thread(new Runnable() {
-      @Override
-      public void run() {
-        try {
-          Thread.sleep(5000);
-          String queryId = ((HiveStatement) stmt).getQueryId();
-          System.out.println("Killing query: " + queryId);
-          stmt2.execute("kill query '" + queryId + "'");
-          stmt2.close();
-        } catch (Exception e) {
-          tKillHolder.throwable = e;
+
+    tExecute.start();
+
+    // wait for other thread to create the stmt handle
+    int count = 0;
+    while (count < 10) {
+      try {
+        tKillHolder.throwable = null;
+        Thread.sleep(2000);
+        String queryId;
+        if (useTag) {
+          queryId = tag;
+        } else {
+          if (stmtQueryId.length() != 0) {
+            queryId = stmtQueryId.toString();
+          } else {
+            continue;
+          }
         }
+        System.out.println("Killing query: " + queryId);
+        if (killUser.equals(System.getProperty("user.name"))) {
+          stmt2.execute("set role admin");
+        }
+        stmt2.execute("kill query '" + queryId + "'");
+        stmt2.close();
+        break;
+      } catch (SQLException e) {
+        count++;
+        LOG.warn("Exception when kill query", e);
+        tKillHolder.throwable = e;
       }
-    });
+    }
 
-    tExecute.start();
-    tKill.start();
     tExecute.join();
-    tKill.join();
     try {
       stmt.close();
+      con1.close();
       con2.close();
-      con.close();
-      // We check the result
-      assertNotNull("tExecute", tExecuteHolder.throwable);
-      assertNull("tCancel", tKillHolder.throwable);
     } catch (Exception e) {
       // ignore error
-      LOG.error("Exception in testKillQuery", e);
+      LOG.warn("Exception when close stmt and con", e);
     }
   }
 
+  @Test
+  @Override
+  public void testKillQuery() throws Exception {
+    testKillQueryById();
+    testKillQueryByTagNegative();
+    testKillQueryByTagAdmin();
+    testKillQueryByTagOwner();
+  }
+
+  public void testKillQueryById() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal(System.getProperty("user.name"), System.getProperty("user.name"),
false,
+            tExecuteHolder, tKillHolder);
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  public void testKillQueryByTagNegative() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", "user2", true, tExecuteHolder, tKillHolder);
+    assertNull("tExecute", tExecuteHolder.throwable);
+    assertNotNull("tCancel", tKillHolder.throwable);
+    assertTrue(tKillHolder.throwable.getMessage(), tKillHolder.throwable.getMessage().contains("No
privilege"));
+  }
+
+  public void testKillQueryByTagAdmin() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", System.getProperty("user.name"), true, tExecuteHolder,
tKillHolder);
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
+
+  public void testKillQueryByTagOwner() throws Exception {
+    ExceptionHolder tExecuteHolder = new ExceptionHolder();
+    ExceptionHolder tKillHolder = new ExceptionHolder();
+    testKillQueryInternal("user1", "user1", true, tExecuteHolder, tKillHolder);
+    assertNotNull("tExecute", tExecuteHolder.throwable);
+    assertNull("tCancel", tKillHolder.throwable);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
index 6990be0..84316bc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/Driver.java
@@ -707,7 +707,12 @@ public class Driver implements IDriver {
 
         try {
           perfLogger.PerfLogBegin(CLASS_NAME, PerfLogger.DO_AUTHORIZATION);
-          doAuthorization(queryState.getHiveOperation(), sem, command);
+          // Authorization check for kill query will be in KillQueryImpl
+          // As both admin or operation owner can perform the operation.
+          // Which is not directly supported in authorizer
+          if (queryState.getHiveOperation() != HiveOperation.KILL_QUERY) {
+            doAuthorization(queryState.getHiveOperation(), sem, command);
+          }
         } catch (AuthorizationException authExp) {
           console.printError("Authorization failed:" + authExp.getMessage()
               + ". Use SHOW GRANT to get more details.");

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
index 028dd60..267f7d0 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/QueryState.java
@@ -23,7 +23,9 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.plan.HiveOperation;
 import org.apache.hadoop.hive.ql.session.LineageState;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.mapreduce.MRJobConfig;
+import org.apache.tez.dag.api.TezConfiguration;
 
 /**
  * The class to store query level info such as queryId. Multiple queries can run
@@ -55,10 +57,7 @@ public class QueryState {
    */
   private long numModifiedRows = 0;
 
-  // Holds the tag supplied by user to uniquely identify the query. Can be used to kill the
query if the query
-  // id cannot be queried for some reason like hive server restart.
-  private String queryTag = null;
-
+  static public final String USERID_TAG = "userid";
   /**
    * Private constructor, use QueryState.Builder instead.
    * @param conf The query specific configuration object
@@ -120,21 +119,25 @@ public class QueryState {
   }
 
   public String getQueryTag() {
-    return queryTag;
+    return HiveConf.getVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
   }
 
   public void setQueryTag(String queryTag) {
-    this.queryTag = queryTag;
+    HiveConf.setVar(this.queryConf, HiveConf.ConfVars.HIVEQUERYTAG, queryTag);
   }
 
-  public static void setMapReduceJobTag(HiveConf queryConf, String queryTag) {
-    String jobTag = queryConf.get(MRJobConfig.JOB_TAGS);
-    if (jobTag == null) {
+  public static void setApplicationTag(HiveConf queryConf, String queryTag) {
+    String jobTag = HiveConf.getVar(queryConf, HiveConf.ConfVars.HIVEQUERYTAG);
+    if (jobTag == null || jobTag.isEmpty()) {
       jobTag = queryTag;
     } else {
       jobTag = jobTag.concat("," + queryTag);
     }
+    if (SessionState.get() != null) {
+      jobTag = jobTag.concat("," + USERID_TAG + "=" + SessionState.get().getUserName());
+    }
     queryConf.set(MRJobConfig.JOB_TAGS, jobTag);
+    queryConf.set(TezConfiguration.TEZ_APPLICATION_TAGS, jobTag);
   }
 
   /**
@@ -246,7 +249,7 @@ public class QueryState {
       if (generateNewQueryId) {
         String queryId = QueryPlan.makeQueryId();
         queryConf.setVar(HiveConf.ConfVars.HIVEQUERYID, queryId);
-        setMapReduceJobTag(queryConf, queryId);
+        setApplicationTag(queryConf, queryId);
 
         // FIXME: druid storage handler relies on query.id to maintain some staging directories
         // expose queryid to session level

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
index f357775..ee539ba 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/KillTriggerActionHandler.java
@@ -18,8 +18,10 @@ package org.apache.hadoop.hive.ql.exec.tez;
 
 import java.util.Map;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.hive.ql.wm.Trigger;
 import org.apache.hadoop.hive.ql.wm.TriggerActionHandler;
 import org.slf4j.Logger;
@@ -39,6 +41,9 @@ public class KillTriggerActionHandler implements TriggerActionHandler<TezSession
           TezSessionState sessionState = entry.getKey();
           String queryId = sessionState.getWmContext().getQueryId();
           try {
+            SessionState ss = new SessionState(new HiveConf());
+            ss.setIsHiveServerQuery(true);
+            SessionState.start(ss);
             KillQuery killQuery = sessionState.getKillQuery();
             // if kill query is null then session might have been released to pool or closed
already
             if (killQuery != null) {

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
index 5326e35..f8fa0cd 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/WorkloadManager.java
@@ -427,6 +427,9 @@ public class WorkloadManager extends TezSessionPoolSession.AbstractTriggerValida
       final String reason = killCtx.reason;
       LOG.info("Killing query for {}", toKill);
       workPool.submit(() -> {
+        SessionState ss = new SessionState(new HiveConf());
+        ss.setIsHiveServerQuery(true);
+        SessionState.start(ss);
         // Note: we get query ID here, rather than in the caller, where it would be more
correct
         //       because we know which exact query we intend to kill. This is valid because
we
         //       are not expecting query ID to change - we never reuse the session for which
a

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 87c69cf..f1fcd6e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -385,7 +385,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         if (key.equalsIgnoreCase(HIVEQUERYID.varname)) {
           String queryTag = config.getValue();
           if (!StringUtils.isEmpty(queryTag)) {
-            QueryState.setMapReduceJobTag(conf, queryTag);
+            QueryState.setApplicationTag(conf, queryTag);
           }
           queryState.setQueryTag(queryTag);
         } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/ql/src/test/queries/clientnegative/authorization_kill_query.q
----------------------------------------------------------------------
diff --git a/ql/src/test/queries/clientnegative/authorization_kill_query.q b/ql/src/test/queries/clientnegative/authorization_kill_query.q
deleted file mode 100644
index 5379f87..0000000
--- a/ql/src/test/queries/clientnegative/authorization_kill_query.q
+++ /dev/null
@@ -1,15 +0,0 @@
-set hive.security.authorization.enabled=true;
-set hive.test.authz.sstd.hs2.mode=true;
-set hive.security.authorization.manager=org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactoryForTest;
-set hive.security.authenticator.manager=org.apache.hadoop.hive.ql.security.SessionStateConfigUserAuthenticator;
-
-set user.name=hive_admin_user;
-set role ADMIN;
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';
-
-set user.name=ruser1;
-
--- kill query as non-admin should fail
-explain authorization kill query 'dummyqueryid';
-kill query 'dummyqueryid';

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
index 8db6a29..61d5e88 100644
--- a/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
+++ b/service/src/java/org/apache/hive/service/cli/operation/OperationManager.java
@@ -22,12 +22,18 @@ import java.sql.SQLException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import com.google.common.collect.Multimap;
+import com.google.common.collect.MultimapBuilder;
+import com.google.common.collect.Multimaps;
+import com.google.common.collect.SetMultimap;
 import org.apache.hadoop.hive.common.metrics.common.Metrics;
 import org.apache.hadoop.hive.common.metrics.common.MetricsConstant;
 import org.apache.hadoop.hive.common.metrics.common.MetricsFactory;
@@ -62,7 +68,8 @@ public class OperationManager extends AbstractService {
       new ConcurrentHashMap<OperationHandle, Operation>();
   private final ConcurrentHashMap<String, Operation> queryIdOperation =
       new ConcurrentHashMap<String, Operation>();
-  private final ConcurrentHashMap<String, String> queryTagToIdMap = new ConcurrentHashMap<>();
+  private final SetMultimap<String, String> queryTagToIdMap =
+          Multimaps.synchronizedSetMultimap(MultimapBuilder.hashKeys().hashSetValues().build());
 
   //Following fields for displaying queries on WebUI
   private Object webuiLock = new Object();
@@ -205,12 +212,7 @@ public class OperationManager extends AbstractService {
   public void updateQueryTag(String queryId, String queryTag) {
     Operation operation = queryIdOperation.get(queryId);
     if (operation != null) {
-      String queryIdTemp = queryTagToIdMap.get(queryTag);
-      if (queryIdTemp != null) {
-        throw new RuntimeException("tag " + queryTag + " is already applied for query " +
queryIdTemp);
-      }
       queryTagToIdMap.put(queryTag, queryId);
-      LOG.info("Query " + queryId + " is updated with tag " + queryTag);
       return;
     }
     LOG.info("Query id is missing during query tag updation");
@@ -225,7 +227,7 @@ public class OperationManager extends AbstractService {
     queryIdOperation.remove(queryId);
     String queryTag = operation.getQueryTag();
     if (queryTag != null) {
-      queryTagToIdMap.remove(queryTag);
+      queryTagToIdMap.remove(queryTag, queryId);
     }
     LOG.info("Removed queryId: {} corresponding to operation: {} with tag: {}", queryId,
opHandle, queryTag);
     if (operation instanceof SQLOperation) {
@@ -442,11 +444,14 @@ public class OperationManager extends AbstractService {
     return queryIdOperation.get(queryId);
   }
 
-  public Operation getOperationByQueryTag(String queryTag) {
-    String queryId = queryTagToIdMap.get(queryTag);
-    if (queryId != null) {
-      return getOperationByQueryId(queryId);
+  public Set<Operation> getOperationsByQueryTag(String queryTag) {
+    Set<String> queryIds = queryTagToIdMap.get(queryTag);
+    Set<Operation> result = new HashSet<Operation>();
+    for (String queryId : queryIds) {
+      if (queryId != null && getOperationByQueryId(queryId) != null) {
+        result.add(getOperationByQueryId(queryId));
+      }
     }
-    return null;
+    return result;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/3963c729/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
----------------------------------------------------------------------
diff --git a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
index 490a04d..c7f2c91 100644
--- a/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
+++ b/service/src/java/org/apache/hive/service/server/KillQueryImpl.java
@@ -21,8 +21,13 @@ package org.apache.hive.service.server;
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.QueryState;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveAuthzContext;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HiveOperationType;
+import org.apache.hadoop.hive.ql.security.authorization.plugin.HivePrivilegeObject;
 import org.apache.hadoop.hive.ql.session.KillQuery;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
 import org.apache.hadoop.yarn.api.protocolrecords.ApplicationsRequestScope;
 import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest;
@@ -40,6 +45,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
@@ -49,6 +55,7 @@ public class KillQueryImpl implements KillQuery {
   private final static Logger LOG = LoggerFactory.getLogger(KillQueryImpl.class);
 
   private final OperationManager operationManager;
+  private enum TagOrId {TAG, ID, UNKNOWN};
 
   public KillQueryImpl(OperationManager operationManager) {
     this.operationManager = operationManager;
@@ -64,7 +71,10 @@ public class KillQueryImpl implements KillQuery {
     GetApplicationsResponse apps = proxy.getApplications(gar);
     List<ApplicationReport> appsList = apps.getApplicationList();
     for(ApplicationReport appReport : appsList) {
-      childYarnJobs.add(appReport.getApplicationId());
+      if (isAdmin() || appReport.getApplicationTags().contains(QueryState.USERID_TAG + "="
+ SessionState.get()
+              .getUserName())) {
+        childYarnJobs.add(appReport.getApplicationId());
+      }
     }
 
     if (childYarnJobs.isEmpty()) {
@@ -81,6 +91,7 @@ public class KillQueryImpl implements KillQuery {
       if (tag == null) {
         return;
       }
+      LOG.info("Killing yarn jobs using query tag:" + tag);
       Set<ApplicationId> childYarnJobs = getChildYarnJobs(conf, tag);
       if (!childYarnJobs.isEmpty()) {
         YarnClient yarnClient = YarnClient.createYarnClient();
@@ -91,44 +102,87 @@ public class KillQueryImpl implements KillQuery {
         }
       }
     } catch (IOException | YarnException ye) {
-      throw new RuntimeException("Exception occurred while killing child job(s)", ye);
+      LOG.warn("Exception occurred while killing child job({})", ye);
+    }
+  }
+
+  private static boolean isAdmin() {
+    boolean isAdmin = false;
+    if (SessionState.get().getAuthorizerV2() != null) {
+      try {
+        SessionState.get().getAuthorizerV2().checkPrivileges(HiveOperationType.KILL_QUERY,
+                new ArrayList<HivePrivilegeObject>(), new ArrayList<HivePrivilegeObject>(),
+                new HiveAuthzContext.Builder().build());
+        isAdmin = true;
+      } catch (Exception e) {
+      }
+    }
+    return isAdmin;
+  }
+
+  private boolean cancelOperation(Operation operation, boolean isAdmin, String errMsg) throws
+          HiveSQLException {
+    if (isAdmin || operation.getParentSession().getUserName().equals(SessionState.get()
+            .getAuthenticator().getUserName())) {
+      OperationHandle handle = operation.getHandle();
+      operationManager.cancelOperation(handle, errMsg);
+      return true;
+    } else {
+      return false;
     }
   }
 
   @Override
-  public void killQuery(String queryId, String errMsg, HiveConf conf) throws HiveException
{
+  public void killQuery(String queryIdOrTag, String errMsg, HiveConf conf) throws HiveException
{
     try {
-      String queryTag = null;
-
-      Operation operation = operationManager.getOperationByQueryId(queryId);
-      if (operation == null) {
-        // Check if user has passed the query tag to kill the operation. This is possible
if the application
-        // restarts and it does not have the proper query id. The tag can be used in that
case to kill the query.
-        operation = operationManager.getOperationByQueryTag(queryId);
-        if (operation == null) {
-          LOG.info("Query not found: " + queryId);
-        }
+      TagOrId tagOrId = TagOrId.UNKNOWN;
+      Set<Operation> operationsToKill = new HashSet<Operation>();
+      if (operationManager.getOperationByQueryId(queryIdOrTag) != null) {
+        operationsToKill.add(operationManager.getOperationByQueryId(queryIdOrTag));
+        tagOrId = TagOrId.ID;
       } else {
-        // This is the normal flow, where the query is tagged and user wants to kill the
query using the query id.
-        queryTag = operation.getQueryTag();
+        operationsToKill.addAll(operationManager.getOperationsByQueryTag(queryIdOrTag));
+        if (!operationsToKill.isEmpty()) {
+          tagOrId = TagOrId.TAG;
+        }
       }
-
-      if (queryTag == null) {
-        //use query id as tag if user wanted to kill only the yarn jobs after hive server
restart. The yarn jobs are
-        //tagged with query id by default. This will cover the case where the application
after restarts wants to kill
-        //the yarn jobs with query tag. The query tag can be passed as query id.
-        queryTag = queryId;
+      if (operationsToKill.isEmpty()) {
+        LOG.info("Query not found: " + queryIdOrTag);
       }
-
-      LOG.info("Killing yarn jobs for query id : " + queryId + " using tag :" + queryTag);
-      killChildYarnJobs(conf, queryTag);
-
-      if (operation != null) {
-        OperationHandle handle = operation.getHandle();
-        operationManager.cancelOperation(handle, errMsg);
+      boolean admin = isAdmin();
+      switch(tagOrId) {
+        case ID:
+          Operation operation = operationsToKill.iterator().next();
+          boolean canceled = cancelOperation(operation, admin, errMsg);
+          if (canceled) {
+            String queryTag = operation.getQueryTag();
+            if (queryTag == null) {
+              queryTag = queryIdOrTag;
+            }
+            killChildYarnJobs(conf, queryTag);
+          } else {
+            // no privilege to cancel
+            throw new HiveSQLException("No privilege");
+          }
+          break;
+        case TAG:
+          int numCanceled = 0;
+          for (Operation operationToKill : operationsToKill) {
+            if (cancelOperation(operationToKill, admin, errMsg)) {
+              numCanceled++;
+            }
+          }
+          killChildYarnJobs(conf, queryIdOrTag);
+          if (numCanceled == 0) {
+            throw new HiveSQLException("No privilege");
+          }
+          break;
+        case UNKNOWN:
+          killChildYarnJobs(conf, queryIdOrTag);
+          break;
       }
     } catch (HiveSQLException e) {
-      LOG.error("Kill query failed for query " + queryId, e);
+      LOG.error("Kill query failed for query " + queryIdOrTag, e);
       throw new HiveException(e.getMessage(), e);
     }
   }


Mime
View raw message