carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject carbondata git commit: [CARBONDATA-1854] add support for implicit column filter
Date Wed, 20 Dec 2017 02:49:23 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master b8a02f391 -> 28c94183b


[CARBONDATA-1854] add support for implicit column filter

This closes #1644


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/28c94183
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/28c94183
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/28c94183

Branch: refs/heads/master
Commit: 28c94183bf5e03f6af05a270b96c30529233332d
Parents: b8a02f3
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Tue Dec 12 14:49:27 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Wed Dec 20 08:18:52 2017 +0530

----------------------------------------------------------------------
 .../scan/filter/FilterExpressionProcessor.java  | 11 ++++
 .../filter/FilterExpressionProcessorTest.java   | 68 ++++++++++++++++++++
 .../load/DataLoadProcessorStepOnSpark.scala     |  5 ++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  5 ++
 4 files changed, 89 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
index 6c804d7..5a1b7df 100644
--- a/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
+++ b/core/src/main/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessor.java
@@ -403,6 +403,17 @@ public class FilterExpressionProcessor implements FilterProcessor {
         return new TrueConditionalResolverImpl(expression, false, false, tableIdentifier);
       case EQUALS:
         currentCondExpression = (BinaryConditionalExpression) expression;
+        // check for implicit column in the expression
+        if (currentCondExpression instanceof InExpression) {
+          CarbonColumn carbonColumn =
+              currentCondExpression.getColumnList().get(0).getCarbonColumn();
+          if (carbonColumn.hasEncoding(Encoding.IMPLICIT)) {
+            return new ConditionalFilterResolverImpl(expression, isExpressionResolve, true,
+                tableIdentifier,
+                currentCondExpression.getColumnList().get(0).getCarbonColumn().isMeasure());
+          }
+        }
+
         CarbonColumn column = currentCondExpression.getColumnList().get(0).getCarbonColumn();
         if (currentCondExpression.isSingleColumn() && ! column.getDataType().isComplexType())
{
           if (column.isMeasure()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java
b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java
new file mode 100644
index 0000000..1f244a3
--- /dev/null
+++ b/core/src/test/java/org/apache/carbondata/core/scan/filter/FilterExpressionProcessorTest.java
@@ -0,0 +1,68 @@
+package org.apache.carbondata.core.scan.filter;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.UUID;
+
+import org.apache.carbondata.core.cache.dictionary.AbstractDictionaryCacheTest;
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.metadata.encoder.Encoding;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.LiteralExpression;
+import org.apache.carbondata.core.scan.expression.conditional.EqualToExpression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
+
+import org.junit.Before;
+import org.junit.Test;
+
+public class FilterExpressionProcessorTest extends AbstractDictionaryCacheTest {
+
+  private ColumnSchema columnSchema;
+
+  @Before public void setUp() throws Exception {
+    init();
+    this.databaseName = props.getProperty("database", "testSchema");
+    this.tableName = props.getProperty("tableName", "carbon");
+    this.carbonStorePath = props.getProperty("storePath", "carbonStore");
+    carbonTableIdentifier =
+        new CarbonTableIdentifier(databaseName, tableName, UUID.randomUUID().toString());
+    this.carbonStorePath = props.getProperty("storePath", "carbonStore");
+    columnSchema = new ColumnSchema();
+    columnSchema.setColumnar(true);
+    columnSchema.setColumnName("IMEI");
+    columnSchema.setColumnUniqueId(UUID.randomUUID().toString());
+    columnSchema.setDataType(DataTypes.STRING);
+    columnSchema.setDimensionColumn(true);
+    List<Encoding> encodingList = new ArrayList<>();
+    encodingList.add(Encoding.IMPLICIT);
+    columnSchema.setEncodingList(encodingList);
+  }
+
+  @Test public void testGetFilterResolverBasedOnExpressionType()
+      throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
+    CarbonColumn carbonColumn = new CarbonColumn(columnSchema, 0, 0);
+    ColumnExpression columnExpression = new ColumnExpression("IMEI", DataTypes.STRING);
+    columnExpression.setCarbonColumn(carbonColumn);
+    LiteralExpression literalExpression = new LiteralExpression("ImeiValue", DataTypes.STRING);
+    InExpression equalToExpression = new InExpression(columnExpression, literalExpression);
+    FilterExpressionProcessor filterExpressionProcessor = new FilterExpressionProcessor();
+    Method method = FilterExpressionProcessor.class
+        .getDeclaredMethod("getFilterResolverBasedOnExpressionType", ExpressionType.class,
+            boolean.class, Expression.class, AbsoluteTableIdentifier.class, Expression.class);
+    method.setAccessible(true);
+    Object result = method
+        .invoke(filterExpressionProcessor, ExpressionType.EQUALS, false, equalToExpression,
null,
+            null);
+    assert (result.getClass().getName()
+        .equals("org.apache.carbondata.core.scan.filter.resolver.ConditionalFilterResolverImpl"));
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
index 6759b20..5e6ba98 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala
@@ -19,6 +19,7 @@ package org.apache.carbondata.spark.load
 
 import scala.util.Random
 
+import com.univocity.parsers.common.TextParsingException
 import org.apache.spark.{Accumulator, SparkEnv, TaskContext}
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.Row
@@ -234,6 +235,10 @@ object DataLoadProcessorStepOnSpark {
   private def wrapException(e: Throwable, model: CarbonLoadModel): Unit = {
     e match {
       case e: CarbonDataLoadingException => throw e
+      case e: TextParsingException =>
+        LOGGER.error(e, "Data Loading failed for table " + model.getTableName)
+        throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName,
+          e)
       case e: Exception =>
         LOGGER.error(e, "Data Loading failed for table " + model.getTableName)
         throw new CarbonDataLoadingException("Data Loading failed for table " + model.getTableName,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/28c94183/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 29712de..0b786b5 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
+import com.univocity.parsers.common.TextParsingException
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.io.NullWritable
@@ -403,6 +404,10 @@ object CarbonDataRDDFactory {
                 sparkException.getCause.isInstanceOf[CarbonDataLoadingException]) {
               executorMessage = sparkException.getCause.getMessage
               errorMessage = errorMessage + ": " + executorMessage
+            } else if (sparkException.getCause.isInstanceOf[TextParsingException]) {
+              executorMessage = CarbonDataProcessorUtil
+                .trimErrorMessage(sparkException.getCause.getMessage)
+              errorMessage = errorMessage + " : " + executorMessage
             }
           case _ =>
             if (ex.getCause != null) {


Mime
View raw message