drill-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From meh...@apache.org
Subject drill git commit: DRILL-3739: (part 2) Fix issues in reading Hive tables with StorageHandler configuration (eg. Hive-HBase tables)
Date Wed, 30 Dec 2015 19:40:26 GMT
Repository: drill
Updated Branches:
  refs/heads/master 6dea42994 -> 76f41e182


DRILL-3739: (part 2) Fix issues in reading Hive tables with StorageHandler configuration (eg.
Hive-HBase tables)


Project: http://git-wip-us.apache.org/repos/asf/drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/76f41e18
Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/76f41e18
Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/76f41e18

Branch: refs/heads/master
Commit: 76f41e18207e3e3e987fef56ee7f1695dd6ddd7a
Parents: 6dea429
Author: vkorukanti <venki@stealthsec.com>
Authored: Tue Dec 29 01:29:05 2015 -0800
Committer: vkorukanti <venki@stealthsec.com>
Committed: Tue Dec 29 01:29:21 2015 -0800

----------------------------------------------------------------------
 ...onvertHiveParquetScanToDrillParquetScan.java | 23 +++++++++++++++-----
 .../exec/store/hive/HiveMetadataProvider.java   |  2 +-
 .../drill/exec/store/hive/HiveRecordReader.java | 19 ++++++++--------
 .../drill/exec/store/hive/HiveUtilities.java    | 18 ++++++++-------
 .../apache/drill/exec/hive/TestHiveStorage.java | 16 ++++++++++++++
 5 files changed, 54 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/drill/blob/76f41e18/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
index 7f42336..722776b 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/planner/sql/logical/ConvertHiveParquetScanToDrillParquetScan.java
@@ -36,9 +36,11 @@ import org.apache.drill.exec.planner.physical.PrelUtil;
 import org.apache.drill.exec.planner.sql.DrillSqlOperator;
 import org.apache.drill.exec.store.StoragePluginOptimizerRule;
 import org.apache.drill.exec.store.hive.HiveDrillNativeParquetScan;
+import org.apache.drill.exec.store.hive.HiveReadEntry;
 import org.apache.drill.exec.store.hive.HiveScan;
 import org.apache.drill.exec.store.hive.HiveTable.HivePartition;
 import org.apache.drill.exec.store.hive.HiveUtilities;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
@@ -46,9 +48,11 @@ import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.JobConf;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 
 /**
  * Convert Hive scan to use Drill's native parquet reader instead of Hive's native reader.
It also adds a
@@ -91,7 +95,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     final HiveScan hiveScan = (HiveScan) scanRel.getGroupScan();
     final Table hiveTable = hiveScan.hiveReadEntry.getTable();
 
-    final Class<? extends InputFormat> tableInputFormat = getInputFormatFromSD(hiveTable,
hiveTable.getSd());
+    final Class<? extends InputFormat> tableInputFormat =
+        getInputFormatFromSD(MetaStoreUtils.getTableMetadata(hiveTable), hiveScan.hiveReadEntry,
hiveTable.getSd());
     if (tableInputFormat == null || !tableInputFormat.equals(MapredParquetInputFormat.class))
{
       return false;
     }
@@ -105,7 +110,8 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
     // Make sure all partitions have the same input format as the table input format
     for (HivePartition partition : partitions) {
       final StorageDescriptor partitionSD = partition.getPartition().getSd();
-      Class<? extends InputFormat> inputFormat = getInputFormatFromSD(hiveTable, partitionSD);
+      Class<? extends InputFormat> inputFormat = getInputFormatFromSD(
+          HiveUtilities.getPartitionMetadata(partition.getPartition(), hiveTable), hiveScan.hiveReadEntry,
partitionSD);
       if (inputFormat == null || !inputFormat.equals(tableInputFormat)) {
         return false;
       }
@@ -127,14 +133,19 @@ public class ConvertHiveParquetScanToDrillParquetScan extends StoragePluginOptim
 
   /**
    * Get the input format from given {@link StorageDescriptor}
-   * @param hiveTable
+   * @param properties
+   * @param hiveReadEntry
    * @param sd
    * @return {@link InputFormat} class or null if a failure has occurred. Failure is logged
as warning.
    */
