hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vgumas...@apache.org
Subject hive git commit: HIVE-15366: REPL LOAD & DUMP support for incremental INSERT events (Vaibhav Gumashta reviewed by Sushanth Sowmyan)
Date Mon, 09 Jan 2017 21:06:37 GMT
Repository: hive
Updated Branches:
  refs/heads/master 01e691c5c -> 2f501a8a0


HIVE-15366: REPL LOAD & DUMP support for incremental INSERT events (Vaibhav Gumashta reviewed
by Sushanth Sowmyan)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/2f501a8a
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/2f501a8a
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/2f501a8a

Branch: refs/heads/master
Commit: 2f501a8a024bf25701f97f4621ceda9b080be95d
Parents: 01e691c
Author: Vaibhav Gumashta <vgumashta@hortonworks.com>
Authored: Mon Jan 9 13:05:47 2017 -0800
Committer: Vaibhav Gumashta <vgumashta@hortonworks.com>
Committed: Mon Jan 9 13:05:47 2017 -0800

----------------------------------------------------------------------
 .../listener/TestDbNotificationListener.java    | 27 ++----
 .../hive/ql/TestReplicationScenarios.java       | 92 ++++++++++++++++++++
 metastore/if/hive_metastore.thrift              |  4 +-
 .../gen/thrift/gen-cpp/hive_metastore_types.cpp |  4 +-
 .../metastore/api/InsertEventRequestData.java   | 40 ++++-----
 .../gen/thrift/gen-rb/hive_metastore_types.rb   |  2 +-
 .../hive/metastore/events/InsertEvent.java      |  5 +-
 .../hive/metastore/messaging/InsertMessage.java |  4 +-
 .../metastore/messaging/MessageFactory.java     |  3 +-
 .../messaging/json/JSONInsertMessage.java       | 27 +++---
 .../messaging/json/JSONMessageFactory.java      |  2 +-
 .../hadoop/hive/ql/exec/ReplCopyTask.java       | 14 +--
 .../apache/hadoop/hive/ql/metadata/Hive.java    |  9 +-
 .../apache/hadoop/hive/ql/parse/EximUtil.java   | 30 +++++--
 .../hive/ql/parse/ExportSemanticAnalyzer.java   |  4 +-
 .../hive/ql/parse/ImportSemanticAnalyzer.java   |  4 +-
 .../ql/parse/ReplicationSemanticAnalyzer.java   | 47 +++++++++-
 17 files changed, 230 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
----------------------------------------------------------------------
diff --git a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
index 39356ae..4eabb24 100644
--- a/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
+++ b/itests/hcatalog-unit/src/test/java/org/apache/hive/hcatalog/listener/TestDbNotificationListener.java
@@ -913,7 +913,7 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, event.getDbName());
     assertEquals(tblName, event.getTableName());
     // Parse the message field
-    verifyInsertJSON(event, defaultDbName, tblName, false);
+    verifyInsertJSON(event, defaultDbName, tblName);
   }
 
   @Test
