carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject carbondata git commit: [CARBONDATA-2826] support select using distributed carbon store
Date Fri, 17 Aug 2018 03:29:04 GMT
Repository: carbondata
Updated Branches:
  refs/heads/store [created] 94d0b54ac


[CARBONDATA-2826] support select using distributed carbon store

Provides select support with select columns pruning and filter pushdown using new RDD for
distributed carbon store

This closes 2631


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/94d0b54a
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/94d0b54a
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/94d0b54a

Branch: refs/heads/store
Commit: 94d0b54aca5e17dcd4c3683fbf64de8fbdf60abb
Parents: 9f10122
Author: Ajith <ajith2489@gmail.com>
Authored: Sun Aug 12 16:54:27 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Fri Aug 17 11:28:36 2018 +0800

----------------------------------------------------------------------
 integration/spark2-sourcev2/pom.xml             | 183 +++++++++++++++++++
 .../store/CarbonResultBatchIterator.scala       |  57 ++++++
 .../apache/carbondata/store/CarbonSource.scala  |  58 ++++++
 .../carbondata/store/CarbonSourceRelation.scala | 113 ++++++++++++
 .../carbondata/store/CarbonStorePartition.scala |  29 +++
 .../carbondata/store/CarbonStoreScanRDD.scala   |  59 ++++++
 ....apache.spark.sql.sources.DataSourceRegister |   2 +
 .../carbondata/store/TestCarbonStoreScan.scala  | 110 +++++++++++
 .../spark/sql/hive/CarbonFileMetastore.scala    |   2 +-
 pom.xml                                         |   6 +
 .../apache/carbondata/store/devapi/Pruner.java  |   3 +-
 .../carbondata/store/impl/MetaOperation.java    |   2 +-
 .../carbondata/store/impl/Schedulable.java      |   3 +-
 .../store/impl/master/RegistryServiceImpl.java  |   2 +-
 14 files changed, 624 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/pom.xml b/integration/spark2-sourcev2/pom.xml
