carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manishgupt...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2910] Support backward compatability in fileformat and added tests for load with different sort orders
Date Fri, 07 Sep 2018 14:43:44 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master b6bd90d80 -> 3894e1d05


http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
index 837bc4f..dcc76d8 100644
--- a/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
+++ b/integration/spark-datasource/src/test/scala/org/apache/spark/sql/carbondata/datasource/SparkCarbonDataSourceTest.scala
@@ -17,6 +17,11 @@
 package org.apache.spark.sql.carbondata.datasource
 
 
+import java.io.File
+import java.util
+
+import scala.collection.JavaConverters._
+
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.carbondata.datasource.TestUtil._
 import org.scalatest.{BeforeAndAfterAll, FunSuite}
@@ -24,6 +29,9 @@ import org.scalatest.{BeforeAndAfterAll, FunSuite}
 import org.apache.carbondata.core.datamap.DataMapStoreManager
 import org.apache.carbondata.core.datastore.impl.FileFactory
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.datatype.DataTypes
+import org.apache.carbondata.hadoop.testutil.StoreCreator
+import org.apache.carbondata.sdk.file.{CarbonWriter, Field, Schema}
 
 class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll {
 
@@ -346,7 +354,7 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll
{
     df.write.format("carbon").save(warehouse1 + "/test_folder/")
     if (!spark.sparkContext.version.startsWith("2.1")) {
       spark
-        .sql(s"create table test123 (c1 string, c2 string, arrayc array<int>, structc
struct<_1:string, _2:decimal(38,18)>, shortc smallint,intc int, longc bigint,  doublec
double, bigdecimalc decimal(38,18)) using carbon location '$warehouse1/test_folder/'")
+        .sql(s"create table test123 (c1 string, c2 string, shortc smallint,intc int, longc
bigint,  doublec double, bigdecimalc decimal(38,18), arrayc array<int>, structc struct<_1:string,
_2:decimal(38,18)>) using carbon location '$warehouse1/test_folder/'")
 
       checkAnswer(spark.sql("select * from test123"),
         spark.read.format("carbon").load(warehouse1 + "/test_folder/"))
@@ -613,6 +621,152 @@ class SparkCarbonDataSourceTest extends FunSuite with BeforeAndAfterAll
{
       FileFactory.deleteAllCarbonFilesOfDir(FileFactory.getCarbonFile(warehouse1 + "/test_folder"))
     }
   }
+
+  test("test read using old data") {
+    val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+      new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+      false)
+    store.createCarbonStore()
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/testdb/testtable/Fact/Part0/Segment_0/0"))
+    val dfread = spark.read.format("carbon").load(warehouse1+"/testdb/testtable/Fact/Part0/Segment_0")
+    dfread.show(false)
+    spark.sql("drop table if exists parquet_table")
+  }
+
+  test("test read using different sort order data") {
+    if (!spark.sparkContext.version.startsWith("2.1")) {
+      spark.sql("drop table if exists old_comp")
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb"))
+      val store = new StoreCreator(new File(warehouse1).getAbsolutePath,
+        new File(warehouse1 + "../../../../../hadoop/src/test/resources/data.csv").getCanonicalPath,
+        false)
+      store.setSortColumns(new util.ArrayList[String](Seq("name").asJava))
+      var model = store.createTableAndLoadModel(false)
+      model.setSegmentId("0")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_0/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("country,phonetype").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("1")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_1/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("date").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("2")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_2/0"))
+      store.setSortColumns(new util.ArrayList[String](Seq("serialname").asJava))
+      model = store.createTableAndLoadModel(false)
+      model.setSegmentId("3")
+      store.createCarbonStore(model)
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb/testtable/Fact/Part0/Segment_3/0"))
+      spark.sql(s"create table old_comp(id int, date string, country string, name string,
phonetype string, serialname string, salary int) using carbon options(path='$warehouse1/testdb/testtable/Fact/Part0/',
'sort_columns'='name')")
+
+      assert(spark.sql("select * from old_comp where country='china'").count() == 3396)
+      assert(spark.sql("select * from old_comp ").count() == 4000)
+      spark.sql("drop table if exists old_comp")
+
+      spark.sql(s"create table old_comp1 using carbon options(path='$warehouse1/testdb/testtable/Fact/Part0/')")
+      assert(spark.sql("select * from old_comp1 where country='china'").count() == 3396)
+      assert(spark.sql("select * from old_comp1 ").count() == 4000)
+      spark.sql("drop table if exists old_comp1")
+      FileFactory.deleteAllFilesOfDir(new File(warehouse1 + "/testdb"))
+    }
+  }
+
+
+  test("test write sdk and read with spark using different sort order data") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1+"/sdk")
+    spark.sql(s"create table sdkout using carbon options(path='$warehouse1/sdk')")
+    assert(spark.sql("select * from sdkout").collect().length == 5)
+    buildTestDataOtherDataType(5, Array("name","salary"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name = 'name1'").collect().length == 2)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 2)
+    buildTestDataOtherDataType(5, Array("name","age"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name='name0'").collect().length == 3)
+    assert(spark.sql("select * from sdkout").collect().length == 15)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 3)
+    assert(spark.sql("select * from sdkout where address='address1'").collect().length ==
3)
+    buildTestDataOtherDataType(5, Array("name","salary"), warehouse1+"/sdk")
+    spark.sql("refresh table sdkout")
+    assert(spark.sql("select * from sdkout where name='name0'").collect().length == 4)
+    assert(spark.sql("select * from sdkout").collect().length == 20)
+    assert(spark.sql("select * from sdkout where salary=100").collect().length == 4)
+    assert(spark.sql("select * from sdkout where address='address1'").collect().length ==
4)
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk"))
+  }
+
+  test("test write sdk with different schema and read with spark") {
+    spark.sql("drop table if exists sdkout")
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+    buildTestDataOtherDataType(5, Array("age", "address"), warehouse1+"/sdk1")
+    spark.sql(s"create table sdkout using carbon options(path='$warehouse1/sdk1')")
+    assert(spark.sql("select * from sdkout").collect().length == 5)
+    buildTestDataOtherDataType(5, null, warehouse1+"/sdk1", 2)
+    spark.sql("refresh table sdkout")
+    intercept[Exception] {
+      spark.sql("select * from sdkout").show()
+    }
+    intercept[Exception] {
+      spark.sql("select * from sdkout where salary=100").show()
+    }
+    FileFactory.deleteAllFilesOfDir(new File(warehouse1+"/sdk1"))
+  }
+
+  // prepare sdk writer output with other schema
+  def buildTestDataOtherDataType(rows: Int, sortColumns: Array[String], writerPath: String,
colCount: Int = -1): Any = {
+    var fields: Array[Field] = new Array[Field](6)
+    // same column name, but name as boolean type
+    fields(0) = new Field("male", DataTypes.BOOLEAN)
+    fields(1) = new Field("age", DataTypes.INT)
+    fields(2) = new Field("height", DataTypes.DOUBLE)
+    fields(3) = new Field("name", DataTypes.STRING)
+    fields(4) = new Field("address", DataTypes.STRING)
+    fields(5) = new Field("salary", DataTypes.LONG)
+
+    if (colCount > 0) {
+      val fieldsToWrite: Array[Field] = new Array[Field](colCount)
+      var i = 0
+      while (i < colCount) {
+        fieldsToWrite(i) = fields(i)
+        i += 1
+      }
+      fields = fieldsToWrite
+    }
+
+    try {
+      val builder = CarbonWriter.builder()
+      val writer =
+        builder.outputPath(writerPath)
+          .isTransactionalTable(false)
+          .uniqueIdentifier(System.nanoTime()).withBlockSize(2).sortBy(sortColumns)
+          .buildWriterForCSVInput(new Schema(fields))
+
+      var i = 0
+      while (i < rows) {
+        val array = Array[String]("true",
+          String.valueOf(i),
+          String.valueOf(i.toDouble / 2),
+          "name" + i,
+          "address" + i,
+          (i * 100).toString)
+        if (colCount > 0) {
+          writer.write(array.slice(0, colCount))
+        } else {
+          writer.write(array)
+        }
+        i += 1
+      }
+      writer.close()
+    } catch {
+      case ex: Exception => throw new RuntimeException(ex)
+      case _ => None
+    }
+  }
   override protected def beforeAll(): Unit = {
     drop
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/3894e1d0/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
----------------------------------------------------------------------
diff --git a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
index af79483..d675973 100644
--- a/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
+++ b/streaming/src/test/java/org/apache/carbondata/streaming/CarbonStreamOutputFormatTest.java
@@ -70,7 +70,8 @@ public class CarbonStreamOutputFormatTest extends TestCase {
             tablePath,
             new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString()));
 
-    CarbonTable table = StoreCreator.createTable(identifier);
+    CarbonTable table = new StoreCreator(new File("target/store").getAbsolutePath(),
+        new File("../hadoop/src/test/resources/data.csv").getCanonicalPath()).createTable(identifier);
 
     String factFilePath = new File("../hadoop/src/test/resources/data.csv").getCanonicalPath();
     carbonLoadModel = StoreCreator.buildCarbonLoadModel(table, factFilePath, identifier);


Mime
View raw message