gobblin-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ibuen...@apache.org
Subject incubator-gobblin git commit: [GOBBLIN-181] Create HiveTask using customized Gobblin Task
Date Tue, 05 Sep 2017 18:23:28 GMT
Repository: incubator-gobblin
Updated Branches:
  refs/heads/master a5fe06210 -> 8a374f207


[GOBBLIN-181] Create HiveTask using customized Gobblin Task

Closes #2062 from arjun4084346/materializer


Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/8a374f20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/8a374f20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/8a374f20

Branch: refs/heads/master
Commit: 8a374f207bc7838bcfb144d644b6243c78dc122d
Parents: a5fe062
Author: Arjun <abora@linkedin.com>
Authored: Tue Sep 5 11:23:29 2017 -0700
Committer: Issac Buenrostro <ibuenros@apache.org>
Committed: Tue Sep 5 11:23:29 2017 -0700

----------------------------------------------------------------------
 .../converter/AbstractAvroToOrcConverter.java   | 125 +-----
 .../conversion/hive/source/HiveSource.java      |   2 +-
 .../hive/task/HiveConverterUtils.java           | 387 +++++++++++++++++++
 .../conversion/hive/task/HiveMaterializer.java  |  54 +++
 .../task/HiveMaterializerQueryGenerator.java    | 244 ++++++++++++
 .../hive/task/HiveMaterializerSource.java       |  60 +++
 .../hive/task/HiveMaterializerTaskFactory.java  |  46 +++
 .../conversion/hive/task/HiveTask.java          | 178 +++++++++
 .../conversion/hive/task/QueryGenerator.java    |  42 ++
 .../hive/task/HiveConverterUtilsTest.java       |  49 +++
 .../hive/validation/ValidationJob.java          |  33 +-
 .../org/apache/gobblin/util/HadoopUtils.java    |  30 +-
 .../apache/gobblin/util/HadoopUtilsTest.java    |  21 +
 13 files changed, 1117 insertions(+), 154 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
index 9c4a5ec..b8495a9 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/converter/AbstractAvroToOrcConverter.java
@@ -32,6 +32,7 @@ import org.apache.avro.Schema;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -137,13 +138,6 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
   public static final String HIVE_DATASET_DESTINATION_SKIP_SETGROUP = "hive.dataset.destination.skip.setGroup";
   public static final boolean DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP = false;
 
-  /**
-   * If the property is set to true then partition dir is overwritten,
-   * else a new time-stamped partition dir is created to avoid breaking in-flight queries
-   * Check org.apache.gobblin.data.management.retention.Avro2OrcStaleDatasetCleaner to clean stale directories
-   */
-  public static final String HIVE_DATASET_PARTITION_OVERWRITE = "hive.dataset.partition.overwrite";
-  public static final boolean DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE = true;
 
   /**
    * If set to true, a set format DDL will be separate from add partition DDL
@@ -224,8 +218,8 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     String orcDataLocation = getOrcDataLocation();
     String orcStagingDataLocation = getOrcStagingDataLocation(orcStagingTableName);
     boolean isEvolutionEnabled = getConversionConfig().isEvolutionEnabled();
-    Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta = getDestinationTableMeta(orcTableDatabase,
-        orcTableName, workUnit);
+    Pair<Optional<Table>, Optional<List<Partition>>> destinationMeta = HiveConverterUtils.getDestinationTableMeta(orcTableDatabase,
+        orcTableName, workUnit.getProperties());
     Optional<Table> destinationTableMeta = destinationMeta.getLeft();
 
     // Optional
@@ -263,7 +257,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     // Populate optional partition info
     Map<String, String> partitionsDDLInfo = Maps.newHashMap();
     Map<String, String> partitionsDMLInfo = Maps.newHashMap();
-    populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo);
+    HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo);
 
     /*
      * Create ORC data location with the same permissions as Avro data
@@ -334,7 +328,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     log.debug("Create staging table DDL: " + createStagingTableDDL);
 
     // Create DDL statement for partition
-    String orcStagingDataPartitionDirName = getOrcStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier);
+    String orcStagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier);
     String orcStagingDataPartitionLocation = orcStagingDataLocation + Path.SEPARATOR + orcStagingDataPartitionDirName;
     if (partitionsDMLInfo.size() > 0) {
       List<String> createStagingPartitionDDL =
@@ -476,7 +470,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
       Optional<Path> destPartitionLocation = getDestinationPartitionLocation(destinationTableMeta, workUnit,
           conversionEntity.getHivePartition().get().getName());
       orcFinalDataPartitionLocation =
-          updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation);
+          HiveConverterUtils.updatePartitionLocation(orcFinalDataPartitionLocation, workUnit, destPartitionLocation);
       log.info(
           "Partition directory to move: " + orcStagingDataPartitionLocation + " to: " + orcFinalDataPartitionLocation);
       publishDirectories.put(orcStagingDataPartitionLocation, orcFinalDataPartitionLocation);
@@ -607,32 +601,7 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier;
   }
 
-  /***
-   * Get the ORC partition directory name of the format: [hourly_][daily_]<partitionSpec1>[partitionSpec ..]
-   * @param conversionEntity Conversion entity.
-   * @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name
-   *                               such as hourly or daily.
-   * @return Partition directory name.
-   */
-  private String getOrcStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity,
-      List<String> sourceDataPathIdentifier) {
-
-    if (conversionEntity.getHivePartition().isPresent()) {
-      StringBuilder dirNamePrefix = new StringBuilder();
-      String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString();
-      if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) {
-        for (String hint : sourceDataPathIdentifier) {
-          if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) {
-            dirNamePrefix.append(hint.toLowerCase()).append("_");
-          }
-        }
-      }
 
