hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1619567 - in /hive/trunk: metastore/src/java/org/apache/hadoop/hive/metastore/txn/ ql/src/java/org/apache/hadoop/hive/ql/lockmgr/ ql/src/test/org/apache/hadoop/hive/ql/lockmgr/ ql/src/test/org/apache/hadoop/hive/ql/parse/
Date Thu, 21 Aug 2014 20:32:21 GMT
Author: hashutosh
Date: Thu Aug 21 20:32:21 2014
New Revision: 1619567

URL: http://svn.apache.org/r1619567
Log:
HIVE-7281 : DbTxnManager acquiring wrong level of lock for dynamic partitioning (Alan Gates
via Ashutosh Chauhan)

Modified:
    hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
    hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
    hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java

Modified: hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java
URL: http://svn.apache.org/viewvc/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java (original)
+++ hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/txn/TxnDbUtil.java Thu
Aug 21 20:32:21 2014
@@ -49,121 +49,135 @@ public class TxnDbUtil {
     // intended for creating derby databases, and thus will inexorably get
     // out of date with it.  I'm open to any suggestions on how to make this
     // read the file in a build friendly way.
-    Connection conn = getConnection();
-    Statement s = conn.createStatement();
-    s.execute("CREATE TABLE TXNS (" +
-        "  TXN_ID bigint PRIMARY KEY," +
-        "  TXN_STATE char(1) NOT NULL," +
-        "  TXN_STARTED bigint NOT NULL," +
-        "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
-        "  TXN_USER varchar(128) NOT NULL," +
-        "  TXN_HOST varchar(128) NOT NULL)");
-
-        s.execute("CREATE TABLE TXN_COMPONENTS (" +
-        "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
-        "  TC_DATABASE varchar(128) NOT NULL," +
-        "  TC_TABLE varchar(128)," +
-        "  TC_PARTITION varchar(767))");
-    s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
-        "  CTC_TXNID bigint," +
-        "  CTC_DATABASE varchar(128) NOT NULL," +
-        "  CTC_TABLE varchar(128)," +
-        "  CTC_PARTITION varchar(767))");
-    s.execute("CREATE TABLE NEXT_TXN_ID (" +
-        "  NTXN_NEXT bigint NOT NULL)");
-    s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
-    s.execute("CREATE TABLE HIVE_LOCKS (" +
-        " HL_LOCK_EXT_ID bigint NOT NULL," +
-        " HL_LOCK_INT_ID bigint NOT NULL," +
-        " HL_TXNID bigint," +
-        " HL_DB varchar(128) NOT NULL," +
-        " HL_TABLE varchar(128)," +
-        " HL_PARTITION varchar(767)," +
-        " HL_LOCK_STATE char(1) NOT NULL," +
-        " HL_LOCK_TYPE char(1) NOT NULL," +
-        " HL_LAST_HEARTBEAT bigint NOT NULL," +
-        " HL_ACQUIRED_AT bigint," +
-        " HL_USER varchar(128) NOT NULL," +
-        " HL_HOST varchar(128) NOT NULL," +
-        " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
-    s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
-
-    s.execute("CREATE TABLE NEXT_LOCK_ID (" +
-        " NL_NEXT bigint NOT NULL)");
-    s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
-
-    s.execute("CREATE TABLE COMPACTION_QUEUE (" +
-        " CQ_ID bigint PRIMARY KEY," +
-        " CQ_DATABASE varchar(128) NOT NULL," +
-        " CQ_TABLE varchar(128) NOT NULL," +
-        " CQ_PARTITION varchar(767)," +
-        " CQ_STATE char(1) NOT NULL," +
-        " CQ_TYPE char(1) NOT NULL," +
-        " CQ_WORKER_ID varchar(128)," +
-        " CQ_START bigint," +
-        " CQ_RUN_AS varchar(128))");
-
-    s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
-    s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
-
-    conn.commit();
-    conn.close();
+    Connection conn = null;
+    boolean committed = false;
+    try {
+      conn = getConnection();
+      Statement s = conn.createStatement();
+      s.execute("CREATE TABLE TXNS (" +
+          "  TXN_ID bigint PRIMARY KEY," +
+          "  TXN_STATE char(1) NOT NULL," +
+          "  TXN_STARTED bigint NOT NULL," +
+          "  TXN_LAST_HEARTBEAT bigint NOT NULL," +
+          "  TXN_USER varchar(128) NOT NULL," +
+          "  TXN_HOST varchar(128) NOT NULL)");
+
+      s.execute("CREATE TABLE TXN_COMPONENTS (" +
+      "  TC_TXNID bigint REFERENCES TXNS (TXN_ID)," +
+      "  TC_DATABASE varchar(128) NOT NULL," +
+      "  TC_TABLE varchar(128)," +
+      "  TC_PARTITION varchar(767))");
+      s.execute("CREATE TABLE COMPLETED_TXN_COMPONENTS (" +
+          "  CTC_TXNID bigint," +
+          "  CTC_DATABASE varchar(128) NOT NULL," +
+          "  CTC_TABLE varchar(128)," +
+          "  CTC_PARTITION varchar(767))");
+      s.execute("CREATE TABLE NEXT_TXN_ID (" +
+          "  NTXN_NEXT bigint NOT NULL)");
+      s.execute("INSERT INTO NEXT_TXN_ID VALUES(1)");
+      s.execute("CREATE TABLE HIVE_LOCKS (" +
+          " HL_LOCK_EXT_ID bigint NOT NULL," +
+          " HL_LOCK_INT_ID bigint NOT NULL," +
+          " HL_TXNID bigint," +
+          " HL_DB varchar(128) NOT NULL," +
+          " HL_TABLE varchar(128)," +
+          " HL_PARTITION varchar(767)," +
+          " HL_LOCK_STATE char(1) NOT NULL," +
+          " HL_LOCK_TYPE char(1) NOT NULL," +
+          " HL_LAST_HEARTBEAT bigint NOT NULL," +
+          " HL_ACQUIRED_AT bigint," +
+          " HL_USER varchar(128) NOT NULL," +
+          " HL_HOST varchar(128) NOT NULL," +
+          " PRIMARY KEY(HL_LOCK_EXT_ID, HL_LOCK_INT_ID))");
+      s.execute("CREATE INDEX HL_TXNID_INDEX ON HIVE_LOCKS (HL_TXNID)");
+
+      s.execute("CREATE TABLE NEXT_LOCK_ID (" +
+          " NL_NEXT bigint NOT NULL)");
+      s.execute("INSERT INTO NEXT_LOCK_ID VALUES(1)");
+
+      s.execute("CREATE TABLE COMPACTION_QUEUE (" +
+          " CQ_ID bigint PRIMARY KEY," +
+          " CQ_DATABASE varchar(128) NOT NULL," +
+          " CQ_TABLE varchar(128) NOT NULL," +
+          " CQ_PARTITION varchar(767)," +
+          " CQ_STATE char(1) NOT NULL," +
+          " CQ_TYPE char(1) NOT NULL," +
+          " CQ_WORKER_ID varchar(128)," +
+          " CQ_START bigint," +
+          " CQ_RUN_AS varchar(128))");
+
+      s.execute("CREATE TABLE NEXT_COMPACTION_QUEUE_ID (NCQ_NEXT bigint NOT NULL)");
+      s.execute("INSERT INTO NEXT_COMPACTION_QUEUE_ID VALUES(1)");
+
+      conn.commit();
+      committed = true;
+    } finally {
+      if (!committed) conn.rollback();
+      conn.close();
+    }
   }
 
   public static void cleanDb() throws  Exception {
-    Connection conn = getConnection();
-    Statement s = conn.createStatement();
-    // We want to try these, whether they succeed or fail.
-    try {
-      s.execute("DROP INDEX HL_TXNID_INDEX");
-    } catch (Exception e) {
-      System.err.println("Unable to drop index HL_TXNID_INDEX " +
-          e.getMessage());
-    }
-    try {
-      s.execute("DROP TABLE TXN_COMPONENTS");
-    } catch (Exception e) {
-      System.err.println("Unable to drop table TXN_COMPONENTS " +
-          e.getMessage());
-    }
-    try {
-      s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
-    } catch (Exception e) {
-      System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
-          e.getMessage());
-    }
-    try {
-      s.execute("DROP TABLE TXNS");
-    } catch (Exception e) {
-      System.err.println("Unable to drop table TXNS " +
-          e.getMessage());
-    }
+    Connection conn = null;
+    boolean committed = false;
     try {
-      s.execute("DROP TABLE NEXT_TXN_ID");
-    } catch (Exception e) {
-      System.err.println("Unable to drop table NEXT_TXN_ID " +
-          e.getMessage());
+      conn = getConnection();
+      Statement s = conn.createStatement();
+      // We want to try these, whether they succeed or fail.
+      try {
+        s.execute("DROP INDEX HL_TXNID_INDEX");
+      } catch (Exception e) {
+        System.err.println("Unable to drop index HL_TXNID_INDEX " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE TXN_COMPONENTS");
+      } catch (Exception e) {
+        System.err.println("Unable to drop table TXN_COMPONENTS " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE COMPLETED_TXN_COMPONENTS");
+      } catch (Exception e) {
+        System.err.println("Unable to drop table COMPLETED_TXN_COMPONENTS " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE TXNS");
+      } catch (Exception e) {
+        System.err.println("Unable to drop table TXNS " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE NEXT_TXN_ID");
+      } catch (Exception e) {
+        System.err.println("Unable to drop table NEXT_TXN_ID " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE HIVE_LOCKS");
+      } catch (Exception e) {
+        System.err.println("Unable to drop table HIVE_LOCKS " +
+            e.getMessage());
+      }
+      try {
+        s.execute("DROP TABLE NEXT_LOCK_ID");
+      } catch (Exception e) {
+      }
+      try {
+        s.execute("DROP TABLE COMPACTION_QUEUE");
+      } catch (Exception e) {
+      }
+      try {
+        s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
+      } catch (Exception e) {
+      }
+      conn.commit();
+      committed = true;
+    } finally {
+      if (!committed) conn.rollback();
+      conn.close();
     }
-    try {
-      s.execute("DROP TABLE HIVE_LOCKS");
-    } catch (Exception e) {
-      System.err.println("Unable to drop table HIVE_LOCKS " +
-          e.getMessage());
-    }
-    try {
-      s.execute("DROP TABLE NEXT_LOCK_ID");
-    } catch (Exception e) {
-    }
-    try {
-      s.execute("DROP TABLE COMPACTION_QUEUE");
-    } catch (Exception e) {
-    }
-    try {
-      s.execute("DROP TABLE NEXT_COMPACTION_QUEUE_ID");
-    } catch (Exception e) {
-    }
-    conn.commit();
-    conn.close();
   }
 
   /**
@@ -174,25 +188,34 @@ public class TxnDbUtil {
    */
   public static int countLockComponents(long lockId) throws  Exception {
     Connection conn = getConnection();
-    Statement s = conn.createStatement();
-    ResultSet rs = s.executeQuery("select count(*) from hive_locks where " +
-        "hl_lock_ext_id = " + lockId);
-    if (!rs.next()) return 0;
-    int rc = rs.getInt(1);
-    conn.rollback();
-    conn.close();
-    return rc;
+    try {
+      Statement s = conn.createStatement();
+      ResultSet rs = s.executeQuery("select count(*) from hive_locks where hl_lock_ext_id
= " +
+          lockId);
+      if (!rs.next()) return 0;
+      int rc = rs.getInt(1);
+      return rc;
+    } finally {
+      conn.rollback();
+      conn.close();
+    }
   }
 
   public static int findNumCurrentLocks() throws Exception {
-    Connection conn = getConnection();
-    Statement s = conn.createStatement();
-    ResultSet rs = s.executeQuery("select count(*) from hive_locks");
-    if (!rs.next()) return 0;
-    int rc = rs.getInt(1);
-    conn.rollback();
-    conn.close();
-    return rc;
+    Connection conn = null;
+    try {
+      conn = getConnection();
+      Statement s = conn.createStatement();
+      ResultSet rs = s.executeQuery("select count(*) from hive_locks");
+      if (!rs.next()) return 0;
+      int rc = rs.getInt(1);
+      return rc;
+    } finally {
+      if (conn != null) {
+        conn.rollback();
+        conn.close();
+      }
+    }
   }
 
   private static Connection getConnection() throws Exception {

Modified: hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java (original)
+++ hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java Thu Aug 21
20:32:21 2014
@@ -165,13 +165,13 @@ public class DbTxnManager extends HiveTx
           break;
 
         case TABLE:
+        case DUMMYPARTITION:   // in case of dynamic partitioning lock the table
           t = output.getTable();
           compBuilder.setDbName(t.getDbName());
           compBuilder.setTableName(t.getTableName());
           break;
 
         case PARTITION:
-        case DUMMYPARTITION:
           compBuilder.setPartitionName(output.getPartition().getName());
           t = output.getPartition().getTable();
           compBuilder.setDbName(t.getDbName());
@@ -301,7 +301,10 @@ public class DbTxnManager extends HiveTx
     try {
       if (txnId > 0) rollbackTxn();
       if (lockMgr != null) lockMgr.close();
+      if (client != null) client.close();
     } catch (Exception e) {
+      LOG.error("Caught exception " + e.getClass().getName() + " with message <" + e.getMessage()
+      + ">, swallowing as there is nothing we can do with it.");
       // Not much we can do about it here.
     }
   }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager.java Thu Aug
21 20:32:21 2014
@@ -21,12 +21,12 @@ import junit.framework.Assert;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
+import org.apache.hadoop.hive.metastore.api.ShowLocksResponse;import org.apache.hadoop.hive.metastore.api.ShowLocksResponseElement;import
org.apache.hadoop.hive.metastore.txn.TxnDbUtil;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.QueryPlan;
 import org.apache.hadoop.hive.ql.hooks.ReadEntity;
 import org.apache.hadoop.hive.ql.hooks.WriteEntity;
-import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.DummyPartition;import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.log4j.Level;
@@ -137,6 +137,43 @@ public class TestDbTxnManager {
     Assert.assertEquals(0, locks.size());
   }
 
+
+  @Test
+  public void testSingleWritePartition() throws Exception {
+    WriteEntity we = addPartitionOutput(newTable(true), WriteEntity.WriteType.INSERT);
+    QueryPlan qp = new MockQueryPlan(this);
+    txnMgr.openTxn("fred");
+    txnMgr.acquireLocks(qp, ctx, "fred");
+    List<HiveLock> locks = ctx.getHiveLocks();
+    Assert.assertEquals(1, locks.size());
+    Assert.assertEquals(1,
+        TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+    txnMgr.commitTxn();
+    locks = txnMgr.getLockManager().getLocks(false, false);
+    Assert.assertEquals(0, locks.size());
+  }
+
+  @Test
+  public void testWriteDynamicPartition() throws Exception {
+    WriteEntity we = addDynamicPartitionedOutput(newTable(true), WriteEntity.WriteType.INSERT);
+    QueryPlan qp = new MockQueryPlan(this);
+    txnMgr.openTxn("fred");
+    txnMgr.acquireLocks(qp, ctx, "fred");
+    List<HiveLock> locks = ctx.getHiveLocks();
+    Assert.assertEquals(1, locks.size());
+    /*Assert.assertEquals(1,
+        TxnDbUtil.countLockComponents(((DbLockManager.DbHiveLock) locks.get(0)).lockId));
+    */// Make sure we're locking the whole table, since this is dynamic partitioning
+    ShowLocksResponse rsp = ((DbLockManager)txnMgr.getLockManager()).getLocks();
+    List<ShowLocksResponseElement> elms = rsp.getLocks();
+    Assert.assertEquals(1, elms.size());
+    Assert.assertNotNull(elms.get(0).getTablename());
+    Assert.assertNull(elms.get(0).getPartname());
+    txnMgr.commitTxn();
+    locks = txnMgr.getLockManager().getLocks(false, false);
+    Assert.assertEquals(0, locks.size());
+  }
+
   @Test
   public void testReadWrite() throws Exception {
     Table t = newTable(true);
@@ -252,6 +289,7 @@ public class TestDbTxnManager {
 
   @After
   public void tearDown() throws Exception {
+    if (txnMgr != null) txnMgr.closeTxnManager();
     TxnDbUtil.cleanDb();
   }
 
@@ -318,4 +356,12 @@ public class TestDbTxnManager {
     writeEntities.add(we);
     return we;
   }
+
+  private WriteEntity addDynamicPartitionedOutput(Table t, WriteEntity.WriteType writeType)
+      throws Exception {
+    DummyPartition dp = new DummyPartition(t, "no clue what I should call this");
+    WriteEntity we = new WriteEntity(dp, writeType, false);
+    writeEntities.add(we);
+    return we;
+  }
 }

Modified: hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java
URL: http://svn.apache.org/viewvc/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java?rev=1619567&r1=1619566&r2=1619567&view=diff
==============================================================================
--- hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java (original)
+++ hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/parse/TestQBCompact.java Thu Aug 21 20:32:21
2014
@@ -65,7 +65,6 @@ public class TestQBCompact {
   private AlterTableSimpleDesc parseAndAnalyze(String query) throws Exception {
     ParseDriver hd = new ParseDriver();
     ASTNode head = (ASTNode)hd.parse(query).getChild(0);
-    System.out.println("HERE " + head.dump());
     BaseSemanticAnalyzer a = SemanticAnalyzerFactory.get(conf, head);
     a.analyze(head, new Context(conf));
     List<Task<? extends Serializable>> roots = a.getRootTasks();



Mime
View raw message