gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject [2/2] incubator-gobblin git commit: [GOBBLIN-253] Enhance Hive materializer.
Date Thu, 21 Sep 2017 23:54:03 GMT
[GOBBLIN-253] Enhance Hive materializer.

Closes #2104 from ibuenros/hive-materializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/5fa98326
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/5fa98326
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/5fa98326

Branch: refs/heads/master
Commit: 5fa983268606335493903e7186836c57eefe40d9
Parents: 30990f4
Author: ibuenros <issac.buenrostro@gmail.com>
Authored: Thu Sep 21 16:53:52 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Thu Sep 21 16:53:52 2017 -0700

----------------------------------------------------------------------
 gobblin-data-management/build.gradle            |   1 +
 .../converter/AbstractAvroToOrcConverter.java   |  26 +-
 .../hive/dataset/ConvertibleHiveDataset.java    | 125 +++------
 .../hive/entities/HiveProcessingEntity.java     |  50 ++++
 .../QueryBasedHiveConversionEntity.java         |  20 +-
 .../ReplaceTableStageableTableMetadata.java     |  33 +++
 .../hive/entities/StageableTableMetadata.java   | 154 +++++++++++
 .../TableLikeStageableTableMetadata.java        |  55 ++++
 .../materializer/CopyTableQueryGenerator.java   |  97 +++++++
 .../hive/materializer/HiveMaterializer.java     | 155 ++++++++++++
 ...iveMaterializerFromEntityQueryGenerator.java | 176 +++++++++++++
 .../HiveMaterializerQueryGenerator.java         |  94 +++++++
 .../HiveMaterializerTaskFactory.java            |  46 ++++
 .../MaterializeTableQueryGenerator.java         |  52 ++++
 .../QueryBasedMaterializerQueryGenerator.java   |  96 +++++++
 .../conversion/hive/source/HiveSource.java      |  41 ++-
 .../conversion/hive/source/HiveWorkUnit.java    |  12 +
 .../hive/task/HiveConverterUtils.java           | 104 ++++++--
 .../conversion/hive/task/HiveMaterializer.java  |  54 ----
 .../task/HiveMaterializerQueryGenerator.java    | 244 ------------------
 .../hive/task/HiveMaterializerSource.java       |  60 -----
 .../hive/task/HiveMaterializerTaskFactory.java  |  46 ----
 .../conversion/hive/task/HiveTask.java          |  27 +-
 .../conversion/hive/task/QueryGenerator.java    |   2 +-
 .../hive/writer/HiveQueryExecutionWriter.java   |   2 +-
 .../conversion/hive/HiveSourceTest.java         |   8 +-
 .../hive/LocalHiveMetastoreTestUtils.java       |  14 +-
 .../converter/HiveAvroToOrcConverterTest.java   |   4 +-
 .../hive/materializer/HiveMaterializerTest.java | 253 +++++++++++++++++++
 .../PartitionLevelWatermarkerTest.java          |   3 +-
 .../integration/HiveRetentionTest.java          |   4 +-
 .../DatePartitionedHiveVersionFinderTest.java   |   6 +-
 .../hiveMaterializerTest/source/part1/data.txt  |   4 +
 .../hiveMaterializerTest/source/part2/data.txt  |   4 +
 gobblin-example/build.gradle                    |   1 +
 .../HiveMaterializerSource.java                 | 129 ++++++++++
 .../src/main/resources/hive-materializer.conf   |  19 ++
 .../apache/gobblin/util/HiveJdbcConnector.java  |  22 +-
 38 files changed, 1650 insertions(+), 593 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/build.gradle
----------------------------------------------------------------------
diff --git a/gobblin-data-management/build.gradle b/gobblin-data-management/build.gradle
index c10835e..3bb5d1e 100644
--- a/gobblin-data-management/build.gradle
+++ b/gobblin-data-management/build.gradle
@@ -54,6 +54,7 @@ dependencies {
   testCompile externalDependency.joptSimple
   testCompile externalDependency.hamcrest
   testCompile externalDependency.testng
+  testCompile externalDependency.hiveJdbc
 
   testRuntime project(":gobblin-modules:gobblin-crypto-provider") // for GPG
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index b8495a9..b8591e7 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -17,7 +17,6 @@
 package org.apache.gobblin.data.management.conversion.hive.converter;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedHashMap;
@@ -30,8 +29,8 @@ import lombok.extern.slf4j.Slf4j;
 
 import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity;
 import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -65,7 +64,6 @@ import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiv
 import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
 import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
-import org.apache.gobblin.data.management.copy.hive.HiveUtils;
 import org.apache.gobblin.data.management.copy.hive.WhitelistBlacklist;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
 import org.apache.gobblin.metrics.event.sla.SlaEventKeys;
@@ -198,7 +196,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     Preconditions.checkNotNull(outputAvroSchema, "Avro schema must not be null");
     Preconditions.checkNotNull(conversionEntity, "Conversion entity must not be null");
     Preconditions.checkNotNull(workUnit, "Workunit state must not be null");
-    Preconditions.checkNotNull(conversionEntity.getHiveTable(), "Hive table within conversion entity must not be null");
+    Preconditions.checkNotNull(conversionEntity.getTable(), "Hive table within conversion entity must not be null");
 
     EventWorkunitUtils.setBeginDDLBuildTimeMetadata(workUnit, System.currentTimeMillis());
 
@@ -209,7 +207,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     }
 
     // Avro table name and location
-    String avroTableName = conversionEntity.getHiveTable().getTableName();
+    String avroTableName = conversionEntity.getTable().getTableName();
 
     // ORC table name and location
     String orcTableName = getConversionConfig().getDestinationTableName();
