Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CE835200D62 for ; Sat, 16 Dec 2017 13:41:27 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id CD3BB160C11; Sat, 16 Dec 2017 12:41:27 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EBB04160BFB for ; Sat, 16 Dec 2017 13:41:26 +0100 (CET) Received: (qmail 41339 invoked by uid 500); 16 Dec 2017 12:41:26 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 41329 invoked by uid 99); 16 Dec 2017 12:41:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 16 Dec 2017 12:41:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A0E62DFC25; Sat, 16 Dec 2017 12:41:23 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ravipesala@apache.org To: commits@carbondata.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: carbondata git commit: [CARBONDATA-1247] Block pruning not working for date type column Date: Sat, 16 Dec 2017 12:41:23 +0000 (UTC) archived-at: Sat, 16 Dec 2017 12:41:28 -0000 Repository: carbondata Updated Branches: refs/heads/master 8bf72a6e0 -> e7ec6f43c [CARBONDATA-1247] Block pruning not working for date type column Block pruning not working for date type column. Root Cause : Type casting of String for DateType is not handled Solution: CastExpressionOptimization should handle the casting of String for DateType This closes #1656 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e7ec6f43 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e7ec6f43 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e7ec6f43 Branch: refs/heads/master Commit: e7ec6f43cea9837b032be08527b7cdd221d23f3c Parents: 8bf72a6 Author: Pawan Malwal Authored: Wed Dec 13 13:01:20 2017 +0530 Committer: ravipesala Committed: Sat Dec 16 18:11:11 2017 +0530 ---------------------------------------------------------------------- .../spark/rdd/CarbonDataRDDFactory.scala | 2 + .../execution/CastExpressionOptimization.scala | 93 +++++++++++--------- 2 files changed, 52 insertions(+), 43 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7ec6f43/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index e7d10d9..36a2e09 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -19,6 +19,7 @@ package org.apache.carbondata.spark.rdd import java.text.SimpleDateFormat import java.util +import java.util.TimeZone import java.util.concurrent._ import scala.collection.JavaConverters._ @@ -844,6 +845,7 @@ object CarbonDataRDDFactory { .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT) new SimpleDateFormat(dateFormatString) } + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")) // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/e7ec6f43/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala index 046e17d..2ff8c42 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CastExpressionOptimization.scala @@ -26,7 +26,7 @@ import scala.collection.JavaConverters._ import org.apache.spark.sql.catalyst.expressions.{Attribute, EmptyRow, EqualTo, Expression, GreaterThan, GreaterThanOrEqual, In, LessThan, LessThanOrEqual, Literal, Not} import org.apache.spark.sql.CastExpr import org.apache.spark.sql.sources -import org.apache.spark.sql.types.{DoubleType, IntegerType, ShortType, StringType, TimestampType} +import org.apache.spark.sql.types._ import org.apache.spark.sql.CarbonExpressions.{MatchCast => Cast} import org.apache.spark.sql.sources.Filter @@ -36,10 +36,20 @@ import org.apache.carbondata.core.util.CarbonProperties object CastExpressionOptimization { - def typeCastStringToLong(v: Any): Any = { - val parser: SimpleDateFormat = new SimpleDateFormat(CarbonProperties.getInstance - .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, - CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + def typeCastStringToLong(v: Any, dataType: DataType): Any = { + var parser: SimpleDateFormat = null + if (dataType == TimestampType) { + parser = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, + CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)) + } else if (dataType == DateType) { + parser = new SimpleDateFormat(CarbonProperties.getInstance + .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT, + CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)) + parser.setTimeZone(TimeZone.getTimeZone("GMT")) + } else { + throw new UnsupportedOperationException ("Unsupported DataType being evaluated.") + } try { val value = parser.parse(v.toString).getTime() * 1000L value @@ -53,7 +63,7 @@ object CastExpressionOptimization { val gmtDay = new SimpleDateFormat("yyyy-MM-dd", Locale.US) gmtDay.setTimeZone(TimeZone.getTimeZone("GMT")) try { - gmtDay.parse(v.toString).getTime() + gmtDay.parse(v.toString).getTime() * 1000L } catch { case e: ParseException => v @@ -68,10 +78,11 @@ object CastExpressionOptimization { } } - def typeCastStringToLongList(list: Seq[Expression]): Seq[Expression] = { + + def typeCastStringToLongList(list: Seq[Expression], dataType: DataType): Seq[Expression] = { val tempList = new util.ArrayList[Expression]() list.foreach { value => - val output = typeCastStringToLong(value) + val output = typeCastStringToLong(value, dataType) if (!output.equals(value)) { tempList.add(output.asInstanceOf[Expression]) } @@ -130,8 +141,8 @@ object CastExpressionOptimization { expr match { case c@EqualTo(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -140,8 +151,8 @@ object CastExpressionOptimization { } case c@EqualTo(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -150,8 +161,8 @@ object CastExpressionOptimization { } case c@Not(EqualTo(Cast(a: Attribute, _), Literal(v, t))) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -160,8 +171,8 @@ object CastExpressionOptimization { } case c@Not(EqualTo(Literal(v, t), Cast(a: Attribute, _))) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -170,8 +181,8 @@ object CastExpressionOptimization { } case c@Not(In(Cast(a: Attribute, _), list)) => a.dataType match { - case ts: TimestampType if list.head.dataType.sameType(StringType) => - val value = typeCastStringToLongList(list) + case ts@(_: DateType | _: TimestampType) if list.head.dataType.sameType(StringType) => + val value = typeCastStringToLongList(list, ts) if (!value.equals(list)) { val hSet = value.map(e => e.eval(EmptyRow)) Some(sources.Not(sources.In(a.name, hSet.toArray))) @@ -198,8 +209,8 @@ object CastExpressionOptimization { } case c@In(Cast(a: Attribute, _), list) => a.dataType match { - case ts: TimestampType if list.head.dataType.sameType(StringType) => - val value = typeCastStringToLongList(list) + case ts@(_: DateType | _: TimestampType) if list.head.dataType.sameType(StringType) => + val value = typeCastStringToLongList(list, ts) if (!value.equals(list)) { val hSet = value.map(e => e.eval(EmptyRow)) Some(sources.In(a.name, hSet.toArray)) @@ -226,8 +237,8 @@ object CastExpressionOptimization { } case c@GreaterThan(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -236,8 +247,8 @@ object CastExpressionOptimization { } case c@GreaterThan(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -246,8 +257,8 @@ object CastExpressionOptimization { } case c@LessThan(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -256,8 +267,8 @@ object CastExpressionOptimization { } case c@LessThan(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -266,8 +277,8 @@ object CastExpressionOptimization { } case c@GreaterThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -276,8 +287,8 @@ object CastExpressionOptimization { } case c@GreaterThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -286,13 +297,8 @@ object CastExpressionOptimization { } case c@LessThanOrEqual(Cast(a: Attribute, _), Literal(v, t)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - val value = typeCastStringToLong(v) - if (!value.equals(v)) { - Some(sources.LessThanOrEqual(a.name, value)) - } else { - Some(CastExpr(c)) - } + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -301,8 +307,8 @@ object CastExpressionOptimization { } case c@LessThanOrEqual(Literal(v, t), Cast(a: Attribute, _)) => a.dataType match { - case ts: TimestampType if t.sameType(StringType) => - updateFilterForTimeStamp(v, c) + case ts@(_: DateType | _: TimestampType) if t.sameType(StringType) => + updateFilterForTimeStamp(v, c, ts) case i: IntegerType if t.sameType(DoubleType) => updateFilterForInt(v, c) case s: ShortType if t.sameType(IntegerType) => @@ -350,8 +356,9 @@ object CastExpressionOptimization { * @param exp * @return */ - def updateFilterForTimeStamp(actualValue: Any, exp: Expression): Option[sources.Filter] = { - val newValue = typeCastStringToLong(actualValue) + def updateFilterForTimeStamp(actualValue: Any, exp: Expression, dt: DataType): + Option[sources.Filter] = { + val newValue = typeCastStringToLong(actualValue, dt) if (!newValue.equals(actualValue)) { updateFilterBasedOnFilterType(exp, newValue) } else {