hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject hive git commit: HIVE-15587: Using ChangeManager to copy files in ReplCopyTask (Daniel Dai, reviewed by Vaibhav Gumashta)
Date Thu, 26 Jan 2017 20:17:27 GMT
Repository: hive
Updated Branches:
  refs/heads/master 85c103532 -> 318db5a35


HIVE-15587: Using ChangeManager to copy files in ReplCopyTask (Daniel Dai, reviewed by Vaibhav
Gumashta)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/318db5a3
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/318db5a3
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/318db5a3

Branch: refs/heads/master
Commit: 318db5a3579c7f4039ee24636cd1eeb3b6633bbb
Parents: 85c1035
Author: Daniel Dai <daijy@hortonworks.com>
Authored: Thu Jan 26 12:16:47 2017 -0800
Committer: Daniel Dai <daijy@hortonworks.com>
Committed: Thu Jan 26 12:16:47 2017 -0800

----------------------------------------------------------------------
 .../listener/DbNotificationListener.java        |  16 +-
 .../hive/metastore/TestReplChangeManager.java   |  26 +--
 .../hive/ql/TestReplicationScenarios.java       | 214 ++++++++++++++++++-
 .../hive/metastore/ReplChangeManager.java       | 124 +++++++++--
 .../hadoop/hive/ql/exec/ReplCopyTask.java       |  39 ++--
 .../apache/hadoop/hive/ql/parse/EximUtil.java   |  17 --
 .../ql/parse/ReplicationSemanticAnalyzer.java   |  57 +++--
 7 files changed, 391 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
----------------------------------------------------------------------
diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
index 7524c49..4df2758 100644
--- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
+++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java
@@ -23,7 +23,6 @@ import java.util.List;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -32,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.MetaStoreEventListener;
 import org.apache.hadoop.hive.metastore.RawStore;
 import org.apache.hadoop.hive.metastore.RawStoreProxy;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.Warehouse;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.Function;
@@ -59,7 +59,6 @@ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent;
 import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.metastore.messaging.PartitionFiles;
-import org.apache.hadoop.util.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -223,7 +222,8 @@ public class DbNotificationListener extends MetaStoreEventListener {
       try {
         FileStatus file = files[i];
         i++;
-        return buildFileWithChksum(file.getPath(), fs);
+        return ReplChangeManager.encodeFileUri(file.getPath().toString(),
+            ReplChangeManager.getChksumString(file.getPath(), fs));
       } catch (IOException e) {
         throw new RuntimeException(e);
       }
@@ -520,16 +520,6 @@ public class DbNotificationListener extends MetaStoreEventListener {
 
   }
 
-  String buildFileWithChksum(Path p, FileSystem fs) throws IOException {
-    FileChecksum cksum = fs.getFileChecksum(p);
-    String chksumString = null;
-    if (cksum != null) {
-      chksumString =
-          StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
-    }
-    return encodeFileUri(p.toString(), chksumString);
-  }
-
   // TODO: this needs to be enhanced once change management based filesystem is implemented
   // Currently using fileuri#checksum as the format
   private String encodeFileUri(String fileUriStr, String fileChecksum) {

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
index 205c640..1ac4d01 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java
@@ -53,6 +53,7 @@ public class TestReplChangeManager {
   private static Warehouse warehouse;
   private static MiniDFSCluster m_dfs;
   private static String cmroot;
+  private static FileSystem fs;
 
   @BeforeClass
   public static void setUp() throws Exception {
@@ -65,6 +66,7 @@ public class TestReplChangeManager {
     hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot);
     hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60);
     warehouse = new Warehouse(hiveConf);
+    fs = new Path(cmroot).getFileSystem(hiveConf);
     try {
       client = new HiveMetaStoreClient(hiveConf);
     } catch (Throwable e) {
@@ -151,15 +153,15 @@ public class TestReplChangeManager {
 
     Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt",
"20160101")), "part");
     createFile(part1Path, "p1");
-    String path1Chksum = ReplChangeManager.getCksumString(part1Path, hiveConf);
+    String path1Chksum = ReplChangeManager.getChksumString(part1Path, fs);
 
     Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt",
"20160102")), "part");
     createFile(part2Path, "p2");
-    String path2Chksum = ReplChangeManager.getCksumString(part2Path, hiveConf);
+    String path2Chksum = ReplChangeManager.getChksumString(part2Path, fs);
 
     Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt",
