Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id EB1EC200C08 for ; Thu, 26 Jan 2017 21:17:29 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E669A160B50; Thu, 26 Jan 2017 20:17:29 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4C9D4160B31 for ; Thu, 26 Jan 2017 21:17:28 +0100 (CET) Received: (qmail 91017 invoked by uid 500); 26 Jan 2017 20:17:27 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 91006 invoked by uid 99); 26 Jan 2017 20:17:27 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 26 Jan 2017 20:17:27 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 55F63DFA22; Thu, 26 Jan 2017 20:17:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: daijy@apache.org To: commits@hive.apache.org Message-Id: <4349318efc0a4321a5b5ca6c310d6714@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: hive git commit: HIVE-15587: Using ChangeManager to copy files in ReplCopyTask (Daniel Dai, reviewed by Vaibhav Gumashta) Date: Thu, 26 Jan 2017 20:17:27 +0000 (UTC) archived-at: Thu, 26 Jan 2017 20:17:30 -0000 Repository: hive Updated Branches: refs/heads/master 85c103532 -> 318db5a35 HIVE-15587: Using ChangeManager to copy files in ReplCopyTask (Daniel Dai, reviewed by Vaibhav Gumashta) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/318db5a3 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/318db5a3 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/318db5a3 Branch: refs/heads/master Commit: 318db5a3579c7f4039ee24636cd1eeb3b6633bbb Parents: 85c1035 Author: Daniel Dai Authored: Thu Jan 26 12:16:47 2017 -0800 Committer: Daniel Dai Committed: Thu Jan 26 12:16:47 2017 -0800 ---------------------------------------------------------------------- .../listener/DbNotificationListener.java | 16 +- .../hive/metastore/TestReplChangeManager.java | 26 +-- .../hive/ql/TestReplicationScenarios.java | 214 ++++++++++++++++++- .../hive/metastore/ReplChangeManager.java | 124 +++++++++-- .../hadoop/hive/ql/exec/ReplCopyTask.java | 39 ++-- .../apache/hadoop/hive/ql/parse/EximUtil.java | 17 -- .../ql/parse/ReplicationSemanticAnalyzer.java | 57 +++-- 7 files changed, 391 insertions(+), 102 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java ---------------------------------------------------------------------- diff --git a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java index 7524c49..4df2758 100644 --- a/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java +++ b/hcatalog/server-extensions/src/main/java/org/apache/hive/hcatalog/listener/DbNotificationListener.java @@ -23,7 +23,6 @@ import java.util.List; import java.util.concurrent.TimeUnit; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -32,6 +31,7 @@ import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.MetaStoreEventListener; import org.apache.hadoop.hive.metastore.RawStore; import org.apache.hadoop.hive.metastore.RawStoreProxy; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.Function; @@ -59,7 +59,6 @@ import org.apache.hadoop.hive.metastore.events.LoadPartitionDoneEvent; import org.apache.hadoop.hive.metastore.messaging.EventMessage.EventType; import org.apache.hadoop.hive.metastore.messaging.MessageFactory; import org.apache.hadoop.hive.metastore.messaging.PartitionFiles; -import org.apache.hadoop.util.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -223,7 +222,8 @@ public class DbNotificationListener extends MetaStoreEventListener { try { FileStatus file = files[i]; i++; - return buildFileWithChksum(file.getPath(), fs); + return ReplChangeManager.encodeFileUri(file.getPath().toString(), + ReplChangeManager.getChksumString(file.getPath(), fs)); } catch (IOException e) { throw new RuntimeException(e); } @@ -520,16 +520,6 @@ public class DbNotificationListener extends MetaStoreEventListener { } - String buildFileWithChksum(Path p, FileSystem fs) throws IOException { - FileChecksum cksum = fs.getFileChecksum(p); - String chksumString = null; - if (cksum != null) { - chksumString = - StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength()); - } - return encodeFileUri(p.toString(), chksumString); - } - // TODO: this needs to be enhanced once change management based filesystem is implemented // Currently using fileuri#checksum as the format private String encodeFileUri(String fileUriStr, String fileChecksum) { http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java index 205c640..1ac4d01 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/metastore/TestReplChangeManager.java @@ -53,6 +53,7 @@ public class TestReplChangeManager { private static Warehouse warehouse; private static MiniDFSCluster m_dfs; private static String cmroot; + private static FileSystem fs; @BeforeClass public static void setUp() throws Exception { @@ -65,6 +66,7 @@ public class TestReplChangeManager { hiveConf.set(HiveConf.ConfVars.REPLCMDIR.varname, cmroot); hiveConf.setInt(CommonConfigurationKeysPublic.FS_TRASH_INTERVAL_KEY, 60); warehouse = new Warehouse(hiveConf); + fs = new Path(cmroot).getFileSystem(hiveConf); try { client = new HiveMetaStoreClient(hiveConf); } catch (Throwable e) { @@ -151,15 +153,15 @@ public class TestReplChangeManager { Path part1Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160101")), "part"); createFile(part1Path, "p1"); - String path1Chksum = ReplChangeManager.getCksumString(part1Path, hiveConf); + String path1Chksum = ReplChangeManager.getChksumString(part1Path, fs); Path part2Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160102")), "part"); createFile(part2Path, "p2"); - String path2Chksum = ReplChangeManager.getCksumString(part2Path, hiveConf); + String path2Chksum = ReplChangeManager.getChksumString(part2Path, fs); Path part3Path = new Path(warehouse.getPartitionPath(db, tblName, ImmutableMap.of("dt", "20160103")), "part"); createFile(part3Path, "p3"); - String path3Chksum = ReplChangeManager.getCksumString(part3Path, hiveConf); + String path3Chksum = ReplChangeManager.getChksumString(part3Path, fs); Assert.assertTrue(part1Path.getFileSystem(hiveConf).exists(part1Path)); Assert.assertTrue(part2Path.getFileSystem(hiveConf).exists(part2Path)); @@ -221,15 +223,15 @@ public class TestReplChangeManager { Path filePath1 = new Path(warehouse.getTablePath(db, tblName), "part1"); createFile(filePath1, "f1"); - String fileChksum1 = ReplChangeManager.getCksumString(filePath1, hiveConf); + String fileChksum1 = ReplChangeManager.getChksumString(filePath1, fs); Path filePath2 = new Path(warehouse.getTablePath(db, tblName), "part2"); createFile(filePath2, "f2"); - String fileChksum2 = ReplChangeManager.getCksumString(filePath2, hiveConf); + String fileChksum2 = ReplChangeManager.getChksumString(filePath2, fs); Path filePath3 = new Path(warehouse.getTablePath(db, tblName), "part3"); createFile(filePath3, "f3"); - String fileChksum3 = ReplChangeManager.getCksumString(filePath3, hiveConf); + String fileChksum3 = ReplChangeManager.getChksumString(filePath3, fs); Assert.assertTrue(filePath1.getFileSystem(hiveConf).exists(filePath1)); Assert.assertTrue(filePath2.getFileSystem(hiveConf).exists(filePath2)); @@ -267,26 +269,26 @@ public class TestReplChangeManager { fs.mkdirs(dirTbl1); Path part11 = new Path(dirTbl1, "part1"); createFile(part11, "testClearer11"); - String fileChksum11 = ReplChangeManager.getCksumString(part11, hiveConf); + String fileChksum11 = ReplChangeManager.getChksumString(part11, fs); Path part12 = new Path(dirTbl1, "part2"); createFile(part12, "testClearer12"); - String fileChksum12 = ReplChangeManager.getCksumString(part12, hiveConf); + String fileChksum12 = ReplChangeManager.getChksumString(part12, fs); Path dirTbl2 = new Path(dirDb, "tbl2"); fs.mkdirs(dirTbl2); Path part21 = new Path(dirTbl2, "part1"); createFile(part21, "testClearer21"); - String fileChksum21 = ReplChangeManager.getCksumString(part21, hiveConf); + String fileChksum21 = ReplChangeManager.getChksumString(part21, fs); Path part22 = new Path(dirTbl2, "part2"); createFile(part22, "testClearer22"); - String fileChksum22 = ReplChangeManager.getCksumString(part22, hiveConf); + String fileChksum22 = ReplChangeManager.getChksumString(part22, fs); Path dirTbl3 = new Path(dirDb, "tbl3"); fs.mkdirs(dirTbl3); Path part31 = new Path(dirTbl3, "part1"); createFile(part31, "testClearer31"); - String fileChksum31 = ReplChangeManager.getCksumString(part31, hiveConf); + String fileChksum31 = ReplChangeManager.getChksumString(part31, fs); Path part32 = new Path(dirTbl3, "part2"); createFile(part32, "testClearer32"); - String fileChksum32 = ReplChangeManager.getCksumString(part32, hiveConf); + String fileChksum32 = ReplChangeManager.getChksumString(part32, fs); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl1, false); ReplChangeManager.getInstance(hiveConf).recycle(dirTbl2, false); http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java ---------------------------------------------------------------------- diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java index 5be3e9c..7836c47 100644 --- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java +++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java @@ -90,9 +90,11 @@ public class TestReplicationScenarios { WindowsPathUtil.convertPathsFromWindowsToHdfs(hconf); } - System.setProperty(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS.varname, + hconf.setVar(HiveConf.ConfVars.METASTORE_EVENT_LISTENERS, DBNOTIF_LISTENER_CLASSNAME); // turn on db notification listener on metastore - msPort = MetaStoreUtils.startMetaStore(); + hconf.setBoolVar(HiveConf.ConfVars.REPLCMENABLED, true); + hconf.setVar(HiveConf.ConfVars.REPLCMDIR, TEST_PATH + "/cmroot/"); + msPort = MetaStoreUtils.startMetaStore(hconf); hconf.setVar(HiveConf.ConfVars.REPLDIR,TEST_PATH + "/hrepl/"); hconf.setVar(HiveConf.ConfVars.METASTOREURIS, "thrift://localhost:" + msPort); @@ -193,6 +195,87 @@ public class TestReplicationScenarios { } @Test + public void testBasicWithCM() throws Exception { + + String testName = "basic_with_cm"; + LOG.info("Testing "+testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE"); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] ptn_data_2_later = new String[]{ "eighteen", "nineteen", "twenty"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + String ptn_locn_2_later = new Path(TEST_PATH , testName + "_ptn2_later").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + createTestDataFile(ptn_locn_2_later, ptn_data_2_later); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=1)"); + run("SELECT a from " + dbName + ".ptned WHERE b=1"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b=2)"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(ptn_data_2); + run("SELECT a from " + dbName + ".ptned_empty"); + verifyResults(empty); + run("SELECT * from " + dbName + ".unptned_empty"); + verifyResults(empty); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0,0); + String replDumpId = getResult(0,1,true); + + // Table dropped after "repl dump" + run("DROP TABLE " + dbName + ".unptned"); + // Partition droppped after "repl dump" + run("ALTER TABLE " + dbName + ".ptned " + "DROP PARTITION(b=1)"); + // File changed after "repl dump" + Partition p = metaStoreClient.getPartition(dbName, "ptned", "b=2"); + Path loc = new Path(p.getSd().getLocation()); + FileSystem fs = loc.getFileSystem(hconf); + Path file = fs.listStatus(loc)[0].getPath(); + fs.delete(file, false); + fs.copyFromLocalFile(new Path(ptn_locn_2_later), file); + + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + run("REPL STATUS " + dbName + "_dupe"); + verifyResults(new String[] {replDumpId}); + + run("SELECT * from " + dbName + "_dupe.unptned"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=1"); + verifyResults(ptn_data_1); + // Since partition(b=2) changed manually, Hive cannot find + // it in original location and cmroot, thus empty + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + ".ptned_empty"); + verifyResults(empty); + run("SELECT * from " + dbName + ".unptned_empty"); + verifyResults(empty); + } + + @Test public void testIncrementalAdds() throws IOException { String testName = "incrementalAdds"; LOG.info("Testing "+testName); @@ -319,7 +402,6 @@ public class TestReplicationScenarios { run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned3 PARTITION(b=2)"); verifySetup("SELECT a from " + dbName + ".ptned2 WHERE b=2", ptn_data_2); - // At this point, we've set up all the tables and ptns we're going to test drops across // Replicate it first, and then we'll drop it on the source. @@ -393,6 +475,132 @@ public class TestReplicationScenarios { } @Test + public void testDropsWithCM() throws IOException { + + String testName = "drops_with_cm"; + LOG.info("Testing "+testName); + String dbName = testName + "_" + tid; + + run("CREATE DATABASE " + dbName); + run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b string) STORED AS TEXTFILE"); + run("CREATE TABLE " + dbName + ".ptned2(a string) partitioned by (b string) STORED AS TEXTFILE"); + + String[] unptn_data = new String[]{ "eleven" , "twelve" }; + String[] ptn_data_1 = new String[]{ "thirteen", "fourteen", "fifteen"}; + String[] ptn_data_2 = new String[]{ "fifteen", "sixteen", "seventeen"}; + String[] empty = new String[]{}; + + String unptn_locn = new Path(TEST_PATH , testName + "_unptn").toUri().getPath(); + String ptn_locn_1 = new Path(TEST_PATH , testName + "_ptn1").toUri().getPath(); + String ptn_locn_2 = new Path(TEST_PATH , testName + "_ptn2").toUri().getPath(); + + createTestDataFile(unptn_locn, unptn_data); + createTestDataFile(ptn_locn_1, ptn_data_1); + createTestDataFile(ptn_locn_2, ptn_data_2); + + run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned"); + run("SELECT * from " + dbName + ".unptned"); + verifyResults(unptn_data); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='1')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName + ".ptned2 PARTITION(b='2')"); + run("SELECT a from " + dbName + ".ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + advanceDumpDir(); + run("REPL DUMP " + dbName); + String replDumpLocn = getResult(0,0); + String replDumpId = getResult(0,1,true); + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'"); + + run("REPL STATUS " + dbName + "_dupe"); + verifyResults(new String[] {replDumpId}); + + run("SELECT * from " + dbName + "_dupe.unptned"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned WHERE b='2'"); + verifyResults(ptn_data_2); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='1'"); + verifyResults(ptn_data_1); + run("SELECT a from " + dbName + "_dupe.ptned2 WHERE b='2'"); + verifyResults(ptn_data_2); + + run("CREATE TABLE " + dbName + ".unptned_copy" + " AS SELECT a FROM " + dbName + ".unptned"); + run("CREATE TABLE " + dbName + ".ptned_copy" + " LIKE " + dbName + ".ptned"); + run("INSERT INTO TABLE " + dbName + ".ptned_copy" + " PARTITION(b='1') SELECT a FROM " + + dbName + ".ptned WHERE b='1'"); + run("SELECT a from " + dbName + ".unptned_copy"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + ".ptned_copy"); + verifyResults(ptn_data_1); + + run("DROP TABLE " + dbName + ".unptned"); + run("ALTER TABLE " + dbName + ".ptned DROP PARTITION (b='2')"); + run("DROP TABLE " + dbName + ".ptned2"); + run("SELECT a from " + dbName + ".ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + ".ptned"); + verifyResults(ptn_data_1); + + advanceDumpDir(); + run("REPL DUMP " + dbName + " FROM " + replDumpId); + String postDropReplDumpLocn = getResult(0,0); + String postDropReplDumpId = getResult(0,1,true); + LOG.info("Dumped to {} with id {}->{}", postDropReplDumpLocn, replDumpId, postDropReplDumpId); + + // Drop table after dump + run("DROP TABLE " + dbName + ".unptned_copy"); + // Drop partition after dump + run("ALTER TABLE " + dbName + ".ptned_copy DROP PARTITION(b='1')"); + + run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + printOutput(); + run("REPL LOAD " + dbName + "_dupe FROM '" + postDropReplDumpLocn + "'"); + + Exception e = null; + try { + Table tbl = metaStoreClient.getTable(dbName + "_dupe", "unptned"); + assertNull(tbl); + } catch (TException te) { + e = te; + } + assertNotNull(e); + assertEquals(NoSuchObjectException.class, e.getClass()); + + run("SELECT a from " + dbName + "_dupe.ptned WHERE b=2"); + verifyResults(empty); + run("SELECT a from " + dbName + "_dupe.ptned"); + verifyResults(ptn_data_1); + + Exception e2 = null; + try { + Table tbl = metaStoreClient.getTable(dbName+"_dupe","ptned2"); + assertNull(tbl); + } catch (TException te) { + e2 = te; + } + assertNotNull(e2); + assertEquals(NoSuchObjectException.class, e.getClass()); + + run("SELECT a from " + dbName + "_dupe.unptned_copy"); + verifyResults(unptn_data); + run("SELECT a from " + dbName + "_dupe.ptned_copy"); + verifyResults(ptn_data_1); + } + + @Test public void testAlters() throws IOException { String testName = "alters"; http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java index 99cba9d..51e4627 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/ReplChangeManager.java @@ -55,6 +55,7 @@ public class ReplChangeManager { public static final String ORIG_LOC_TAG = "user.original-loc"; public static final String REMAIN_IN_TRASH_TAG = "user.remain-in-trash"; + public static final String URI_FRAGMENT_SEPARATOR = "#"; public static ReplChangeManager getInstance(HiveConf hiveConf) throws MetaException { if (instance == null) { @@ -121,7 +122,7 @@ public class ReplChangeManager { count += recycle(file.getPath(), ifPurge); } } else { - Path cmPath = getCMPath(path, hiveConf, getCksumString(path, hiveConf)); + Path cmPath = getCMPath(path, hiveConf, getChksumString(path, fs)); if (LOG.isDebugEnabled()) { LOG.debug("Moving " + path.toString() + " to " + cmPath.toString()); @@ -151,7 +152,11 @@ public class ReplChangeManager { // Note we currently only track the last known trace as // xattr has limited capacity. We shall revisit and store all original // locations if orig-loc becomes important - fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + try { + fs.setXAttr(cmPath, ORIG_LOC_TAG, path.toString().getBytes()); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for " + path.toString()); + } count++; } @@ -159,7 +164,11 @@ public class ReplChangeManager { // If multiple files share the same content, then // any file claim remain in trash would be granted if (!ifPurge) { - fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); + try { + fs.setXAttr(cmPath, REMAIN_IN_TRASH_TAG, new byte[]{0}); + } catch (UnsupportedOperationException e) { + LOG.warn("Error setting xattr for " + cmPath.toString()); + } } } return count; @@ -169,16 +178,22 @@ public class ReplChangeManager { } // Get checksum of a file - static public String getCksumString(Path path, Configuration conf) throws IOException { + static public String getChksumString(Path path, FileSystem fs) throws IOException { // TODO: fs checksum only available on hdfs, need to // find a solution for other fs (eg, local fs, s3, etc) - FileSystem fs = path.getFileSystem(conf); + String checksumString = null; FileChecksum checksum = fs.getFileChecksum(path); - String checksumString = StringUtils.byteToHexString( - checksum.getBytes(), 0, checksum.getLength()); + if (checksum != null) { + checksumString = StringUtils.byteToHexString( + checksum.getBytes(), 0, checksum.getLength()); + } return checksumString; } + static public void setCmRoot(Path cmRoot) { + ReplChangeManager.cmroot = cmRoot; + } + /*** * Convert a path of file inside a partition or table (if non-partitioned) * to a deterministic location of cmroot. So user can retrieve the file back @@ -205,6 +220,69 @@ public class ReplChangeManager { return cmPath; } + /*** + * Get original file specified by src and chksumString. If the file exists and checksum + * matches, return the file; otherwise, use chksumString to retrieve it from cmroot + * @param src Original file location + * @param chksumString Checksum of the original file + * @param conf + * @return Corresponding FileStatus object + * @throws MetaException + */ + static public FileStatus getFileStatus(Path src, String chksumString, + HiveConf conf) throws MetaException { + try { + FileSystem srcFs = src.getFileSystem(conf); + if (chksumString == null) { + return srcFs.getFileStatus(src); + } + + if (!srcFs.exists(src)) { + return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + } + + String currentChksumString = getChksumString(src, srcFs); + if (currentChksumString == null || chksumString.equals(currentChksumString)) { + return srcFs.getFileStatus(src); + } else { + return srcFs.getFileStatus(getCMPath(src, conf, chksumString)); + } + } catch (IOException e) { + throw new MetaException(StringUtils.stringifyException(e)); + } + } + + /*** + * Concatenate filename and checksum with "#" + * @param fileUriStr Filename string + * @param fileChecksum Checksum string + * @return Concatenated Uri string + */ + // TODO: this needs to be enhanced once change management based filesystem is implemented + // Currently using fileuri#checksum as the format + static public String encodeFileUri(String fileUriStr, String fileChecksum) { + if (fileChecksum != null) { + return fileUriStr + URI_FRAGMENT_SEPARATOR + fileChecksum; + } else { + return fileUriStr; + } + } + + /*** + * Split uri with fragment into file uri and checksum + * @param fileURIStr uri with fragment + * @return array of file name and checksum + */ + static public String[] getFileWithChksumFromURI(String fileURIStr) { + String[] uriAndFragment = fileURIStr.split(URI_FRAGMENT_SEPARATOR); + String[] result = new String[2]; + result[0] = uriAndFragment[0]; + if (uriAndFragment.length>1) { + result[1] = uriAndFragment[1]; + } + return result; + } + /** * Thread to clear old files of cmroot recursively */ @@ -231,24 +309,28 @@ public class ReplChangeManager { for (FileStatus file : files) { long modifiedTime = file.getModificationTime(); if (now - modifiedTime > secRetain*1000) { - if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { - boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Move " + file.toString() + " to trash"); + try { + if (fs.getXAttrs(file.getPath()).containsKey(REMAIN_IN_TRASH_TAG)) { + boolean succ = Trash.moveToAppropriateTrash(fs, file.getPath(), hiveConf); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Move " + file.toString() + " to trash"); + } + } else { + LOG.warn("Fail to move " + file.toString() + " to trash"); } } else { - LOG.warn("Fail to move " + file.toString() + " to trash"); - } - } else { - boolean succ = fs.delete(file.getPath(), false); - if (succ) { - if (LOG.isDebugEnabled()) { - LOG.debug("Remove " + file.toString()); + boolean succ = fs.delete(file.getPath(), false); + if (succ) { + if (LOG.isDebugEnabled()) { + LOG.debug("Remove " + file.toString()); + } + } else { + LOG.warn("Fail to remove " + file.toString()); } - } else { - LOG.warn("Fail to remove " + file.toString()); } + } catch (UnsupportedOperationException e) { + LOG.warn("Error getting xattr for " + file.getPath().toString()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java index e6b943b..4686e2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java @@ -18,6 +18,8 @@ package org.apache.hadoop.hive.ql.exec; +import org.apache.hadoop.hive.metastore.ReplChangeManager; +import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.ql.parse.EximUtil; import org.apache.hadoop.hive.ql.parse.ReplicationSpec; import org.apache.hadoop.hive.ql.plan.CopyWork; @@ -126,15 +128,16 @@ public class ReplCopyTask extends Task implements Serializable { for (FileStatus oneSrc : srcFiles) { console.printInfo("Copying file: " + oneSrc.getPath().toString()); LOG.debug("Copying file: " + oneSrc.getPath().toString()); + + FileSystem actualSrcFs = null; + if (rwork.getReadListFromInput()){ + // TODO : filesystemcache prevents this from being a perf nightmare, but we + // should still probably follow up to see if we need to do something better here. + actualSrcFs = oneSrc.getPath().getFileSystem(conf); + } else { + actualSrcFs = srcFs; + } if (!rwork.getListFilesOnOutputBehaviour(oneSrc)){ - FileSystem actualSrcFs = null; - if (rwork.getReadListFromInput()){ - // TODO : filesystemcache prevents this from being a perf nightmare, but we - // should still probably follow up to see if we need to do something better here. - actualSrcFs = oneSrc.getPath().getFileSystem(conf); - } else { - actualSrcFs = srcFs; - } LOG.debug("ReplCopyTask :cp:" + oneSrc.getPath() + "=>" + toPath); if (!FileUtils.copy(actualSrcFs, oneSrc.getPath(), dstFs, toPath, @@ -148,7 +151,9 @@ public class ReplCopyTask extends Task implements Serializable { }else{ LOG.debug("ReplCopyTask _files now tracks:" + oneSrc.getPath().toUri()); console.printInfo("Tracking file: " + oneSrc.getPath().toUri()); - listBW.write(oneSrc.getPath().toUri().toString() + "\n"); + String chksumString = ReplChangeManager.getChksumString(oneSrc.getPath(), actualSrcFs); + listBW.write(ReplChangeManager.encodeFileUri + (oneSrc.getPath().toUri().toString(), chksumString) + "\n"); } } @@ -183,12 +188,16 @@ public class ReplCopyTask extends Task implements Serializable { String line = null; while ( (line = br.readLine()) != null){ LOG.debug("ReplCopyTask :_filesReadLine:" + line); - String fileUriStr = EximUtil.getCMDecodedFileName(line); - // TODO HIVE-15490: Add checksum validation here - Path p = new Path(fileUriStr); - // TODO: again, fs cache should make this okay, but if not, revisit - FileSystem srcFs = p.getFileSystem(conf); - ret.add(srcFs.getFileStatus(p)); + + String[] fileWithChksum = ReplChangeManager.getFileWithChksumFromURI(line); + try { + FileStatus f = ReplChangeManager.getFileStatus(new Path(fileWithChksum[0]), + fileWithChksum[1], conf); + ret.add(f); + } catch (MetaException e) { + // skip and issue warning for missing file + LOG.warn("Cannot find " + fileWithChksum[0] + " in source repo or cmroot"); + } // Note - we need srcFs rather than fs, because it is possible that the _files lists files // which are from a different filesystem than the fs where the _files file itself was loaded // from. Currently, it is possible, for eg., to do REPL LOAD hdfs:///dir/ and for the _files http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java index 34e53d2..796ccc8 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java @@ -31,7 +31,6 @@ import org.apache.hadoop.hive.ql.metadata.Hive; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileChecksum; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; @@ -78,7 +77,6 @@ public class EximUtil { public static final String METADATA_NAME = "_metadata"; public static final String FILES_NAME = "_files"; public static final String DATA_PATH_NAME = "data"; - public static final String URI_FRAGMENT_SEPARATOR = "#"; private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class); @@ -574,19 +572,4 @@ public class EximUtil { }; } - public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) { - // The checksum is set as the fragment portion of the file uri - return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum; - } - - public static String getCMDecodedFileName(String encodedFileURIStr) { - String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR); - return uriAndFragment[0]; - } - - public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) { - // TODO: Implement this as part of HIVE-15490 - return null; - } - } http://git-wip-us.apache.org/repos/asf/hive/blob/318db5a3/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java index 86b6a6e..2b327db 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java @@ -27,6 +27,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.IMetaStoreClient; +import org.apache.hadoop.hive.metastore.ReplChangeManager; import org.apache.hadoop.hive.metastore.api.Database; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.NotificationEvent; @@ -141,22 +142,24 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { private final Path dumpRoot; private final Path dumpFile; + private Path cmRoot; public DumpMetaData(Path dumpRoot) { this.dumpRoot = dumpRoot; dumpFile = new Path(dumpRoot, DUMPMETADATA); } - public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo){ + public DumpMetaData(Path dumpRoot, DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ this(dumpRoot); - setDump(lvl, eventFrom, eventTo); + setDump(lvl, eventFrom, eventTo, cmRoot); } - public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo){ + public void setDump(DUMPTYPE lvl, Long eventFrom, Long eventTo, Path cmRoot){ this.dumpType = lvl; this.eventFrom = eventFrom; this.eventTo = eventTo; this.initialized = true; + this.cmRoot = cmRoot; } public void loadDumpFromFile() throws SemanticException { @@ -166,9 +169,11 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { BufferedReader br = new BufferedReader(new InputStreamReader(fs.open(dumpFile))); String line = null; if ( (line = br.readLine()) != null){ - String[] lineContents = line.split("\t", 4); - setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2])); - setPayload(lineContents[3].equals(Utilities.nullStringOutput) ? null : lineContents[3]); + String[] lineContents = line.split("\t", 5); + setDump(DUMPTYPE.valueOf(lineContents[0]), Long.valueOf(lineContents[1]), Long.valueOf(lineContents[2]), + new Path(lineContents[3])); + setPayload(lineContents[4].equals(Utilities.nullStringOutput) ? null : lineContents[4]); + ReplChangeManager.setCmRoot(cmRoot); } else { throw new IOException("Unable to read valid values from dumpFile:"+dumpFile.toUri().toString()); } @@ -201,6 +206,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { return eventTo; } + public Path getCmRoot() { + return cmRoot; + } + + public void setCmRoot(Path cmRoot) { + this.cmRoot = cmRoot; + } + public Path getDumpFilePath() { return dumpFile; } @@ -217,7 +230,8 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } public void write() throws SemanticException { - writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), payload), dumpFile); + writeOutput(Arrays.asList(dumpType.toString(), eventFrom.toString(), eventTo.toString(), + cmRoot.toString(), payload), dumpFile); } } @@ -301,6 +315,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { String replRoot = conf.getVar(HiveConf.ConfVars.REPLDIR); Path dumpRoot = new Path(replRoot, getNextDumpDir()); DumpMetaData dmd = new DumpMetaData(dumpRoot); + Path cmRoot = new Path(conf.getVar(HiveConf.ConfVars.REPLCMDIR)); Long lastReplId; try { if (eventFrom == null){ @@ -340,7 +355,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { LOG.info( "Consolidation done, preparing to return {},{}->{}", dumpRoot.toUri(), bootDumpBeginReplId, bootDumpEndReplId); - dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId); + dmd.setDump(DUMPTYPE.BOOTSTRAP, bootDumpBeginReplId, bootDumpEndReplId, cmRoot); dmd.write(); // Set the correct last repl id to return to the user @@ -375,14 +390,14 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { while (evIter.hasNext()){ NotificationEvent ev = evIter.next(); Path evRoot = new Path(dumpRoot, String.valueOf(ev.getEventId())); - dumpEvent(ev, evRoot); + dumpEvent(ev, evRoot, cmRoot); } LOG.info("Done dumping events, preparing to return {},{}", dumpRoot.toUri(), eventTo); writeOutput( Arrays.asList("incremental", String.valueOf(eventFrom), String.valueOf(eventTo)), dmd.getDumpFilePath()); - dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo); + dmd.setDump(DUMPTYPE.INCREMENTAL, eventFrom, eventTo, cmRoot); dmd.write(); // Set the correct last repl id to return to the user lastReplId = eventTo; @@ -396,7 +411,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - private void dumpEvent(NotificationEvent ev, Path evRoot) throws Exception { + private void dumpEvent(NotificationEvent ev, Path evRoot, Path cmRoot) throws Exception { long evid = ev.getEventId(); String evidStr = String.valueOf(evid); ReplicationSpec replicationSpec = getNewEventOnlyReplicationSpec(evidStr); @@ -439,7 +454,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_CREATE_TABLE, evid, evid, cmRoot)).write(); break; } case MessageFactory.ADD_PARTITION_EVENT : { @@ -504,19 +519,19 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } } - (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid)).write(); + (new DumpMetaData(evRoot, DUMPTYPE.EVENT_ADD_PARTITION, evid, evid, cmRoot)).write(); break; } case MessageFactory.DROP_TABLE_EVENT : { LOG.info("Processing#{} DROP_TABLE message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; } case MessageFactory.DROP_PARTITION_EVENT : { LOG.info("Processing#{} DROP_PARTITION message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_DROP_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -540,12 +555,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { null, replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_TABLE, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); } @@ -582,13 +597,13 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { qlMdTable, qlPtns, replicationSpec); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_ALTER_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; } else { // rename scenario - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_RENAME_PARTITION, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -626,7 +641,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { } LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break; @@ -634,7 +649,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer { // TODO : handle other event types default: LOG.info("Dummy processing#{} message : {}", ev.getEventId(), ev.getMessage()); - DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid); + DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_UNKNOWN, evid, evid, cmRoot); dmd.setPayload(ev.getMessage()); dmd.write(); break;