hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject hive git commit: HIVE-16006 : Incremental REPL LOAD Inserts doesn't operate on the target database if name differs from source database. (Sankar Hariappan, reviewed by Sushanth Sowmyan)
Date Wed, 08 Mar 2017 03:35:00 GMT
Repository: hive
Updated Branches:
  refs/heads/master 45b48d5fd -> 7f4a3e17e


HIVE-16006 : Incremental REPL LOAD Inserts doesn't operate on the target database if name
differs from source database. (Sankar Hariappan, reviewed by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f4a3e17
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f4a3e17
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f4a3e17

Branch: refs/heads/master
Commit: 7f4a3e17ec2fa886276a7f278e5846e0e7ebc8a6
Parents: 45b48d5
Author: Sushanth Sowmyan <khorgath@gmail.com>
Authored: Tue Mar 7 19:33:40 2017 -0800
Committer: Sushanth Sowmyan <khorgath@gmail.com>
Committed: Tue Mar 7 19:34:56 2017 -0800

----------------------------------------------------------------------
 .../hive/ql/TestReplicationScenarios.java       | 77 ++++++++++++++++----
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  3 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 13 +++-
 .../hadoop/hive/ql/parse/ReplicationSpec.java   | 19 ++++-
 4 files changed, 92 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 4efc81d..c26a075 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -96,6 +96,7 @@ public class TestReplicationScenarios {
     hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
         DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
     hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true);
     hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
     msPort = MetaStoreUtils.startMetaStore(hconf);
     hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
@@ -790,16 +791,8 @@ public class TestReplicationScenarios {
   }
 
   @Test
-  @Ignore
-  // The test turned off temporarily in HIVE-15478. This test is not running
-  // properly even though it passed before. The reason the test passed before is because
-  // we collect files added by "create table" statement during "repl dump", and it will take
-  // the files added by "insert statement". In HIVE-15478, Hive collect "create table" affected
-  // files during processing "create table" statement, and no data files present at that
time.
-  // The inserted files rely on the missing INSERT_EVENT to signal. We need to turn on
-  // FIRE_EVENTS_FOR_DML setting to trigger INSERT_EVENT and this is WIP tracked by other
ticket.
-  public void testIncrementalInserts() throws IOException {
-    String testName = "incrementalInserts";
+  public void testIncrementalLoad() throws IOException {
+    String testName = "incrementalLoad";
     LOG.info("Testing " + testName);
     String dbName = testName + "_" + tid;
 
@@ -815,7 +808,7 @@ public class TestReplicationScenarios {
     run("REPL DUMP " + dbName);
     String replDumpLocn = getResult(0, 0);
     String replDumpId = getResult(0, 1, true);
-    LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
     run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
 
     String[] unptn_data = new String[] { "eleven", "twelve" };
@@ -844,7 +837,8 @@ public class TestReplicationScenarios {
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
     String incrementalDumpLocn = getResult(0, 0);
     String incrementalDumpId = getResult(0, 1, true);
-    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId,
replDumpId);
+    replDumpId = incrementalDumpId;
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
@@ -871,7 +865,8 @@ public class TestReplicationScenarios {
     run("REPL DUMP " + dbName + " FROM " + replDumpId);
     incrementalDumpLocn = getResult(0, 0);
     incrementalDumpId = getResult(0, 1, true);
-    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId,
replDumpId);
+    replDumpId = incrementalDumpId;
     run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
     printOutput();
     run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
@@ -881,6 +876,62 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testIncrementalInserts() throws IOException {
+    String testName = "incrementalInserts";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')");
+    run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')");
+    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+    verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId,
replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    verifyRun("SELECT a from " + dbName + ".unptned", unptn_data);
+    verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data);
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data);
+
+    String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" };
+    run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2]
+ "')");
+    verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId,
replDumpId);
+    replDumpId = incrementalDumpId;
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+
+    verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins);
+  }
+
+  @Test
   public void testStatus() throws IOException {
     // first test ReplStateMap functionality
     Map<String,Long> cmap = new ReplStateMap<String,Long>();

http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 7bb48a9..c398792 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -949,7 +949,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
           return; // silently return, table is newer than our replacement.
         }
         if (!replicationSpec.isMetadataOnly()) {
-          loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports
are replace-into
+          // repl-imports are replace-into unless the event is insert-into
+          loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec,
x);
         } else {
           x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec));
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index cb1371c..e72d621 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -614,14 +614,17 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
       }
       case MessageFactory.INSERT_EVENT: {
         InsertMessage insertMsg = md.getInsertMessage(ev.getMessage());
+        String dbName = insertMsg.getDB();
         String tblName = insertMsg.getTable();
-        Table qlMdTable = db.getTable(tblName);
+        org.apache.hadoop.hive.metastore.api.Table tobj = db.getMSC().getTable(dbName, tblName);
+        Table qlMdTable = new Table(tobj);
         Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
         List<Partition> qlPtns  = null;
         if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
           qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false));
         }
         Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+        replicationSpec.setIsInsert(true); // Mark the replication type as insert into to