"20160103")), "part");
     createFile(part3Path, "p3");
-    String path3Chksum = ReplChangeManager.getCksumString(part3Path, hiveConf);
+    String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs);
 
     Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path));
     Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path));
@@ -221,15 +223,15 @@ public class TestReplChangeManager {
 
     Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1");
     createFile(filePath1, "f1");
-    String fileChksum1 = ReplChangeManager.getCksumString(filePath1, hiveConf);
+    String fileChksum1 = ReplChangeManager.getChksumString(filePath1, fs);
 
     Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2");
     createFile(filePath2, "f2");
-    String fileChksum2 = ReplChangeManager.getCksumString(filePath2, hiveConf);
+    String fileChksum2 = ReplChangeManager.getChksumString(filePath2, fs);
 
     Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3");
     createFile(filePath3, "f3");
-    String fileChksum3 = ReplChangeManager.getCksumString(filePath3, hiveConf);
+    String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs);
 
     Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1));
     Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2));
@@ -267,26 +269,26 @@ public class TestReplChangeManager {
     fs.mkdirs(dirTbl1);
     Path part11 = new Path(dirTbl1, "part1");
     createFile(part11, "testClearer11");
-    String fileChksum11 = ReplChangeManager.getCksumString(part11, hiveConf);
+    String fileChksum11 = ReplChangeManager.getChksumString(part11, fs);
     Path part12 = new Path(dirTbl1, "part2");
     createFile(part12, "testClearer12");
-    String fileChksum12 = ReplChangeManager.getCksumString(part12, hiveConf);
+    String fileChksum12 = ReplChangeManager.getChksumString(part12, fs);
     Path dirTbl2 = new Path(dirDb, "tbl2");
     fs.mkdirs(dirTbl2);
     Path part21 = new Path(dirTbl2, "part1");
     createFile(part21, "testClearer21");
-    String fileChksum21 = ReplChangeManager.getCksumString(part21, hiveConf);
+    String fileChksum21 = ReplChangeManager.getChksumString(part21, fs);
     Path part22 = new Path(dirTbl2, "part2");
     createFile(part22, "testClearer22");
-    String fileChksum22 = ReplChangeManager.getCksumString(part22, hiveConf);
+    String fileChksum22 = ReplChangeManager.getChksumString(part22, fs);
     Path dirTbl3 = new Path(dirDb, "tbl3");
     fs.mkdirs(dirTbl3);
     Path part31 = new Path(dirTbl3, "part1");
     createFile(part31, "testClearer31");
-    String fileChksum31 = ReplChangeManager.getCksumString(part31, hiveConf);
+    String fileChksum31 = ReplChangeManager.getChksumString(part31, fs);
     Path part32 = new Path(dirTbl3, "part2");
     createFile(part32, "testClearer32");
-    String fileChksum32 = ReplChangeManager.getCksumString(part32, hiveConf);
+    String fileChksum32 = ReplChangeManager.getChksumString(part32, fs);
 
     ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false);
     ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false);

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index 5be3e9c..7836c47 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -90,9 +90,11 @@ public class TestReplicationScenarios {
       WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf);
     }
 
-    System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname,
+    hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS,
         DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore
-    msPort = MetaStoreUtils.startMetaStore();
+    hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true);
+    hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/");
+    msPort = MetaStoreUtils.startMetaStore(hconf);
     hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/");
     hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:"
         + msPort);
