carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [10/50] [abbrv] carbondata git commit: [CARBONDATA-1937][PARTITION] Fix partition fetch fail if null partition value present in integral columns
Date Tue, 09 Jan 2018 04:01:38 GMT
[CARBONDATA-1937][PARTITION] Fix partition fetch fail if null partition value present in integral
columns

It seems like an issue in hive while querying partitions from metastore if any integral partition
column contains a null value.

Now alternatively we get the full list of partitions from hive and then apply a filter to
it.

This closes #1730


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7f3c374b
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7f3c374b
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7f3c374b

Branch: refs/heads/branch-1.3
Commit: 7f3c374bacdf54c9eac91a908701e3ea8dd369e0
Parents: 03ddcc8
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Wed Dec 27 22:32:46 2017 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Tue Jan 2 16:56:06 2018 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  9 +++++
 .../StandardPartitionTableQueryTestCase.scala   | 16 ++++++++
 .../spark/sql/optimizer/CarbonFilters.scala     | 39 +++++++++++++++++++-
 .../src/main/spark2.1/CarbonSessionState.scala  | 31 +++++++++++++++-
 .../src/main/spark2.2/CarbonSessionState.scala  | 21 ++++++++++-
 5 files changed, 111 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 2021222..a05d023 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1501,6 +1501,15 @@ public final class CarbonCommonConstants {
 
   public static final String TIMESERIES_HIERARCHY = "timeseries.hierarchy";
 
+  /**
+   * It allows queries on hive metastore directly along with filter information, otherwise
first
+   * fetches all partitions from hive and apply filters on it.
+   */
+  @CarbonProperty
+  public static final String CARBON_READ_PARTITION_HIVE_DIRECT =
+      "carbon.read.partition.hive.direct";
+  public static final String CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
index b3c91ae..8a09093 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableQueryTestCase.scala
@@ -208,6 +208,20 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
     checkAnswer(sql("select count(*) cnt from badrecordsPartitionignore where intfield2 is
not null"), Seq(Row(2)))
   }
 
+  test("test partition fails on int null partition") {
+    sql("create table badrecordsPartitionintnull(intField1 int, stringField1 string) partitioned
by (intField2 int) stored by 'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table
badrecordsPartitionintnull options('bad_records_action'='force')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnull where intfield2
= 13"), Seq(Row(1)))
+  }
+
+  test("test partition fails on int null partition read alternate") {
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
"false")
+    sql("create table badrecordsPartitionintnullalt(intField1 int, stringField1 string) partitioned
by (intField2 int) stored by 'carbondata'")
+    sql(s"load data local inpath '$resourcesPath/data_partition_badrecords.csv' into table
badrecordsPartitionintnullalt options('bad_records_action'='force')")
+    checkAnswer(sql("select count(*) cnt from badrecordsPartitionintnullalt where intfield2
= 13"), Seq(Row(1)))
+    CarbonProperties.getInstance().addProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT)
+  }
+
   test("static column partition with load command") {
     sql(
       """
@@ -249,6 +263,8 @@ class StandardPartitionTableQueryTestCase extends QueryTest with BeforeAndAfterA
     sql("drop table if exists staticpartitionload")
     sql("drop table if exists badrecordsPartitionignore")
     sql("drop table if exists badrecordsPartitionfail")
+    sql("drop table if exists badrecordsPartitionintnull")
+    sql("drop table if exists badrecordsPartitionintnullalt")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
index 24fd732..09546cd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/optimizer/CarbonFilters.scala
@@ -23,12 +23,15 @@ import org.apache.spark.sql.execution.CastExpressionOptimization
 import org.apache.spark.sql.types._
 import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast}
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.hive.CarbonSessionCatalog
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.datatype.{DataTypes => CarbonDataTypes}
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.scan.expression.{ColumnExpression => CarbonColumnExpression,
Expression => CarbonExpression, LiteralExpression => CarbonLiteralExpression}
 import org.apache.carbondata.core.scan.expression.conditional._
 import org.apache.carbondata.core.scan.expression.logical.{AndExpression, FalseExpression,
OrExpression}
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.util.CarbonScalaUtil
 
@@ -405,13 +408,45 @@ object CarbonFilters {
     }
   }
 
+  /**
+   * Fetches partition information from hive
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
   def getPartitions(partitionFilters: Seq[Expression],
       sparkSession: SparkSession,
       identifier: TableIdentifier): Seq[String] = {
-    val partitions =
-      sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
+    val partitions = {
+      try {
+        if (CarbonProperties.getInstance().
+          getProperty(CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT,
+          CarbonCommonConstants.CARBON_READ_PARTITION_HIVE_DIRECT_DEFAULT).toBoolean) {
+          // read partitions directly from hive metastore using filters
+          sparkSession.sessionState.catalog.listPartitionsByFilter(identifier, partitionFilters)
+        } else {
+          // Read partitions alternatively by firts get all partitions then filter them
+          sparkSession.sessionState.catalog.
+            asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate(
+            partitionFilters,
+            sparkSession,
+            identifier)
+        }
+      } catch {
+        case e: Exception =>
+          // Get partition information alternatively.
+          sparkSession.sessionState.catalog.
+            asInstanceOf[CarbonSessionCatalog].getPartitionsAlternate(
+            partitionFilters,
+            sparkSession,
+            identifier)
+      }
+    }
     partitions.toList.flatMap { partition =>
       partition.spec.seq.map{case (column, value) => column + "=" + value}
     }.toSet.toSeq
   }
+
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
index a6f28c9..dae6249 100644
--- a/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.1/CarbonSessionState.scala
@@ -18,8 +18,8 @@ package org.apache.spark.sql.hive
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
-import org.apache.spark.sql.catalyst.catalog.{FunctionResourceLoader, GlobalTempViewManager,
SessionCatalog}
-import org.apache.spark.sql.catalyst.expressions.{PredicateSubquery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.catalog.{ExternalCatalogUtils, FunctionResourceLoader,
GlobalTempViewManager, SessionCatalog}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference,
Expression, InterpretedPredicate, PredicateSubquery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.parser.ParserUtils._
@@ -134,6 +134,33 @@ class CarbonSessionCatalog(
   def getClient(): org.apache.spark.sql.hive.client.HiveClient = {
     sparkSession.sessionState.asInstanceOf[CarbonSessionState].metadataHive
   }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions
from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    val catalogTable = sparkSession.sessionState.catalog.getTableMetadata(identifier)
+    val partitionSchema = catalogTable.partitionSchema
+    if (partitionFilters.nonEmpty) {
+      val boundPredicate =
+        InterpretedPredicate.create(partitionFilters.reduce(And).transform {
+          case att: AttributeReference =>
+            val index = partitionSchema.indexWhere(_.name == att.name)
+            BoundReference(index, partitionSchema(index).dataType, nullable = true)
+        })
+      allPartitions.filter { p => boundPredicate(p.toRow(partitionSchema)) }
+    } else {
+      allPartitions
+    }
+  }
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7f3c374b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
index a722cbf..c8ea275 100644
--- a/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
+++ b/integration/spark2/src/main/spark2.2/CarbonSessionState.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.{Analyzer, FunctionRegistry}
 import org.apache.spark.sql.catalyst.catalog._
-import org.apache.spark.sql.catalyst.expressions.{Exists, In, ListQuery, ScalarSubquery}
+import org.apache.spark.sql.catalyst.expressions.{And, AttributeReference, BoundReference,
Exists, Expression, In, InterpretedPredicate, ListQuery, ScalarSubquery}
 import org.apache.spark.sql.catalyst.optimizer.Optimizer
 import org.apache.spark.sql.catalyst.parser.ParserInterface
 import org.apache.spark.sql.catalyst.parser.ParserUtils.string
@@ -141,6 +141,25 @@ class CarbonSessionCatalog(
     sparkSession.asInstanceOf[CarbonSession].sharedState.externalCatalog
       .asInstanceOf[HiveExternalCatalog].client
   }
+
+  /**
+   * This is alternate way of getting partition information. It first fetches all partitions
from
+   * hive and then apply filter instead of querying hive along with filters.
+   * @param partitionFilters
+   * @param sparkSession
+   * @param identifier
+   * @return
+   */
+  def getPartitionsAlternate(partitionFilters: Seq[Expression],
+      sparkSession: SparkSession,
+      identifier: TableIdentifier) = {
+    val allPartitions = sparkSession.sessionState.catalog.listPartitions(identifier)
+    ExternalCatalogUtils.prunePartitionsByFilter(
+      sparkSession.sessionState.catalog.getTableMetadata(identifier),
+      allPartitions,
+      partitionFilters,
+      sparkSession.sessionState.conf.sessionLocalTimeZone)
+  }
 }
 
 


Mime
View raw message