@@ -271,7 +269,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
      * Upon testing, this did not work
      */
     try {
-      FileStatus sourceDataFileStatus = this.fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation());
+      FileStatus sourceDataFileStatus = this.fs.getFileStatus(conversionEntity.getTable().getDataLocation());
       FsPermission sourceDataPermission = sourceDataFileStatus.getPermission();
       if (!this.fs.mkdirs(new Path(getConversionConfig().getDestinationDataPath()), sourceDataPermission)) {
         throw new RuntimeException(String.format("Failed to create path %s with permissions %s", new Path(
@@ -297,10 +295,10 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     }
     // Set hive runtime properties for tracking
     conversionEntity.getQueries().add(String.format("SET %s=%s", GOBBLIN_DATASET_URN_KEY,
-        conversionEntity.getHiveTable().getCompleteName()));
-    if (conversionEntity.getHivePartition().isPresent()) {
+        conversionEntity.getTable().getCompleteName()));
+    if (conversionEntity.getPartition().isPresent()) {
       conversionEntity.getQueries().add(String.format("SET %s=%s", GOBBLIN_PARTITION_NAME_KEY,
-          conversionEntity.getHivePartition().get().getCompleteName()));
+          conversionEntity.getPartition().get().getCompleteName()));
     }
     conversionEntity.getQueries().add(String
         .format("SET %s=%s", GOBBLIN_WORKUNIT_CREATE_TIME_KEY,
@@ -348,7 +346,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
                 outputAvroSchema,
                 avroTableName,
                 orcStagingTableName,
-                Optional.of(conversionEntity.getHiveTable().getDbName()),
+                Optional.of(conversionEntity.getTable().getDbName()),
                 Optional.of(orcTableDatabase),
                 Optional.of(partitionsDMLInfo),
                 Optional.<Boolean>absent(),
@@ -468,7 +466,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
       // Move: orcStagingDataPartitionLocation to: orcFinalDataPartitionLocation
       String orcFinalDataPartitionLocation = orcDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName;
       Optional<Path> destPartitionLocation = getDestinationPartitionLocation(destinationTableMeta, workUnit,
-          conversionEntity.getHivePartition().get().getName());
+          conversionEntity.getPartition().get().getName());
       orcFinalDataPartitionLocation =
           HiveConverterUtils.updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation);
       log.info(
@@ -626,12 +624,12 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
 
 
   @VisibleForTesting
-  public static List<Map<String, String>> getDropPartitionsDDLInfo(QueryBasedHiveConversionEntity conversionEntity) {
-    if (!conversionEntity.getHivePartition().isPresent()) {
+  public static List<Map<String, String>> getDropPartitionsDDLInfo(HiveProcessingEntity conversionEntity) {
+    if (!conversionEntity.getPartition().isPresent()) {
       return Collections.emptyList();
     }
 
-    return getDropPartitionsDDLInfo(conversionEntity.getHivePartition().get());
+    return getDropPartitionsDDLInfo(conversionEntity.getPartition().get());
 
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
index ba42811..933e86a 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/dataset/ConvertibleHiveDataset.java
@@ -16,7 +16,6 @@
  */
 package org.apache.gobblin.data.management.conversion.hive.dataset;
 
-import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
@@ -25,6 +24,7 @@ import lombok.Getter;
 import lombok.ToString;
 
 import lombok.extern.slf4j.Slf4j;
+
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.hive.ql.metadata.Table;
 
@@ -34,6 +34,7 @@ import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
 import com.typesafe.config.Config;
 
+import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata;
 import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
 import org.apache.gobblin.hive.HiveMetastoreClientPool;
@@ -144,116 +145,58 @@ public class ConvertibleHiveDataset extends HiveDataset {
    */
   @Getter
   @ToString
-  public static class ConversionConfig {
-    public static final String DESTINATION_TABLE_KEY = "destination.tableName";
+  public static class ConversionConfig extends StageableTableMetadata {
     public static final String DESTINATION_VIEW_KEY = "destination.viewName";
-    public static final String DESTINATION_DB_KEY = "destination.dbName";
-    public static final String DESTINATION_DATA_PATH_KEY = "destination.dataPath";
-    public static final String DESTINATION_TABLE_PROPERTIES_LIST_KEY = "destination.tableProperties";
-    public static final String CLUSTER_BY_KEY = "clusterByList";
-    public static final String NUM_BUCKETS_KEY = "numBuckets";
-    public static final String EVOLUTION_ENABLED = "evolution.enabled";
     public static final String UPDATE_VIEW_ALWAYS_ENABLED = "updateViewAlways.enabled";
-    public static final String ROW_LIMIT_KEY = "rowLimit";
-    public static final String HIVE_VERSION_KEY = "hiveVersion";
-    private static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY = "hiveRuntimeProperties";
-
-    /***
-     * Comma separated list of string that should be used as a prefix for destination partition directory name
-     * ... (if present in the location path string of source partition)
-     *
-     * This is helpful in roll-up / compaction scenarios, where you don't want queries in flight to fail.
-     *
-     * Scenario without this property:
-     * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for
-     *   processing
-     * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00
-     *
-     * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for
-     *   processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition)
-     * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00
-     *   (previous data is overwritten and any queries in flight fail)
-     *
-     * Same scenario with this property set to "hourly,daily":
-     * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for
-     *   processing
-     * - Source partition is processed and published to destination table as: /foo/bar_orc/hourly_datepartition=2016-01-01-00
-     *   (Note: "hourly_" is prefixed to destination partition directory name because source partition path contains
-     *   "hourly" substring)
-     *
-     * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for
-     *   processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition)
-     * - Source partition is processed and published to destination table as: /foo/bar_orc/daily_datepartition=2016-01-01-00
-     *   (Note: "daily_" is prefixed to destination partition directory name, because source partition path contains
-     *   "daily" substring)
-     * - Any running queries are not impacted since data is not overwritten and hourly_datepartition=2016-01-01-00
-     *   directory continues to exist
-     *
-     * Notes:
-     * - This however leaves the responsibility of cleanup of previous destination partition directory on retention or
-     *   other such independent module, since in the above case hourly_datepartition=2016-01-01-00 dir will not be deleted
-     * - Directories can still be overwritten if they resolve to same destination partition directory name, such as
-     *   re-processing / backfill of daily partition will overwrite daily_datepartition=2016-01-01-00 directory
-     */
-    private static final String SOURCE_DATA_PATH_IDENTIFIER_KEY = "source.dataPathIdentifier";
 
     private final String destinationFormat;
-    private final String destinationTableName;
     // destinationViewName : If specified view with 'destinationViewName' is created if not already exists over destinationTableName
     private final Optional<String> destinationViewName;
-    private final String destinationStagingTableName;
-    private final String destinationDbName;
-    private final String destinationDataPath;
-    private final Properties destinationTableProperties;
-    private final List<String> clusterBy;
-    private final Optional<Integer> numBuckets;
-    private final Properties hiveRuntimeProperties;
-    private final boolean evolutionEnabled;
-    // updateViewAlwaysEnabled: If false 'destinationViewName' is only updated when schema evolves; if true 'destinationViewName'
+     // updateViewAlwaysEnabled: If false 'destinationViewName' is only updated when schema evolves; if true 'destinationViewName'
     // ... is always updated (everytime publish happens)
     private final boolean updateViewAlwaysEnabled;
-    private final Optional<Integer> rowLimit;
-    private final List<String> sourceDataPathIdentifier;
 
     private ConversionConfig(Config config, Table table, String destinationFormat) {
-
-      Preconditions.checkArgument(config.hasPath(DESTINATION_TABLE_KEY), String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_TABLE_KEY));
-      Preconditions.checkArgument(config.hasPath(DESTINATION_DB_KEY), String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_DB_KEY));
-      Preconditions.checkArgument(config.hasPath(DESTINATION_DATA_PATH_KEY),
-          String.format("Key %s.%s is not specified", destinationFormat, DESTINATION_DATA_PATH_KEY));
+      super(config, table);
 
       // Required
       this.destinationFormat = destinationFormat;
-      this.destinationTableName = resolveTemplate(config.getString(DESTINATION_TABLE_KEY), table);
-      this.destinationStagingTableName = String.format("%s_%s", this.destinationTableName, "staging"); // Fixed and non-configurable
-      this.destinationDbName = resolveTemplate(config.getString(DESTINATION_DB_KEY), table);
-      this.destinationDataPath = resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), table);
 
       // Optional
       this.destinationViewName = Optional.fromNullable(resolveTemplate(ConfigUtils.getString(config, DESTINATION_VIEW_KEY, null), table));
-      this.destinationTableProperties =
-          convertKeyValueListToProperties(ConfigUtils.getStringList(config, DESTINATION_TABLE_PROPERTIES_LIST_KEY));
-      this.clusterBy = ConfigUtils.getStringList(config, CLUSTER_BY_KEY);
-      this.numBuckets = Optional.fromNullable(ConfigUtils.getInt(config, NUM_BUCKETS_KEY, null));
-
-      this.hiveRuntimeProperties =
-          convertKeyValueListToProperties(ConfigUtils.getStringList(config, HIVE_RUNTIME_PROPERTIES_LIST_KEY));
-      this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED, false);
       this.updateViewAlwaysEnabled = ConfigUtils.getBoolean(config, UPDATE_VIEW_ALWAYS_ENABLED, true);
-      this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config, ROW_LIMIT_KEY, null));
-      this.sourceDataPathIdentifier = ConfigUtils.getStringList(config, SOURCE_DATA_PATH_IDENTIFIER_KEY);
     }
 