@@ -967,7 +967,7 @@ public class TestDbNotificationListener {
     assertEquals(defaultDbName, event.getDbName());
     assertEquals(tblName, event.getTableName());
     // Parse the message field
-    verifyInsertJSON(event, defaultDbName, tblName, false);
+    verifyInsertJSON(event, defaultDbName, tblName);
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
     LinkedHashMap<String, String> partKeyValsFromNotif =
         JSONMessageFactory.getAsMap((ObjectNode) jsonTree.get("partKeyVals"),
@@ -1057,7 +1057,7 @@ public class TestDbNotificationListener {
     assertEquals(firstEventId + 3, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, defaultDbName, tblName, true);
+    verifyInsertJSON(event, defaultDbName, tblName);
 
     event = rsp.getEvents().get(4);
     assertEquals(firstEventId + 5, event.getEventId());
@@ -1090,7 +1090,7 @@ public class TestDbNotificationListener {
     assertEquals(firstEventId + 3, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, null, sourceTblName, true);
+    verifyInsertJSON(event, null, sourceTblName);
 
     event = rsp.getEvents().get(4);
     assertEquals(firstEventId + 5, event.getEventId());
@@ -1165,13 +1165,13 @@ public class TestDbNotificationListener {
     assertEquals(firstEventId + 4, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, null, tblName, true);
+    verifyInsertJSON(event, null, tblName);
 
     event = rsp.getEvents().get(6);
     assertEquals(firstEventId + 7, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, null, tblName, true);
+    verifyInsertJSON(event, null, tblName);
 
     event = rsp.getEvents().get(9);
     assertEquals(firstEventId + 10, event.getEventId());
@@ -1181,13 +1181,13 @@ public class TestDbNotificationListener {
     assertEquals(firstEventId + 11, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, null, tblName, true);
+    verifyInsertJSON(event, null, tblName);
 
     event = rsp.getEvents().get(13);
     assertEquals(firstEventId + 14, event.getEventId());
     assertEquals(EventType.INSERT.toString(), event.getEventType());
     // Parse the message field
-    verifyInsertJSON(event, null, tblName, true);
+    verifyInsertJSON(event, null, tblName);
 
     event = rsp.getEvents().get(16);
     assertEquals(firstEventId + 17, event.getEventId());
@@ -1223,8 +1223,7 @@ public class TestDbNotificationListener {
     assertTrue(event.getMessage().matches(".*\"ds\":\"todaytwo\".*"));
   }
 
-  private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName,
-      boolean verifyChecksums) throws Exception {
+  private void verifyInsertJSON(NotificationEvent event, String dbName, String tblName) throws
Exception {
     // Parse the message field
     ObjectNode jsonTree = JSONMessageFactory.getJsonTree(event);
     System.out.println("JSONInsertMessage: " + jsonTree.toString());
@@ -1239,14 +1238,6 @@ public class TestDbNotificationListener {
     List<String> files =
         JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("files"), new ArrayList<String>());
     assertTrue(files.size() > 0);
-    if (verifyChecksums) {
-      // Should have list of file checksums
-      List<String> fileChecksums =
-          JSONMessageFactory.getAsList((ArrayNode) jsonTree.get("fileChecksums"),
-              new ArrayList<String>());
-      assertTrue(fileChecksums.size() > 0);
-
-    }
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
----------------------------------------------------------------------
diff --git a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
index e29aa22..6b86080 100644
--- a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
+++ b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestReplicationScenarios.java
@@ -592,6 +592,98 @@ public class TestReplicationScenarios {
     verifyResults(ptn_data_2);
   }
 
+  @Test
+  public void testIncrementalInserts() throws IOException {
+    String testName = "incrementalInserts";
+    LOG.info("Testing " + testName);
+    String dbName = testName + "_" + tid;
+
+    run("CREATE DATABASE " + dbName);
+
+    run("CREATE TABLE " + dbName + ".unptned(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".ptned(a string) partitioned by (b int) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName + ".unptned_empty(a string) STORED AS TEXTFILE");
+    run("CREATE TABLE " + dbName
+        + ".ptned_empty(a string) partitioned by (b int) STORED AS TEXTFILE");
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName);
+    String replDumpLocn = getResult(0, 0);
+    String replDumpId = getResult(0, 1, true);
+    LOG.info("Dumped to {} with id {}", replDumpLocn, replDumpId);
+    run("REPL LOAD " + dbName + "_dupe FROM '" + replDumpLocn + "'");
+
+    String[] unptn_data = new String[] { "eleven", "twelve" };
+    String[] ptn_data_1 = new String[] { "thirteen", "fourteen", "fifteen" };
+    String[] ptn_data_2 = new String[] { "fifteen", "sixteen", "seventeen" };
+    String[] empty = new String[] {};
+
+    String unptn_locn = new Path(TEST_PATH, testName + "_unptn").toUri().getPath();
+    String ptn_locn_1 = new Path(TEST_PATH, testName + "_ptn1").toUri().getPath();
+    String ptn_locn_2 = new Path(TEST_PATH, testName + "_ptn2").toUri().getPath();
+
+    createTestDataFile(unptn_locn, unptn_data);
+    createTestDataFile(ptn_locn_1, ptn_data_1);
+    createTestDataFile(ptn_locn_2, ptn_data_2);
+
+    run("SELECT a from " + dbName + ".ptned_empty");
+    verifyResults(empty);
+    run("SELECT * from " + dbName + ".unptned_empty");
+    verifyResults(empty);
+
+    run("LOAD DATA LOCAL INPATH '" + unptn_locn + "' OVERWRITE INTO TABLE " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned");
+    verifyResults(unptn_data);
+    run("CREATE TABLE " + dbName + ".unptned_late LIKE " + dbName + ".unptned");
+    run("INSERT INTO TABLE " + dbName + ".unptned_late SELECT * FROM " + dbName + ".unptned");
+    run("SELECT * from " + dbName + ".unptned_late");
+    verifyResults(unptn_data);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    String incrementalDumpLocn = getResult(0, 0);
+    String incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    run("SELECT * from " + dbName + "_dupe.unptned_late");
+    verifyResults(unptn_data);
+
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_1 + "' OVERWRITE INTO TABLE " + dbName
+        + ".ptned PARTITION(b=1)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("LOAD DATA LOCAL INPATH '" + ptn_locn_2 + "' OVERWRITE INTO TABLE " + dbName
+        + ".ptned PARTITION(b=2)");
+    run("SELECT a from " + dbName + ".ptned WHERE b=2");
+    verifyResults(ptn_data_2);
+
+    run("CREATE TABLE " + dbName
+        + ".ptned_late(a string) PARTITIONED BY (b int) STORED AS TEXTFILE");
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=1) SELECT a FROM " + dbName
+        + ".ptned WHERE b=1");
+    run("SELECT a from " + dbName + ".ptned_late WHERE b=1");
+    verifyResults(ptn_data_1);
+
+    run("INSERT INTO TABLE " + dbName + ".ptned_late PARTITION(b=2) SELECT a FROM " + dbName
+        + ".ptned WHERE b=2");
+    run("SELECT a from " + dbName + ".ptned_late WHERE b=2");
+    verifyResults(ptn_data_2);
+
+    advanceDumpDir();
+    run("REPL DUMP " + dbName + " FROM " + replDumpId);
+    incrementalDumpLocn = getResult(0, 0);
+    incrementalDumpId = getResult(0, 1, true);
+    LOG.info("Dumped to {} with id {}", incrementalDumpLocn, incrementalDumpId);
+    run("EXPLAIN REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    printOutput();
+    run("REPL LOAD " + dbName + "_dupe FROM '" + incrementalDumpLocn + "'");
+    run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=1");
+    verifyResults(ptn_data_1);
+    run("SELECT a from " + dbName + "_dupe.ptned_late WHERE b=2");
+    verifyResults(ptn_data_2);
+  }
 
   private String getResult(int rowNum, int colNum) throws IOException {
     return getResult(rowNum,colNum,false);

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/if/hive_metastore.thrift
----------------------------------------------------------------------
diff --git a/metastore/if/hive_metastore.thrift b/metastore/if/hive_metastore.thrift
index 79592ea..bf80455 100755
--- a/metastore/if/hive_metastore.thrift
+++ b/metastore/if/hive_metastore.thrift
@@ -812,8 +812,8 @@ struct CurrentNotificationEventId {
 
 struct InsertEventRequestData {
     1: required list<string> filesAdded,
-    // Checksum of files (UTF8 encoded string) added during this insert event (at the time
they were added)
-    2: optional list<binary> filesAddedChecksum,
+    // Checksum of files (hex string of checksum byte payload)
+    2: optional list<string> filesAddedChecksum,
 }
 
 union FireEventRequestData {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
index 1311b20..d605049 100644
--- a/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
+++ b/metastore/src/gen/thrift/gen-cpp/hive_metastore_types.cpp
@@ -16166,7 +16166,7 @@ uint32_t InsertEventRequestData::read(::apache::thrift::protocol::TProtocol*
ipr
             uint32_t _i660;
             for (_i660 = 0; _i660 < _size656; ++_i660)
             {
-              xfer += iprot->readBinary(this->filesAddedChecksum[_i660]);
+              xfer += iprot->readString(this->filesAddedChecksum[_i660]);
             }
             xfer += iprot->readListEnd();
           }
@@ -16213,7 +16213,7 @@ uint32_t InsertEventRequestData::write(::apache::thrift::protocol::TProtocol*
op
       std::vector<std::string> ::const_iterator _iter662;
       for (_iter662 = this->filesAddedChecksum.begin(); _iter662 != this->filesAddedChecksum.end();
++_iter662)
       {
-        xfer += oprot->writeBinary((*_iter662));
+        xfer += oprot->writeString((*_iter662));
       }
       xfer += oprot->writeListEnd();
     }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
index 39a607d..fd1dc06 100644
--- a/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
+++ b/metastore/src/gen/thrift/gen-javabean/org/apache/hadoop/hive/metastore/api/InsertEventRequestData.java
@@ -48,7 +48,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
   }
 
   private List<String> filesAdded; // required
-  private List<ByteBuffer> filesAddedChecksum; // optional
+  private List<String> filesAddedChecksum; // optional
 
   /** The set of fields this struct contains, along with convenience methods for finding
and manipulating them. */
   public enum _Fields implements org.apache.thrift.TFieldIdEnum {
@@ -121,7 +121,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
             new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
     tmpMap.put(_Fields.FILES_ADDED_CHECKSUM, new org.apache.thrift.meta_data.FieldMetaData("filesAddedChecksum",
org.apache.thrift.TFieldRequirementType.OPTIONAL, 
         new org.apache.thrift.meta_data.ListMetaData(org.apache.thrift.protocol.TType.LIST,

-            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING
           , true))));
+            new org.apache.thrift.meta_data.FieldValueMetaData(org.apache.thrift.protocol.TType.STRING))));
     metaDataMap = Collections.unmodifiableMap(tmpMap);
     org.apache.thrift.meta_data.FieldMetaData.addStructMetaDataMap(InsertEventRequestData.class,
metaDataMap);
   }
@@ -145,7 +145,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       this.filesAdded = __this__filesAdded;
     }
     if (other.isSetFilesAddedChecksum()) {
-      List<ByteBuffer> __this__filesAddedChecksum = new ArrayList<ByteBuffer>(other.filesAddedChecksum);
+      List<String> __this__filesAddedChecksum = new ArrayList<String>(other.filesAddedChecksum);
       this.filesAddedChecksum = __this__filesAddedChecksum;
     }
   }