-      return dirNamePrefix + conversionEntity.getHivePartition().get().getName();
-    } else {
-      return StringUtils.EMPTY;
-    }
-  }
 
   /***
    * Get the ORC final table location of format: <ORC final table location>/final
@@ -699,88 +668,6 @@ public abstract class AbstractAvroToOrcConverter extends Converter<Schema, Schem
     return replacedPartitionsDDLInfo;
   }
 
-  private void populatePartitionInfo(QueryBasedHiveConversionEntity conversionEntity, Map<String, String> partitionsDDLInfo,
-      Map<String, String> partitionsDMLInfo) {
-    String partitionsInfoString = null;
-    String partitionsTypeString = null;
-
-    if (conversionEntity.getHivePartition().isPresent()) {
-      partitionsInfoString = conversionEntity.getHivePartition().get().getName();
-      partitionsTypeString = conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types");
-    }
-
-    if (StringUtils.isNotBlank(partitionsInfoString) || StringUtils.isNotBlank(partitionsTypeString)) {
-      if (StringUtils.isBlank(partitionsInfoString) || StringUtils.isBlank(partitionsTypeString)) {
-        throw new IllegalArgumentException("Both partitions info and partitions must be present, if one is specified");
-      }
-      List<String> pInfo = Splitter.on(HIVE_PARTITIONS_INFO).omitEmptyStrings().trimResults().splitToList(partitionsInfoString);
-      List<String> pType = Splitter.on(HIVE_PARTITIONS_TYPE).omitEmptyStrings().trimResults().splitToList(partitionsTypeString);
-      log.debug("PartitionsInfoString: " + partitionsInfoString);
-      log.debug("PartitionsTypeString: " + partitionsTypeString);
-
-      if (pInfo.size() != pType.size()) {
-        throw new IllegalArgumentException("partitions info and partitions type list should of same size");
-      }
-      for (int i = 0; i < pInfo.size(); i++) {
-        List<String> partitionInfoParts = Splitter.on("=").omitEmptyStrings().trimResults().splitToList(pInfo.get(i));
-        String partitionType = pType.get(i);
-        if (partitionInfoParts.size() != 2) {
-          throw new IllegalArgumentException(
-              String.format("Partition details should be of the format partitionName=partitionValue. Recieved: %s", pInfo.get(i)));
-        }
-        partitionsDDLInfo.put(partitionInfoParts.get(0), partitionType);
-        partitionsDMLInfo.put(partitionInfoParts.get(0), partitionInfoParts.get(1));
-      }
-    }
-  }
-
-  private Pair<Optional<Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName,
-      String tableName, WorkUnitState state)
-      throws DataConversionException {
-
-    Optional<Table> table = Optional.<Table>absent();
-    Optional<List<Partition>> partitions = Optional.<List<Partition>>absent();
-
-    try {
-      HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getJobState().getProperties(),
-          Optional.fromNullable(state.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
-      try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
-        table = Optional.of(client.get().getTable(dbName, tableName));
-        if (table.isPresent()) {
-          org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get());
-          if (HiveUtils.isPartitioned(qlTable)) {
-            partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String>absent()));
-          }
-        }
-      }
-    } catch (NoSuchObjectException e) {
-      return ImmutablePair.of(table, partitions);
-    } catch (IOException | TException e) {
-      throw new DataConversionException("Could not fetch destination table metadata", e);
-    }
-
-    return ImmutablePair.of(table, partitions);
-  }
-
-  /**
-   * If partition already exists then new partition location will be a separate time stamp dir
-   * If partition location is /a/b/c/<oldTimeStamp> then new partition location is /a/b/c/<currentTimeStamp>
-   * If partition location is /a/b/c/ then new partition location is /a/b/c/<currentTimeStamp>
-   **/
-  private String updatePartitionLocation(String orcDataPartitionLocation, WorkUnitState workUnitState,
-      Optional<Path> destPartitionLocation)
-      throws DataConversionException {
-
-    if (workUnitState.getPropAsBoolean(HIVE_DATASET_PARTITION_OVERWRITE, DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE)) {
-      return orcDataPartitionLocation;
-    }
-    if (!destPartitionLocation.isPresent()) {
-      return orcDataPartitionLocation;
-    }
-    long timeStamp = System.currentTimeMillis();
-    return StringUtils.join(Arrays.asList(orcDataPartitionLocation, timeStamp), '/');
-  }
-
   private Optional<Path> getDestinationPartitionLocation(Optional<Table> table, WorkUnitState state,
       String partitionName)
       throws DataConversionException {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
index 21d8ab2..74a8b3b 100644
--- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/source/HiveSource.java
@@ -427,7 +427,7 @@ public class HiveSource implements Source {
   public void shutdown(SourceState state) {
   }
 
-  private static FileSystem getSourceFs(State state) throws IOException {
+  public static FileSystem getSourceFs(State state) throws IOException {
     if (state.contains(HIVE_SOURCE_FS_URI)) {
       return FileSystem.get(URI.create(state.getProp(HIVE_SOURCE_FS_URI)), HadoopUtils.getConfFromState(state));
     }

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
new file mode 100644
index 0000000..119ef1c
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtils.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+import static java.util.stream.Collectors.joining;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.lang3.tuple.Pair;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hadoop.hive.ql.metadata.Partition;
+import org.apache.thrift.TException;
+
+import com.google.common.base.Joiner;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Throwables;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.data.management.copy.hive.HiveUtils;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.util.AutoReturnableObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+
+/**
+ * A utility class for converting hive data from one dataset to another.
+ */
+public class HiveConverterUtils {
+
+  /***
+   * Subdirectory within destination table directory to publish data
+   */
+  private static final String PUBLISHED_TABLE_SUBDIRECTORY = "final";
+
+  /***
+   * Separators used by Hive
+   */
+  private static final String HIVE_PARTITIONS_INFO = "/";
+  private static final String HIVE_PARTITIONS_TYPE = ":";
+
+  /**
+   * If the property is set to true then partition dir is overwritten,
+   * else a new time-stamped partition dir is created to avoid breaking in-flight queries
+   * Check org.apache.gobblin.data.management.retention.Avro2OrcStaleDatasetCleaner to clean stale directories
+   */
+  public static final String HIVE_DATASET_PARTITION_OVERWRITE = "hive.dataset.partition.overwrite";
+  public static final boolean DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE = true;
+
+  /**
+   * If the property is set to true then in the destination dir permissions, group won't be explicitly set.
+   */
+  public static final String HIVE_DATASET_DESTINATION_SKIP_SETGROUP = "hive.dataset.destination.skip.setGroup";
+  public static final boolean DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP = false;
+
+  public static String getStagingTableName(String stagingTableNamePrefix) {
+    int randomNumber = new Random().nextInt(100);
+    String uniqueStagingTableQualifier = String.format("%s%s", System.currentTimeMillis(), randomNumber);
+
+    return stagingTableNamePrefix + "_" + uniqueStagingTableQualifier;
+  }
+
+  /***
+   * Get the final table location of format: <final table location>/final
+   * @return final table location.
+   */
+  public static String getOutputDataLocation(String outputDataLocation) {
+    return outputDataLocation + Path.SEPARATOR + PUBLISHED_TABLE_SUBDIRECTORY;
+  }
+
+  /***
+   * Get the staging table location of format: <final table location>/<staging table name>
+   * @param outputDataLocation output table data lcoation.
+   * @return staging table location.
+   */
+  public static String getStagingDataLocation(String outputDataLocation, String stagingTableName) {
+    return outputDataLocation + Path.SEPARATOR + stagingTableName;
+  }
+
+  /***
+   * Generate DDL query to create a duplicate Hive table
+   * @param inputDbName source DB name
+   * @param inputTblName source table name
+   * @param tblName New Hive table name
+   * @param tblLocation New hive table location
+   * @param optionalDbName Optional DB name, if not specified it defaults to 'default'
+   */
+  public static String generateCreateDuplicateTableDDL(
+      String inputDbName,
+      String inputTblName,
+      String tblName,
+      String tblLocation,
+      Optional<String> optionalDbName) {
+
+    Preconditions.checkArgument(StringUtils.isNotBlank(tblName));
+    Preconditions.checkArgument(StringUtils.isNotBlank(tblLocation));
+
+    String dbName = optionalDbName.isPresent() ? optionalDbName.get() : "default";
+
+    return String.format("CREATE EXTERNAL TABLE IF NOT EXISTS `%s`.`%s` LIKE `%s`.`%s` LOCATION %n  '%s' %n",
+        dbName, tblName, inputDbName, inputTblName, tblLocation);
+  }
+
+  /**
+   * Fills data from input table into output table.
+   * @param inputTblName input hive table name
+   * @param outputTblName output hive table name
+   * @param inputDbName input hive database name
+   * @param outputDbName output hive database name
+   * @param optionalPartitionDMLInfo input hive table's partition's name and value
+   * @return Hive query string
+   */
+  public static String generateTableCopy(
+      String inputTblName,
+      String outputTblName,
+      String inputDbName,
+      String outputDbName,
+      Optional<Map<String, String>> optionalPartitionDMLInfo) {
+    Preconditions.checkArgument(StringUtils.isNotBlank(inputTblName));
+    Preconditions.checkArgument(StringUtils.isNotBlank(outputTblName));
+    Preconditions.checkArgument(StringUtils.isNotBlank(inputDbName));
+    Preconditions.checkArgument(StringUtils.isNotBlank(outputDbName));
+
+    StringBuilder dmlQuery = new StringBuilder();
+
+    // Insert query
+    dmlQuery.append(String.format("INSERT OVERWRITE TABLE `%s`.`%s` %n", outputDbName, outputTblName));
+
+    // Partition details
+    dmlQuery.append(partitionKeyValues(optionalPartitionDMLInfo));
+
+    dmlQuery.append(String.format("SELECT * FROM `%s`.`%s`", inputDbName, inputTblName));
+    if (optionalPartitionDMLInfo.isPresent()) {
+      if (optionalPartitionDMLInfo.get().size() > 0) {
+        dmlQuery.append(" WHERE ");
+        String partitionsAndValues = optionalPartitionDMLInfo.get().entrySet().stream()
+            .map(e -> "`" + e.getKey() + "`='" + e.getValue() + "'")
+            .collect(joining(" AND "));
+        dmlQuery.append(partitionsAndValues);
+      }
+    }
+
+    return dmlQuery.toString();
+  }
+
+  protected static StringBuilder partitionKeyValues(Optional<Map<String, String>> optionalPartitionDMLInfo) {
+    if (!optionalPartitionDMLInfo.isPresent()) {
+      return new StringBuilder();
+    } else {
+      return new StringBuilder("PARTITION (").append(Joiner.on(", ")
+          .join(optionalPartitionDMLInfo.get().entrySet().stream().map(e -> "`" + e.getKey() + "`").iterator())).append(") \n");
+    }
+  }
+
+  /**
+   * It fills partitionsDDLInfo and partitionsDMLInfo with the partition information
+   * @param conversionEntity conversion entity to
+   * @param partitionsDDLInfo partition type information, to be filled by this method
+   * @param partitionsDMLInfo partition key-value pair, to be filled by this method
+   */
+  public static void populatePartitionInfo(QueryBasedHiveConversionEntity conversionEntity, Map<String, String> partitionsDDLInfo,
+      Map<String, String> partitionsDMLInfo) {
+
+    String partitionsInfoString = null;
+    String partitionsTypeString = null;
+
+    if (conversionEntity.getHivePartition().isPresent()) {
+      partitionsInfoString = conversionEntity.getHivePartition().get().getName();
+      partitionsTypeString = conversionEntity.getHivePartition().get().getSchema().getProperty("partition_columns.types");
+    }
+
+    if (StringUtils.isNotBlank(partitionsInfoString) || StringUtils.isNotBlank(partitionsTypeString)) {
+      if (StringUtils.isBlank(partitionsInfoString) || StringUtils.isBlank(partitionsTypeString)) {
+        throw new IllegalArgumentException("Both partitions info and partitions must be present, if one is specified");
+      }
+      List<String> pInfo = Splitter.on(HIVE_PARTITIONS_INFO).omitEmptyStrings().trimResults().splitToList(partitionsInfoString);
+      List<String> pType = Splitter.on(HIVE_PARTITIONS_TYPE).omitEmptyStrings().trimResults().splitToList(partitionsTypeString);
+      log.debug("PartitionsInfoString: " + partitionsInfoString);
+      log.debug("PartitionsTypeString: " + partitionsTypeString);
+
+      if (pInfo.size() != pType.size()) {
+        throw new IllegalArgumentException("partitions info and partitions type list should of same size");
+      }
+      for (int i = 0; i < pInfo.size(); i++) {
+        List<String> partitionInfoParts = Splitter.on("=").omitEmptyStrings().trimResults().splitToList(pInfo.get(i));
+        String partitionType = pType.get(i);
+        if (partitionInfoParts.size() != 2) {
+          throw new IllegalArgumentException(
+              String.format("Partition details should be of the format partitionName=partitionValue. Recieved: %s", pInfo.get(i)));
+        }
+        partitionsDDLInfo.put(partitionInfoParts.get(0), partitionType);
+        partitionsDMLInfo.put(partitionInfoParts.get(0), partitionInfoParts.get(1));
+      }
+    }
+  }
+
+  /**
+   * Creates a staging directory with the permission as in source directory.
+   * @param fs filesystem object
+   * @param destination staging directory location
+   * @param conversionEntity conversion entity used to get source directory permissions
+   * @param workUnit workunit
+   */
+  public static void createStagingDirectory(FileSystem fs, String destination, QueryBasedHiveConversionEntity conversionEntity,
+      WorkUnitState workUnit) {
+    /*
+     * Create staging data location with the same permissions as source data location
+     *
+     * Note that hive can also automatically create the non-existing directories but it does not
+     * seem to create it with the desired permissions.
+     * According to hive docs permissions for newly created directories/files can be controlled using uMask like,
+     *
+     * SET hive.warehouse.subdir.inherit.perms=false;
+     * SET fs.permissions.umask-mode=022;
+     * Upon testing, this did not work
+     */
+    Path destinationPath = new Path(destination);
+    try {
+      FileStatus sourceDataFileStatus = fs.getFileStatus(conversionEntity.getHiveTable().getDataLocation());
+      FsPermission sourceDataPermission = sourceDataFileStatus.getPermission();
+      if (!fs.mkdirs(destinationPath, sourceDataPermission)) {
+        throw new RuntimeException(String.format("Failed to create path %s with permissions %s",
+            destinationPath, sourceDataPermission));
+      } else {
+        fs.setPermission(destinationPath, sourceDataPermission);
+        // Set the same group as source
+        if (!workUnit.getPropAsBoolean(HIVE_DATASET_DESTINATION_SKIP_SETGROUP, DEFAULT_HIVE_DATASET_DESTINATION_SKIP_SETGROUP)) {
+          fs.setOwner(destinationPath, null, sourceDataFileStatus.getGroup());
+        }
+        log.info(String.format("Created %s with permissions %s and group %s", destinationPath, sourceDataPermission, sourceDataFileStatus.getGroup()));
+      }
+    } catch (IOException e) {
+      Throwables.propagate(e);
+    }
+  }
+
+  /***
+   * Get the partition directory name of the format: [hourly_][daily_]<partitionSpec1>[partitionSpec ..]
+   * @param conversionEntity Conversion entity.
+   * @param sourceDataPathIdentifier Hints to look in source partition location to prefix the partition dir name
+   *                               such as hourly or daily.
+   * @return Partition directory name.
+   */
+  public static String getStagingDataPartitionDirName(QueryBasedHiveConversionEntity conversionEntity,
+      List<String> sourceDataPathIdentifier) {
+
+    if (conversionEntity.getHivePartition().isPresent()) {
+      StringBuilder dirNamePrefix = new StringBuilder();
+      String sourceHivePartitionLocation = conversionEntity.getHivePartition().get().getDataLocation().toString();
+      if (null != sourceDataPathIdentifier && null != sourceHivePartitionLocation) {
+        for (String hint : sourceDataPathIdentifier) {
+          if (sourceHivePartitionLocation.toLowerCase().contains(hint.toLowerCase())) {
+            dirNamePrefix.append(hint.toLowerCase()).append("_");
+          }
+        }
+      }
+
+      return dirNamePrefix + conversionEntity.getHivePartition().get().getName();
+    } else {
+      return StringUtils.EMPTY;
+    }
+  }
+
+  /**
+   * Returns the partition data location of a given table and partition
+   * @param table Hive table
+   * @param state workunit state
+   * @param partitionName partition name
+   * @return partition data location
+   * @throws DataConversionException
+   */
+  public static Optional<Path> getDestinationPartitionLocation(Optional<Table> table, WorkUnitState state,
+      String partitionName)
+      throws DataConversionException {
+    Optional<org.apache.hadoop.hive.metastore.api.Partition> partitionOptional;
+    if (!table.isPresent()) {
+      return Optional.absent();
+    }
+    try {
+      HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(state.getJobState().getProperties(),
+          Optional.fromNullable(state.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
+      try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
+        partitionOptional =
+            Optional.of(client.get().getPartition(table.get().getDbName(), table.get().getTableName(), partitionName));
+      } catch (NoSuchObjectException e) {
+        return Optional.absent();
+      }
+      if (partitionOptional.isPresent()) {
+        org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get());
+        Partition qlPartition =
+            new Partition(qlTable, partitionOptional.get());
+        return Optional.of(qlPartition.getDataLocation());
+      }
+    } catch (IOException | TException | HiveException e) {
+      throw new DataConversionException("Could not fetch destination table metadata", e);
+    }
+    return Optional.absent();
+  }
+
+  /**
+   * If partition already exists then new partition location will be a separate time stamp dir
+   * If partition location is /a/b/c/<oldTimeStamp> then new partition location is /a/b/c/<currentTimeStamp>
+   * If partition location is /a/b/c/ then new partition location is /a/b/c/<currentTimeStamp>
+   **/
+  public static String updatePartitionLocation(String outputDataPartitionLocation, WorkUnitState workUnitState,
+      Optional<Path> destPartitionLocation)
+      throws DataConversionException {
+
+    if (workUnitState.getPropAsBoolean(HIVE_DATASET_PARTITION_OVERWRITE, DEFAULT_HIVE_DATASET_PARTITION_OVERWRITE)) {
+      return outputDataPartitionLocation;
+    }
+    if (!destPartitionLocation.isPresent()) {
+      return outputDataPartitionLocation;
+    }
+    long timeStamp = System.currentTimeMillis();
+    return StringUtils.join(Arrays.asList(outputDataPartitionLocation, timeStamp), '/');
+  }
+
+  /**
+   * Returns a pair of Hive table and its partitions
+   * @param dbName db name
+   * @param tableName table name
+   * @param props properties
+   * @return a pair of Hive table and its partitions
+   * @throws DataConversionException
+   */
+  public static Pair<Optional<Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName,
+      String tableName, Properties props) {
+
+    Optional<Table> table = Optional.<Table>absent();
+    Optional<List<Partition>> partitions = Optional.<List<Partition>>absent();
+
+    try {
+      HiveMetastoreClientPool pool = HiveMetastoreClientPool.get(props,
+          Optional.fromNullable(props.getProperty(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
+      try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
+        table = Optional.of(client.get().getTable(dbName, tableName));
+        if (table.isPresent()) {
+          org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get());
+          if (HiveUtils.isPartitioned(qlTable)) {
+            partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String>absent()));
+          }
+        }
+      }
+    } catch (NoSuchObjectException e) {
+      return ImmutablePair.of(table, partitions);
+    } catch (IOException | TException e) {
+      throw new RuntimeException("Could not fetch destination table metadata", e);
+    }
+
+    return ImmutablePair.of(table, partitions);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
new file mode 100644
index 0000000..2e370c0
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializer.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.util.List;
+
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.runtime.TaskContext;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+
+/**
+ * A simple {@link HiveTask} for Hive view materialization.
+ */
+public class HiveMaterializer extends HiveTask {
+
+  private final QueryGenerator queryGenerator;
+
+  public HiveMaterializer(TaskContext taskContext) throws Exception {
+    super(taskContext);
+    this.queryGenerator = new HiveMaterializerQueryGenerator(this.workUnitState);
+    if (!(workUnit.getHiveDataset() instanceof ConvertibleHiveDataset)) {
+      throw new IllegalStateException("HiveConvertExtractor is only compatible with ConvertibleHiveDataset");
+    }
+  }
+
+  @Override
+  public List<String> generateHiveQueries() {
+    return queryGenerator.generateQueries();
+  }
+
+  @Override
+  public QueryBasedHivePublishEntity generatePublishQueries() throws Exception {
+    return queryGenerator.generatePublishQueries();
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
new file mode 100644
index 0000000..4eee0e0
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerQueryGenerator.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.util.Map;
+import java.util.List;
+
+import org.apache.gobblin.data.management.conversion.hive.converter.AbstractAvroToOrcConverter;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.IMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.Table;
+
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.avro.AvroSchemaManager;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDataset;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHiveConversionEntity;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHivePartition;
+import org.apache.gobblin.data.management.conversion.hive.entities.SchemaAwareHiveTable;
+import org.apache.gobblin.data.management.conversion.hive.events.EventWorkunitUtils;
+import org.apache.gobblin.data.management.conversion.hive.query.HiveAvroORCQueryGenerator;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.hive.HiveMetastoreClientPool;
+import org.apache.gobblin.util.AutoReturnableObject;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+/**
+ * A simple query generator for {@link HiveMaterializer}.
+ */
+public class HiveMaterializerQueryGenerator implements QueryGenerator {
+  private final FileSystem fs;
+  private final ConvertibleHiveDataset.ConversionConfig conversionConfig;
+  private final ConvertibleHiveDataset hiveDataset;
+  private final String inputDbName;
+  private final String inputTableName;
+  private final String outputDatabaseName;
+  private final String outputTableName;
+  private final String outputDataLocation;
+  private final String stagingTableName;
+  private final String stagingDataLocation;
+  private final List<String> sourceDataPathIdentifier;
+  private final String stagingDataPartitionDirName;
+  private final String stagingDataPartitionLocation;
+  private final Map<String, String> partitionsDDLInfo;
+  private final Map<String, String> partitionsDMLInfo;
+  private final Optional<Table> destinationTableMeta;
+  private final HiveWorkUnit workUnit;
+  private final HiveMetastoreClientPool pool;
+  private final QueryBasedHiveConversionEntity conversionEntity;
+  private final WorkUnitState workUnitState;
+
+  public HiveMaterializerQueryGenerator(WorkUnitState workUnitState) throws Exception {
+    this.workUnitState = workUnitState;
+    this.workUnit = new HiveWorkUnit(workUnitState.getWorkunit());
+    this.hiveDataset = (ConvertibleHiveDataset) workUnit.getHiveDataset();
+    this.inputDbName = hiveDataset.getDbAndTable().getDb();
+    this.inputTableName = hiveDataset.getDbAndTable().getTable();
+    this.fs = HiveSource.getSourceFs(workUnitState);
+    this.conversionConfig = hiveDataset.getConversionConfigForFormat("sameAsSource").get();
+    this.outputDatabaseName = conversionConfig.getDestinationDbName();
+    this.outputTableName = conversionConfig.getDestinationTableName();
+    this.outputDataLocation = HiveConverterUtils.getOutputDataLocation(conversionConfig.getDestinationDataPath());
+    this.stagingTableName = HiveConverterUtils.getStagingTableName(conversionConfig.getDestinationStagingTableName());
+    this.stagingDataLocation = HiveConverterUtils.getStagingDataLocation(conversionConfig.getDestinationDataPath(), stagingTableName);
+    this.sourceDataPathIdentifier = conversionConfig.getSourceDataPathIdentifier();
+    this.pool = HiveMetastoreClientPool.get(workUnitState.getJobState().getProperties(),
+        Optional.fromNullable(workUnitState.getJobState().getProp(HiveDatasetFinder.HIVE_METASTORE_URI_KEY)));
+    this.conversionEntity = getConversionEntity();
+    this.stagingDataPartitionDirName = HiveConverterUtils.getStagingDataPartitionDirName(conversionEntity, sourceDataPathIdentifier);
+    this.stagingDataPartitionLocation = stagingDataLocation + Path.SEPARATOR + stagingDataPartitionDirName;
+    this.partitionsDDLInfo = Maps.newHashMap();
+    this.partitionsDMLInfo = Maps.newHashMap();
+    HiveConverterUtils.populatePartitionInfo(conversionEntity, partitionsDDLInfo, partitionsDMLInfo);
+    this.destinationTableMeta = HiveConverterUtils.getDestinationTableMeta(outputDatabaseName,
+        outputTableName, workUnitState.getProperties()).getLeft();
+  }
+
+  /**
+   * Returns hive queries to be run as a part of a hive task.
+   * This does not include publish queries.
+   * @return
+   */
+  @Override
+  public List<String> generateQueries() {
+
+    List<String> hiveQueries = Lists.newArrayList();
+
+    Preconditions.checkNotNull(this.workUnit, "Workunit must not be null");
+    EventWorkunitUtils.setBeginDDLBuildTimeMetadata(this.workUnit, System.currentTimeMillis());
+
+    HiveConverterUtils.createStagingDirectory(fs, conversionConfig.getDestinationDataPath(),
+        conversionEntity, this.workUnitState);
+
+    // Create DDL statement for table
+    String createStagingTableDDL =
+        HiveConverterUtils.generateCreateDuplicateTableDDL(
+            inputDbName,
+            inputTableName,
+            stagingTableName,
+            stagingDataLocation,
+            Optional.of(outputDatabaseName));
+    hiveQueries.add(createStagingTableDDL);
+    log.debug("Create staging table DDL:\n" + createStagingTableDDL);
+
+    /*
+     * Setting partition mode to 'nonstrict' is needed to improve readability of the code.
+     * If we do not set dynamic partition mode to nonstrict, we will have to write partition values also,
+     * and because hive considers partition as a virtual column, we also have to write each of the column
+     * name in the query (in place of *) to match source and target columns.
+     */
+    hiveQueries.add("SET hive.exec.dynamic.partition.mode=nonstrict");
+
+    String insertInStagingTableDML =
+        HiveConverterUtils
+            .generateTableCopy(
+                inputTableName,
+                stagingTableName,
+                conversionEntity.getHiveTable().getDbName(),
+                outputDatabaseName,
+                Optional.of(partitionsDMLInfo));
+    hiveQueries.add(insertInStagingTableDML);
+    log.debug("Conversion staging DML: " + insertInStagingTableDML);
+
+    log.info("Conversion Queries {}\n",  hiveQueries);
+
+    EventWorkunitUtils.setEndDDLBuildTimeMetadata(workUnit, System.currentTimeMillis());
+    return hiveQueries;
+  }
+
+  /**
+   * Retuens a QueryBasedHivePublishEntity which includes publish level queries and cleanup commands.
+   * @return QueryBasedHivePublishEntity
+   * @throws DataConversionException
+   */
+  public QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException {
+
+    QueryBasedHivePublishEntity publishEntity = new QueryBasedHivePublishEntity();
+    List<String> publishQueries = publishEntity.getPublishQueries();
+    Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
+    List<String> cleanupQueries = publishEntity.getCleanupQueries();
+    List<String> cleanupDirectories = publishEntity.getCleanupDirectories();
+
+    String createFinalTableDDL =
+        HiveConverterUtils.generateCreateDuplicateTableDDL(inputDbName, inputTableName, outputTableName,
+            outputDataLocation, Optional.of(outputDatabaseName));
+    publishQueries.add(createFinalTableDDL);
+    log.debug("Create final table DDL:\n" + createFinalTableDDL);
+
+    if (partitionsDDLInfo.size() == 0) {
+      log.debug("Snapshot directory to move: " + stagingDataLocation + " to: " + outputDataLocation);
+      publishDirectories.put(stagingDataLocation, outputDataLocation);
+
+      String dropStagingTableDDL = HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName);
+
+      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
+      cleanupQueries.add(dropStagingTableDDL);
+
+      log.debug("Staging table directory to delete: " + stagingDataLocation);
+      cleanupDirectories.add(stagingDataLocation);
+    } else {
+      String finalDataPartitionLocation = outputDataLocation + Path.SEPARATOR + stagingDataPartitionDirName;
+      Optional<Path> destPartitionLocation =
+            HiveConverterUtils.getDestinationPartitionLocation(destinationTableMeta, this.workUnitState,
+                conversionEntity.getHivePartition().get().getName());
+        finalDataPartitionLocation = HiveConverterUtils.updatePartitionLocation(finalDataPartitionLocation, this.workUnitState,
+            destPartitionLocation);
+
+      log.debug("Partition directory to move: " + stagingDataPartitionLocation + " to: " + finalDataPartitionLocation);
+      publishDirectories.put(stagingDataPartitionLocation, finalDataPartitionLocation);
+      List<String> dropPartitionsDDL =
+          HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName, partitionsDMLInfo);
+      log.debug("Drop partitions if exist in final table: " + dropPartitionsDDL);
+      publishQueries.addAll(dropPartitionsDDL);
+      List<String> createFinalPartitionDDL =
+          HiveAvroORCQueryGenerator.generateCreatePartitionDDL(outputDatabaseName, outputTableName,
+              finalDataPartitionLocation, partitionsDMLInfo, Optional.<String>absent());
+
+      log.debug("Create final partition DDL: " + createFinalPartitionDDL);
+      publishQueries.addAll(createFinalPartitionDDL);
+
+      String dropStagingTableDDL =
+          HiveAvroORCQueryGenerator.generateDropTableDDL(outputDatabaseName, stagingTableName);
+
+      log.debug("Drop staging table DDL: " + dropStagingTableDDL);
+      cleanupQueries.add(dropStagingTableDDL);
+
+      log.debug("Staging table directory to delete: " + stagingDataLocation);
+      cleanupDirectories.add(stagingDataLocation);
+    }
+
+    publishQueries.addAll(HiveAvroORCQueryGenerator.generateDropPartitionsDDL(outputDatabaseName, outputTableName,
+        AbstractAvroToOrcConverter.getDropPartitionsDDLInfo(conversionEntity)));
+
+    log.info("Publish partition entity: " + publishEntity);
+    return publishEntity;
+  }
+
+  private QueryBasedHiveConversionEntity getConversionEntity() throws Exception {
+
+
+    try (AutoReturnableObject<IMetaStoreClient> client = this.pool.getClient()) {
+
+      Table table = client.get().getTable(this.inputDbName, this.inputTableName);
+
+      SchemaAwareHiveTable schemaAwareHiveTable = new SchemaAwareHiveTable(table, AvroSchemaManager.getSchemaFromUrl(workUnit.getTableSchemaUrl(), fs));
+
+      SchemaAwareHivePartition schemaAwareHivePartition = null;
+
+      if (workUnit.getPartitionName().isPresent() && workUnit.getPartitionSchemaUrl().isPresent()) {
+        org.apache.hadoop.hive.metastore.api.Partition
+            partition = client.get().getPartition(this.inputDbName, this.inputTableName, workUnit.getPartitionName().get());
+        schemaAwareHivePartition =
+            new SchemaAwareHivePartition(table, partition, AvroSchemaManager.getSchemaFromUrl(workUnit.getPartitionSchemaUrl().get(), fs));
+      }
+      return new QueryBasedHiveConversionEntity(this.hiveDataset, schemaAwareHiveTable, Optional.fromNullable(schemaAwareHivePartition));
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
new file mode 100644
index 0000000..d1c9371
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerSource.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.util.List;
+
+import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.configuration.SourceState;
+import org.apache.gobblin.data.management.conversion.hive.dataset.ConvertibleHiveDatasetFinder;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.PartitionLevelWatermarker;
+import org.apache.gobblin.data.management.copy.hive.HiveDatasetFinder;
+import org.apache.gobblin.runtime.task.TaskUtils;
+import org.apache.gobblin.source.workunit.WorkUnit;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+
+/**
+ * A simple HiveSource for {@link HiveMaterializer}.
+ */
+public class HiveMaterializerSource extends HiveSource {
+
+  @Override
+  public List<WorkUnit> getWorkunits(SourceState state) {
+    if (!state.contains(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY)) {
+      state.setProp(HIVE_SOURCE_DATASET_FINDER_CLASS_KEY, ConvertibleHiveDatasetFinder.class.getName());
+    }
+    if (!state.contains(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY)) {
+      state.setProp(HiveDatasetFinder.HIVE_DATASET_CONFIG_PREFIX_KEY, "hive.conversion.avro");
+    }
+
+    List<WorkUnit> workUnits = super.getWorkunits(state);
+
+    for(WorkUnit workUnit : workUnits) {
+      if (Boolean.valueOf(workUnit.getPropAsBoolean(PartitionLevelWatermarker.IS_WATERMARK_WORKUNIT_KEY))) {
+        log.info("Ignoring Watermark workunit for {}", workUnit.getProp(ConfigurationKeys.DATASET_URN_KEY));
+        continue;
+      }
+      TaskUtils.setTaskFactoryClass(workUnit, HiveMaterializerTaskFactory.class);
+    }
+    return workUnits;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
new file mode 100644
index 0000000..c05d4bf
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveMaterializerTaskFactory.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import org.apache.gobblin.publisher.DataPublisher;
+import org.apache.gobblin.publisher.NoopPublisher;
+import org.apache.gobblin.runtime.JobState;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.TaskFactory;
+import org.apache.gobblin.runtime.task.TaskIFace;
+
+/**
+ * A {@link TaskFactory} that runs a {@link HiveMaterializer} task.
+ * This factory is intended to publish data in the task directly, and
+ * uses a {@link NoopPublisher}.
+ */
+public class HiveMaterializerTaskFactory implements TaskFactory {
+  @Override
+  public TaskIFace createTask(TaskContext taskContext) {
+    try {
+      return new HiveMaterializer(taskContext);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public DataPublisher createDataPublisher(JobState.DatasetState datasetState) {
+    return new NoopPublisher(datasetState);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
new file mode 100644
index 0000000..aabbb6e
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/HiveTask.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.sql.SQLException;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveSource;
+import org.apache.gobblin.data.management.conversion.hive.source.HiveWorkUnit;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarker;
+import org.apache.gobblin.data.management.conversion.hive.watermarker.HiveSourceWatermarkerFactory;
+import org.apache.gobblin.metrics.event.EventSubmitter;
+import org.apache.gobblin.runtime.TaskContext;
+import org.apache.gobblin.runtime.task.BaseAbstractTask;
+import org.apache.gobblin.util.HadoopUtils;
+import org.apache.gobblin.util.HiveJdbcConnector;
+import org.apache.gobblin.util.reflection.GobblinConstructorUtils;
+
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+
+/**
+ * An abstract Task that runs a hive job.
+ * it runs hive queries.
+ * Implementation classes should implement abstract methods generateHiveQueries() and generatePublishQueries()
+ * which creates extract/write level queries and publish level queries respectively.
+ */
+public abstract class HiveTask extends BaseAbstractTask {
+  protected final TaskContext taskContext;
+  protected final WorkUnitState workUnitState;
+  protected final HiveWorkUnit workUnit;
+  protected final EventSubmitter eventSubmitter;
+  protected final List<String> hiveExecutionQueries;
+  protected final QueryBasedHivePublishEntity publishEntity;
+  protected final HiveJdbcConnector hiveJdbcConnector;
+
+  public HiveTask(TaskContext taskContext) {
+    super(taskContext);
+    this.taskContext = taskContext;
+    this.workUnitState = taskContext.getTaskState();
+    this.workUnit = new HiveWorkUnit(this.workUnitState.getWorkunit());
+    this.eventSubmitter = new EventSubmitter.Builder(this.metricContext, "gobblin.HiveTask")
+        .build();
+    this.hiveExecutionQueries = Lists.newArrayList();
+    this.publishEntity = new QueryBasedHivePublishEntity();
+    try {
+      this.hiveJdbcConnector = HiveJdbcConnector.newConnectorWithProps(this.workUnitState.getProperties());
+    } catch (SQLException se) {
+      throw new RuntimeException("Error in creating JDBC Connector", se);
+    }
+  }
+
+  /**
+   * Generate hive queries to extract data
+   * @return list of hive queries
+   * @throws Exception
+   */
+  public abstract List<String> generateHiveQueries() throws Exception;
+
+  /**
+   * Generate publish and cleanup queries for hive datasets/partitions
+   * @return QueryBasedHivePublishEntity having cleanup and publish queries
+   * @throws Exception
+   */
+  public abstract QueryBasedHivePublishEntity generatePublishQueries() throws Exception;
+
+  protected void executePublishQueries(QueryBasedHivePublishEntity publishEntity) {
+    Set<String> cleanUpQueries = Sets.newLinkedHashSet();
+    Set<String> publishQueries = Sets.newLinkedHashSet();
+    List<String> directoriesToDelete = Lists.newArrayList();
+    FileSystem fs = null;
+
+    try {
+      fs = HiveSource.getSourceFs(workUnitState);
+
+      if (publishEntity.getCleanupQueries() != null) {
+        cleanUpQueries.addAll(publishEntity.getCleanupQueries());
+      }
+
+      if (publishEntity.getCleanupDirectories() != null) {
+        directoriesToDelete.addAll(publishEntity.getCleanupDirectories());
+      }
+
+      if (publishEntity.getPublishDirectories() != null) {
+        // Publish snapshot / partition directories
+        Map<String, String> publishDirectories = publishEntity.getPublishDirectories();
+        try {
+          for (Map.Entry<String, String> publishDir : publishDirectories.entrySet()) {
+            HadoopUtils.renamePath(fs, new Path(publishDir.getKey()), new Path(publishDir.getValue()), true);
+          }
+        } catch (RuntimeException re) {
+          throw re;
+        }
+        catch (Exception e) {
+          log.error("error in move dir");
+        }
+      }
+
+      if (publishEntity.getPublishQueries() != null) {
+        publishQueries.addAll(publishEntity.getPublishQueries());
+      }
+
+      WorkUnitState wus = this.workUnitState;
+
+      this.hiveJdbcConnector.executeStatements(publishQueries.toArray(new String[publishQueries.size()]));
+
+      wus.setWorkingState(WorkUnitState.WorkingState.COMMITTED);
+
+      HiveSourceWatermarker watermarker = GobblinConstructorUtils.invokeConstructor(
+          HiveSourceWatermarkerFactory.class, wus.getProp(HiveSource.HIVE_SOURCE_WATERMARKER_FACTORY_CLASS_KEY,
+              HiveSource.DEFAULT_HIVE_SOURCE_WATERMARKER_FACTORY_CLASS)).createFromState(wus);
+
+      watermarker.setActualHighWatermark(wus);
+    } catch (RuntimeException re) {
+      throw re;
+    } catch (Exception e) {
+      log.error("Error in HiveMaterializer generate publish queries", e);
+    } finally {
+      try {
+        this.hiveJdbcConnector.executeStatements(cleanUpQueries.toArray(new String[cleanUpQueries.size()]));
+        HadoopUtils.deleteDirectories(fs, directoriesToDelete, true, true);
+      } catch(RuntimeException re) {
+        throw re;
+      } catch (Exception e) {
+        log.error("Failed to cleanup staging entities.", e);
+      }
+    }
+  }
+
+  @Override
+  public void run() {
+    try {
+      List<String> queries = generateHiveQueries();
+      this.hiveJdbcConnector.executeStatements(queries.toArray(new String[queries.size()]));
+      super.run();
+    } catch (Exception e) {
+      this.workingState = WorkUnitState.WorkingState.FAILED;
+      log.error("Exception in HiveTask generateHiveQueries ", e);
+    }
+  }
+
+  @Override
+  public void commit() {
+    try {
+      executePublishQueries(generatePublishQueries());
+      super.commit();
+    } catch (Exception e) {
+      this.workingState = WorkUnitState.WorkingState.FAILED;
+      log.error("Exception in HiveTask generate publish HiveQueries ", e);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
new file mode 100644
index 0000000..1502b06
--- /dev/null
+++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/task/QueryGenerator.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.util.List;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.data.management.conversion.hive.entities.QueryBasedHivePublishEntity;
+
+
+/**
+ * An interface for generating queries.
+ */
+interface QueryGenerator {
+
+  /**
+   * Generates queries to extract/convert/write data
+   * @return list of queries
+   */
+  List<String> generateQueries();
+
+  /**
+   * Generates queries for publish data
+   * @return QueryBasedHivePublishEntity containing cleanup and publish queries
+   * @throws DataConversionException
+   */
+  QueryBasedHivePublishEntity generatePublishQueries() throws DataConversionException;
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
new file mode 100644
index 0000000..a2a3dbe
--- /dev/null
+++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/task/HiveConverterUtilsTest.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.data.management.conversion.hive.task;
+
+import java.util.Map;
+
+import org.junit.Test;
+import org.testng.Assert;
+import org.testng.collections.Maps;
+
+import com.google.common.base.Optional;
+
+public class HiveConverterUtilsTest {
+  private final String inputDbName = "testdb";
+  private final String inputTableName = "testtable";
+  private final String outputDatabaseName = "testdb2";
+  private final String outputTableName = "testtable2";
+
+  @Test
+  public void copyTableQueryTest() throws Exception {
+    Map<String, String> partitionsDMLInfo = Maps.newHashMap();
+    String partitionName = "datepartition";
+    String partitionValue = "2017-07-15-08";
+
+    partitionsDMLInfo.put(partitionName, partitionValue);
+    String expectedQuery = "INSERT OVERWRITE TABLE `" + outputDatabaseName + "`.`" + outputTableName + "` \n"
+        + "PARTITION (`" + partitionName + "`) \n" + "SELECT * FROM `" + inputDbName + "`.`" + inputTableName + "` WHERE "
+        + "`" + partitionName + "`='" + partitionsDMLInfo.get(partitionName) + "'";
+
+    String actualQuery = HiveConverterUtils.generateTableCopy(inputTableName,
+        outputTableName, inputDbName, outputDatabaseName, Optional.of(partitionsDMLInfo));
+    Assert.assertEquals(expectedQuery, actualQuery);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
----------------------------------------------------------------------
diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
index 8bf2c21..9222d48 100644
--- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
+++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/data/management/conversion/hive/validation/ValidationJob.java
@@ -19,6 +19,7 @@ package org.apache.gobblin.data.management.conversion.hive.validation;
 import org.apache.gobblin.config.client.ConfigClient;
 import org.apache.gobblin.config.client.api.VersionStabilityPolicy;
 import org.apache.gobblin.configuration.ConfigurationKeys;
+import org.apache.gobblin.data.management.conversion.hive.task.HiveConverterUtils;
 import org.apache.gobblin.util.PathUtils;
 import java.io.IOException;
 import java.io.InputStreamReader;
@@ -45,18 +46,15 @@ import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.tuple.ImmutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.metastore.IMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
 import org.apache.hadoop.hive.ql.metadata.Partition;
 import org.apache.hadoop.hive.ql.metadata.Table;
 import org.apache.log4j.Logger;
-import org.apache.thrift.TException;
 import org.joda.time.DateTime;
 import org.slf4j.LoggerFactory;
 
@@ -374,7 +372,7 @@ public class ValidationJob extends AbstractJob {
           String orcTableName = conversionConfig.getDestinationTableName();
           String orcTableDatabase = conversionConfig.getDestinationDbName();
           Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> destinationMeta =
-              getDestinationTableMeta(orcTableDatabase, orcTableName, this.props);
+              HiveConverterUtils.getDestinationTableMeta(orcTableDatabase, orcTableName, this.props);
 
           // Generate validation queries
           final List<String> validationQueries =
@@ -433,7 +431,7 @@ public class ValidationJob extends AbstractJob {
         String orcTableName = conversionConfig.getDestinationTableName();
         String orcTableDatabase = conversionConfig.getDestinationDbName();
         Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> destinationMeta =
-            getDestinationTableMeta(orcTableDatabase, orcTableName, this.props);
+            HiveConverterUtils.getDestinationTableMeta(orcTableDatabase, orcTableName, this.props);
 
         // Validate each partition
         for (final Partition sourcePartition : sourcePartitions) {
@@ -704,31 +702,6 @@ public class ValidationJob extends AbstractJob {
     DateFormat dateFormat = new SimpleDateFormat(DATE_FORMAT);
     return dateFormat.parse(dateString).getTime();
   }
-
-  private Pair<Optional<org.apache.hadoop.hive.metastore.api.Table>, Optional<List<Partition>>> getDestinationTableMeta(String dbName, String tableName,
-      Properties props) {
-
-    Optional<org.apache.hadoop.hive.metastore.api.Table> table = Optional.absent();
-    Optional<List<Partition>> partitions = Optional.absent();
-
-    try {
-      try (AutoReturnableObject<IMetaStoreClient> client = pool.getClient()) {
-        table = Optional.of(client.get().getTable(dbName, tableName));
-        if (table.isPresent()) {
-          org.apache.hadoop.hive.ql.metadata.Table qlTable = new org.apache.hadoop.hive.ql.metadata.Table(table.get());
-          if (HiveUtils.isPartitioned(qlTable)) {
-            partitions = Optional.of(HiveUtils.getPartitions(client.get(), qlTable, Optional.<String> absent()));
-          }
-        }
-      }
-    } catch (NoSuchObjectException e) {
-      return ImmutablePair.of(table, partitions);
-    } catch (IOException | TException e) {
-      throw new RuntimeException("Could not fetch destination table metadata", e);
-    }
-
-    return ImmutablePair.of(table, partitions);
-  }
 }
 
 enum ValidationType {

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
index 8d186a6..27ec5cd 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/HadoopUtils.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.fs.FileUtil;
 import org.apache.hadoop.fs.LocalFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.RawLocalFileSystem;
+import org.apache.hadoop.fs.Trash;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.apache.hadoop.io.IOUtils;
 import org.apache.hadoop.io.Writable;
@@ -147,6 +148,20 @@ public class HadoopUtils {
   }
 
   /**
+   * Calls deletePath() on each directory in the given list of directories to delete.
+   * If moveToTrash is set, it will be moved to trash according to the file system trash policy.
+   */
+  public static void deleteDirectories(FileSystem fs, List<String> directoriesToDelete, boolean recursive, boolean moveToTrash) throws IOException {
+    for (String directory : directoriesToDelete) {
+      if (moveToTrash) {
+        moveToTrash(fs, new Path(directory));
+      } else {
+        deletePath(fs, new Path(directory), recursive);
+      }
+    }
+  }
+
+  /**
    * A wrapper around {@link FileSystem#delete(Path, boolean)} that only deletes a given {@link Path} if it is present
    * on the given {@link FileSystem}.
    */
@@ -170,6 +185,16 @@ public class HadoopUtils {
   }
 
   /**
+   * Moves the object to the filesystem trash according to the file system policy.
+   * @param fs FileSystem object
+   * @param path Path to the object to be moved to trash.
+   * @throws IOException
+   */
+  public static void moveToTrash(FileSystem fs, Path path) throws IOException {
+    Trash trash = new Trash(fs, new Configuration());
+    trash.moveToTrash(path);
+  }
+  /**
    * Renames a src {@link Path} on fs {@link FileSystem} to a dst {@link Path}. If fs is a {@link LocalFileSystem} and
    * src is a directory then {@link File#renameTo} is called directly to avoid a directory rename race condition where
    * {@link org.apache.hadoop.fs.RawLocalFileSystem#rename} copies the conflicting src directory into dst resulting in
@@ -212,10 +237,7 @@ public class HadoopUtils {
     }
     if (fs.exists(newName)) {
       if (overwrite) {
-        if (!fs.delete(newName, true)) {
-          throw new IOException(
-              String.format("Failed to delete %s while renaming %s to %s", newName, oldName, newName));
-        }
+        HadoopUtils.moveToTrash(fs, newName);
       } else {
         throw new FileAlreadyExistsException(
             String.format("Failed to rename %s to %s: dst already exists", oldName, newName));

http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/8a374f20/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java
----------------------------------------------------------------------
diff --git a/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java b/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java
index 768dbdd..4dc3ba6 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/HadoopUtilsTest.java
@@ -29,6 +29,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.Trash;
+import org.apache.hadoop.fs.TrashPolicy;
 import org.apache.hadoop.fs.permission.FsPermission;
 import org.mockito.Mockito;
 import org.mockito.invocation.InvocationOnMock;
@@ -281,4 +283,23 @@ public class HadoopUtilsTest {
       Assert.assertNotNull(configuration.get(entry.getKey())); //Verify key with child path exist as decryption is unit tested in ConfigUtil.
     }
   }
+
+  @Test
+  public void testMoveToTrash() throws IOException {
+    Path hadoopUtilsTestDir = new Path(Files.createTempDir().getAbsolutePath(), "HadoopUtilsTestDir");
+    Configuration conf = new Configuration();
+    // Set the time to keep it in trash to 10 minutes.
+    // 0 means object will be deleted instantly.
+    conf.set("fs.trash.interval", "10");
+    FileSystem fs = FileSystem.getLocal(conf);
+    Trash trash = new Trash(fs, conf);
+    TrashPolicy trashPolicy = TrashPolicy.getInstance(conf, fs, fs.getHomeDirectory());
+    Path trashPath = trashPolicy.getCurrentTrashDir();
+
+    fs.mkdirs(hadoopUtilsTestDir);
+    Assert.assertTrue(fs.exists(hadoopUtilsTestDir));
+    trash.moveToTrash(hadoopUtilsTestDir.getParent());
+    Assert.assertFalse(fs.exists(hadoopUtilsTestDir));
+    Assert.assertTrue(fs.exists(trashPath));
+  }
 }


Mime
View raw message