carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [08/18] carbondata git commit: modify compare test
Date Tue, 11 Jul 2017 01:43:10 GMT
modify compare test

fix

fix style

change table


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/327b307f
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/327b307f
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/327b307f

Branch: refs/heads/encoding_override
Commit: 327b307fdddc7b0fffe5b86049d1a2d08dfb182a
Parents: d9c3b48
Author: jackylk <jacky.likun@huawei.com>
Authored: Mon Jul 3 21:54:39 2017 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Wed Jul 5 21:34:56 2017 +0800

----------------------------------------------------------------------
 .../carbondata/examples/CompareTest.scala       | 103 ++++++++++++-------
 1 file changed, 67 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/327b307f/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
----------------------------------------------------------------------
diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
index ee53c31..ffc4b22 100644
--- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
+++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala
@@ -41,6 +41,7 @@ case class Query(sqlText: String, queryType: String, desc: String)
 object CompareTest {
 
   def parquetTableName: String = "comparetest_parquet"
+  def orcTableName: String = "comparetest_orc"
   def carbonTableName(version: String): String = s"comparetest_carbonV$version"
 
   // Table schema:
@@ -63,7 +64,7 @@ object CompareTest {
   // +-------------+-----------+-------------+-------------+------------+
   // | m4          | double    | NA          | measure     | no         |
   // +-------------+-----------+-------------+-------------+------------+
-  // | m5          | double    | NA          | measure     | no         |
+  // | m5          | decimal   | NA          | measure     | no         |
   // +-------------+-----------+-------------+-------------+------------+
   private def generateDataFrame(spark: SparkSession): DataFrame = {
     val r = new Random()
@@ -71,10 +72,11 @@ object CompareTest {
         .parallelize(1 to 10 * 1000 * 1000, 4)
         .map { x =>
           ("city" + x % 8, "country" + x % 1103, "planet" + x % 10007, "IDENTIFIER" + x.toString,
-          (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13, x.toDouble / 11)
+              (x % 16).toShort, x / 2, (x << 1).toLong, x.toDouble / 13,
+              BigDecimal.valueOf(x.toDouble / 11))
         }.map { x =>
-          Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
-        }
+      Row(x._1, x._2, x._3, x._4, x._5, x._6, x._7, x._8, x._9)
+    }
 
     val schema = StructType(
       Seq(
@@ -86,7 +88,7 @@ object CompareTest {
         StructField("m2", IntegerType, nullable = false),
         StructField("m3", LongType, nullable = false),
         StructField("m4", DoubleType, nullable = false),
-        StructField("m5", DoubleType, nullable = false)
+        StructField("m5", DecimalType(30, 10), nullable = false)
       )
     )
 
@@ -142,12 +144,12 @@ object CompareTest {
     // ==                      FULL SCAN GROUP BY AGGREGATE                     ==
     // ===========================================================================
     Query(
-      "select country, sum(m1) from $table group by country",
+      "select country, sum(m1) as metric from $table group by country order by metric",
       "aggregate",
       "group by on big data, on medium card column, medium result set,"
     ),
     Query(
-      "select city, sum(m1) from $table group by city",
+      "select city, sum(m1) as metric from $table group by city order by metric",
       "aggregate",
       "group by on big data, on low card column, small result set,"
     ),
@@ -170,17 +172,20 @@ object CompareTest {
     // ==                  FILTER SCAN GROUP BY AGGREGATION                     ==
     // ===========================================================================
     Query(
-      "select country, sum(m1) from $table where city='city8' group by country ",
+      "select country, sum(m1) as metric from $table where city='city8' group by country
" +
+          "order by metric",
       "filter scan and aggregate",
       "group by on large data, small result set"
     ),
     Query(
-      "select id, sum(m1) from $table where planet='planet10' group by id",
+      "select id, sum(m1) as metric from $table where planet='planet10' group by id " +
+          "order by metric",
       "filter scan and aggregate",
       "group by on medium data, large result set"
     ),
     Query(
-      "select city, sum(m1) from $table where country='country12' group by city ",
+      "select city, sum(m1) as metric from $table where country='country12' group by city
" +
+          "order by metric",
       "filter scan and aggregate",
       "group by on medium data, small result set"
     ),
@@ -244,25 +249,35 @@ object CompareTest {
     )
   )
 
-  private def loadParquetTable(spark: SparkSession, input: DataFrame): Double = time {
+  private def loadParquetTable(spark: SparkSession, input: DataFrame, table: String)
+  : Double = time {
     // partitioned by last 1 digit of id column
     val dfWithPartition = input.withColumn("partitionCol", input.col("id").%(10))
     dfWithPartition.write
         .partitionBy("partitionCol")
         .mode(SaveMode.Overwrite)
-        .parquet(parquetTableName)
+        .parquet(table)
+    spark.read.parquet(table).registerTempTable(table)
+  }
+
+  private def loadOrcTable(spark: SparkSession, input: DataFrame, table: String): Double
= time {
+    // partitioned by last 1 digit of id column
+    input.write
+        .mode(SaveMode.Overwrite)
+        .orc(table)
+    spark.read.orc(table).registerTempTable(table)
   }
 
-  private def loadCarbonTable(spark: SparkSession, input: DataFrame, version: String): Double
= {
+  private def loadCarbonTable(spark: SparkSession, input: DataFrame, tableName: String):
Double = {
     CarbonProperties.getInstance().addProperty(
       CarbonCommonConstants.CARBON_DATA_FILE_VERSION,
-      version
+      "3"
     )
-    spark.sql(s"drop table if exists ${carbonTableName(version)}")
+    spark.sql(s"drop table if exists $tableName")
     time {
       input.write
           .format("carbondata")
-          .option("tableName", carbonTableName(version))
+          .option("tableName", tableName)
           .option("tempCSV", "false")
           .option("single_pass", "true")
           .option("dictionary_exclude", "id") // id is high cardinality column
@@ -273,18 +288,23 @@ object CompareTest {
   }
 
   // load data into parquet, carbonV2, carbonV3
-  private def prepareTable(spark: SparkSession): Unit = {
+  private def prepareTable(spark: SparkSession, table1: String, table2: String): Unit = {
     val df = generateDataFrame(spark).cache
     println(s"loading ${df.count} records, schema: ${df.schema}")
-    val loadParquetTime = loadParquetTable(spark, df)
-    val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3")
-    println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time")
+    val table1Time = if (table1.endsWith("parquet")) {
+      loadParquetTable(spark, df, table1)
+    } else if (table1.endsWith("orc")) {
+      loadOrcTable(spark, df, table1)
+    } else {
+      sys.error("invalid table: " + table1)
+    }
+    val table2Time = loadCarbonTable(spark, df, table2)
+    println(s"load completed, time: $table1Time, $table2Time")
     df.unpersist()
-    spark.read.parquet(parquetTableName).registerTempTable(parquetTableName)
   }
 
   // Run all queries for the specified table
-  private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Int)] =
{
+  private def runQueries(spark: SparkSession, tableName: String): Array[(Double, Array[Row])]
= {
     println(s"start running queries for $tableName...")
     var result: Array[Row] = null
     queries.zipWithIndex.map { case (query, index) =>
@@ -294,37 +314,46 @@ object CompareTest {
         result = spark.sql(sqlText).collect()
       }
       println(s"=> $rt sec")
-      (rt, result.length)
+      (rt, result)
+    }
+  }
+
+  private def printErrorIfNotMatch(index: Int, table1: String, result1: Array[Row],
+      table2: String, result2: Array[Row]): Unit = {
+    if (!result1.sameElements(result2)) {
+      val num = index + 1
+      println(s"$table1 result for query $num: ")
+      println(s"""${result1.mkString(",")}""")
+      println(s"$table2 result for query $num: ")
+      println(s"""${result2.mkString(",")}""")
+      sys.error(s"result not matching for query $num (${queries(index).desc})")
     }
   }
 
   // run testcases and print comparison result
-  private def runTest(spark: SparkSession): Unit = {
+  private def runTest(spark: SparkSession, table1: String, table2: String): Unit = {
     val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
     val date = new Date
     val timestamp = date.getTime
     // run queries on parquet and carbon
-    val parquetResult: Array[(Double, Int)] = runQueries(spark, parquetTableName)
+    val table1Result: Array[(Double, Array[Row])] = runQueries(spark, table1)
     // do GC and sleep for some time before running next table
     System.gc()
     Thread.sleep(1000)
     System.gc()
     Thread.sleep(1000)
-    val carbonResult: Array[(Double, Int)] = runQueries(spark, carbonTableName("3"))
+    val table2Result: Array[(Double, Array[Row])] = runQueries(spark, table2)
     // check result by comparing output from parquet and carbon
-    parquetResult.zipWithIndex.foreach { case (result, index) =>
-      if (result._2 != carbonResult(index)._2) {
-        sys.error(s"result not matching for query ${index + 1} (${queries(index).desc}):
" +
-            s"${result._2} and ${carbonResult(index)._2}")
-      }
+    table1Result.zipWithIndex.foreach { case (result, index) =>
+      printErrorIfNotMatch(index, table1, result._2, table2, table2Result(index)._2)
     }
     // print all response time in JSON format, so that it can be analyzed later
     queries.zipWithIndex.foreach { case (query, index) =>
       println("{" +
           s""""query":"${index + 1}", """ +
-          s""""parquetTime":${parquetResult(index)._1}, """ +
-          s""""carbonTime":${carbonResult(index)._1}, """ +
-          s""""fetched":${parquetResult(index)._2}, """ +
+          s""""$table1 time":${table1Result(index)._1}, """ +
+          s""""$table2 time":${table2Result(index)._1}, """ +
+          s""""fetched":${table1Result(index)._2.length}, """ +
           s""""type":"${query.queryType}", """ +
           s""""desc":"${query.desc}",  """ +
           s""""date": "${formatter.format(date)}" """ +
@@ -351,8 +380,10 @@ object CompareTest {
         .getOrCreateCarbonSession(storeLocation)
     spark.sparkContext.setLogLevel("warn")
 
-    prepareTable(spark)
-    runTest(spark)
+    val table1 = parquetTableName
+    val table2 = carbonTableName("3")
+    prepareTable(spark, table1, table2)
+    runTest(spark, table1, table2)
 
     spark.close()
   }


Mime
View raw message