@@ -202,22 +202,22 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
     return (this.filesAddedChecksum == null) ? 0 : this.filesAddedChecksum.size();
   }
 
-  public java.util.Iterator<ByteBuffer> getFilesAddedChecksumIterator() {
+  public java.util.Iterator<String> getFilesAddedChecksumIterator() {
     return (this.filesAddedChecksum == null) ? null : this.filesAddedChecksum.iterator();
   }
 
-  public void addToFilesAddedChecksum(ByteBuffer elem) {
+  public void addToFilesAddedChecksum(String elem) {
     if (this.filesAddedChecksum == null) {
-      this.filesAddedChecksum = new ArrayList<ByteBuffer>();
+      this.filesAddedChecksum = new ArrayList<String>();
     }
     this.filesAddedChecksum.add(elem);
   }
 
-  public List<ByteBuffer> getFilesAddedChecksum() {
+  public List<String> getFilesAddedChecksum() {
     return this.filesAddedChecksum;
   }
 
-  public void setFilesAddedChecksum(List<ByteBuffer> filesAddedChecksum) {
+  public void setFilesAddedChecksum(List<String> filesAddedChecksum) {
     this.filesAddedChecksum = filesAddedChecksum;
   }
 
@@ -250,7 +250,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       if (value == null) {
         unsetFilesAddedChecksum();
       } else {
-        setFilesAddedChecksum((List<ByteBuffer>)value);
+        setFilesAddedChecksum((List<String>)value);
       }
       break;
 
@@ -396,7 +396,7 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       if (this.filesAddedChecksum == null) {
         sb.append("null");
       } else {
-        org.apache.thrift.TBaseHelper.toString(this.filesAddedChecksum, sb);
+        sb.append(this.filesAddedChecksum);
       }
       first = false;
     }
@@ -469,11 +469,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
             if (schemeField.type == org.apache.thrift.protocol.TType.LIST) {
               {
                 org.apache.thrift.protocol.TList _list561 = iprot.readListBegin();
-                struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list561.size);
-                ByteBuffer _elem562;
+                struct.filesAddedChecksum = new ArrayList<String>(_list561.size);
+                String _elem562;
                 for (int _i563 = 0; _i563 < _list561.size; ++_i563)
                 {
-                  _elem562 = iprot.readBinary();
+                  _elem562 = iprot.readString();
                   struct.filesAddedChecksum.add(_elem562);
                 }
                 iprot.readListEnd();
@@ -513,9 +513,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
           oprot.writeFieldBegin(FILES_ADDED_CHECKSUM_FIELD_DESC);
           {
             oprot.writeListBegin(new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING,
struct.filesAddedChecksum.size()));
-            for (ByteBuffer _iter565 : struct.filesAddedChecksum)
+            for (String _iter565 : struct.filesAddedChecksum)
             {
-              oprot.writeBinary(_iter565);
+              oprot.writeString(_iter565);
             }
             oprot.writeListEnd();
           }
@@ -554,9 +554,9 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       if (struct.isSetFilesAddedChecksum()) {
         {
           oprot.writeI32(struct.filesAddedChecksum.size());
-          for (ByteBuffer _iter567 : struct.filesAddedChecksum)
+          for (String _iter567 : struct.filesAddedChecksum)
           {
-            oprot.writeBinary(_iter567);
+            oprot.writeString(_iter567);
           }
         }
       }
@@ -580,11 +580,11 @@ public class InsertEventRequestData implements org.apache.thrift.TBase<InsertEve
       if (incoming.get(0)) {
         {
           org.apache.thrift.protocol.TList _list571 = new org.apache.thrift.protocol.TList(org.apache.thrift.protocol.TType.STRING,
iprot.readI32());
-          struct.filesAddedChecksum = new ArrayList<ByteBuffer>(_list571.size);
-          ByteBuffer _elem572;
+          struct.filesAddedChecksum = new ArrayList<String>(_list571.size);
+          String _elem572;
           for (int _i573 = 0; _i573 < _list571.size; ++_i573)
           {
-            _elem572 = iprot.readBinary();
+            _elem572 = iprot.readString();
             struct.filesAddedChecksum.add(_elem572);
           }
         }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
----------------------------------------------------------------------
diff --git a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
index ebed504..b6050c6 100644
--- a/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
+++ b/metastore/src/gen/thrift/gen-rb/hive_metastore_types.rb
@@ -2506,7 +2506,7 @@ class InsertEventRequestData
 
   FIELDS = {
     FILESADDED => {:type => ::Thrift::Types::LIST, :name => 'filesAdded', :element
=> {:type => ::Thrift::Types::STRING}},
-    FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum',
:element => {:type => ::Thrift::Types::STRING, :binary => true}, :optional =>
true}
+    FILESADDEDCHECKSUM => {:type => ::Thrift::Types::LIST, :name => 'filesAddedChecksum',
:element => {:type => ::Thrift::Types::STRING}, :optional => true}
   }
 
   def struct_fields; FIELDS; end

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
index d9a42a7..7bc0e04 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/events/InsertEvent.java
@@ -26,7 +26,6 @@ import org.apache.hadoop.hive.metastore.api.MetaException;
 import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.metastore.api.Table;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -40,7 +39,7 @@ public class InsertEvent extends ListenerEvent {
   private final String table;
   private final Map<String, String> keyValues;
   private final List<String> files;
-  private List<ByteBuffer> fileChecksums = new ArrayList<ByteBuffer>();
+  private List<String> fileChecksums = new ArrayList<String>();
 
   /**
    *
@@ -104,7 +103,7 @@ public class InsertEvent extends ListenerEvent {
    *
    * @return
    */
-  public List<ByteBuffer> getFileChecksums() {
+  public List<String> getFileChecksums() {
     return fileChecksums;
   }
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
index fe747df..7e6e34e 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/InsertMessage.java
@@ -45,7 +45,9 @@ public abstract class InsertMessage extends EventMessage {
   public abstract Map<String,String> getPartitionKeyValues();
 
   /**
-   * Get the list of files created as a result of this DML operation.  May be null.
+   * Get the list of files created as a result of this DML operation. May be null. The file
uri may
+   * be an encoded uri, which represents both a uri and the file checksum
+   *
    * @return List of new files, or null.
    */
   public abstract List<String> getFiles();

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
index fdb8e80..df25f43 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/MessageFactory.java
@@ -28,7 +28,6 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.util.ReflectionUtils;
 
-import java.nio.ByteBuffer;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -253,5 +252,5 @@ public abstract class MessageFactory {
    * @return instance of InsertMessage
    */
   public abstract InsertMessage buildInsertMessage(String db, String table,
-      Map<String, String> partVals, List<String> files, List<ByteBuffer>
fileChecksums);
+      Map<String, String> partVals, List<String> files, List<String> fileChecksums);
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
index bd9f9ec..820cc9c 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONInsertMessage.java
@@ -22,8 +22,6 @@ package org.apache.hadoop.hive.metastore.messaging.json;
 import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.codehaus.jackson.annotate.JsonProperty;
 
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 
@@ -44,9 +42,6 @@ public class JSONInsertMessage extends InsertMessage {
   @JsonProperty
   Map<String, String> partKeyVals;
 
-  @JsonProperty
-  List<byte[]> fileChecksums;
-
   /**
    * Default constructor, needed for Jackson.
    */
@@ -66,17 +61,21 @@ public class JSONInsertMessage extends InsertMessage {
   }
 
   public JSONInsertMessage(String server, String servicePrincipal, String db, String table,
-      Map<String, String> partKeyVals, List<String> files, List<ByteBuffer>
checksums,
-      Long timestamp) {
+      Map<String, String> partKeyVals, List<String> files, List<String>
checksums, Long timestamp) {
     this(server, servicePrincipal, db, table, partKeyVals, files, timestamp);
-    fileChecksums = new ArrayList<byte[]>();
-    for (ByteBuffer checksum : checksums) {
-      byte[] checksumBytes = new byte[checksum.remaining()];
-      checksum.get(checksumBytes);
-      fileChecksums.add(checksumBytes);
+    for (int i = 0; i < files.size(); i++) {
+      if ((!checksums.isEmpty()) && (checksums.get(i) != null) && !checksums.get(i).isEmpty())
{
+        files.set(i, encodeFileUri(files.get(i), checksums.get(i)));
+      }
     }
   }
 
+  // TODO: this needs to be enhanced once change management based filesystem is implemented
+  // Currently using fileuri#checksum as the format
+  private String encodeFileUri(String fileUriStr, String fileChecksum) {
+    return fileUriStr + "#" + fileChecksum;
+  }
+
   @Override
   public String getTable() {
     return table;
@@ -112,10 +111,6 @@ public class JSONInsertMessage extends InsertMessage {
     return timestamp;
   }
 
-  public List<byte[]> getFileChecksums() {
-    return fileChecksums;
-  }
-
   @Override
   public String toString() {
     try {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
----------------------------------------------------------------------
diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
index 9954902..2749371 100644
--- a/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
+++ b/metastore/src/java/org/apache/hadoop/hive/metastore/messaging/json/JSONMessageFactory.java
@@ -176,7 +176,7 @@ public class JSONMessageFactory extends MessageFactory {
 
   @Override
   public InsertMessage buildInsertMessage(String db, String table, Map<String, String>
partKeyVals,
-      List<String> files, List<ByteBuffer> fileChecksums) {
+      List<String> files, List<String> fileChecksums) {
     return new JSONInsertMessage(MS_SERVER_URL, MS_SERVICE_PRINCIPAL, db, table, partKeyVals,
         files, fileChecksums, now());
   }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
index 4c0f817..e6b943b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/ReplCopyTask.java
@@ -18,7 +18,7 @@
 
 package org.apache.hadoop.hive.ql.exec;
 
-import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.hive.ql.parse.EximUtil;
 import org.apache.hadoop.hive.ql.parse.ReplicationSpec;
 import org.apache.hadoop.hive.ql.plan.CopyWork;
 import org.apache.hadoop.hive.ql.plan.ReplCopyWork;
@@ -27,7 +27,6 @@ import java.io.BufferedReader;
 import java.io.BufferedWriter;
 import java.io.IOException;
 import java.io.InputStreamReader;
-import java.io.OutputStream;
 import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -113,7 +112,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
 
       BufferedWriter listBW = null;
       if (rwork.getListFilesOnOutputBehaviour()){
-        Path listPath = new Path(toPath,"_files");
+        Path listPath = new Path(toPath,EximUtil.FILES_NAME);
         LOG.debug("ReplCopyTask : generating _files at :" + listPath.toUri().toString());
         if (dstFs.exists(listPath)){
           console.printError("Cannot make target _files file:" + listPath.toString());
@@ -169,7 +168,7 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
 
   private List<FileStatus> filesInFileListing(FileSystem fs, Path path)
       throws IOException {
-    Path fileListing = new Path(path, "_files");
+    Path fileListing = new Path(path, EximUtil.FILES_NAME);
     LOG.debug("ReplCopyTask filesInFileListing() reading " + fileListing.toUri());
     if (! fs.exists(fileListing)){
       LOG.debug("ReplCopyTask : _files does not exist");
@@ -184,8 +183,11 @@ public class ReplCopyTask extends Task<ReplCopyWork> implements
Serializable {
     String line = null;
     while ( (line = br.readLine()) != null){
       LOG.debug("ReplCopyTask :_filesReadLine:" + line);
-      Path p = new Path(line);
-      FileSystem srcFs = p.getFileSystem(conf); // TODO : again, fs cache should make this
okay, but if not, revisit
+      String fileUriStr = EximUtil.getCMDecodedFileName(line);
+      // TODO HIVE-15490: Add checksum validation here
+      Path p = new Path(fileUriStr);
+      // TODO: again, fs cache should make this okay, but if not, revisit
+      FileSystem srcFs = p.getFileSystem(conf);
       ret.add(srcFs.getFileStatus(p));
       // Note - we need srcFs rather than fs, because it is possible that the _files lists
files
       // which are from a different filesystem than the fs where the _files file itself was
loaded

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
index be5a6a9..c5b3517 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
@@ -2349,11 +2349,12 @@ private void constructOneLBLocationMap(FileStatus fSta,
             FileChecksum cksum = fileSystem.getFileChecksum(p);
             // File checksum is not implemented for local filesystem (RawLocalFileSystem)
             if (cksum != null) {
-              ByteArrayOutputStream baos = new ByteArrayOutputStream();
-              cksum.write(new DataOutputStream(baos));
-              insertData.addToFilesAddedChecksum(ByteBuffer.wrap(baos.toByteArray()));
+              String checksumString =
+                  StringUtils.byteToHexString(cksum.getBytes(), 0, cksum.getLength());
+              insertData.addToFilesAddedChecksum(checksumString);
             } else {
-              insertData.addToFilesAddedChecksum(ByteBuffer.allocate(0));
+              // Add an empty checksum string for filesystems that don't generate one
+              insertData.addToFilesAddedChecksum("");
             }
           }
         } else {

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
index 6e9602f..34e53d2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/EximUtil.java
@@ -19,9 +19,9 @@
 package org.apache.hadoop.hive.ql.parse;
 
 import com.google.common.base.Function;
+
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.fs.PathFilter;
-import org.apache.hadoop.hive.common.FileUtils;
 import org.apache.hadoop.hive.metastore.api.Database;
 import org.apache.hadoop.hive.ql.Context;
 import org.apache.hadoop.hive.ql.exec.Task;
@@ -31,6 +31,7 @@ import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileChecksum;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
@@ -50,6 +51,7 @@ import org.json.JSONException;
 import org.json.JSONObject;
 
 import javax.annotation.Nullable;
+
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
@@ -73,7 +75,10 @@ import java.util.TreeMap;
  */
 public class EximUtil {
 
-  public static final String METADATA_NAME="_metadata";
+  public static final String METADATA_NAME = "_metadata";
+  public static final String FILES_NAME = "_files";
+  public static final String DATA_PATH_NAME = "data";
+  public static final String URI_FRAGMENT_SEPARATOR = "#";
 
   private static final Logger LOG = LoggerFactory.getLogger(EximUtil.class);
 
@@ -278,6 +283,7 @@ public class EximUtil {
     if (replicationSpec == null){
       replicationSpec = new ReplicationSpec(); // instantiate default values if not specified
     }
+
     if (tableHandle == null){
       replicationSpec.setNoop(true);
     }
@@ -351,10 +357,6 @@ public class EximUtil {
     jgen.close(); // JsonGenerator owns the OutputStream, so it closes it when we call close.
   }
 
-  private static void write(OutputStream out, String s) throws IOException {
-    out.write(s.getBytes("UTF-8"));
-  }
-
   /**
    * Utility class to help return complex value from readMetaData function
    */
@@ -571,4 +573,20 @@ public class EximUtil {
       }
     };
   }
+
+  public static String getCMEncodedFileName(String fileURIStr, String fileChecksum) {
+    // The checksum is set as the fragment portion of the file uri
+    return fileURIStr + URI_FRAGMENT_SEPARATOR + fileChecksum;
+  }
+
+  public static String getCMDecodedFileName(String encodedFileURIStr) {
+    String[] uriAndFragment = encodedFileURIStr.split(URI_FRAGMENT_SEPARATOR);
+    return uriAndFragment[0];
+  }
+
+  public static FileChecksum getCMDecodedChecksum(String encodedFileURIStr) {
+    // TODO: Implement this as part of HIVE-15490
+    return null;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
index f61274b..08bad63 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ExportSemanticAnalyzer.java
@@ -170,7 +170,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
         partitions = null;
       }
 
-      Path path = new Path(ctx.getLocalTmpPath(), "_metadata");
+      Path path = new Path(ctx.getLocalTmpPath(), EximUtil.METADATA_NAME);
       EximUtil.createExportDump(
           FileSystem.getLocal(conf),
           path,
@@ -202,7 +202,7 @@ public class ExportSemanticAnalyzer extends BaseSemanticAnalyzer {
         }
       } else {
         Path fromPath = ts.tableHandle.getDataLocation();
-        Path toDataPath = new Path(parentPath, "data");
+        Path toDataPath = new Path(parentPath, EximUtil.DATA_PATH_NAME);
         Task<? extends Serializable> rTask =
             ReplCopyTask.getDumpCopyTask(replicationSpec, fromPath, toDataPath, conf);
         rootTasks.add(rTask);

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
index 5561e06..8c5cac2 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ImportSemanticAnalyzer.java
@@ -344,7 +344,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
 
   private static Task<?> loadTable(URI fromURI, Table table, boolean replace, Path
tgtPath,
                             ReplicationSpec replicationSpec, EximUtil.SemanticAnalyzerWrapperContext
x) {
-    Path dataPath = new Path(fromURI.toString(), "data");
+    Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
     Path tmpPath = x.getCtx().getExternalTmpPath(tgtPath);
     Task<?> copyTask = ReplCopyTask.getLoadCopyTask(replicationSpec, dataPath, tmpPath,
x.getConf());
     LoadTableDesc loadTableWork = new LoadTableDesc(tmpPath,
@@ -777,7 +777,7 @@ public class ImportSemanticAnalyzer extends BaseSemanticAnalyzer {
         x.getLOG().debug("adding dependent CopyWork/MoveWork for table");
         if (tblDesc.isExternal() && (tblDesc.getLocation() == null)) {
           x.getLOG().debug("Importing in place, no emptiness check, no copying/loading");
-          Path dataPath = new Path(fromURI.toString(), "data");
+          Path dataPath = new Path(fromURI.toString(), EximUtil.DATA_PATH_NAME);
           tblDesc.setLocation(dataPath.toString());
         } else {
           Path tablePath = null;

http://git-wip-us.apache.org/repos/asf/hive/blob/2f501a8a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
index 9b83407..85f8c64 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/ReplicationSemanticAnalyzer.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.parse;
 import com.google.common.base.Function;
 import com.google.common.collect.Iterables;
 import com.google.common.primitives.Ints;
+
 import org.antlr.runtime.tree.Tree;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -36,6 +37,7 @@ import org.apache.hadoop.hive.metastore.messaging.CreateTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropPartitionMessage;
 import org.apache.hadoop.hive.metastore.messaging.DropTableMessage;
 import org.apache.hadoop.hive.metastore.messaging.EventUtils;
+import org.apache.hadoop.hive.metastore.messaging.InsertMessage;
 import org.apache.hadoop.hive.metastore.messaging.MessageDeserializer;
 import org.apache.hadoop.hive.metastore.messaging.MessageFactory;
 import org.apache.hadoop.hive.ql.ErrorMsg;
@@ -61,13 +63,15 @@ import org.apache.hadoop.hive.ql.plan.RenamePartitionDesc;
 import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
 import org.apache.hadoop.io.IOUtils;
-
 import javax.annotation.Nullable;
+
 import java.io.BufferedReader;
+import java.io.BufferedWriter;
 import java.io.DataOutputStream;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.io.OutputStreamWriter;
 import java.io.Serializable;
 import java.net.URI;
 import java.util.ArrayList;
@@ -108,6 +112,7 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
     EVENT_RENAME_TABLE("EVENT_RENAME_TABLE"),
     EVENT_ALTER_PARTITION("EVENT_ALTER_PARTITION"),
     EVENT_RENAME_PARTITION("EVENT_RENAME_PARTITION"),
+    EVENT_INSERT("EVENT_INSERT"),
     EVENT_UNKNOWN("EVENT_UNKNOWN");
 
     String type = null;
@@ -559,7 +564,39 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
           dmd.write();
           break;
         }
-
+      }
+      case MessageFactory.INSERT_EVENT: {
+        InsertMessage insertMsg = md.getInsertMessage(ev.getMessage());
+        String tblName = insertMsg.getTable();
+        Table qlMdTable = db.getTable(tblName);
+        Map<String, String> partSpec = insertMsg.getPartitionKeyValues();
+        List<Partition> qlPtns  = null;
+        if (qlMdTable.isPartitioned() && !partSpec.isEmpty()) {
+          qlPtns = Arrays.asList(db.getPartition(qlMdTable, partSpec, false));
+        }
+        Path metaDataPath = new Path(evRoot, EximUtil.METADATA_NAME);
+        EximUtil.createExportDump(metaDataPath.getFileSystem(conf), metaDataPath, qlMdTable,
qlPtns,
+            replicationSpec);
+        Path dataPath = new Path(evRoot, EximUtil.DATA_PATH_NAME);
+        Path filesPath = new Path(dataPath, EximUtil.FILES_NAME);
+        FileSystem fs = dataPath.getFileSystem(conf);
+        BufferedWriter fileListWriter =
+            new BufferedWriter(new OutputStreamWriter(fs.create(filesPath)));
+        try {
+          // TODO: HIVE-15205: move this metadata generation to a task
+          // Get the encoded filename of files that are being inserted
+          List<String> files = insertMsg.getFiles();
+          for (String fileUriStr : files) {
+            fileListWriter.write(fileUriStr + "\n");
+          }
+        } finally {
+          fileListWriter.close();
+        }
+        LOG.info("Processing#{} INSERT message : {}", ev.getEventId(), ev.getMessage());
+        DumpMetaData dmd = new DumpMetaData(evRoot, DUMPTYPE.EVENT_INSERT, evid, evid);
+        dmd.setPayload(ev.getMessage());
+        dmd.write();
+        break;
       }
       // TODO : handle other event types
       default:
@@ -957,6 +994,12 @@ public class ReplicationSemanticAnalyzer extends BaseSemanticAnalyzer
{
         LOG.debug("Added rename ptn task : {}:{}->{}", renamePtnTask.getId(), oldPartSpec,
newPartSpec);
         return tasks;
       }
+      case EVENT_INSERT: {
+        md = MessageFactory.getInstance().getDeserializer();
+        InsertMessage insertMessage = md.getInsertMessage(dmd.getPayload());
+        // Piggybacking in Import logic for now
+        return analyzeTableLoad(insertMessage.getDB(), insertMessage.getTable(), locn, precursor);
+      }
       case EVENT_UNKNOWN: {
         break;
       }


Mime
View raw message