-  private Class<? extends InputFormat> getInputFormatFromSD(final Table hiveTable,
final StorageDescriptor sd) {
+  private Class<? extends InputFormat> getInputFormatFromSD(final Properties properties,
+      final HiveReadEntry hiveReadEntry, final StorageDescriptor sd) {
+    final Table hiveTable = hiveReadEntry.getTable();
     try {
-      return (Class<? extends InputFormat>) Class.forName(sd.getInputFormat());
-    } catch (ReflectiveOperationException e) {
+      final JobConf job = new JobConf();
+      HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
+      return HiveUtilities.getInputFormatClass(job, sd, hiveTable);
+    } catch (final Exception e) {
       logger.warn("Failed to get InputFormat class from Hive table '{}.{}'. StorageDescriptor
[{}]",
           hiveTable.getDbName(), hiveTable.getTableName(), sd.toString(), e);
       return null;

http://git-wip-us.apache.org/repos/asf/drill/blob/76f41e18/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
index 7b36796..c1aa9fa 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveMetadataProvider.java
@@ -240,7 +240,7 @@ public class HiveMetadataProvider {
           final List<InputSplitWrapper> splits = Lists.newArrayList();
           final JobConf job = new JobConf();
           HiveUtilities.addConfToJob(job, properties, hiveReadEntry.hiveConfigOverride);
-          HiveUtilities.setInputFormatClass(job, sd, hiveReadEntry.getTable());
+          job.setInputFormat(HiveUtilities.getInputFormatClass(job, sd, hiveReadEntry.getTable()));
           final Path path = new Path(sd.getLocation());
           final FileSystem fs = path.getFileSystem(job);
 

http://git-wip-us.apache.org/repos/asf/drill/blob/76f41e18/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index f50f331..73c126c 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -126,30 +126,31 @@ public class HiveRecordReader extends AbstractRecordReader {
     defaultPartitionValue = HiveUtilities.getDefaultPartitionValue(hiveConfigOverride);
 
     try {
-      Properties properties = MetaStoreUtils.getTableMetadata(table);
-      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(),
properties);
+      final Properties tableProperties = MetaStoreUtils.getTableMetadata(table);
+      final Properties partitionProperties =
+          (partition == null) ?  tableProperties :
+              HiveUtilities.getPartitionMetadata(partition, table);
+      HiveUtilities.addConfToJob(job, partitionProperties, hiveConfigOverride);
+
+      final SerDe tableSerDe = createSerDe(job, table.getSd().getSerdeInfo().getSerializationLib(),
tableProperties);
       final StructObjectInspector tableOI = getStructOI(tableSerDe);
 
       if (partition != null) {
-        properties = HiveUtilities.getPartitionMetadata(partition, table);
-
-        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(),
properties);
+        partitionSerDe = createSerDe(job, partition.getSd().getSerdeInfo().getSerializationLib(),
partitionProperties);
         partitionOI = getStructOI(partitionSerDe);
 
         finalOI = (StructObjectInspector)ObjectInspectorConverters.getConvertedOI(partitionOI,
tableOI);
         partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(partitionOI,
finalOI);
-        HiveUtilities.setInputFormatClass(job, partition.getSd(), table);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, partition.getSd(), table));
       } else {
         // For non-partitioned tables, there is no need to create converter as there are
no schema changes expected.
         partitionSerDe = tableSerDe;
         partitionOI = tableOI;
         partTblObjectInspectorConverter = null;
         finalOI = tableOI;
-        HiveUtilities.setInputFormatClass(job, table.getSd(), table);
+        job.setInputFormat(HiveUtilities.getInputFormatClass(job, table.getSd(), table));
       }
 
-      HiveUtilities.addConfToJob(job, properties, hiveConfigOverride);
-
       // Get list of partition column names
       final List<String> partitionNames = Lists.newArrayList();
       for (FieldSchema field : table.getPartitionKeys()) {

http://git-wip-us.apache.org/repos/asf/drill/blob/76f41e18/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
index 9bf4213..00597d8 100644
--- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
+++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveUtilities.java
@@ -368,16 +368,16 @@ public class HiveUtilities {
   }
 
   /**
-   * Utility method which sets table or partition {@link InputFormat} class in given {@link
JobConf} object. First it
+   * Utility method which gets table or partition {@link InputFormat} class. First it
    * tries to get the class name from given StorageDescriptor object. If it doesn't contain
it tries to get it from
    * StorageHandler class set in table properties. If not found throws an exception.
-   * @param job {@link JobConf} instance where InputFormat class is set.
+   * @param job {@link JobConf} instance needed incase the table is StorageHandler based
table.
    * @param sd {@link StorageDescriptor} instance of currently reading partition or table
(for non-partitioned tables).
    * @param table Table object
    * @throws Exception
    */
-  public static void setInputFormatClass(final JobConf job, final StorageDescriptor sd, final
Table table)
-      throws Exception {
+  public static Class<? extends InputFormat> getInputFormatClass(final JobConf job,
final StorageDescriptor sd,
+      final Table table) throws Exception {
     final String inputFormatName = sd.getInputFormat();
     if (Strings.isNullOrEmpty(inputFormatName)) {
       final String storageHandlerClass = table.getParameters().get(META_TABLE_STORAGE);
@@ -386,9 +386,9 @@ public class HiveUtilities {
             "InputFormat class explicitly specified nor StorageHandler class");
       }
       final HiveStorageHandler storageHandler = HiveUtils.getStorageHandler(job, storageHandlerClass);
-      job.setInputFormat(storageHandler.getInputFormatClass());
+      return storageHandler.getInputFormatClass();
     } else {
-      job.setInputFormat((Class<? extends InputFormat>) Class.forName(inputFormatName));
+      return (Class<? extends InputFormat>) Class.forName(inputFormatName);
     }
   }
 
@@ -401,12 +401,14 @@ public class HiveUtilities {
    */
   public static void addConfToJob(final JobConf job, final Properties properties,
       final Map<String, String> hiveConfigOverride) {
+    final HiveConf hiveConf = new HiveConf();
     for (Object obj : properties.keySet()) {
-      job.set((String) obj, (String) properties.get(obj));
+      hiveConf.set((String) obj, (String) properties.get(obj));
     }
     for(Map.Entry<String, String> entry : hiveConfigOverride.entrySet()) {
-      job.set(entry.getKey(), entry.getValue());
+      hiveConf.set(entry.getKey(), entry.getValue());
     }
+    job.addResource(hiveConf);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/drill/blob/76f41e18/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
----------------------------------------------------------------------
diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
index 69d7c8a..c2e367d 100644
--- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
+++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java
@@ -386,6 +386,22 @@ public class TestHiveStorage extends HiveTestBase {
         .go();
   }
 
+  @Test // DRILL-3739
+  public void readingFromStorageHandleBasedTable2() throws Exception {
+    try {
+      test(String.format("alter session set `%s` = true", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+
+      testBuilder()
+          .sqlQuery("SELECT * FROM hive.kv_sh ORDER BY key LIMIT 2")
+          .ordered()
+          .baselineColumns("key", "value")
+          .expectsEmptyResultSet()
+          .go();
+    } finally {
+      test(String.format("alter session set `%s` = false", ExecConstants.HIVE_OPTIMIZE_SCAN_WITH_NATIVE_READERS));
+    }
+  }
+
   @AfterClass
   public static void shutdownOptions() throws Exception {
     test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY));


Mime
View raw message