carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [1/2] carbondata git commit: close dictionary server on application end
Date Fri, 26 May 2017 16:38:05 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 823eb1d71 -> 175fc0db2


close dictionary server on application end


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/51d32b25
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/51d32b25
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/51d32b25

Branch: refs/heads/master
Commit: 51d32b2550cbcce5d62202989126bb6c4290173f
Parents: 823eb1d
Author: kunal642 <kunal.kapoor@knoldus.in>
Authored: Sun May 21 23:12:59 2017 +0530
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Fri May 26 22:07:16 2017 +0530

----------------------------------------------------------------------
 .../carbondata/core/dictionary/server/DictionaryServer.java   | 4 +---
 .../spark/sql/execution/command/carbonTableSchema.scala       | 6 ++++++
 .../spark/sql/execution/command/carbonTableSchema.scala       | 7 +++++++
 3 files changed, 14 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/51d32b25/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index f86cd6b..84f2a0d 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -135,11 +135,9 @@ public class DictionaryServer {
    * @throws Exception
    */
   public void shutdown() throws Exception {
+    LOGGER.info("Shutting down dictionary server");
     worker.shutdownGracefully();
     boss.shutdownGracefully();
-    // Wait until all threads are terminated.
-    boss.terminationFuture().sync();
-    worker.terminationFuture().sync();
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51d32b25/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 494beff..7258511 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -525,6 +526,11 @@ case class LoadTable(
             val dictionaryServer = DictionaryServer
               .getInstance(dictionaryServerPort.toInt)
             carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            sqlContext.sparkContext.addSparkListener(new SparkListener() {
+              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd)
{
+                dictionaryServer.shutdown()
+              }
+            })
             Some(dictionaryServer)
           } else {
             None

http://git-wip-us.apache.org/repos/asf/carbondata/blob/51d32b25/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 09824d8..5dd6832 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -23,6 +23,7 @@ import scala.collection.JavaConverters._
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd}
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute
@@ -533,10 +534,16 @@ case class LoadTable(
             val dictionaryServer = DictionaryServer
               .getInstance(dictionaryServerPort.toInt)
             carbonLoadModel.setDictionaryServerPort(dictionaryServer.getPort)
+            sparkSession.sparkContext.addSparkListener(new SparkListener() {
+              override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd)
{
+                dictionaryServer.shutdown()
+              }
+            })
             Some(dictionaryServer)
           } else {
             None
           }
+
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,


Mime
View raw message