Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 3BDDE200C3F for ; Wed, 8 Mar 2017 04:35:02 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3A5AC160B74; Wed, 8 Mar 2017 03:35:02 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 37988160B68 for ; Wed, 8 Mar 2017 04:35:01 +0100 (CET) Received: (qmail 77520 invoked by uid 500); 8 Mar 2017 03:35:00 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 77509 invoked by uid 99); 8 Mar 2017 03:35:00 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 08 Mar 2017 03:35:00 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4323DDFC4A; Wed, 8 Mar 2017 03:35:00 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: khorgath@apache.org To: commits@hive.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-16006 : Incremental REPL LOAD Inserts doesn't operate on the target database if name differs from source database. (Sankar Hariappan, reviewed by Sushanth Sowmyan) Date: Wed, 8 Mar 2017 03:35:00 +0000 (UTC) archived-at: Wed, 08 Mar 2017 03:35:02 -0000 Repository: hive Updated Branches: refs/heads/master 45b48d5fd -> 7f4a3e17e HIVE-16006 : Incremental REPL LOAD Inserts doesn't operate on the target database if name differs from source database. (Sankar Hariappan, reviewed by Sushanth Sowmyan) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/7f4a3e17 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/7f4a3e17 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/7f4a3e17 Branch: refs/heads/master Commit: 7f4a3e17ec2fa886276a7f278e5846e0e7ebc8a6 Parents: 45b48d5 Author: Sushanth Sowmyan Authored: Tue Mar 7 19:33:40 2017 -0800 Committer: Sushanth Sowmyan Committed: Tue Mar 7 19:34:56 2017 -0800 ---------------------------------------------------------------------- .../hive/ql/TestReplicationScenarios.java | 77 ++++++++++++++++---- .../hive/ql/parse/ImportSemanticAnalyzer.java | 3 +- .../ql/parse/ReplicationSemanticAnalyzer.java | 13 +++- .../hadoop/hive/ql/parse/ReplicationSpec.java | 19 ++++- 4 files changed, 92 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 4efc81d..c26a075 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -96,6 +96,7 @@ public class TestReplicationScenarios { hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hconf.setBoolVar(HiveConf.ConfVars.FIRE_EVENTS_FOR_DML, true); hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); msPort = MetaStoreUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); @@ -790,16 +791,8 @@ public class TestReplicationScenarios { } @Test - @Ignore - // The test turned off temporarily in HIVE-15478. This test is not running - // properly even though it passed before. The reason the test passed before is because - // we collect files added by "create table" statement during "repl dump", and it will take - // the files added by "insert statement". In HIVE-15478, Hive collect "create table" affected - // files during processing "create table" statement, and no data files present at that time. - // The inserted files rely on the missing INSERT_EVENT to signal. We need to turn on - // FIRE_EVENTS_FOR_DML setting to trigger INSERT_EVENT and this is WIP tracked by other ticket. - public void testIncrementalInserts() throws IOException { - String testName = "incrementalInserts"; + public void testIncrementalLoad() throws IOException { + String testName = "incrementalLoad"; LOG.info("Testing " + testName); String dbName = testName + "_" + tid; @@ -815,7 +808,7 @@ public class TestReplicationScenarios { run("REPL DUMP " + dbName); String replDumpLocn = getResult(0, 0); String replDumpId = getResult(0, 1, true); - LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); String[] unptn_data = new String[] { "eleven", "twelve" }; @@ -844,7 +837,8 @@ public class TestReplicationScenarios { run("REPL DUMP " + dbName + " FROM " + replDumpId); String incrementalDumpLocn = getResult(0, 0); String incrementalDumpId = getResult(0, 1, true); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); @@ -871,7 +865,8 @@ public class TestReplicationScenarios { run("REPL DUMP " + dbName + " FROM " + replDumpId); incrementalDumpLocn = getResult(0, 0); incrementalDumpId = getResult(0, 1, true); - LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); printOutput(); run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); @@ -881,6 +876,62 @@ public class TestReplicationScenarios { } @Test + public void testIncrementalInserts() throws IOException { + String testName = "incrementalInserts"; + LOG.info("Testing " + testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0, 0); + String replDumpId = getResult(0, 1, true); + LOG.info("Bootstrap-Dump: Dumped to {} with id {}", replDumpLocn, replDumpId); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + String[] unptn_data = new String[] { "eleven", "twelve" }; + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[0] + "')"); + run("INSERT INTO TABLE " + dbName + ".unptned values('" + unptn_data[1] + "')"); + verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); + + run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned"); + run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned"); + verifyRun("SELECT * from " + dbName + ".unptned_late", unptn_data); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String incrementalDumpLocn = getResult(0, 0); + String incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + verifyRun("SELECT a from " + dbName + ".unptned", unptn_data); + verifyRun("SELECT a from " + dbName + ".unptned_late", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned", unptn_data); + verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data); + + String[] unptn_data_after_ins = new String[] { "eleven", "twelve", "thirteen" }; + run("INSERT INTO TABLE " + dbName + ".unptned_late values('" + unptn_data_after_ins[2] + "')"); + verifySetup("SELECT a from " + dbName + ".unptned_late", unptn_data_after_ins); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + incrementalDumpLocn = getResult(0, 0); + incrementalDumpId = getResult(0, 1, true); + LOG.info("Incremental-Dump: Dumped to {} with id {} from {}", incrementalDumpLocn, incrementalDumpId, replDumpId); + replDumpId = incrementalDumpId; + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'"); + + verifyRun("SELECT a from " + dbName + "_dupe.unptned_late", unptn_data_after_ins); + } + + @Test public void testStatus() throws IOException { // first test ReplStateMap functionality Map cmap = new ReplStateMap(); http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java index 7bb48a9..c398792 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java @@ -949,7 +949,8 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer { return; // silently return, table is newer than our replacement. } if (!replicationSpec.isMetadataOnly()) { - loadTable(fromURI, table, true, new Path(fromURI), replicationSpec, x); // repl-imports are replace-into + // repl-imports are replace-into unless the event is insert-into + loadTable(fromURI, table, !replicationSpec.isInsert(), new Path(fromURI), replicationSpec, x); } else { x.getTasks().add(alterTableTask(tblDesc, x, replicationSpec)); } http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index cb1371c..e72d621 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -614,14 +614,17 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } case MessageFactory.INSERT_EVENT: { InsertMessage insertMsg = md.getInsertMessage(ev.getMessage()); + String dbName = insertMsg.getDB(); String tblName = insertMsg.getTable(); - Table qlMdTable = db.getTable(tblName); + org.apache.hadoop.hive.metastore.api.Table tobj = db.getMSC().getTable(dbName, tblName); + Table qlMdTable = new Table(tobj); Map partSpec = insertMsg.getPartitionKeyValues(); List qlPtns = null; if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) { qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false)); } Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME); + replicationSpec.setIsInsert(true); // Mark the replication type as insert into to avoid overwrite while import EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable, qlPtns, replicationSpec); Iterable files = insertMsg.getFiles(); @@ -1156,9 +1159,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { case EVENT_INSERT: { md = MessageFactory.getInstance().getDeserializer(); InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload()); + String actualDbName = ((dbName == null) || dbName.isEmpty() ? insertMessage.getDB() : dbName); + String actualTblName = ((tblName == null) || tblName.isEmpty() ? insertMessage.getTable() : tblName); + // Piggybacking in Import logic for now - return analyzeTableLoad( - insertMessage.getDB(), insertMessage.getTable(), locn, precursor, dbsUpdated, tablesUpdated); + return analyzeTableLoad(actualDbName, actualTblName, locn, precursor, dbsUpdated, tablesUpdated); } case EVENT_UNKNOWN: { break; @@ -1395,7 +1400,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // Use for specifying object state as well as event state private ReplicationSpec getNewReplicationSpec(String evState, String objState) throws SemanticException { - return new ReplicationSpec(true, false, evState, objState, false, true); + return new ReplicationSpec(true, false, evState, objState, false, true, false); } // Use for replication states focussed on event only, where the obj state will be the event state http://git-wip-us.apache.org/repos/asf/hive/blob/7f4a3e17/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java index be17ffa..48362a3 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSpec.java @@ -44,7 +44,7 @@ public class ReplicationSpec { private String currStateId = null; private boolean isNoop = false; private boolean isLazy = false; // lazy mode => we only list files, and expect that the eventual copy will pull data in. - + private boolean isInsert = false; // default is that the import mode is replace-into // Key definitions related to replication public enum KEY { @@ -53,6 +53,7 @@ public class ReplicationSpec { CURR_STATE_ID("repl.last.id"), NOOP("repl.noop"), LAZY("repl.lazy"), + IS_INSERT("repl.is.insert") ; private final String keyName; @@ -134,13 +135,15 @@ public class ReplicationSpec { } public ReplicationSpec(boolean isInReplicationScope, boolean isMetadataOnly, - String eventReplicationState, String currentReplicationState, boolean isNoop, boolean isLazy) { + String eventReplicationState, String currentReplicationState, + boolean isNoop, boolean isLazy, boolean isInsert) { this.isInReplicationScope = isInReplicationScope; this.isMetadataOnly = isMetadataOnly; this.eventId = eventReplicationState; this.currStateId = currentReplicationState; this.isNoop = isNoop; this.isLazy = isLazy; + this.isInsert = isInsert; } public ReplicationSpec(Function keyFetcher) { @@ -159,6 +162,7 @@ public class ReplicationSpec { this.currStateId = keyFetcher.apply(ReplicationSpec.KEY.CURR_STATE_ID.toString()); this.isNoop = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.NOOP.toString())); this.isLazy = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.LAZY.toString())); + this.isInsert = Boolean.parseBoolean(keyFetcher.apply(ReplicationSpec.KEY.IS_INSERT.toString())); } /** @@ -292,6 +296,15 @@ public class ReplicationSpec { } /** + * @return true if this statement refers to insert-into operation. + */ + public boolean isInsert(){ return isInsert; } + + public void setIsInsert(boolean isInsert){ + this.isInsert = isInsert; + } + + /** * @return the replication state of the event that spawned this statement */ public String getReplicationState() { @@ -357,6 +370,8 @@ public class ReplicationSpec { return String.valueOf(isNoop()); case LAZY: return String.valueOf(isLazy()); + case IS_INSERT: + return String.valueOf(isInsert()); } return null; }