carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] carbondata git commit: fixed multiple dictionary server issue
Date Sun, 14 May 2017 05:59:47 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 6db1903be -> fb2441a93


fixed multiple dictionary server issue


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/8815dd58
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/8815dd58
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/8815dd58

Branch: refs/heads/master
Commit: 8815dd586c181e74400190ae04f4ec0e86d25402
Parents: 35b23c9
Author: kunal642 <kunal.kapoor@knoldus.in>
Authored: Wed Mar 15 14:38:31 2017 +0530
Committer: kunal642 <kunal.kapoor@knoldus.in>
Committed: Thu May 11 16:20:03 2017 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  10 ++
 .../dictionary/client/DictionaryClient.java     |   8 +-
 .../generator/ServerDictionaryGenerator.java    |   5 +
 .../generator/key/DictionaryMessageType.java    |   3 +-
 .../dictionary/server/DictionaryServer.java     | 113 +++++++++++++++----
 .../server/DictionaryServerHandler.java         |   3 +
 .../dictionary/client/DictionaryClientTest.java |   4 +-
 .../spark/rdd/CarbonDataRDDFactory.scala        |  64 ++++++-----
 .../execution/command/carbonTableSchema.scala   |  29 +----
 .../spark/rdd/CarbonDataRDDFactory.scala        |  41 +++----
 .../execution/command/carbonTableSchema.scala   |  29 +----
 11 files changed, 179 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index ee18321..269a75f 100644
--- a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -700,6 +700,16 @@ public final class CarbonCommonConstants {
   public static final String DICTIONARY_ONE_CHUNK_SIZE = "carbon.dictionary.chunk.size";
 
   /**
+   *  Dictionary Server Worker Threads
+   */
+  public static final String DICTIONARY_WORKER_THREADS = "dictionary.worker.threads";
+
+  /**
+   *  Dictionary Server Worker Threads
+   */
+  public static final String DICTIONARY_WORKER_THREADS_DEFAULT = "1";
+
+  /**
    * dictionary chunk default size
    */
   public static final String DICTIONARY_ONE_CHUNK_SIZE_DEFAULT = "10000";

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
index 39f747e..7910190 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/client/DictionaryClient.java
@@ -49,8 +49,10 @@ public class DictionaryClient {
    * @param port
    */
   public void startClient(String address, int port) {
+    LOGGER.audit("Starting client on " + address + " " + port);
     long start = System.currentTimeMillis();
-    workerGroup = new NioEventLoopGroup();
+    // Create an Event with 1 thread.
+    workerGroup = new NioEventLoopGroup(1);
     Bootstrap clientBootstrap = new Bootstrap();
     clientBootstrap.group(workerGroup).channel(NioSocketChannel.class)
         .handler(new ChannelInitializer<SocketChannel>() {
@@ -58,7 +60,9 @@ public class DictionaryClient {
             ChannelPipeline pipeline = ch.pipeline();
             // Based on length provided at header, it collects all packets
             pipeline
-                .addLast("LengthDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 2,
0, 2));
+                .addLast("LengthDecoder",
+                    new LengthFieldBasedFrameDecoder(1048576, 0,
+                        2, 0, 2));
             pipeline.addLast("DictionaryClientHandler", dictionaryClientHandler);
           }
         });

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
index b2b9863..cd168b8 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/ServerDictionaryGenerator.java
@@ -71,4 +71,9 @@ public class ServerDictionaryGenerator implements DictionaryGenerator<Integer,
D
     }
   }
 
