carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [04/42] carbondata git commit: added check for starting dictionary server
Date Thu, 15 Jun 2017 11:50:09 GMT
added check for starting dictionary server

moved single pass test suite to common module


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/6f554504
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/6f554504
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/6f554504

Branch: refs/heads/branch-1.1
Commit: 6f55450482601488a4762a01cb4316f51d0f9025
Parents: 38a5144
Author: kunal642 <kunal.kapoor@knoldus.in>
Authored: Wed May 17 13:07:12 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Jun 15 12:44:11 2017 +0530

----------------------------------------------------------------------
 .../generator/ServerDictionaryGenerator.java    |   4 +-
 .../dataload/TestLoadDataWithSinglePass.scala   | 129 +++++++++++++++++++
 .../execution/command/carbonTableSchema.scala   |  21 ++-
 .../dataload/TestLoadDataWithSinglePass.scala   | 111 ----------------
 .../spark/rdd/CarbonDataRDDFactory.scala        |   4 +-
 .../execution/command/carbonTableSchema.scala   |  21 ++-
 6 files changed, 166 insertions(+), 124 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index cd168b8..456e885 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -73,7 +73,9 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer,
D
 
   public void writeTableDictionaryData(String tableUniqueName) throws Exception {
     TableDictionaryGenerator generator = tableMap.get(tableUniqueName);
-    generator.writeDictionaryData(tableUniqueName);
+    if (generator != null) {
+      generator.writeDictionaryData(tableUniqueName);
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
new file mode 100644
index 0000000..3bb16f2
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.integration.spark.testsuite.dataload
+
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+/**
+  * Test Class for data loading use one pass
+  *
+  */
+class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll {
+    sql("DROP TABLE IF EXISTS table_two_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass_2")
+
+    sql(
+      """
+        |CREATE TABLE table_two_pass (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      s"""
+        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_two_pass
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
+      """.stripMargin)
+
+    sql(
+      """
+        |CREATE TABLE table_one_pass (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(
+      s"""
+        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
+      """.stripMargin)
+  }
+
+  test("test data loading use one pass") {
+    checkAnswer(
+      sql("select * from table_one_pass"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  test("test data loading use one pass when offer column dictionary file") {
+    sql(
+      """
+        |CREATE TABLE table_one_pass_2 (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(
+      s"""
+        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass_2
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true', 'COLUMNDICT'=
+        |'country:$resourcesPath/columndictionary/country.csv, name:$resourcesPath/columndictionary/name.csv')
+      """.stripMargin)
+
+    checkAnswer(
+      sql("select * from table_one_pass_2"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  test("test data loading use one pass when do incremental load") {
+    sql(
+      s"""
+        |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_two_pass
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
+      """.stripMargin)
+    sql(
+      s"""
+        |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_one_pass
+        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
+      """.stripMargin)
+
+    checkAnswer(
+      sql("select * from table_one_pass"),
+      sql("select * from table_two_pass")
+    )
+  }
+
+  test("test data loading with dctionary exclude") {
+    sql("DROP TABLE IF EXISTS dict_exclude")
+    sql(
+      """
+        |CREATE TABLE dict_exclude (ID int, date Timestamp, country String,
+        |name String, phonetype String, serialname String, salary int)
+        |STORED BY 'org.apache.carbondata.format' TBLPROPERTIES('DICTIONARY_EXCLUDE'='country,name,serialname,phonetype')
+      """.stripMargin)
+    sql(
+      s"""
+         |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE dict_exclude
+         |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='FALSE')
+      """.stripMargin)
+    checkAnswer(sql("select name from dict_exclude limit 1"),Row("aaa1"))
+  }
+
+  override def afterAll {
+    sql("DROP TABLE IF EXISTS table_two_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass")
+    sql("DROP TABLE IF EXISTS table_one_pass_2")
+    sql("DROP TABLE IF EXISTS dict_exclude")
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index c770e1b..9745ddd 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -495,16 +495,27 @@ case class LoadTable(
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
           val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
           carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-          // start dictionary server when use one pass load.
-          val server: DictionaryServer = DictionaryServer
-            .getInstance(dictionaryServerPort.toInt)
-          carbonLoadModel.setDictionaryServerPort(server.getPort)
+          // start dictionary server when use one pass load and dimension with DICTIONARY
+          // encoding is present.
+          val allDimensions = table.getAllDimensions.asScala.toList
+          val createDictionary = allDimensions.exists {
+            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+              !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+          }
+          val server: Option[DictionaryServer] = if (createDictionary) {
+            val dictionaryServer = DictionaryServer
+              .getInstance(dictionaryServerPort.toInt)
+            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            Some(dictionaryServer)
+          } else {
+            None
+          }
           CarbonDataRDDFactory.loadCarbonData(sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            Some(server),
+            server,
             dataFrame,
             updateModel)
         } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
deleted file mode 100644
index 1d456d3..0000000
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataWithSinglePass.scala
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.spark.testsuite.dataload
-
-import org.apache.spark.sql.common.util.QueryTest
-import org.scalatest.BeforeAndAfterAll
-
-/**
-  * Test Class for data loading use one pass
-  *
-  */
-class TestLoadDataWithSinglePass extends QueryTest with BeforeAndAfterAll {
-
-  override def beforeAll {
-    sql("DROP TABLE IF EXISTS table_two_pass")
-    sql("DROP TABLE IF EXISTS table_one_pass")
-    sql("DROP TABLE IF EXISTS table_one_pass_2")
-
-    sql(
-      """
-        |CREATE TABLE table_two_pass (ID int, date Timestamp, country String,
-        |name String, phonetype String, serialname String, salary int)
-        |STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(
-      s"""
-        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_two_pass
-        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
-      """.stripMargin)
-
-    sql(
-      """
-        |CREATE TABLE table_one_pass (ID int, date Timestamp, country String,
-        |name String, phonetype String, serialname String, salary int)
-        |STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-
-    sql(
-      s"""
-        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass
-        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
-      """.stripMargin)
-  }
-
-  test("test data loading use one pass") {
-    checkAnswer(
-      sql("select * from table_one_pass"),
-      sql("select * from table_two_pass")
-    )
-  }
-
-  test("test data loading use one pass when offer column dictionary file") {
-    sql(
-      """
-        |CREATE TABLE table_one_pass_2 (ID int, date Timestamp, country String,
-        |name String, phonetype String, serialname String, salary int)
-        |STORED BY 'org.apache.carbondata.format'
-      """.stripMargin)
-    sql(
-      s"""
-        |LOAD DATA local inpath '$resourcesPath/source.csv' INTO TABLE table_one_pass_2
-        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true', 'COLUMNDICT'=
-        |'country:$resourcesPath/columndictionary/country.csv, name:$resourcesPath/columndictionary/name.csv')
-      """.stripMargin)
-
-    checkAnswer(
-      sql("select * from table_one_pass_2"),
-      sql("select * from table_two_pass")
-    )
-  }
-
-  test("test data loading use one pass when do incremental load") {
-    sql(
-      s"""
-        |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_two_pass
-        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='false')
-      """.stripMargin)
-    sql(
-      s"""
-        |LOAD DATA local inpath '$resourcesPath/dataIncrement.csv' INTO TABLE table_one_pass
-        |OPTIONS('DELIMITER'= ',', 'SINGLE_PASS'='true')
-      """.stripMargin)
-
-    checkAnswer(
-      sql("select * from table_one_pass"),
-      sql("select * from table_two_pass")
-    )
-  }
-
-  override def afterAll {
-    sql("DROP TABLE IF EXISTS table_two_pass")
-    sql("DROP TABLE IF EXISTS table_one_pass")
-    sql("DROP TABLE IF EXISTS table_one_pass_2")
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 835af35..ede63ec 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -861,7 +861,7 @@ object CarbonDataRDDFactory {
 
   private def writeDictionary(carbonLoadModel: CarbonLoadModel,
       result: Option[DictionaryServer], writeAll: Boolean) = {
-    // write dictionary file and shutdown dictionary server
+    // write dictionary file
     val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
       carbonLoadModel.getTableName
     }"
@@ -874,7 +874,7 @@ object CarbonDataRDDFactory {
             server.writeTableDictionary(uniqueTableName)
           }
         } catch {
-          case ex: Exception =>
+          case _: Exception =>
             LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
             throw new Exception("Dataload failed due to error while writing dictionary file!")
         }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/6f554504/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 11b3115..94a95fd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -504,16 +504,27 @@ case class LoadTable(
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
             getConf.get("spark.driver.host")
           carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
-          // start dictionary server when use one pass load.
-          val server: DictionaryServer = DictionaryServer
-            .getInstance(dictionaryServerPort.toInt)
-          carbonLoadModel.setDictionaryServerPort(server.getPort)
+          // start dictionary server when use one pass load and dimension with DICTIONARY
+          // encoding is present.
+          val allDimensions = table.getAllDimensions.asScala.toList
+          val createDictionary = allDimensions.exists {
+            carbonDimension => carbonDimension.hasEncoding(Encoding.DICTIONARY) &&
+                               !carbonDimension.hasEncoding(Encoding.DIRECT_DICTIONARY)
+          }
+          val server: Option[DictionaryServer] = if (createDictionary) {
+            val dictionaryServer = DictionaryServer
+              .getInstance(dictionaryServerPort.toInt)
+            carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            Some(dictionaryServer)
+          } else {
+            None
+          }
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            Some(server),
+            server,
             dataFrame,
             updateModel)
         }


Mime
View raw message