@@ -193,6 +195,87 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testBasicWithCM() throws Exception {
+
+    String testName = "basic_with_cm";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED
AS TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+    String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+    createTestDataFile(ptn_locn_2_later, ptn_data_2_later);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned
PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned
PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+
+    // Table dropped after "repl dump"
+    run("DROP TABLE " + dbName + ".unptned");
+    // Partition droppped after "repl dump"
+    run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)");
+    // File changed after "repl dump"
+    Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2");
+    Path loc = new Path(p.getSd().getLocation());
+    FileSystem fs = loc.getFileSystem(hconf);
+    Path file = fs.listStatus(loc)[0].getPath();
+    fs.delete(file, false);
+    fs.copyFromLocalFile(new Path(ptn_locn_2_later), file);
+
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    // Since partition(b=2) changed manually, Hive cannot find
+    // it in original location and cmroot, thus empty
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+  }
+
+  @Test
   public void testIncrementalAdds() throws IOException {
     String testName = "incrementalAdds";
     LOG.info("Testing "+testName);
@@ -319,7 +402,6 @@ public class TestReplicationScenarios {
     run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3
PARTITION(b=2)");
     verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2);
 
-
     // At this point, we've set up all the tables and ptns we're going to test drops across
     // Replicate it first, and then we'll drop it on the source.
 
@@ -393,6 +475,132 @@ public class TestReplicationScenarios {
   }
 
   @Test
+  public void testDropsWithCM() throws IOException {
+
+    String testName = "drops_with_cm";
+    LOG.info("Testing "+testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS
TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS
TEXTFILE");
+
+    String[] unptn_data = new String[]{ "eleven" , "twelve" };
+    String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"};
+    String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"};
+    String[] empty = new String[]{};
+
+    String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned
PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned
PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2
PARTITION(b='1')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2
PARTITION(b='2')");
+    run("SELECT a from " + dbName + ".ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0,0);
+    String replDumpId = getResult(0,1,true);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    run("REPL STATUS " + dbName + "_dupe");
+    verifyResults(new String[] {replDumpId});
+
+    run("SELECT * from " + dbName + "_dupe.unptned");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'");
+    verifyResults(ptn_data_2);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'");
+    verifyResults(ptn_data_2);
+
+    run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned");
+    run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned");
+    run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM
" +
+        dbName + ".ptned WHERE b='1'");
+    run("SELECT a from " + dbName + ".unptned_copy");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + ".ptned_copy");
+    verifyResults(ptn_data_1);
+
+    run("DROP TABLE " + dbName + ".unptned");
+    run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')");
+    run("DROP TABLE " + dbName + ".ptned2");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + ".ptned");
+    verifyResults(ptn_data_1);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String postDropReplDumpLocn = getResult(0,0);
+    String postDropReplDumpId = getResult(0,1,true);
+    LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId);
+
+    // Drop table after dump
+    run("DROP TABLE " + dbName + ".unptned_copy");
+    // Drop partition after dump
+    run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')");
+
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'");
+
+    Exception e = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned");
+      assertNull(tbl);
+    } catch (TException te) {
+      e = te;
+    }
+    assertNotNull(e);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2");
+    verifyResults(empty);
+    run("SELECT a from " + dbName + "_dupe.ptned");
+    verifyResults(ptn_data_1);
+
+    Exception e2 = null;
+    try {
+      Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2");
+      assertNull(tbl);
+    } catch (TException te) {
+      e2 = te;
+    }
+    assertNotNull(e2);
+    assertEquals(NoSuchObjectException.class, e.getClass());
+
+    run("SELECT a from " + dbName + "_dupe.unptned_copy");
+    verifyResults(unptn_data);
+    run("SELECT a from " + dbName + "_dupe.ptned_copy");
+    verifyResults(ptn_data_1);
+  }
+
+  @Test
   public void testAlters() throws IOException {
 
     String testName = "alters";

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
index 99cba9d..51e4627 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java
@@ -55,6 +55,7 @@ public class ReplChangeManager {
 
   public static final String ORIG_LOC_TAG = "user.original-loc";
   public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash";
+  public static final String URI_FRAGMENT_SEPARATOR = "#";
 
   public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException {
     if (instance == null) {
@@ -121,7 +122,7 @@ public class ReplChangeManager {
           count += recycle(file.getPath(), ifPurge);
         }
       } else {
-        Path cmPath = getCMPath(path, hiveConf, getCksumString(path, hiveConf));
+        Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs));
 
         if (LOG.isDebugEnabled()) {
           LOG.debug("Moving " + path.toString() + " to " + cmPath.toString());
@@ -151,7 +152,11 @@ public class ReplChangeManager {
           // Note we currently only track the last known trace as
           // xattr has limited capacity. We shall revisit and store all original
           // locations if orig-loc becomes important
-          fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+          try {
+            fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes());
+          } catch (UnsupportedOperationException e) {
+            LOG.warn("Error setting xattr for " + path.toString());
+          }
 
           count++;
         }
@@ -159,7 +164,11 @@ public class ReplChangeManager {
         // If multiple files share the same content, then
         // any file claim remain in trash would be granted
         if (!ifPurge) {
-          fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
+          try {
+            fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0});
+          } catch (UnsupportedOperationException e) {
+            LOG.warn("Error setting xattr for " + cmPath.toString());
+          }
         }
       }
       return count;