-    private Properties convertKeyValueListToProperties(List<String> keyValueList) {
-      Preconditions.checkArgument(keyValueList.size() % 2 == 0, String.format(
-          "The list %s does not have equal number of keys and values. Size %s", keyValueList, keyValueList.size()));
-      Properties props = new Properties();
-      for (int i = 0; i < keyValueList.size(); i += 2) {
-        String key = keyValueList.get(i);
-        String value = keyValueList.get(i + 1);
-        props.put(key, value);
+    @Override
+    public boolean equals(Object o) {
+      if (this == o) {
+        return true;
+      }
+      if (o == null || getClass() != o.getClass()) {
+        return false;
+      }
+      if (!super.equals(o)) {
+        return false;
+      }
+
+      ConversionConfig that = (ConversionConfig) o;
+
+      if (isUpdateViewAlwaysEnabled() != that.isUpdateViewAlwaysEnabled()) {
+        return false;
       }
-      return props;
+      if (!getDestinationFormat().equals(that.getDestinationFormat())) {
+        return false;
+      }
+      return getDestinationViewName().equals(that.getDestinationViewName());
+    }
+
+    @Override
+    public int hashCode() {
+      int result = super.hashCode();
+      result = 31 * result + getDestinationFormat().hashCode();
+      result = 31 * result + getDestinationViewName().hashCode();
+      result = 31 * result + (isUpdateViewAlwaysEnabled() ? 1 : 0);
+      return result;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java
new file mode 100644
index 0000000..5a9a819
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/HiveProcessingEntity.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.entities;
+
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+import com.google.common.base.Optional;
+
+import lombok.Getter;
+
+
+/**
+ * Represents a Hive table and optionally partition.
+ */
+@Getter
+public class HiveProcessingEntity {
+
+  private final HiveDataset hiveDataset;
+  private final Table table;
+  private final Optional<Partition> partition;
+
+  public HiveProcessingEntity(HiveDataset hiveDataset, Table table) {
+    this(hiveDataset, table, Optional.absent());
+  }
+
+  public HiveProcessingEntity(HiveDataset convertibleHiveDataset, Table table,
+      Optional<Partition> partition) {
+    this.hiveDataset = convertibleHiveDataset;
+    this.table = table;
+    this.partition = partition;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java
index 1486fe5..a84e55b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/QueryBasedHiveConversionEntity.java
@@ -18,13 +18,6 @@ package org.apache.gobblin.data.management.conversion.hive.entities;
 
 import java.util.List;
 
-import lombok.EqualsAndHashCode;
-import lombok.Getter;
-import lombok.ToString;
-
-import com.google.common.base.Optional;
-import com.google.common.collect.Lists;
-
 import org.apache.gobblin.converter.Converter;
 import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
 import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
@@ -35,6 +28,15 @@ import org.apache.gobblin.hive.HiveRegistrationUnit;
 import org.apache.gobblin.hive.HiveTable;
 import org.apache.gobblin.source.extractor.Extractor;
 
+import org.apache.hadoop.hive.ql.metadata.Partition;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import lombok.EqualsAndHashCode;
+import lombok.Getter;
+import lombok.ToString;
+
 
 /**
  * Represents a gobblin Record in the Hive avro to orc conversion flow.
@@ -54,7 +56,7 @@ import org.apache.gobblin.source.extractor.Extractor;
 @ToString
 @EqualsAndHashCode
 @Getter
-public class QueryBasedHiveConversionEntity {
+public class QueryBasedHiveConversionEntity extends HiveProcessingEntity {
 
   private final ConvertibleHiveDataset convertibleHiveDataset;
   private final SchemaAwareHiveTable hiveTable;
@@ -71,9 +73,11 @@ public class QueryBasedHiveConversionEntity {
 
   public QueryBasedHiveConversionEntity(ConvertibleHiveDataset convertibleHiveDataset, SchemaAwareHiveTable hiveTable,
       Optional<SchemaAwareHivePartition> hivePartition) {
+    super(convertibleHiveDataset, hiveTable, Optional.fromNullable(hivePartition.orNull()));
     this.convertibleHiveDataset = convertibleHiveDataset;
     this.hiveTable = hiveTable;
     this.hivePartition = hivePartition;
     this.queries = Lists.newArrayList();
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java
new file mode 100644
index 0000000..76c0c4e
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/ReplaceTableStageableTableMetadata.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.entities;
+
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+
+/**
+ * A {@link StageableTableMetadata} intended where the target table is the same as the reference table. Intended to
+ * replace the original table.
+ */
+public class ReplaceTableStageableTableMetadata extends TableLikeStageableTableMetadata {
+
+  public ReplaceTableStageableTableMetadata(Table referenceTable) {
+    super(referenceTable, referenceTable.getDbName(), referenceTable.getTableName(), referenceTable.getDataLocation().toString());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
new file mode 100644
index 0000000..39494c2
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/StageableTableMetadata.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.entities;
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.util.ConfigUtils;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+
+import javax.annotation.Nullable;
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+
+/**
+ * Contains metadata associated with a stageable table.
+ *
+ * This class contains information about two Hive tables: a final destination table and a staging table. The staging
+ * table is used as temporary storage during job run to aid with consistency of the final destination table.
+ */
+@Data
+@AllArgsConstructor
+public class StageableTableMetadata {
+
+  public static final String DESTINATION_TABLE_KEY = "destination.tableName";
+  public static final String DESTINATION_DB_KEY = "destination.dbName";
+  public static final String DESTINATION_DATA_PATH_KEY = "destination.dataPath";
+  public static final String DESTINATION_TABLE_PROPERTIES_LIST_KEY = "destination.tableProperties";
+  public static final String CLUSTER_BY_KEY = "clusterByList";
+  public static final String NUM_BUCKETS_KEY = "numBuckets";
+  public static final String EVOLUTION_ENABLED = "evolution.enabled";
+  public static final String ROW_LIMIT_KEY = "rowLimit";
+  public static final String HIVE_VERSION_KEY = "hiveVersion";
+  public static final String HIVE_RUNTIME_PROPERTIES_LIST_KEY = "hiveRuntimeProperties";
+  /***
+   * Comma separated list of string that should be used as a prefix for destination partition directory name
+   * ... (if present in the location path string of source partition)
+   *
+   * This is helpful in roll-up / compaction scenarios, where you don't want queries in flight to fail.
+   *
+   * Scenario without this property:
+   * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for
+   *   processing
+   * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00
+   *
+   * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for
+   *   processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition)
+   * - Source partition is processed and published to destination table as: /foo/bar_orc/datepartition=2016-01-01-00
+   *   (previous data is overwritten and any queries in flight fail)
+   *
+   * Same scenario with this property set to "hourly,daily":
+   * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/hourly/2016/01/01/00 is available for
+   *   processing
+   * - Source partition is processed and published to destination table as: /foo/bar_orc/hourly_datepartition=2016-01-01-00
+   *   (Note: "hourly_" is prefixed to destination partition directory name because source partition path contains
+   *   "hourly" substring)
+   *
+   * - Source partition: datepartition=2016-01-01-00 with path /foo/bar/daily/2016/01/01/00 is available again for
+   *   processing (due to roll-up / compaction of hourly data for 2016-01-01 into same partition)
+   * - Source partition is processed and published to destination table as: /foo/bar_orc/daily_datepartition=2016-01-01-00
+   *   (Note: "daily_" is prefixed to destination partition directory name, because source partition path contains
+   *   "daily" substring)
+   * - Any running queries are not impacted since data is not overwritten and hourly_datepartition=2016-01-01-00
+   *   directory continues to exist
+   *
+   * Notes:
+   * - This however leaves the responsibility of cleanup of previous destination partition directory on retention or
+   *   other such independent module, since in the above case hourly_datepartition=2016-01-01-00 dir will not be deleted
+   * - Directories can still be overwritten if they resolve to same destination partition directory name, such as
+   *   re-processing / backfill of daily partition will overwrite daily_datepartition=2016-01-01-00 directory
+   */
+  public static final String SOURCE_DATA_PATH_IDENTIFIER_KEY = "source.dataPathIdentifier";
+
+
+  /** Table name of the destination table. */
+  private final String destinationTableName;
+  /** Table name of the staging table. */
+  private final String destinationStagingTableName;
+  /** Name of db for destination name. */
+  private final String destinationDbName;
+  /** Path where files of the destination table should be located. */
+  private final String destinationDataPath;
+  /** Table properties of destination table. */
+  private final Properties destinationTableProperties;
+  /** List of columns to cluster by. */
+  private final List<String> clusterBy;
+  /** Number of buckets in destination table. */
+  private final Optional<Integer> numBuckets;
+  private final Properties hiveRuntimeProperties;
+  private final boolean evolutionEnabled;
+  private final Optional<Integer> rowLimit;
+  private final List<String> sourceDataPathIdentifier;
+
+  public StageableTableMetadata(Config config, @Nullable Table referenceTable) {
+    Preconditions.checkArgument(config.hasPath(DESTINATION_TABLE_KEY), String.format("Key %s is not specified", DESTINATION_TABLE_KEY));
+    Preconditions.checkArgument(config.hasPath(DESTINATION_DB_KEY), String.format("Key %s is not specified", DESTINATION_DB_KEY));
+    Preconditions.checkArgument(config.hasPath(DESTINATION_DATA_PATH_KEY),
+        String.format("Key %s is not specified", DESTINATION_DATA_PATH_KEY));
+
+    // Required
+    this.destinationTableName = referenceTable == null ? config.getString(DESTINATION_TABLE_KEY)
+        : HiveDataset.resolveTemplate(config.getString(DESTINATION_TABLE_KEY), referenceTable);
+    this.destinationStagingTableName = String.format("%s_%s", this.destinationTableName, "staging"); // Fixed and non-configurable
+    this.destinationDbName = referenceTable == null ? config.getString(DESTINATION_DB_KEY)
+        : HiveDataset.resolveTemplate(config.getString(DESTINATION_DB_KEY), referenceTable);
+    this.destinationDataPath = referenceTable == null ? config.getString(DESTINATION_DATA_PATH_KEY)
+        : HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), referenceTable);
+
+    // Optional
+    this.destinationTableProperties =
+        convertKeyValueListToProperties(ConfigUtils.getStringList(config, DESTINATION_TABLE_PROPERTIES_LIST_KEY));
+    this.clusterBy = ConfigUtils.getStringList(config, CLUSTER_BY_KEY);
+    this.numBuckets = Optional.fromNullable(ConfigUtils.getInt(config, NUM_BUCKETS_KEY, null));
+
+    this.hiveRuntimeProperties =
+        convertKeyValueListToProperties(ConfigUtils.getStringList(config, HIVE_RUNTIME_PROPERTIES_LIST_KEY));
+    this.evolutionEnabled = ConfigUtils.getBoolean(config, EVOLUTION_ENABLED, false);
+    this.rowLimit = Optional.fromNullable(ConfigUtils.getInt(config, ROW_LIMIT_KEY, null));
+    this.sourceDataPathIdentifier = ConfigUtils.getStringList(config, SOURCE_DATA_PATH_IDENTIFIER_KEY);
+  }
+
+  private Properties convertKeyValueListToProperties(List<String> keyValueList) {
+    Preconditions.checkArgument(keyValueList.size() % 2 == 0, String.format(
+        "The list %s does not have equal number of keys and values. Size %s", keyValueList, keyValueList.size()));
+    Properties props = new Properties();
+    for (int i = 0; i < keyValueList.size(); i += 2) {
+      String key = keyValueList.get(i);
+      String value = keyValueList.get(i + 1);
+      props.put(key, value);
+    }
+    return props;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
new file mode 100644
index 0000000..b4fe9d4
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/entities/TableLikeStageableTableMetadata.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.entities;
+
+import java.util.ArrayList;
+import java.util.Properties;
+
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.hadoop.hive.ql.metadata.Table;
+
+import com.google.common.base.Optional;
+import com.typesafe.config.Config;
+
+
+/**
+ * A {@link StageableTableMetadata} that copies most metadata from a reference table.
+ */
+public class TableLikeStageableTableMetadata extends StageableTableMetadata {
+
+  public TableLikeStageableTableMetadata(Table referenceTable, String destinationDB, String destinationTableName, String targetDataPath) {
+    super(destinationTableName, destinationTableName + "_STAGING", destinationDB, targetDataPath,
+        getTableProperties(referenceTable), new ArrayList<>(), Optional.of(referenceTable.getNumBuckets()), new Properties(), false, Optional.absent(),
+        new ArrayList<>());
+  }
+
+  public TableLikeStageableTableMetadata(Table referenceTable, Config config) {
+    super(HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_TABLE_KEY), referenceTable),
+        HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_TABLE_KEY), referenceTable) + "_STAGING",
+        HiveDataset.resolveTemplate(config.getString(StageableTableMetadata.DESTINATION_DB_KEY), referenceTable),
+        HiveDataset.resolveTemplate(config.getString(DESTINATION_DATA_PATH_KEY), referenceTable),
+        getTableProperties(referenceTable), new ArrayList<>(), Optional.of(referenceTable.getNumBuckets()),
+        new Properties(), false, Optional.absent(), new ArrayList<>());
+  }
+
+  private static Properties getTableProperties(Table table) {
+    Properties properties = new Properties();
+    properties.putAll(table.getParameters());
+    return properties;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
new file mode 100644
index 0000000..8ff0913
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/CopyTableQueryGenerator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} that generates queries to exactly
+ * copy an input table / partition.
+ */
+@Slf4j
+public class CopyTableQueryGenerator extends HiveMaterializerFromEntityQueryGenerator {
+
+  public CopyTableQueryGenerator(WorkUnitState workUnitState) throws IOException {
+    super(workUnitState, true);
+  }
+
+  /**
+   * Returns hive queries to be run as a part of a hive task.
+   * This does not include publish queries.
+   * @return
+   */
+  @Override
+  public List<String> generateQueries() {
+
+    List<String> hiveQueries = Lists.newArrayList();
+    /*
+     * Setting partition mode to 'nonstrict' is needed to improve readability of the code.
+     * If we do not set dynamic partition mode to nonstrict, we will have to write partition values also,
+     * and because hive considers partition as a virtual column, we also have to write each of the column
+     * name in the query (in place of *) to match source and target columns.
+     */
+    hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict");
+
+    Preconditions.checkNotNull(this.workUnit, "Workunit must not be null");
+    EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, System.currentTimeMillis());
+
+    HiveConverterUtils.createStagingDirectory(fs, outputTableMetadata.getDestinationDataPath(),
+        conversionEntity, this.workUnitState);
+
+    // Create DDL statement for table
+    String createStagingTableDDL =
+        HiveConverterUtils.generateCreateDuplicateTableDDL(
+            inputDbName,
+            inputTableName,
+            stagingTableName,
+            stagingDataLocation,
+            Optional.of(outputDatabaseName));
+    hiveQueries.add(createStagingTableDDL);
+    log.debug("Create staging table DDL:\n" + createStagingTableDDL);
+
+
+    String insertInStagingTableDML =
+        HiveConverterUtils
+            .generateTableCopy(
+                inputTableName,
+                stagingTableName,
+                conversionEntity.getTable().getDbName(),
+                outputDatabaseName,
+                Optional.of(partitionsDMLInfo));
+    hiveQueries.add(insertInStagingTableDML);
+    log.debug("Conversion staging DML: " + insertInStagingTableDML);
+
+    log.info("Conversion Queries {}\n",  hiveQueries);
+
+    EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, System.currentTimeMillis());
+    return hiveQueries;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java
new file mode 100644
index 0000000..d4b3ca4
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializer.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveTask;
+import org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import com.google.common.base.Strings;
+
+import javax.annotation.Nullable;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+
+/**
+ * A simple {@link HiveTask} for Hive view materialization.
+ */
+public class HiveMaterializer extends HiveTask {
+
+  protected static final String STAGEABLE_TABLE_METADATA_KEY = "internal.hiveMaterializer.stageableTableMetadata";
+  protected static final String MATERIALIZER_MODE_KEY = "internal.hiveMaterializer.materializerMode";
+  protected static final String STORAGE_FORMAT_KEY = "internal.hiveMaterializer.storageFormat";
+  protected static final String QUERY_RESULT_TO_MATERIALIZE_KEY = "internal.hiveMaterializer.queryResultToMaterialize";
+
+  /**
+   * Create a work unit to copy a source table to a target table using a staging table in between.
+   * @param dataset {@link HiveDataset} for the source table.
+   * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata.
+   */
+  public static HiveWorkUnit tableCopyWorkUnit(HiveDataset dataset, StageableTableMetadata destinationTable,
+      @Nullable String partitionName) {
+    HiveWorkUnit workUnit = new HiveWorkUnit(dataset);
+    workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.TABLE_COPY.name());
+    workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable));
+    if (!Strings.isNullOrEmpty(partitionName)) {
+      workUnit.setPartitionName(partitionName);
+    }
+    TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class);
+    return workUnit;
+  }
+
+  /**
+   * Create a work unit to materialize a table / view to a target table using a staging table in between.
+   * @param dataset {@link HiveDataset} for the source table.
+   * @param storageFormat format in which target table should be written.
+   * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata.
+   */
+  public static HiveWorkUnit viewMaterializationWorkUnit(HiveDataset dataset, HiveConverterUtils.StorageFormat storageFormat,
+      StageableTableMetadata destinationTable, @Nullable String partitionName) {
+    HiveWorkUnit workUnit = new HiveWorkUnit(dataset);
+    workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.TABLE_MATERIALIZATION.name());
+    workUnit.setProp(STORAGE_FORMAT_KEY, storageFormat.name());
+    workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable));
+    if (!Strings.isNullOrEmpty(partitionName)) {
+      workUnit.setPartitionName(partitionName);
+    }
+    TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class);
+    return workUnit;
+  }
+
+  /**
+   * Create a work unit to materialize a query to a target table using a staging table in between.
+   * @param query the query to materialize.
+   * @param storageFormat format in which target table should be written.
+   * @param destinationTable {@link StageableTableMetadata} specifying staging and target tables metadata.
+   */
+  public static WorkUnit queryResultMaterializationWorkUnit(String query, HiveConverterUtils.StorageFormat storageFormat,
+      StageableTableMetadata destinationTable) {
+    WorkUnit workUnit = new WorkUnit();
+    workUnit.setProp(MATERIALIZER_MODE_KEY, MaterializerMode.QUERY_RESULT_MATERIALIZATION.name());
+    workUnit.setProp(STORAGE_FORMAT_KEY, storageFormat.name());
+    workUnit.setProp(QUERY_RESULT_TO_MATERIALIZE_KEY, query);
+    workUnit.setProp(STAGEABLE_TABLE_METADATA_KEY, HiveSource.GENERICS_AWARE_GSON.toJson(destinationTable));
+    TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class);
+    HiveTask.disableHiveWatermarker(workUnit);
+    return workUnit;
+  }
+
+  public static StageableTableMetadata parseStageableTableMetadata(WorkUnit workUnit) {
+    return HiveSource.GENERICS_AWARE_GSON.fromJson(workUnit.getProp(STAGEABLE_TABLE_METADATA_KEY), StageableTableMetadata.class);
+  }
+
+  private enum MaterializerMode {
+
+    /** Materialize a table or view into a new table possibly with a new storage format. */
+    TABLE_MATERIALIZATION {
+      @Override
+      public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException {
+        return new MaterializeTableQueryGenerator(state);
+      }
+    },
+    /** Copy a table into a new table with the same properties. */
+    TABLE_COPY {
+      @Override
+      public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException {
+        return new CopyTableQueryGenerator(state);
+      }
+    },
+    /** Materialize a query into a table. */
+    QUERY_RESULT_MATERIALIZATION {
+      @Override
+      public QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException {
+        return new QueryBasedMaterializerQueryGenerator(state);
+      }
+    };
+
+    public abstract QueryGenerator createQueryGenerator(WorkUnitState state) throws IOException;
+  }
+
+  private final QueryGenerator queryGenerator;
+
+  public HiveMaterializer(TaskContext taskContext) throws IOException {
+    super(taskContext);
+    MaterializerMode materializerMode = MaterializerMode.valueOf(this.workUnitState.getProp(MATERIALIZER_MODE_KEY));
+    this.queryGenerator = materializerMode.createQueryGenerator(this.workUnitState);
+  }
+
+  @Override
+  public List<String> generateHiveQueries() {
+    return queryGenerator.generateQueries();
+  }
+
+  @Override
+  public QueryBasedHivePublishEntity generatePublishQueries() throws Exception {
+    return queryGenerator.generatePublishQueries();
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
new file mode 100644
index 0000000..872a3f4
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerFromEntityQueryGenerator.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
+import org.apache.gobblin.data.management.conversion.hive.entities.HiveProcessingEntity;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.copy.hive.HiveDataset;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.util.AutoReturnableObject;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.thrift.TException;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Maps;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * An abstract {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} containing common methods
+ * for materializing existing tables / partitions / views.
+ */
+@Slf4j
+public abstract class HiveMaterializerFromEntityQueryGenerator extends HiveMaterializerQueryGenerator {
+
+  protected final String inputDbName;
+  protected final String inputTableName;
+
+  protected final List<String> sourceDataPathIdentifier;
+  protected final String stagingDataPartitionDirName;
+  protected final String stagingDataPartitionLocation;
+  protected final Map<String, String> partitionsDDLInfo;
+  protected final Map<String, String> partitionsDMLInfo;
+  protected final HiveProcessingEntity conversionEntity;
+  protected final Table sourceTable;
+  protected final boolean supportTargetPartitioning;
+
+  public HiveMaterializerFromEntityQueryGenerator(WorkUnitState workUnitState, boolean supportTargetPartitioning)
+      throws IOException {
+    super(workUnitState);
+
+
+    try {
+      this.conversionEntity = getConversionEntity(this.workUnit);
+    } catch (TException | HiveException ex) {
+      throw new IOException(ex);
+    }
+    this.sourceTable = this.conversionEntity.getTable();
+    this.inputDbName = this.sourceTable.getDbName();
+    this.inputTableName = this.sourceTable.getTableName();
+
+    this.sourceDataPathIdentifier = this.outputTableMetadata.getSourceDataPathIdentifier();
+    this.stagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier);
+    this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + stagingDataPartitionDirName;
+    this.partitionsDDLInfo = Maps.newHashMap();
+    this.partitionsDMLInfo = Maps.newHashMap();
+    HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo);
+    this.supportTargetPartitioning = supportTargetPartitioning;
+  }
+
+  private HiveProcessingEntity getConversionEntity(HiveWorkUnit hiveWorkUnit) throws IOException, TException,
+                                                                                     HiveException {
+
+    try (AutoReturnableObject<IMetaStoreClient> client = this.pool.getClient()) {
+
+      HiveDataset dataset = hiveWorkUnit.getHiveDataset();
+      HiveDatasetFinder.DbAndTable dbAndTable = dataset.getDbAndTable();
+
+      Table table = new Table(client.get().getTable(dbAndTable.getDb(), dbAndTable.getTable()));
+
+      Partition partition = null;
+      if (hiveWorkUnit.getPartitionName().isPresent()) {
+        partition = new Partition(table, client.get()
+            .getPartition(dbAndTable.getDb(), dbAndTable.getTable(), hiveWorkUnit.getPartitionName().get()));
+      }
+      return new HiveProcessingEntity(dataset, table, Optional.fromNullable(partition));
+    }
+  }
+
+  /**
+   * Returns a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands.
+   * @return QueryBasedHivePublishEntity
+   * @throws DataConversionException
+   */
+  public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException {
+
+    QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity();
+    List<String> publishQueries = publishEntity.getPublishQueries();
+    Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
+    List<String> cleanupQueries = publishEntity.getCleanupQueries();
+    List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
+
+    String createFinalTableDDL =
+        HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName,
+            outputDataLocation, Optional.of(outputDatabaseName));
+    publishQueries.add(createFinalTableDDL);
+    log.debug("Create final table DDL:\n" + createFinalTableDDL);
+
+    if (!this.supportTargetPartitioning || partitionsDDLInfo.size() == 0) {
+      log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation);
+      publishDirectories.put(stagingDataLocation, outputDataLocation);
+
+      String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName);
+
+      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
+      cleanupQueries.add(dropStagingTableDDL);
+
+      log.debug("Staging table directory to delete: " + stagingDataLocation);
+      cleanupDirectories.add(stagingDataLocation);
+    } else {
+      String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR + stagingDataPartitionDirName;
+      Optional<Path> destPartitionLocation =
+          HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, this.workUnitState,
+              conversionEntity.getPartition().get().getName());
+      finalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, this.workUnitState,
+          destPartitionLocation);
+
+      log.debug("Partition directory to move: " + stagingDataPartitionLocation + " to: " + finalDataPartitionLocation);
+      publishDirectories.put(stagingDataPartitionLocation, finalDataPartitionLocation);
+      List<String> dropPartitionsDDL =
+          HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, partitionsDMLInfo);
+      log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL);
+      publishQueries.addAll(dropPartitionsDDL);
+      List<String> createFinalPartitionDDL =
+          HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, outputTableName,
+              finalDataPartitionLocation, partitionsDMLInfo, Optional.<String>absent());
+
+      log.debug("Create final partition DDL: " + createFinalPartitionDDL);
+      publishQueries.addAll(createFinalPartitionDDL);
+
+      String dropStagingTableDDL =
+          HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName);
+
+      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
+      cleanupQueries.add(dropStagingTableDDL);
+
+      log.debug("Staging table directory to delete: " + stagingDataLocation);
+      cleanupDirectories.add(stagingDataLocation);
+
+      publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName,
+          AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity)));
+    }
+
+    log.info("Publish partition entity: " + publishEntity);
+    return publishEntity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
new file mode 100644
index 0000000..803e043
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerQueryGenerator.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.data.management.conversion.hive.entities.StageableTableMetadata;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator;
+import org.apache.hadoop.fs.FileSystem;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * A base abstract query generator for {@link HiveMaterializer}.
+ */
+public abstract class HiveMaterializerQueryGenerator implements QueryGenerator {
+  protected final FileSystem fs;
+  protected final StageableTableMetadata outputTableMetadata;
+
+  protected final String outputDatabaseName;
+  protected final String outputTableName;
+  protected final String outputDataLocation;
+
+  protected final String stagingTableName;
+  protected final String stagingDataLocation;
+
+  protected final Optional<org.apache.hadoop.hive.metastore.api.Table> destinationTableMeta;
+  protected final HiveWorkUnit workUnit;
+  protected final HiveMetastoreClientPool pool;
+  protected final WorkUnitState workUnitState;
+
+  public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws IOException {
+    this.fs = HiveSource.getSourceFs(workUnitState);
+    this.pool = HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(),
+        Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
+
+    this.workUnitState = workUnitState;
+    this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit());
+
+    this.outputTableMetadata = HiveMaterializer.parseStageableTableMetadata(this.workUnit);
+    this.outputDatabaseName = outputTableMetadata.getDestinationDbName();
+    this.outputTableName = outputTableMetadata.getDestinationTableName();
+    this.outputDataLocation = HiveConverterUtils.getOutputDataLocation(outputTableMetadata.getDestinationDataPath());
+
+    this.destinationTableMeta = HiveConverterUtils.getDestinationTableMeta(this.outputTableMetadata.getDestinationDbName(),
+        this.outputTableMetadata.getDestinationTableName(), workUnitState.getProperties()).getLeft();
+
+    this.stagingTableName = HiveConverterUtils.getStagingTableName(this.outputTableMetadata.getDestinationStagingTableName());
+    this.stagingDataLocation = HiveConverterUtils.getStagingDataLocation(this.outputTableMetadata.getDestinationDataPath(), this.stagingTableName);
+  }
+
+  /**
+   * Returns hive queries to be run as a part of a hive task.
+   * This does not include publish queries.
+   * @return
+   */
+  @Override
+  public abstract List<String> generateQueries();
+
+  /**
+   * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands.
+   * @return QueryBasedHivePublishEntity
+   * @throws DataConversionException
+   */
+  public abstract QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java
new file mode 100644
index 0000000..9de376b
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/HiveMaterializerTaskFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+
+/**
+ * A {@link TaskFactory} that runs a {@link HiveMaterializer} task.
+ * This factory is intended to publish data in the task directly, and
+ * uses a {@link NoopPublisher}.
+ */
+public class HiveMaterializerTaskFactory implements TaskFactory {
+  @Override
+  public TaskIFace createTask(TaskContext taskContext) {
+    try {
+      return new HiveMaterializer(taskContext);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public DataPublisher createDataPublisher(JobState.DatasetState datasetState) {
+    return new NoopPublisher(datasetState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
new file mode 100644
index 0000000..fa91d15
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/MaterializeTableQueryGenerator.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+
+import com.google.common.collect.Lists;
+
+
+/**
+ * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} to materialize a copy of an existing
+ * Hive table / partition.
+ */
+public class MaterializeTableQueryGenerator extends HiveMaterializerFromEntityQueryGenerator {
+
+  private final HiveConverterUtils.StorageFormat storageFormat;
+
+  public MaterializeTableQueryGenerator(WorkUnitState workUnitState) throws IOException {
+    super(workUnitState, false);
+
+    this.storageFormat = HiveConverterUtils.StorageFormat.valueOf(workUnitState.getProp(HiveMaterializer.STORAGE_FORMAT_KEY));
+  }
+
+  @Override
+  public List<String> generateQueries() {
+    return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatementFromSelectStar(
+        new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName),
+        new HiveDatasetFinder.DbAndTable(this.inputDbName, this.inputTableName),
+        this.partitionsDMLInfo, this.storageFormat,
+        this.stagingDataLocation));
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
new file mode 100644
index 0000000..37a50b3
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/materializer/QueryBasedMaterializerQueryGenerator.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.materializer;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+
+import com.google.common.base.Optional;
+import com.google.common.collect.Lists;
+
+import lombok.extern.slf4j.Slf4j;
+
+
+/**
+ * A {@link org.apache.gobblin.data.management.conversion.hive.task.QueryGenerator} to materialize the result of a Hive
+ * query.
+ */
+@Slf4j
+public class QueryBasedMaterializerQueryGenerator extends HiveMaterializerQueryGenerator {
+
+  private final String sourceQuery;
+  private final HiveConverterUtils.StorageFormat storageFormat;
+
+  public QueryBasedMaterializerQueryGenerator(WorkUnitState workUnitState) throws IOException {
+    super(workUnitState);
+
+    this.sourceQuery = workUnitState.getProp(HiveMaterializer.QUERY_RESULT_TO_MATERIALIZE_KEY);
+    this.storageFormat = HiveConverterUtils.StorageFormat.valueOf(workUnitState.getProp(HiveMaterializer.STORAGE_FORMAT_KEY));
+  }
+
+  @Override
+  public List<String> generateQueries() {
+    return Lists.newArrayList(HiveConverterUtils.generateStagingCTASStatement(
+        new HiveDatasetFinder.DbAndTable(this.outputDatabaseName, this.stagingTableName),
+        this.sourceQuery,
+        this.storageFormat,
+        this.stagingDataLocation));
+  }
+
+  @Override
+  public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException {
+    QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity();
+    List<String> publishQueries = publishEntity.getPublishQueries();
+    Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
+    List<String> cleanupQueries = publishEntity.getCleanupQueries();
+    List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
+
+    String createFinalTableDDL =
+        HiveConverterUtils.generateCreateDuplicateTableDDL(outputDatabaseName, stagingTableName, outputTableName,
+            outputDataLocation, Optional.of(outputDatabaseName));
+    publishQueries.add(createFinalTableDDL);
+    log.debug("Create final table DDL:\n" + createFinalTableDDL);
+
+    log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation);
+    publishDirectories.put(stagingDataLocation, outputDataLocation);
+
+    String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName);
+
+    log.debug("Drop staging table DDL: " + dropStagingTableDDL);
+    cleanupQueries.add(dropStagingTableDDL);
+
+    log.debug("Staging table directory to delete: " + stagingDataLocation);
+    cleanupDirectories.add(stagingDataLocation);
+
+
+    publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName,
+        new HashMap<>()));
+
+    log.info("Publish partition entity: " + publishEntity);
+    return publishEntity;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 74a8b3b..b71d6b6 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.apache.log4j.Level;
 import org.apache.log4j.Logger;
 import org.joda.time.DateTime;
@@ -243,15 +244,15 @@ public class HiveSource implements Source {
             "Creating workunit for table %s as updateTime %s or createTime %s is greater than low watermark %s",
             hiveDataset.getTable().getCompleteName(), updateTime, hiveDataset.getTable().getTTable().getCreateTime(),
             lowWatermark.getValue()));
+        HiveWorkUnit hiveWorkUnit = workUnitForTable(hiveDataset);
+
         LongWatermark expectedDatasetHighWatermark =
             this.watermarker.getExpectedHighWatermark(hiveDataset.getTable(), tableProcessTime);
-        HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
-        hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
-        hiveWorkUnit.setTableLocation(hiveDataset.getTable().getSd().getLocation());
         hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedDatasetHighWatermark));
 
         EventWorkunitUtils.setTableSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), updateTime, lowWatermark.getValue(),
             this.beginGetWorkunitsTime);
