carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/3] incubator-carbondata git commit: WIP provide dictionary server/client framework
Date Thu, 29 Dec 2016 14:35:30 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 241f45f8a -> 20a0b9ec5


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
index 38f8c79..6d9be67 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datatypes/PrimitiveDataType.java
@@ -23,7 +23,9 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
@@ -34,11 +36,15 @@ import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDime
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
 import org.apache.carbondata.core.keygenerator.KeyGenException;
 import org.apache.carbondata.core.keygenerator.KeyGenerator;
 import org.apache.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 import org.apache.carbondata.core.util.DataTypeUtil;
+import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.DirectDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.CarbonCSVBasedDimSurrogateKeyGen;
@@ -116,8 +122,10 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
    * @param columnId
    */
   public PrimitiveDataType(String name, String parentname, String columnId,
-      CarbonDimension carbonDimension, Cache<DictionaryColumnUniqueIdentifier, Dictionary>
cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+                           CarbonDimension carbonDimension,
+                           Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
+                           CarbonTableIdentifier carbonTableIdentifier,
+                           DictionaryClient client, Boolean useOnePass, String storePath)
{
     this.name = name;
     this.parentname = parentname;
     this.columnId = columnId;
@@ -130,8 +138,33 @@ public class PrimitiveDataType implements GenericDataType<Object>
{
         dictionaryGenerator = new DirectDictionary(DirectDictionaryKeyGeneratorFactory
             .getDirectDictionaryGenerator(carbonDimension.getDataType()));
       } else {
-        Dictionary dictionary = cache.get(identifier);
-        dictionaryGenerator = new PreCreatedDictionary(dictionary);
+        Dictionary dictionary = null;
+        if (useOnePass) {
+          if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+            try {
+              dictionary = cache.get(identifier);
+            } catch (CarbonUtilException e) {
+              throw new RuntimeException(e);
+            }
+          }
+          String threadNo = "initial";
+          DictionaryKey dictionaryKey = new DictionaryKey();
+          dictionaryKey.setColumnName(carbonDimension.getColName());
+          dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
+          dictionaryKey.setThreadNo(threadNo);
+          // for table initialization
+          dictionaryKey.setType("TABLE_INTIALIZATION");
+          dictionaryKey.setData("0");
+          client.getDictionary(dictionaryKey);
+          Map<Object, Integer> localCache = new HashMap<>();
+          // for generate dictionary
+          dictionaryKey.setType("DICTIONARY_GENERATION");
+          dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+                  dictionaryKey, localCache);
+        } else {
+          dictionary = cache.get(identifier);
+          dictionaryGenerator = new PreCreatedDictionary(dictionary);
+        }
       }
     } catch (CarbonUtilException e) {
       throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index 9e8f5b0..b7c17dc 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -134,6 +134,21 @@ public class CarbonLoadModel implements Serializable {
   private String rddIteratorKey;
 
   /**
+   *  Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  /**
    * get escape char
    * @return
    */
@@ -336,6 +351,9 @@ public class CarbonLoadModel implements Serializable {
     copy.commentChar = commentChar;
     copy.maxColumns = maxColumns;
     copy.storePath = storePath;
+    copy.useOnePass = useOnePass;
+    copy.dictionaryServerHost = dictionaryServerHost;
+    copy.dictionaryServerPort = dictionaryServerPort;
     return copy;
   }
 
@@ -380,6 +398,9 @@ public class CarbonLoadModel implements Serializable {
     copyObj.dateFormat = dateFormat;
     copyObj.maxColumns = maxColumns;
     copyObj.storePath = storePath;
+    copyObj.useOnePass = useOnePass;
+    copyObj.dictionaryServerHost = dictionaryServerHost;
+    copyObj.dictionaryServerPort = dictionaryServerPort;
     return copyObj;
   }
 
@@ -609,4 +630,28 @@ public class CarbonLoadModel implements Serializable {
     this.rddIteratorKey = rddIteratorKey;
 
   }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
index 79e17e2..7450b1f 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/CarbonDataLoadConfiguration.java
@@ -41,6 +41,24 @@ public class CarbonDataLoadConfiguration {
 
   private Map<String, Object> dataLoadProperties = new HashMap<>();
 
+  /**
+   *  Use one pass to generate dictionary
+   */
+  private boolean useOnePass;
+
+  /**
+   * dictionary server host
+   */
+  private String dictionaryServerHost;
+
+  /**
+   * dictionary sever port
+   */
+  private int dictionaryServerPort;
+
+  public CarbonDataLoadConfiguration() {
+  }
+
   public int getDimensionCount() {
     int dimCount = 0;
     for (int i = 0; i < dataFields.length; i++) {
@@ -144,4 +162,28 @@ public class CarbonDataLoadConfiguration {
   public void setBucketingInfo(BucketingInfo bucketingInfo) {
     this.bucketingInfo = bucketingInfo;
   }
+
+  public boolean getUseOnePass() {
+    return useOnePass;
+  }
+
+  public void setUseOnePass(boolean useOnePass) {
+    this.useOnePass = useOnePass;
+  }
+
+  public String getDictionaryServerHost() {
+    return dictionaryServerHost;
+  }
+
+  public void setDictionaryServerHost(String dictionaryServerHost) {
+    this.dictionaryServerHost = dictionaryServerHost;
+  }
+
+  public int getDictionaryServerPort() {
+    return dictionaryServerPort;
+  }
+
+  public void setDictionaryServerPort(int dictionaryServerPort) {
+    this.dictionaryServerPort = dictionaryServerPort;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
index 63147c9..f87b4bb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadProcessBuilder.java
@@ -43,6 +43,8 @@ import org.apache.carbondata.processing.newflow.steps.InputProcessorStepImpl;
 import org.apache.carbondata.processing.newflow.steps.SortProcessorStepImpl;
 import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 
+import org.apache.commons.lang3.StringUtils;
+
 /**
  * It builds the pipe line of steps for loading data to carbon.
  */
@@ -124,6 +126,10 @@ public final class DataLoadProcessBuilder {
           CarbonDataProcessorUtil.getCsvFileToRead(loadModel.getFactFilesToProcess().get(0));
       csvFileName = csvFile.getName();
       csvHeader = CarbonDataProcessorUtil.getFileHeader(csvFile);
+      // if firstRow = " ", then throw exception
+      if (StringUtils.isNotEmpty(csvHeader) && StringUtils.isBlank(csvHeader)) {
+        throw new CarbonDataLoadingException("First line of the csv is not valid.");
+      }
       configuration.setHeader(
           CarbonDataProcessorUtil.getColumnFields(csvHeader, loadModel.getCsvDelimiter()));
     }
@@ -194,6 +200,11 @@ public final class DataLoadProcessBuilder {
     }
     configuration.setDataFields(dataFields.toArray(new DataField[dataFields.size()]));
     configuration.setBucketingInfo(carbonTable.getBucketingInfo(carbonTable.getFactTableName()));
+    // configuration for one pass load: dictionary server info
+    configuration.setUseOnePass(loadModel.getUseOnePass());
+    configuration.setDictionaryServerHost(loadModel.getDictionaryServerHost());
+    configuration.setDictionaryServerPort(loadModel.getDictionaryServerPort());
+
     return configuration;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
index 3182a37..82a7bc2 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/DictionaryFieldConverterImpl.java
@@ -19,22 +19,27 @@
 
 package org.apache.carbondata.processing.newflow.converter.impl;
 
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.cache.Cache;
-import org.apache.carbondata.core.cache.dictionary.Dictionary;
-import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.cache.dictionary.*;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.devapi.BiDictionary;
 import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+import org.apache.carbondata.core.util.CarbonUtil;
 import org.apache.carbondata.core.util.CarbonUtilException;
 import org.apache.carbondata.core.util.DataTypeUtil;
 import org.apache.carbondata.processing.newflow.DataField;
 import org.apache.carbondata.processing.newflow.converter.BadRecordLogHolder;
+import org.apache.carbondata.processing.newflow.dictionary.DictionaryServerClientDictionary;
 import org.apache.carbondata.processing.newflow.dictionary.PreCreatedDictionary;
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
@@ -54,19 +59,48 @@ public class DictionaryFieldConverterImpl extends AbstractDictionaryFieldConvert
 
   public DictionaryFieldConverterImpl(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index) {
+      CarbonTableIdentifier carbonTableIdentifier, String nullFormat, int index,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     this.index = index;
     this.carbonDimension = (CarbonDimension) dataField.getColumn();
     this.nullFormat = nullFormat;
     DictionaryColumnUniqueIdentifier identifier =
         new DictionaryColumnUniqueIdentifier(carbonTableIdentifier,
             dataField.getColumn().getColumnIdentifier(), dataField.getColumn().getDataType());
-    try {
-      Dictionary dictionary = cache.get(identifier);
-      dictionaryGenerator = new PreCreatedDictionary(dictionary);
-    } catch (CarbonUtilException e) {
-      LOGGER.error(e);
-      throw new RuntimeException(e);
+
+    Dictionary dictionary = null;
+    // if use one pass, use DictionaryServerClientDictionary
+    if (useOnePass) {
+      if (CarbonUtil.isFileExistsForGivenColumn(storePath, identifier)) {
+        try{
+          dictionary = cache.get(identifier);
+        } catch (CarbonUtilException e) {
+          LOGGER.error(e);
+          throw new RuntimeException(e);
+        }
+      }
+      String threadNo = "initial";
+      DictionaryKey dictionaryKey = new DictionaryKey();
+      dictionaryKey.setColumnName(dataField.getColumn().getColName());
+      dictionaryKey.setTableUniqueName(carbonTableIdentifier.getTableUniqueName());
+      dictionaryKey.setThreadNo(threadNo);
+      // for table initialization
+      dictionaryKey.setType("TABLE_INTIALIZATION");
+      dictionaryKey.setData("0");
+      client.getDictionary(dictionaryKey);
+      Map<Object, Integer> localCache = new HashMap<>();
+      // for generate dictionary
+      dictionaryKey.setType("DICTIONARY_GENERATION");
+      dictionaryGenerator = new DictionaryServerClientDictionary(dictionary, client,
+              dictionaryKey, localCache);
+    } else {
+      try {
+        dictionary = cache.get(identifier);
+        dictionaryGenerator = new PreCreatedDictionary(dictionary);
+      } catch (CarbonUtilException e) {
+        LOGGER.error(e);
+        throw new RuntimeException(e);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
index a46b9ba..b065b55 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/FieldEncoderFactory.java
@@ -27,6 +27,7 @@ import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.metadata.encoder.Encoding;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonColumn;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.processing.datatypes.ArrayDataType;
 import org.apache.carbondata.processing.datatypes.GenericDataType;
 import org.apache.carbondata.processing.datatypes.PrimitiveDataType;
@@ -60,7 +61,8 @@ public class FieldEncoderFactory {
    */
   public FieldConverter createFieldEncoder(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat) {
+      CarbonTableIdentifier carbonTableIdentifier, int index, String nullFormat,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     // Converters are only needed for dimensions and measures it return null.
     if (dataField.getColumn().isDimesion()) {
       if (dataField.getColumn().hasEncoding(Encoding.DIRECT_DICTIONARY) &&
@@ -69,10 +71,11 @@ public class FieldEncoderFactory {
       } else if (dataField.getColumn().hasEncoding(Encoding.DICTIONARY) &&
           !dataField.getColumn().isComplex()) {
         return new DictionaryFieldConverterImpl(dataField, cache, carbonTableIdentifier,
nullFormat,
-            index);
+            index, client, useOnePass, storePath);
       } else if (dataField.getColumn().isComplex()) {
         return new ComplexFieldConverterImpl(
-            createComplexType(dataField, cache, carbonTableIdentifier), index);
+            createComplexType(dataField, cache, carbonTableIdentifier,
+                    client, useOnePass, storePath), index);
       } else {
         return new NonDictionaryFieldConverterImpl(dataField, nullFormat, index);
       }
@@ -86,9 +89,10 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(DataField dataField,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+      CarbonTableIdentifier carbonTableIdentifier,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     return createComplexType(dataField.getColumn(), dataField.getColumn().getColName(), cache,
-        carbonTableIdentifier);
+        carbonTableIdentifier, client, useOnePass, storePath);
   }
 
   /**
@@ -98,7 +102,8 @@ public class FieldEncoderFactory {
    */
   private static GenericDataType createComplexType(CarbonColumn carbonColumn, String parentName,
       Cache<DictionaryColumnUniqueIdentifier, Dictionary> cache,
-      CarbonTableIdentifier carbonTableIdentifier) {
+      CarbonTableIdentifier carbonTableIdentifier,
+      DictionaryClient client, Boolean useOnePass, String storePath) {
     switch (carbonColumn.getDataType()) {
       case ARRAY:
         List<CarbonDimension> listOfChildDimensions =
@@ -108,7 +113,7 @@ public class FieldEncoderFactory {
             new ArrayDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : listOfChildDimensions) {
           arrayDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(),
cache,
-              carbonTableIdentifier));
+              carbonTableIdentifier, client, useOnePass, storePath));
         }
         return arrayDataType;
       case STRUCT:
@@ -119,7 +124,7 @@ public class FieldEncoderFactory {
             new StructDataType(carbonColumn.getColName(), parentName, carbonColumn.getColumnId());
         for (CarbonDimension dimension : dimensions) {
           structDataType.addChildren(createComplexType(dimension, carbonColumn.getColName(),
cache,
-              carbonTableIdentifier));
+              carbonTableIdentifier, client, useOnePass, storePath));
         }
         return structDataType;
       case MAP:
@@ -127,7 +132,7 @@ public class FieldEncoderFactory {
       default:
         return new PrimitiveDataType(carbonColumn.getColName(), parentName,
             carbonColumn.getColumnId(), (CarbonDimension) carbonColumn, cache,
-            carbonTableIdentifier);
+            carbonTableIdentifier, client, useOnePass, storePath);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
index e0a7ceb..e46b86a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/converter/impl/RowConverterImpl.java
@@ -20,12 +20,14 @@ package org.apache.carbondata.processing.newflow.converter.impl;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.concurrent.*;
 
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.apache.carbondata.processing.newflow.CarbonDataLoadConfiguration;
 import org.apache.carbondata.processing.newflow.DataField;
@@ -53,6 +55,10 @@ public class RowConverterImpl implements RowConverter {
 
   private BadRecordLogHolder logHolder;
 
+  private DictionaryClient dictClient;
+
+  private ExecutorService executorService;
+
   public RowConverterImpl(DataField[] fields, CarbonDataLoadConfiguration configuration,
       BadRecordsLogger badRecordLogger) {
     this.fields = fields;
@@ -73,10 +79,41 @@ public class RowConverterImpl implements RowConverter {
 
     long lruCacheStartTime = System.currentTimeMillis();
 
+    // for one pass load, start the dictionary client
+    if (configuration.getUseOnePass()) {
+      executorService = Executors.newFixedThreadPool(1);
+      Future<DictionaryClient> result = executorService.submit(new Callable<DictionaryClient>()
{
+        @Override
+        public DictionaryClient call() throws Exception {
+          Thread.currentThread().setName("Dictionary client");
+          DictionaryClient dictionaryClient = new DictionaryClient();
+          dictionaryClient.startClient(configuration.getDictionaryServerHost(),
+                  configuration.getDictionaryServerPort());
+          return dictionaryClient;
+        }
+      });
+
+      try {
+        // wait for client initialization finished, or will raise null pointer exception
+        Thread.sleep(1000);
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      }
+
+      try {
+        dictClient = result.get();
+      } catch (InterruptedException e) {
+        e.printStackTrace();
+      } catch (ExecutionException e) {
+        e.printStackTrace();
+      }
+    }
     for (int i = 0; i < fields.length; i++) {
       FieldConverter fieldConverter = FieldEncoderFactory.getInstance()
           .createFieldEncoder(fields[i], cache,
-              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat);
+              configuration.getTableIdentifier().getCarbonTableIdentifier(), i, nullFormat,
+              dictClient, configuration.getUseOnePass(),
+              configuration.getTableIdentifier().getStorePath());
       fieldConverterList.add(fieldConverter);
     }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
@@ -116,6 +153,12 @@ public class RowConverterImpl implements RowConverter {
     }
     // Set the cardinality to configuration, it will be used by further step for mdk key.
     configuration.setDataLoadProperty(DataLoadProcessorConstants.DIMENSION_LENGTHS, cardinality);
+
+    // close dictionary client when finish write
+    if (configuration.getUseOnePass()) {
+      dictClient.shutDown();
+      executorService.shutdownNow();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/05b26549/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
new file mode 100644
index 0000000..12c6e3b
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/dictionary/DictionaryServerClientDictionary.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.processing.newflow.dictionary;
+
+import java.util.Map;
+
+import org.apache.carbondata.core.cache.dictionary.Dictionary;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.devapi.BiDictionary;
+import org.apache.carbondata.core.devapi.DictionaryGenerationException;
+import org.apache.carbondata.core.dictionary.client.DictionaryClient;
+import org.apache.carbondata.core.dictionary.generator.key.DictionaryKey;
+
+/**
+ * Dictionary implementation along with dictionary server client to get new dictionary values
+ */
+public class DictionaryServerClientDictionary implements BiDictionary<Integer, Object>
{
+
+  private Dictionary dictionary;
+
+  private DictionaryClient client;
+
+  private Map<Object, Integer> localCache;
+
+  private DictionaryKey dictionaryKey;
+
+  private int base;
+
+  private Object lock = new Object();
+
+  public DictionaryServerClientDictionary(Dictionary dictionary, DictionaryClient client,
+      DictionaryKey key, Map<Object, Integer> localCache) {
+    this.dictionary = dictionary;
+    this.client = client;
+    this.dictionaryKey = key;
+    this.localCache = localCache;
+    this.base = (dictionary == null ? 0 : dictionary.getDictionaryChunks().getSize() - 1);
+  }
+
+  @Override public Integer getOrGenerateKey(Object value) throws DictionaryGenerationException
{
+    Integer key = getKey(value);
+    if (key == null) {
+      synchronized (lock) {
+        dictionaryKey.setData(value);
+        dictionaryKey.setThreadNo(Thread.currentThread().getId() + "");
+        DictionaryKey dictionaryValue = client.getDictionary(dictionaryKey);
+        key = (Integer) dictionaryValue.getData();
+        localCache.put(value, key);
+      }
+      return key + base;
+    }
+    return key;
+  }
+
+  @Override public Integer getKey(Object value) {
+    Integer key = -1;
+    if (dictionary != null) {
+      key = dictionary.getSurrogateKey(value.toString());
+    }
+    if (key == CarbonCommonConstants.INVALID_SURROGATE_KEY) {
+      key = localCache.get(value);
+      if (key != null) {
+        return key + base;
+      }
+    }
+    return key;
+  }
+
+  @Override public Object getValue(Integer key) {
+    throw new UnsupportedOperationException("Not supported here");
+  }
+
+  @Override public int size() {
+    dictionaryKey.setType("SIZE");
+    int size = (int) client.getDictionary(dictionaryKey).getData()
+            + base;
+    return size;
+  }
+}


Mime
View raw message