@@ -169,16 +178,22 @@ public class ReplChangeManager {
   }
 
   // Get checksum of a file
-  static public String getCksumString(Path path, Configuration conf) throws IOException {
+  static public String getChksumString(Path path, FileSystem fs) throws IOException {
     // TODO: fs checksum only available on hdfs, need to
     //       find a solution for other fs (eg, local fs, s3, etc)
-    FileSystem fs = path.getFileSystem(conf);
+    String checksumString = null;
     FileChecksum checksum = fs.getFileChecksum(path);
-    String checksumString = StringUtils.byteToHexString(
-        checksum.getBytes(), 0, checksum.getLength());
+    if (checksum != null) {
+      checksumString = StringUtils.byteToHexString(
+          checksum.getBytes(), 0, checksum.getLength());
+    }
     return checksumString;
   }
 
+  static public void setCmRoot(Path cmRoot) {
+    ReplChangeManager.cmroot = cmRoot;
+  }
+
   /***
    * Convert a path of file inside a partition or table (if non-partitioned)
    *   to a deterministic location of cmroot. So user can retrieve the file back
@@ -205,6 +220,69 @@ public class ReplChangeManager {
     return cmPath;
   }
 
+  /***
+   * Get original file specified by src and chksumString. If the file exists and checksum
+   * matches, return the file; otherwise, use chksumString to retrieve it from cmroot
+   * @param src Original file location
+   * @param chksumString Checksum of the original file
+   * @param conf
+   * @return Corresponding FileStatus object
+   * @throws MetaException
+   */
+  static public FileStatus getFileStatus(Path src, String chksumString,
+      HiveConf conf) throws MetaException {
+    try {
+      FileSystem srcFs = src.getFileSystem(conf);
+      if (chksumString == null) {
+        return srcFs.getFileStatus(src);
+      }
+
+      if (!srcFs.exists(src)) {
+        return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+      }
+
+      String currentChksumString = getChksumString(src, srcFs);
+      if (currentChksumString == null || chksumString.equals(currentChksumString)) {
+        return srcFs.getFileStatus(src);
+      } else {
+        return srcFs.getFileStatus(getCMPath(src, conf, chksumString));
+      }
+    } catch (IOException e) {
+      throw new MetaException(StringUtils.stringifyException(e));
+    }
+  }
+
+  /***
+   * Concatenate filename and checksum with "#"
+   * @param fileUriStr Filename string
+   * @param fileChecksum Checksum string
+   * @return Concatenated Uri string
+   */
+  // TODO: this needs to be enhanced once change management based filesystem is implemented
+  // Currently using fileuri#checksum as the format
+  static public String encodeFileUri(String fileUriStr, String fileChecksum) {
+    if (fileChecksum != null) {
+      return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
+    } else {
+      return fileUriStr;
+    }
+  }
+
+  /***
+   * Split uri with fragment into file uri and checksum
+   * @param fileURIStr uri with fragment
+   * @return array of file name and checksum
+   */
+  static public String[] getFileWithChksumFromURI(String fileURIStr) {
+    String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR);
+    String[] result = new String[2];
+    result[0] = uriAndFragment[0];
+    if (uriAndFragment.length>1) {
+      result[1] = uriAndFragment[1];
+    }
+    return result;
+  }
+
   /**
    * Thread to clear old files of cmroot recursively
    */