+  public void writeTableDictionaryData(String tableUniqueName) throws Exception {
+    TableDictionaryGenerator generator = tableMap.get(tableUniqueName);
+    generator.writeDictionaryData(tableUniqueName);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
index 608b602..b3d1f9a 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/generator/key/DictionaryMessageType.java
@@ -24,7 +24,8 @@ public enum DictionaryMessageType {
   DICT_GENERATION((byte) 1),
   TABLE_INTIALIZATION((byte) 2),
   SIZE((byte) 3),
-  WRITE_DICTIONARY((byte) 4);
+  WRITE_DICTIONARY((byte) 4),
+  WRITE_TABLE_DICTIONARY((byte) 5);
 
   final byte type;
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
index e2eaaa3..f86cd6b 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServer.java
@@ -18,8 +18,10 @@ package org.apache.carbondata.core.dictionary.server;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessage;
 import org.apache.carbondata.core.dictionary.generator.key.DictionaryMessageType;
+import org.apache.carbondata.core.util.CarbonProperties;
 
 import io.netty.bootstrap.ServerBootstrap;
 import io.netty.channel.ChannelInitializer;
@@ -43,44 +45,91 @@ public class DictionaryServer {
 
   private EventLoopGroup boss;
   private EventLoopGroup worker;
+  private int port;
+  private static Object lock = new Object();
+  private static DictionaryServer INSTANCE = null;
+
+  private DictionaryServer(int port) {
+    startServer(port);
+  }
+
+  public static DictionaryServer getInstance(int port) {
+    if (INSTANCE == null) {
+      synchronized (lock) {
+        if (INSTANCE == null) {
+          INSTANCE = new DictionaryServer(port);
+        }
+      }
+    }
+    return INSTANCE;
+  }
 
   /**
    * start dictionary server
    *
    * @param port
-   * @throws Exception
    */
-  public void startServer(int port) {
-    long start = System.currentTimeMillis();
+  private void startServer(int port) {
     dictionaryServerHandler = new DictionaryServerHandler();
-    boss = new NioEventLoopGroup();
-    worker = new NioEventLoopGroup();
+    String workerThreads = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.DICTIONARY_WORKER_THREADS,
+            CarbonCommonConstants.DICTIONARY_WORKER_THREADS_DEFAULT);
+    boss = new NioEventLoopGroup(1);
+    worker = new NioEventLoopGroup(Integer.parseInt(workerThreads));
+    // Configure the server.
+    bindToPort(port);
+  }
+
+  /**
+   * Binds dictionary server to an available port.
+   *
+   * @param port
+   */
+  private void bindToPort(int port) {
+    long start = System.currentTimeMillis();
     // Configure the server.
-    try {
-      ServerBootstrap bootstrap = new ServerBootstrap();
-      bootstrap.group(boss, worker);
-      bootstrap.channel(NioServerSocketChannel.class);
-
-      bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
-        @Override public void initChannel(SocketChannel ch) throws Exception {
-          ChannelPipeline pipeline = ch.pipeline();
-          // Based on length provided at header, it collects all packets
-          pipeline.addLast("LengthDecoder", new LengthFieldBasedFrameDecoder(1048576, 0,
2, 0, 2));
-          pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
+    int i = 0;
+    while (i < 10) {
+      int newPort = port + i;
+      try {
+        ServerBootstrap bootstrap = new ServerBootstrap();
+        bootstrap.group(boss, worker);
+        bootstrap.channel(NioServerSocketChannel.class);
+        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
+          @Override public void initChannel(SocketChannel ch) throws Exception {
+            ChannelPipeline pipeline = ch.pipeline();
+            pipeline
+                .addLast("LengthDecoder",
+                    new LengthFieldBasedFrameDecoder(1048576, 0,
+                        2, 0, 2));
+            pipeline.addLast("DictionaryServerHandler", dictionaryServerHandler);
+          }
+        });
+        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
+        bootstrap.bind(newPort).sync();
+        LOGGER.audit("Dictionary Server started, Time spent " + (System.currentTimeMillis()
- start)
+            + " Listening on port " + newPort);
+        this.port = newPort;
+        break;
+      } catch (Exception e) {
+        LOGGER.error(e, "Dictionary Server Failed to bind to port:");
+        if (i == 9) {
+          throw new RuntimeException("Dictionary Server Could not bind to any port");
         }
-      });
-      bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
-      bootstrap.bind(port).sync();
-
-      LOGGER.info("Dictionary Server started, Time spent " + (System.currentTimeMillis()
- start)
-          + " Listening on port " + port);
-    } catch (Exception e) {
-      LOGGER.error(e, "Dictionary Server Start Failed");
-      throw new RuntimeException(e);
+      }
+      i++;
     }
   }
 
   /**
+   *
+   * @return Port on which the DictionaryServer has started.
+   */
+  public int getPort() {
+    return port;
+  }
+
+  /**
    * shutdown dictionary server
    *
    * @throws Exception
@@ -93,6 +142,8 @@ public class DictionaryServer {
     worker.terminationFuture().sync();
   }
 
+
+
   /**
    * Write dictionary to the store.
    * @throws Exception
@@ -102,4 +153,16 @@ public class DictionaryServer {
     key.setType(DictionaryMessageType.WRITE_DICTIONARY);
     dictionaryServerHandler.processMessage(key);
   }
+
+  /**
+   *  Write Dictionary for one table.
+   * @throws Exception
+   */
+
+  public void writeTableDictionary(String uniqueTableName) throws Exception {
+    DictionaryMessage key = new DictionaryMessage();
+    key.setTableUniqueName(uniqueTableName);
+    key.setType(DictionaryMessageType.WRITE_TABLE_DICTIONARY);
+    dictionaryServerHandler.processMessage(key);
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
index a14b675..946a43d 100644
--- a/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
+++ b/core/src/main/java/org/apache/carbondata/core/dictionary/server/DictionaryServerHandler.java
@@ -101,6 +101,9 @@ public class DictionaryServerHandler extends ChannelInboundHandlerAdapter
{
       case WRITE_DICTIONARY :
         generatorForServer.writeDictionaryData();
         return 0;
+      case WRITE_TABLE_DICTIONARY:
+        generatorForServer.writeTableDictionaryData(key.getTableUniqueName());
+        return 0;
       default:
         return -1;
     }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
index c7989e4..a96e364 100644
--- a/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/dictionary/client/DictionaryClientTest.java
@@ -91,8 +91,7 @@ public class DictionaryClientTest {
     metadata.addCarbonTable(carbonTable);
 
     // Start the server for testing the client
-    server = new DictionaryServer();
-    server.startServer(5678);
+    server = DictionaryServer.getInstance(5678);
   }
 
   @Test public void testClient() throws Exception {
@@ -159,7 +158,6 @@ public class DictionaryClientTest {
     client.shutDown();
 
     // Shutdown the server
-    server.shutdown();
   }
 
   @After public void tearDown() {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 51a56e9..8f4727a 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -347,7 +347,7 @@ object CarbonDataRDDFactory {
       storePath: String,
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
-      result: Future[DictionaryServer],
+      result: Option[DictionaryServer],
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -890,31 +890,28 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
-        shutDownDictionaryServer(carbonLoadModel, result, false)
         throw new Exception(errorMessage)
       } else {
         val metadataDetails = status(0)._2
         if (!isAgg) {
-          val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
-            carbonLoadModel, loadStatus, loadStartTime)
-          if (!status) {
-            val errorMessage = "Dataload failed due to failure in table status updation."
-            LOGGER.audit("Data load is failed for " +
-                s"${ carbonLoadModel.getDatabaseName }.${
-                  carbonLoadModel
-                      .getTableName
-                }")
-            LOGGER.error("Dataload failed due to failure in table status updation.")
-            shutDownDictionaryServer(carbonLoadModel, result, false)
-            throw new Exception(errorMessage)
-          }
+            writeDictionary(carbonLoadModel, result, false)
+            val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
+              carbonLoadModel, loadStatus, loadStartTime)
+            if (!status) {
+              val errorMessage = "Dataload failed due to failure in table status updation."
+              LOGGER.audit("Data load is failed for " +
+                           s"${ carbonLoadModel.getDatabaseName }.${
+                             carbonLoadModel
+                               .getTableName
+                           }")
+              LOGGER.error("Dataload failed due to failure in table status updation.")
+              throw new Exception(errorMessage)
+            }
         } else if (!carbonLoadModel.isRetentionRequest) {
           // TODO : Handle it
           LOGGER.info("********Database updated**********")
         }
 
-        shutDownDictionaryServer(carbonLoadModel, result)
-
         if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
           LOGGER.audit("Data load is partially successful for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
@@ -935,22 +932,27 @@ object CarbonDataRDDFactory {
 
   }
 
-  private def shutDownDictionaryServer(carbonLoadModel: CarbonLoadModel,
-      result: Future[DictionaryServer], writeDictionary: Boolean = true): Unit = {
+  private def writeDictionary(carbonLoadModel: CarbonLoadModel,
+      result: Option[DictionaryServer], writeAll: Boolean) = {
     // write dictionary file and shutdown dictionary server
-    if (carbonLoadModel.getUseOnePass) {
-      try {
-        val server = result.get()
-        if (writeDictionary) {
-          server.writeDictionary()
+    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
+      carbonLoadModel.getTableName }"
+    result match {
+      case Some(server) =>
+        try {
+          if (writeAll) {
+            server.writeDictionary()
+          }
+          else {
+            server.writeTableDictionary(uniqueTableName)
+          }
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
+            throw new Exception("Dataload failed due to error while writing dictionary file!")
         }
-        server.shutdown()
-      } catch {
-        case ex: Exception =>
-          LOGGER.error("Error while close dictionary server and write dictionary file for
" +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
-          throw new Exception("Dataload failed due to error while write dictionary file!")
-      }
+      case _ =>
     }
   }
+
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index c030321..c770e1b 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,10 +18,6 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
-import java.util.concurrent.Callable
-import java.util.concurrent.Executors
-import java.util.concurrent.ExecutorService
-import java.util.concurrent.Future
 
 import scala.collection.JavaConverters._
 import scala.language.implicitConversions
@@ -461,9 +457,6 @@ case class LoadTable(
 
       val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
 
-      var result: Future[DictionaryServer] = null
-      var executorService: ExecutorService = null
-
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -500,26 +493,18 @@ case class LoadTable(
           val dictionaryServerPort = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-          carbonLoadModel.setDictionaryServerPort(Integer.parseInt(dictionaryServerPort))
           val sparkDriverHost = sqlContext.sparkContext.getConf.get("spark.driver.host")
           carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
           // start dictionary server when use one pass load.
-          executorService = Executors.newFixedThreadPool(1)
-          result = executorService.submit(new Callable[DictionaryServer]() {
-            @throws[Exception]
-            def call: DictionaryServer = {
-              Thread.currentThread().setName("Dictionary server")
-              val server: DictionaryServer = new DictionaryServer
-              server.startServer(dictionaryServerPort.toInt)
-              server
-            }
-          })
+          val server: DictionaryServer = DictionaryServer
+            .getInstance(dictionaryServerPort.toInt)
+          carbonLoadModel.setDictionaryServerPort(server.getPort)
           CarbonDataRDDFactory.loadCarbonData(sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            result,
+            Some(server),
             dataFrame,
             updateModel)
         } else {
@@ -564,7 +549,7 @@ case class LoadTable(
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            result,
+            None,
             loadDataFrame,
             updateModel)
         }
@@ -576,10 +561,6 @@ case class LoadTable(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
-          // shutdown dictionary server thread
-          if (carbonLoadModel.getUseOnePass) {
-            executorService.shutdownNow()
-          }
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 4656c2e..835af35 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -26,7 +26,7 @@ import scala.collection.mutable.ListBuffer
 import scala.util.Random
 import scala.util.control.Breaks._
 
-import org.apache.hadoop.conf.{Configurable, Configuration}
+import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
@@ -360,7 +360,7 @@ object CarbonDataRDDFactory {
       storePath: String,
       columnar: Boolean,
       partitionStatus: String = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS,
-      result: Future[DictionaryServer],
+      result: Option[DictionaryServer],
       dataFrame: Option[DataFrame] = None,
       updateModel: Option[UpdateTableModel] = None): Unit = {
     val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
@@ -807,7 +807,6 @@ object CarbonDataRDDFactory {
         LOGGER.audit(s"Data load is failed for " +
             s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
         LOGGER.warn("Cannot write load metadata file as data load failed")
-        shutdownDictionaryServer(carbonLoadModel, result, false)
         throw new Exception(errorMessage)
       } else {
         // if segment is empty then fail the data load
@@ -818,11 +817,11 @@ object CarbonDataRDDFactory {
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}" +
                        " as there is no data to load")
           LOGGER.warn("Cannot write load metadata file as data load failed")
-          shutdownDictionaryServer(carbonLoadModel, result, false)
           throw new Exception("No Data to load")
         }
         val metadataDetails = status(0)._2
         if (!isAgg) {
+          writeDictionary(carbonLoadModel, result, false)
           val status = CarbonLoaderUtil.recordLoadMetadata(currentLoadCount, metadataDetails,
             carbonLoadModel, loadStatus, loadStartTime)
           if (!status) {
@@ -833,7 +832,6 @@ object CarbonDataRDDFactory {
                       .getTableName
                 }")
             LOGGER.error("Dataload failed due to failure in table status updation.")
-            shutdownDictionaryServer(carbonLoadModel, result, false)
             throw new Exception(errorMessage)
           }
         } else if (!carbonLoadModel.isRetentionRequest) {
@@ -841,7 +839,6 @@ object CarbonDataRDDFactory {
           LOGGER.info("********Database updated**********")
         }
 
-        shutdownDictionaryServer(carbonLoadModel, result)
         if (CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS.equals(loadStatus)) {
           LOGGER.audit("Data load is partially successful for " +
                        s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
@@ -862,22 +859,26 @@ object CarbonDataRDDFactory {
 
   }
 
-  private def shutdownDictionaryServer(carbonLoadModel: CarbonLoadModel,
-      result: Future[DictionaryServer], writeDictionary: Boolean = true) = {
+  private def writeDictionary(carbonLoadModel: CarbonLoadModel,
+      result: Option[DictionaryServer], writeAll: Boolean) = {
     // write dictionary file and shutdown dictionary server
-    if (carbonLoadModel.getUseOnePass) {
-      try {
-        val server = result.get()
-        if (writeDictionary) {
-          server.writeDictionary()
+    val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
+      carbonLoadModel.getTableName
+    }"
+    result match {
+      case Some(server) =>
+        try {
+          if (writeAll) {
+            server.writeDictionary()
+          } else {
+            server.writeTableDictionary(uniqueTableName)
+          }
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(s"Error while writing dictionary file for $uniqueTableName")
+            throw new Exception("Dataload failed due to error while writing dictionary file!")
         }
-        server.shutdown()
-      } catch {
-        case ex: Exception =>
-          LOGGER.error("Error while close dictionary server and write dictionary file for
" +
-                       s"${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName
}")
-          throw new Exception("Dataload failed due to error while write dictionary file!")
-      }
+      case _ =>
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/8815dd58/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 593de39..11b3115 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -18,10 +18,8 @@
 package org.apache.spark.sql.execution.command
 
 import java.io.File
-import java.util.concurrent.{Callable, Executors, ExecutorService, Future}
 
 import scala.collection.JavaConverters._
-import scala.collection.mutable.ListBuffer
 import scala.language.implicitConversions
 
 import org.apache.commons.lang3.StringUtils
@@ -468,9 +466,6 @@ case class LoadTable(
       carbonLoadModel.setAllDictPath(allDictionaryPath)
 
       val partitionStatus = CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS
-      var result: Future[DictionaryServer] = null
-      var executorService: ExecutorService = null
-
       try {
         // First system has to partition the data first and then call the load data
         LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)")
@@ -506,27 +501,19 @@ case class LoadTable(
           val dictionaryServerPort = CarbonProperties.getInstance()
             .getProperty(CarbonCommonConstants.DICTIONARY_SERVER_PORT,
               CarbonCommonConstants.DICTIONARY_SERVER_PORT_DEFAULT)
-          carbonLoadModel.setDictionaryServerPort(Integer.parseInt(dictionaryServerPort))
           val sparkDriverHost = sparkSession.sqlContext.sparkContext.
             getConf.get("spark.driver.host")
           carbonLoadModel.setDictionaryServerHost(sparkDriverHost)
           // start dictionary server when use one pass load.
-          executorService = Executors.newFixedThreadPool(1)
-          result = executorService.submit(new Callable[DictionaryServer]() {
-            @throws[Exception]
-            def call: DictionaryServer = {
-              Thread.currentThread().setName("Dictionary server")
-              val server: DictionaryServer = new DictionaryServer
-              server.startServer(dictionaryServerPort.toInt)
-              server
-            }
-          })
+          val server: DictionaryServer = DictionaryServer
+            .getInstance(dictionaryServerPort.toInt)
+          carbonLoadModel.setDictionaryServerPort(server.getPort)
           CarbonDataRDDFactory.loadCarbonData(sparkSession.sqlContext,
             carbonLoadModel,
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            result,
+            Some(server),
             dataFrame,
             updateModel)
         }
@@ -575,7 +562,7 @@ case class LoadTable(
             relation.tableMeta.storePath,
             columnar,
             partitionStatus,
-            result,
+            None,
             loadDataFrame,
             updateModel)
         }
@@ -587,12 +574,6 @@ case class LoadTable(
       } finally {
         // Once the data load is successful delete the unwanted partition files
         try {
-
-          // shutdown dictionary server thread
-          if (carbonLoadModel.getUseOnePass) {
-            executorService.shutdownNow()
-          }
-
           val fileType = FileFactory.getFileType(partitionLocation)
           if (FileFactory.isFileExist(partitionLocation, fileType)) {
             val file = FileFactory


Mime
View raw message