+
         this.workunits.add(hiveWorkUnit);
         log.debug(String.format("Workunit added for table: %s", hiveWorkUnit));
 
@@ -264,11 +265,19 @@ public class HiveSource implements Source {
       }
     } catch (UpdateNotFoundException e) {
       log.error(String.format("Not Creating workunit for %s as update time was not found. %s", hiveDataset.getTable()
-          .getCompleteName(), e.getMessage()));
+          .getCompleteName(), e.getMessage()), e);
     } catch (SchemaNotFoundException e) {
       log.error(String.format("Not Creating workunit for %s as schema was not found. %s", hiveDataset.getTable()
-          .getCompleteName(), e.getMessage()));
+          .getCompleteName(), e.getMessage()), e);
+    }
+  }
+
+  protected HiveWorkUnit workUnitForTable(HiveDataset hiveDataset) throws IOException {
+    HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
+    if (isAvro(hiveDataset.getTable())) {
+      hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
     }
+    return hiveWorkUnit;
   }
 
   private void createWorkunitsForPartitionedTable(HiveDataset hiveDataset, AutoReturnableObject<IMetaStoreClient> client) throws IOException {
@@ -315,17 +324,12 @@ public class HiveSource implements Source {
           LongWatermark expectedPartitionHighWatermark = this.watermarker.getExpectedHighWatermark(sourcePartition,
               tableProcessTime, partitionProcessTime);
 
-          HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset);
-          hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
-          hiveWorkUnit.setTableLocation(hiveDataset.getTable().getSd().getLocation());
-          hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(sourcePartition));
-          hiveWorkUnit.setPartitionName(sourcePartition.getName());
-          hiveWorkUnit.setPartitionLocation(sourcePartition.getLocation());
-          hiveWorkUnit.setPartitionKeys(sourcePartition.getTable().getPartitionKeys());
+          HiveWorkUnit hiveWorkUnit = workUnitForPartition(hiveDataset, sourcePartition);
           hiveWorkUnit.setWatermarkInterval(new WatermarkInterval(lowWatermark, expectedPartitionHighWatermark));
 
           EventWorkunitUtils.setPartitionSlaEventMetadata(hiveWorkUnit, hiveDataset.getTable(), sourcePartition, updateTime,
               lowWatermark.getValue(), this.beginGetWorkunitsTime);