avoid overwrite while import
         EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable,
qlPtns,
             replicationSpec);
         Iterable<String> files = insertMsg.getFiles();
@@ -1156,9 +1159,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
       case EVENT_INSERT: {
         md = MessageFactory.getInstance().getDeserializer();
         InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
+        String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB()
: dbName);
+        String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable()
: tblName);
+
         // Piggybacking in Import logic for now
-        return analyzeTableLoad(
-            insertMessage.getDB(), insertMessage.getTable(), locn, precursor, dbsUpdated,
tablesUpdated);
+        return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated,
tablesUpdated);
       }
       case EVENT_UNKNOWN: {
         break;
@@ -1395,7 +1400,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
 
   // Use for specifying object state as well as event state
   private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException
{
-    return new ReplicationSpec(true, false, evState, objState, false, true);
+    return new ReplicationSpec(true, false, evState, objState, false, true, false);
   }
 
   // Use for replication states focussed on event only, where the obj state will be the event
state

http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
index be17ffa..48362a3 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java
@@ -44,7 +44,7 @@ public class ReplicationSpec {
   private String currStateId = null;
   private boolean isNoop = false;
   private boolean isLazy = false; // lazy mode => we only list files, and expect that
the eventual copy will pull data in.
-
+  private boolean isInsert = false; // default is that the import mode is replace-into
 
   // Key definitions related to replication
   public enum KEY {
@@ -53,6 +53,7 @@ public class ReplicationSpec {
     CURR_STATE_ID("repl.last.id"),
     NOOP("repl.noop"),
     LAZY("repl.lazy"),
+    IS_INSERT("repl.is.insert")
     ;
     private final String keyName;
 
@@ -134,13 +135,15 @@ public class ReplicationSpec {
   }
 
   public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly,
-      String eventReplicationState, String currentReplicationState, boolean isNoop, boolean
isLazy) {
+                         String eventReplicationState, String currentReplicationState,
+                         boolean isNoop, boolean isLazy, boolean isInsert) {
     this.isInReplicationScope = isInReplicationScope;
     this.isMetadataOnly = isMetadataOnly;
     this.eventId = eventReplicationState;
     this.currStateId = currentReplicationState;
     this.isNoop = isNoop;
     this.isLazy = isLazy;
+    this.isInsert = isInsert;
   }
 
   public ReplicationSpec(Function<String, String> keyFetcher) {
@@ -159,6 +162,7 @@ public class ReplicationSpec {
     this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString());
     this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString()));
     this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString()));
+    this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString()));
   }
 
   /**
@@ -292,6 +296,15 @@ public class ReplicationSpec {
   }
 
   /**
+   * @return true if this statement refers to insert-into operation.
+   */
+  public boolean isInsert(){ return isInsert; }
+
+  public void setIsInsert(boolean isInsert){
+    this.isInsert = isInsert;
+  }
+
+  /**
    * @return the replication state of the event that spawned this statement
    */
   public String getReplicationState() {
@@ -357,6 +370,8 @@ public class ReplicationSpec {
         return String.valueOf(isNoop());
       case LAZY:
         return String.valueOf(isLazy());
+      case IS_INSERT:
+        return String.valueOf(isInsert());
     }
     return null;
   }


Mime
View raw message