new file mode 100644
index 0000000..51e77ea
--- /dev/null
+++ b/integration/spark2-sourcev2/pom.xml
@@ -0,0 +1,183 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+
+  <parent>
+    <groupId>org.apache.carbondata</groupId>
+    <artifactId>carbondata-parent</artifactId>
+    <version>1.5.0-SNAPSHOT</version>
+    <relativePath>../../pom.xml</relativePath>
+  </parent>
+
+  <artifactId>carbondata-datasourcev2</artifactId>
+  <name>Apache CarbonData :: DatasourceV2</name>
+
+  <properties>
+    <dev.path>${basedir}/../../dev</dev.path>
+  </properties>
+
+  <dependencies>
+   
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <version>4.11</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.scalatest</groupId>
+      <artifactId>scalatest_${scala.binary.version}</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark-common</artifactId>
+      <version>${project.version}</version>
+      <scope>test</scope>
+      <exclusions>
+         <exclusion>
+           <groupId>net.jpountz.lz4</groupId>
+           <artifactId>lz4</artifactId>
+         </exclusion>
+       </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-core</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+         <exclusion>
+           <groupId>net.jpountz.lz4</groupId>
+           <artifactId>lz4</artifactId>
+         </exclusion>
+       </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-store-sdk</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+         <exclusion>
+           <groupId>net.jpountz.lz4</groupId>
+           <artifactId>lz4</artifactId>
+         </exclusion>
+       </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+         <exclusion>
+           <groupId>net.jpountz.lz4</groupId>
+           <artifactId>lz4</artifactId>
+         </exclusion>
+       </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.carbondata</groupId>
+      <artifactId>carbondata-spark2</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+         <exclusion>
+           <groupId>net.jpountz.lz4</groupId>
+           <artifactId>lz4</artifactId>
+         </exclusion>
+       </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-core_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.spark</groupId>
+      <artifactId>spark-sql_${scala.binary.version}</artifactId>
+      <version>${spark.version}</version>
+      <exclusions>
+        <!-- from transitive dependency com.univocity:univocity-parsers:2.5.9
+        is added from the org.apache.spark:spark-sql_2.11,so need to remove
+        this version.Carbon uses 2.2.1 version  -->
+        <exclusion>
+          <groupId>com.univocity</groupId>
+          <artifactId>univocity-parsers</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <version>2.6.5</version>
+      <exclusions>
+        <exclusion>
+          <groupId>com.fasterxml.jackson.core</groupId>
+          <artifactId>jackson-annotations</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <version>2.6.5</version>
+      <scope>provided</scope>
+    </dependency>
+
+  </dependencies>
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.scala-tools</groupId>
+        <artifactId>maven-scala-plugin</artifactId>
+        <version>2.15.2</version>
+        <executions>
+          <execution>
+            <id>compile</id>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+            <phase>compile</phase>
+          </execution>
+          <execution>
+            <id>testCompile</id>
+            <goals>
+              <goal>testCompile</goal>
+            </goals>
+            <phase>test</phase>
+          </execution>
+          <execution>
+            <phase>process-resources</phase>
+            <goals>
+              <goal>compile</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <artifactId>maven-compiler-plugin</artifactId>
+        <configuration>
+          <source>1.8</source>
+          <target>1.8</target>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonResultBatchIterator.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonResultBatchIterator.scala
b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonResultBatchIterator.scala
new file mode 100644
index 0000000..b411fc2
--- /dev/null
+++ b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonResultBatchIterator.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.util
+
+import org.apache.spark.{TaskContext, TaskKilledException}
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
+
+import org.apache.carbondata.store.devapi.ResultBatch
+
+/**
+ * Returns a Result Batch iterator
+ *
+ * @param context
+ * @param resultIterator
+ * @tparam T
+ */
+class CarbonResultBatchIterator[T](var context: TaskContext,
+                                   var resultIterator: util.Iterator[_ <: ResultBatch[T]])
+  extends Iterator[Any] {
+
+  private var resultBatch: ResultBatch[T] = null
+
+  override def hasNext: Boolean = {
+    if (context.isInterrupted) {
+      throw new TaskKilledException
+    }
+    (resultBatch != null && resultBatch.hasNext) || resultIterator.hasNext
+  }
+
+  override def next(): Any = {
+    if (!hasNext) {
+      throw new java.util.NoSuchElementException("End of result")
+    }
+    if (resultBatch == null || !resultBatch.hasNext) {
+      resultBatch = resultIterator.next()
+    }
+    Row.fromSeq(resultBatch.next().asInstanceOf[GenericInternalRow].values)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSource.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSource.scala
b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSource.scala
new file mode 100644
index 0000000..9cb497b
--- /dev/null
+++ b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSource.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import org.apache.spark.sql.SQLContext
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types.StructType
+
+/**
+ * Registered as a database format via @link{org.apache.spark.sql.sources.DataSourceRegister}
+ */
+class CarbonSource extends RelationProvider with SchemaRelationProvider
+  with DataSourceRegister {
+
+  // format short name
+  override def shortName(): String = "carbon"
+
+  /**
+   * create relation called for getting the schema when schema is not passed
+   * example: df.format("carbpn").option("tableName", "carbonsession_table").load()
+   *
+   * @param sqlContext
+   * @param parameters
+   * @return : Instance of @link{CarbonSourceRelation}
+   */
+  override def createRelation(sqlContext: SQLContext,
+                              parameters: Map[String, String]): BaseRelation = {
+    CarbonSourceRelation(sqlContext.sparkSession, parameters, None)
+  }
+
+  /**
+   * create relation with provided schema
+   *
+   * @param sqlContext
+   * @param parameters
+   * @param schema
+   * @return Instance of @link{CarbonSourceRelation}
+   */
+  override def createRelation(sqlContext: SQLContext, parameters: Map[String, String],
+                              schema: StructType): BaseRelation = {
+    CarbonSourceRelation(sqlContext.sparkSession, parameters, Option(schema))
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSourceRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSourceRelation.scala
b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSourceRelation.scala
new file mode 100644
index 0000000..a5ddfc6
--- /dev/null
+++ b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonSourceRelation.scala
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.util
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql._
+import org.apache.spark.sql.optimizer.CarbonFilters
+import org.apache.spark.sql.sources._
+import org.apache.spark.sql.types._
+
+import org.apache.carbondata.core.datastore.row.CarbonRow
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.core.scan.expression.logical.AndExpression
+import org.apache.carbondata.sdk.store.conf.StoreConf
+import org.apache.carbondata.sdk.store.descriptor.{ScanDescriptor, TableIdentifier}
+import org.apache.carbondata.spark.readsupport.SparkRowReadSupportImpl
+import org.apache.carbondata.spark.util.SparkDataTypeConverterImpl
+import org.apache.carbondata.store.devapi.{InternalCarbonStore, InternalCarbonStoreFactory,
Scanner, ScanOption}
+
+/**
+ * Carbon data source relation
+ *
+ * @param sparkSession
+ * @param parameters
+ * @param tableSchema
+ */
+case class CarbonSourceRelation(
+                                 sparkSession: SparkSession,
+                                 parameters: Map[String, String],
+                                 tableSchema: Option[StructType])
+  extends BaseRelation with PrunedFilteredScan {
+
+  lazy val store: InternalCarbonStore = {
+    val storeConf = new StoreConf(System.getProperty("CARBON_STORE_CONF"))
+    InternalCarbonStoreFactory.getStore("GlobalStore", storeConf)
+  }
+
+  val caseInsensitiveMap: Map[String, String] = parameters.map(f => (f._1.toLowerCase,
f._2))
+
+  private val tableIdentifier = new TableIdentifier(caseInsensitiveMap("tablename"),
+    CarbonEnv.getDatabaseName(caseInsensitiveMap.get("dbname"))(sparkSession))
+
+  CarbonSession.updateSessionInfoToCurrentThread(sparkSession)
+
+  @transient lazy val carbonTable: CarbonTable = store.getCarbonTable(tableIdentifier)
+
+  override def sqlContext: SQLContext = sparkSession.sqlContext
+
+  override def schema: StructType = tableSchema.getOrElse(getSparkSchema(carbonTable))
+
+  private def getSparkSchema(sourceTable: CarbonTable): StructType = {
+    val cols = sourceTable.getTableInfo.getFactTable.getListOfColumns.asScala.toArray
+    val sortedCols = cols.filter(_.getSchemaOrdinal != -1)
+      .sortWith(_.getSchemaOrdinal < _.getSchemaOrdinal)
+    SparkDataTypeConverterImpl.convertToSparkSchema(sourceTable, sortedCols)
+  }
+
+  override def unhandledFilters(filters: Array[Filter]): Array[Filter] = new Array[Filter](0)
+
+  override def toString: String = {
+    "CarbonSourceRelation [ " + "Database name :" + tableIdentifier.getDatabaseName +
+      ", " + "Table name :" + tableIdentifier.getTableName + ", Schema :" + tableSchema +
" ]"
+  }
+
+  override def sizeInBytes: Long = 0L
+
+  override def buildScan(requiredColumns: Array[String], filters: Array[Filter]): RDD[Row]
= {
+    // create filter expression
+    val filterExpression: Expression = filters.flatMap { filter =>
+      CarbonFilters.createCarbonFilter(schema, filter)
+    }.reduceOption(new AndExpression(_, _)).orNull
+
+    // create scan descriptor
+    val scanDesc = ScanDescriptor.builder()
+      .table(tableIdentifier)
+      .select(requiredColumns)
+      .filter(filterExpression)
+      .create()
+
+    // create scanner
+    val scanOptions = new util.HashMap[String, String]()
+    scanOptions.put(ScanOption.REMOTE_PRUNE, "true");
+    scanOptions.put(ScanOption.OP_PUSHDOWN, "true")
+    val scanner: Scanner[CarbonRow] =
+      store.newScanner(tableIdentifier, scanDesc,
+        scanOptions, new SparkRowReadSupportImpl())
+        .asInstanceOf[Scanner[CarbonRow]]
+
+    // create RDD
+    new CarbonStoreScanRDD[Row, CarbonRow](sparkSession.sparkContext,
+      tableIdentifier, scanner, filterExpression)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStorePartition.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStorePartition.scala
b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStorePartition.scala
new file mode 100644
index 0000000..ece8626
--- /dev/null
+++ b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStorePartition.scala
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import org.apache.hadoop.io.ObjectWritable
+import org.apache.spark.Partition
+
+import org.apache.carbondata.store.devapi.ScanUnit
+
+class CarbonStorePartition(var scanUnit: ScanUnit, var partitionIndex: Int)
+  extends ObjectWritable with Partition
+    with Serializable {
+  override def index: Int = partitionIndex
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStoreScanRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStoreScanRDD.scala
b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStoreScanRDD.scala
new file mode 100644
index 0000000..b03252c
--- /dev/null
+++ b/integration/spark2-sourcev2/src/main/scala/org/apache/carbondata/store/CarbonStoreScanRDD.scala
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import scala.reflect.ClassTag
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+import org.apache.carbondata.core.scan.expression.Expression
+import org.apache.carbondata.sdk.store.descriptor.TableIdentifier
+import org.apache.carbondata.spark.rdd.CarbonRDD
+import org.apache.carbondata.store.devapi.Scanner
+
+
+/**
+ * Scan RDD supporting Table select with pruning and Filter
+ *
+ * @param sc
+ * @param tableIdentifier
+ * @param scanner
+ * @tparam T
+ * @tparam U
+ */
+class CarbonStoreScanRDD[T: ClassTag, U](sc: SparkContext,
+                                         tableIdentifier: TableIdentifier,
+                                         scanner: Scanner[U],
+                                         filterExpression: Expression)
+  extends CarbonRDD[T](sc, Nil, sc.hadoopConfiguration) {
+
+  override def internalCompute(split: Partition, context: TaskContext): scala.Iterator[T]
= {
+    val splitUnit = split.asInstanceOf[CarbonStorePartition].scanUnit
+    val resultIterator = scanner.scan(splitUnit)
+    new CarbonResultBatchIterator[U](context, resultIterator).asInstanceOf[Iterator[T]]
+  }
+
+  override protected def getPartitions: Array[Partition] = {
+    val scanUnits = scanner.prune(tableIdentifier, filterExpression)
+    val result = new Array[Partition](scanUnits.size())
+    for (i <- 0 until scanUnits.size()) {
+      result(i) = new CarbonStorePartition(scanUnits.get(i), i)
+    }
+    result
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
b/integration/spark2-sourcev2/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
new file mode 100644
index 0000000..a771460
--- /dev/null
+++ b/integration/spark2-sourcev2/src/test/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
@@ -0,0 +1,2 @@
+org.apache.carbondata.store.CarbonSource
+org.apache.spark.sql.execution.datasources.SparkCarbonFileFormat
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2-sourcev2/src/test/scala/org/apache/carbondata/store/TestCarbonStoreScan.scala
----------------------------------------------------------------------
diff --git a/integration/spark2-sourcev2/src/test/scala/org/apache/carbondata/store/TestCarbonStoreScan.scala
b/integration/spark2-sourcev2/src/test/scala/org/apache/carbondata/store/TestCarbonStoreScan.scala
new file mode 100644
index 0000000..32ff301
--- /dev/null
+++ b/integration/spark2-sourcev2/src/test/scala/org/apache/carbondata/store/TestCarbonStoreScan.scala
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.store
+
+import java.io.File
+
+import org.apache.carbondata.sdk.store.conf.StoreConf
+import org.apache.carbondata.store.impl.worker.Worker
+import org.apache.spark.SparkConf
+import org.apache.spark.sql.{functions, SparkSession}
+
+class TestCarbonStoreScan extends org.scalatest.FunSuite {
+
+  test("Test Scan with pruning and filter") {
+
+    val projectFolder = new File(classOf[TestCarbonStoreScan].getResource("/")
+      .getPath + "../../../../").getCanonicalPath
+
+    val storeConfPath = projectFolder + "/store/conf/store.conf"
+    System.setProperty("CARBON_STORE_CONF", storeConfPath)
+    val thread = new Thread {
+      override def run {
+        org.apache.carbondata.store.impl.master.Master.main(
+          Array(projectFolder + "/store/conf/log4j.properties", storeConfPath))
+      }
+    }
+    thread.start()
+    Thread.sleep(5000)
+    val w = new Worker(new StoreConf(System.getProperty("CARBON_STORE_CONF")))
+    w.start()
+
+    val conf = new SparkConf().
+      set("hive.metastore.warehouse.dir", "/tmp/spark-warehouse")
+        .set("spark.sql.warehouse.dir", "/tmp/spark-warehouse")
+      .setAppName("carbon-store-datasource")
+
+    import org.apache.spark.sql.CarbonSession._
+    val spark = SparkSession.builder()
+      .config(conf)
+      .master("local")
+      .getOrCreateCarbonSession("/tmp/carbon.store/")
+
+
+    spark.sql("DROP TABLE IF EXISTS carbonsession_table")
+
+    // Create table
+    spark.sql(
+      s"""
+         | CREATE TABLE carbonsession_table(
+         | shortField SHORT,
+         | intField INT,
+         | bigintField LONG,
+         | doubleField DOUBLE,
+         | stringField STRING,
+         | timestampField TIMESTAMP,
+         | decimalField DECIMAL(18,2),
+         | dateField DATE,
+         | charField CHAR(5),
+         | floatField FLOAT
+         | )
+         | STORED BY 'carbondata'
+         | TBLPROPERTIES('DICTIONARY_INCLUDE'='dateField, charField')
+       """.stripMargin)
+
+    val path = projectFolder + "/examples/spark2/src/main/resources/data.csv"
+
+    // scalastyle:off
+    spark.sql(
+      s"""
+         | LOAD DATA LOCAL INPATH '$path'
+         | INTO TABLE carbonsession_table
+         | OPTIONS('HEADER'='true', 'COMPLEX_DELIMITER_LEVEL_1'='#','timestampformat'='yyyy/MM/dd
HH:mm:ss', 'dateformat'='yyyy/MM/dd')
+       """.stripMargin)
+
+    spark.sql("select * from carbonsession_table").show()
+
+    val df2 = spark.read
+      .format("carbon")
+      .option("tableName", "carbonsession_table")
+      .load()
+
+    // Step 1 (Schema verification)
+    df2.printSchema()
+    // Step 2 (Read data)
+    df2.show()
+
+    df2.selectExpr("intfield", "stringfield", "timestampfield").filter("stringfield = 'spark'").show()
+
+    assert(df2.selectExpr("intfield", "stringfield", "timestampfield", "shortfield")
+      .filter("stringfield = 'spark'")
+      .groupBy("intfield", "shortfield", "stringfield")
+      .agg(functions.collect_list("intfield"))
+      .count() == 2)
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
index dddc72c..129dacf 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/hive/CarbonFileMetastore.scala
@@ -162,7 +162,7 @@ class CarbonFileMetastore extends CarbonMetaStore {
           case _ => throw new NoSuchTableException(database, tableIdentifier.table)
         }
         val identifier: AbsoluteTableIdentifier = AbsoluteTableIdentifier.from(
-           catalogTable.location.toString, database, tableIdentifier.table)
+          catalogTable.storage.properties.get("tablePath").get, database, tableIdentifier.table)
         CarbonEnv.getInstance(sparkSession).carbonMetastore.
           createCarbonRelation(catalogTable.storage.properties, identifier, sparkSession)
       case _ => throw new NoSuchTableException(database, tableIdentifier.table)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index e10acc7..9556c82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -655,6 +655,12 @@
         <module>integration/zeppelin</module>
       </modules>
     </profile>
+    <profile>
+      <id>sourcev2</id>
+      <modules>
+        <module>integration/spark2-sourcev2</module>
+      </modules>
+    </profile>
   </profiles>
 
 </project>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
index 4a1d2e5..6934cc7 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/devapi/Pruner.java
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.store.devapi;
 
+import java.io.Serializable;
 import java.util.List;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -27,7 +28,7 @@ import org.apache.carbondata.sdk.store.exception.CarbonException;
 
 @InterfaceAudience.Developer("Integration")
 @InterfaceStability.Unstable
-public interface Pruner {
+public interface Pruner extends Serializable {
 
   /**
    * Return an array of ScanUnit which will be the input in

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
index 3ca1ffe..0c11fd2 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/MetaOperation.java
@@ -207,6 +207,6 @@ public class MetaOperation {
   public static String getTablePath(String tableName, String databaseName, StoreConf storeConf)
{
     Objects.requireNonNull(tableName);
     Objects.requireNonNull(databaseName);
-    return String.format("%s/%s", storeConf.storeLocation(), tableName);
+    return String.format("%s/%s/%s", storeConf.storeLocation(), databaseName, tableName);
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
index 6e29b48..9de2b9b 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/Schedulable.java
@@ -20,6 +20,7 @@ package org.apache.carbondata.store.impl;
 import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
+import java.io.Serializable;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.carbondata.common.annotations.InterfaceAudience;
@@ -27,7 +28,7 @@ import org.apache.carbondata.common.annotations.InterfaceAudience;
 import org.apache.hadoop.io.Writable;
 
 @InterfaceAudience.Internal
-public class Schedulable implements Writable {
+public class Schedulable implements Writable, Serializable {
 
   private String id;
   private String address;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/94d0b54a/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
----------------------------------------------------------------------
diff --git a/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
index ce2be99..73520f6 100644
--- a/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
+++ b/store/core/src/main/java/org/apache/carbondata/store/impl/master/RegistryServiceImpl.java
@@ -27,7 +27,7 @@ import org.apache.carbondata.store.impl.service.model.RegisterWorkerResponse;
 import org.apache.hadoop.ipc.ProtocolSignature;
 
 @InterfaceAudience.Internal
-class egistryServiceImpl implements RegistryService {
+class RegistryServiceImpl implements RegistryService {
 
   private Master master;
 


Mime
View raw message