+
           workunits.add(hiveWorkUnit);
           log.info(String.format("Creating workunit for partition %s as updateTime %s is greater than low watermark %s",
               sourcePartition.getCompleteName(), updateTime, lowWatermark.getValue()));
@@ -348,6 +352,15 @@ public class HiveSource implements Source {
     }
   }
 
+  protected HiveWorkUnit workUnitForPartition(HiveDataset hiveDataset, Partition partition) throws IOException {
+    HiveWorkUnit hiveWorkUnit = new HiveWorkUnit(hiveDataset, partition);
+    if (isAvro(hiveDataset.getTable())) {
+      hiveWorkUnit.setTableSchemaUrl(this.avroSchemaManager.getSchemaUrl(hiveDataset.getTable()));
+      hiveWorkUnit.setPartitionSchemaUrl(this.avroSchemaManager.getSchemaUrl(partition));
+    }
+    return hiveWorkUnit;
+  }
+
   /***
    * Check if path of Hive entity (table / partition) contains location token that should be ignored. If so, ignore
    * the partition.
@@ -447,4 +460,8 @@ public class HiveSource implements Source {
       }
     }
   }
+
+  private boolean isAvro(Table table) {
+    return AvroSerDe.class.getName().equals(table.getSd().getSerdeInfo().getSerializationLib());
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/5fa98326/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java
index e5d1a2e..326fdd8 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveWorkUnit.java
@@ -20,6 +20,7 @@ import java.lang.reflect.Type;
 import java.util.List;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.TableType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 
 import com.google.common.base.Optional;
@@ -29,6 +30,7 @@ import com.google.gson.Gson;
 import org.apache.gobblin.configuration.ConfigurationKeys;
 import org.apache.gobblin.data.management.copy.hive.HiveDataset;
 import org.apache.gobblin.source.workunit.WorkUnit;
+import org.apache.hadoop.hive.ql.metadata.Partition;
 
 
 /**
@@ -68,6 +70,16 @@ public class HiveWorkUnit extends WorkUnit {
   public HiveWorkUnit(HiveDataset hiveDataset) {
     super();
     setHiveDataset(hiveDataset);
+    if (hiveDataset.getTable().getTableType() != TableType.VIRTUAL_VIEW) {
+      setTableLocation(hiveDataset.getTable().getSd().getLocation());
+    }
+  }
+
+  public HiveWorkUnit(HiveDataset hiveDataset, Partition partition) {
+    this(hiveDataset);
+    setPartitionName(partition.getName());
+    setPartitionLocation(partition.getLocation());
+    setPartitionKeys(partition.getTable().getPartitionKeys());
   }
 
   /**


Mime
View raw message