@@ -231,24 +309,28 @@ public class ReplChangeManager {
         for (FileStatus file : files) {
           long modifiedTime = file.getModificationTime();
           if (now - modifiedTime > secRetain*1000) {
-            if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
-              boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf);
-              if (succ) {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Move " + file.toString() + " to trash");
+            try {
+              if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) {
+                boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf);
+                if (succ) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Move " + file.toString() + " to trash");
+                  }
+                } else {
+                  LOG.warn("Fail to move " + file.toString() + " to trash");
                 }
               } else {
-                LOG.warn("Fail to move " + file.toString() + " to trash");
-              }
-            } else {
-              boolean succ = fs.delete(file.getPath(), false);
-              if (succ) {
-                if (LOG.isDebugEnabled()) {
-                  LOG.debug("Remove " + file.toString());
+                boolean succ = fs.delete(file.getPath(), false);
+                if (succ) {
+                  if (LOG.isDebugEnabled()) {
+                    LOG.debug("Remove " + file.toString());
+                  }
+                } else {
+                  LOG.warn("Fail to remove " + file.toString());
                 }
-              } else {
-                LOG.warn("Fail to remove " + file.toString());
               }
+            } catch (UnsupportedOperationException e) {
+              LOG.warn("Error getting xattr for " + file.getPath().toString());
             }
           }
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index e6b943b..4686e2c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -18,6 +18,8 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
+import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
@@ -126,15 +128,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
       for (FileStatus oneSrc : srcFiles) {
         console.printInfo("Copying file: " + oneSrc.getPath().toString());
         LOG.debug("Copying file: " + oneSrc.getPath().toString());
+
+        FileSystem actualSrcFs = null;
+        if (rwork.getReadListFromInput()){
+          // TODO : filesystemcache prevents this from being a perf nightmare, but we
+          // should still probably follow up to see if we need to do something better here.
+          actualSrcFs = oneSrc.getPath().getFileSystem(conf);
+        } else {
+          actualSrcFs = srcFs;
+        }
         if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){
-          FileSystem actualSrcFs = null;
-          if (rwork.getReadListFromInput()){
-            // TODO : filesystemcache prevents this from being a perf nightmare, but we
-            // should still probably follow up to see if we need to do something better here.
-            actualSrcFs = oneSrc.getPath().getFileSystem(conf);
-          } else {
-            actualSrcFs = srcFs;
-          }
 
           LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath);
           if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath,
@@ -148,7 +151,9 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
         }else{
           LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri());
           console.printInfo("Tracking file: " + oneSrc.getPath().toUri());
-          listBW.write(oneSrc.getPath().toUri().toString() + "\n");
+          String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs);
+          listBW.write(ReplChangeManager.encodeFileUri
+              (oneSrc.getPath().toUri().toString(), chksumString) + "\n");
         }
       }
 
@@ -183,12 +188,16 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
     String line = null;
     while ( (line = br.readLine()) != null){
       LOG.debug("ReplCopyTask :_filesReadLine:" + line);
-      String fileUriStr = EximUtil.getCMDecodedFileName(line);
-      // TODO HIVE-15490: Add checksum validation here
-      Path p = new Path(fileUriStr);
-      // TODO: again, fs cache should make this okay, but if not, revisit
-      FileSystem srcFs = p.getFileSystem(conf);
-      ret.add(srcFs.getFileStatus(p));
+
+      String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line);
+      try {
+        FileStatus f = ReplChangeManager.getFileStatus(new Path(fileWithChksum[0]),
+            fileWithChksum[1], conf);
+        ret.add(f);
+      } catch (MetaException e) {
+        // skip and issue warning for missing file
+        LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot");
+      }
       // Note - we need srcFs rather than fs, because it is possible that the _files lists
files
       // which are from a different filesystem than the fs where the _files file itself was
loaded
       // from. Currently, it is possible, for eg., to do REPL LOAD hdfs://<ip>/dir/
and for the _files

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 34e53d2..796ccc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -78,7 +77,6 @@ public class EximUtil {
   public static final String METADATA_NAME = "_metadata";
   public static final String FILES_NAME = "_files";
   public static final String DATA_PATH_NAME = "data";
-  public static final String URI_FRAGMENT_SEPARATOR = "#";
 
   private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
 
@@ -574,19 +572,4 @@ public class EximUtil {
     };
   }
 
-  public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) {
-    // The checksum is set as the fragment portion of the file uri
-    return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
-  }
-
-  public static String getCMDecodedFileName(String encodedFileURIStr) {
-    String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR);
-    return uriAndFragment[0];
-  }
-
-  public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) {
-    // TODO: Implement this as part of HIVE-15490
-    return null;
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 86b6a6e..2b327db 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.NotificationEvent;
@@ -141,22 +142,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
 
     private final Path dumpRoot;
     private final Path dumpFile;
+    private Path cmRoot;
 
     public DumpMetaData(Path dumpRoot) {
       this.dumpRoot = dumpRoot;
       dumpFile = new Path(dumpRoot, DUMPMETADATA);
     }
 
-    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){
+    public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
       this(dumpRoot);
-      setDump(lvl, eventFrom, eventTo);
+      setDump(lvl, eventFrom, eventTo, cmRoot);
     }
 
-    public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){
+    public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){
       this.dumpType = lvl;
       this.eventFrom = eventFrom;
       this.eventTo = eventTo;
       this.initialized = true;
+      this.cmRoot = cmRoot;
     }
 
     public void loadDumpFromFile() throws SemanticException {
@@ -166,9 +169,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile)));
         String line = null;
         if ( (line = br.readLine()) != null){
-          String[] lineContents = line.split("\t", 4);
-          setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]));
-          setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]);
+          String[] lineContents = line.split("\t", 5);
+          setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]),
+              new Path(lineContents[3]));
+          setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]);
+          ReplChangeManager.setCmRoot(cmRoot);
         } else {
           throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString());
         }
@@ -201,6 +206,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
       return eventTo;
     }
 
+    public Path getCmRoot() {
+      return cmRoot;
+    }
+
+    public void setCmRoot(Path cmRoot) {
+      this.cmRoot = cmRoot;
+    }
+
     public Path getDumpFilePath() {
       return dumpFile;
     }
@@ -217,7 +230,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     }
 
     public void write() throws SemanticException {
-      writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(),
payload), dumpFile);
+      writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(),
+          cmRoot.toString(), payload), dumpFile);
     }
 
   }
@@ -301,6 +315,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR);
     Path dumpRoot = new Path(replRoot, getNextDumpDir());
     DumpMetaData dmd = new DumpMetaData(dumpRoot);
+    Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR));
     Long lastReplId;
     try {
       if (eventFrom == null){
@@ -340,7 +355,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         LOG.info(
             "Consolidation done, preparing to return {},{}->{}",
             dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId);
-        dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId);
+        dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot);
         dmd.write();
 
         // Set the correct last repl id to return to the user
@@ -375,14 +390,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         while (evIter.hasNext()){
           NotificationEvent ev = evIter.next();
           Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId()));
-          dumpEvent(ev, evRoot);
+          dumpEvent(ev, evRoot, cmRoot);
         }
 
         LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo);
         writeOutput(
             Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)),
             dmd.getDumpFilePath());
-        dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo);
+        dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo, cmRoot);
         dmd.write();
         // Set the correct last repl id to return to the user
         lastReplId = eventTo;
@@ -396,7 +411,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     }
   }
 
-  private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception {
+  private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception
{
     long evid = ev.getEventId();
     String evidStr = String.valueOf(evid);
     ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr);
@@ -439,7 +454,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
           }
         }
 
-        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write();
+        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, cmRoot)).write();
         break;
       }
       case MessageFactory.ADD_PARTITION_EVENT : {
@@ -504,19 +519,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
           }
         }
 
-        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write();
+        (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, cmRoot)).write();
         break;
       }
       case MessageFactory.DROP_TABLE_EVENT : {
         LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid);
+        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid,
cmRoot);
         dmd.setPayload(ev.getMessage());
         dmd.write();
         break;
       }
       case MessageFactory.DROP_PARTITION_EVENT : {
         LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid,
evid);
+        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid,
evid, cmRoot);
         dmd.setPayload(ev.getMessage());
         dmd.write();
         break;
@@ -540,12 +555,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
               null,
               replicationSpec);
 
-          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid);
+          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid,
cmRoot);
           dmd.setPayload(ev.getMessage());
           dmd.write();
         } else {
           // rename scenario
-          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid,
evid);
+          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid,
evid, cmRoot);
           dmd.setPayload(ev.getMessage());
           dmd.write();
         }
@@ -582,13 +597,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
               qlMdTable,
               qlPtns,
               replicationSpec);
-          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid,
evid);
+          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid,
evid, cmRoot);
           dmd.setPayload(ev.getMessage());
           dmd.write();
           break;
         } else {
           // rename scenario
-          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid,
evid);
+          DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid,
evid, cmRoot);
           dmd.setPayload(ev.getMessage());
           dmd.write();
           break;
@@ -626,7 +641,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         }
 
         LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid);
+        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid, cmRoot);
         dmd.setPayload(ev.getMessage());
         dmd.write();
         break;
@@ -634,7 +649,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
       // TODO : handle other event types
       default:
         LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage());
-        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid);
+        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid, cmRoot);
         dmd.setPayload(ev.getMessage());
         dmd.write();
         break;


Mime
View raw message