carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [04/50] [abbrv] incubator-carbondata git commit: adapt data with header for all dictionary
Date Thu, 22 Sep 2016 05:36:02 GMT
adapt data with header for all dictionary

use DEFAULT_CHARSET

remove listFiles


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/51e4c11e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/51e4c11e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/51e4c11e

Branch: refs/heads/branch-0.1
Commit: 51e4c11e40611494a456a806a86d57d9348b5b4f
Parents: 854b75e
Author: foryou2030 <foryou2030@126.com>
Authored: Mon Aug 22 18:00:00 2016 +0800
Committer: ravipesala <ravi.pesala@gmail.com>
Committed: Thu Sep 22 09:29:22 2016 +0530

----------------------------------------------------------------------
 .../apache/carbondata/core/util/CarbonUtil.java | 30 ++++++++++++
 .../src/main/resources/datawithoutheader.csv    | 10 ----
 .../examples/AllDictionaryExample.scala         | 12 ++---
 .../examples/util/AllDictionaryUtil.scala       |  2 +-
 .../spark/util/GlobalDictionaryUtil.scala       | 50 +++++++++++++-------
 .../processing/csvload/GraphExecutionUtil.java  | 29 +-----------
 6 files changed, 71 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 2460f6e..df538e0 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -20,14 +20,17 @@
 
 package org.apache.carbondata.core.util;
 
+import java.io.BufferedReader;
 import java.io.Closeable;
 import java.io.DataInputStream;
 import java.io.File;
 import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStreamReader;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.nio.charset.Charset;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -1385,5 +1388,32 @@ public final class CarbonUtil {
     }
     return dictionaryOneChunkSize;
   }
+
+  /**
+   * @param csvFilePath
+   * @return
+   */
+  public static String readHeader(String csvFilePath) {
+
+    DataInputStream fileReader = null;
+    BufferedReader bufferedReader = null;
+    String readLine = null;
+
+    try {
+      fileReader =
+          FileFactory.getDataInputStream(csvFilePath, FileFactory.getFileType(csvFilePath));
+      bufferedReader = new BufferedReader(new InputStreamReader(fileReader,
+              Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+      readLine = bufferedReader.readLine();
+
+    } catch (FileNotFoundException e) {
+      LOGGER.error(e, "CSV Input File not found  " + e.getMessage());
+    } catch (IOException e) {
+      LOGGER.error(e, "Not able to read CSV input File  " + e.getMessage());
+    } finally {
+      CarbonUtil.closeStreams(fileReader, bufferedReader);
+    }
+    return readLine;
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/resources/datawithoutheader.csv
----------------------------------------------------------------------
diff --git a/examples/src/main/resources/datawithoutheader.csv b/examples/src/main/resources/datawithoutheader.csv
deleted file mode 100644
index df2b945..0000000
--- a/examples/src/main/resources/datawithoutheader.csv
+++ /dev/null
@@ -1,10 +0,0 @@
-1,2015/7/23,china,aaa1,phone197,ASD69643,15000
-2,2015/7/24,china,aaa2,phone756,ASD42892,15001
-3,2015/7/25,china,aaa3,phone1904,ASD37014,15002
-4,2015/7/26,china,aaa4,phone2435,ASD66902,15003
-5,2015/7/27,china,aaa5,phone2441,ASD90633,15004
-6,2015/7/28,china,aaa6,phone294,ASD59961,15005
-7,2015/7/29,china,aaa7,phone610,ASD14875,15006
-8,2015/7/30,china,aaa8,phone1848,ASD57308,15007
-9,2015/7/18,china,aaa9,phone706,ASD86717,15008
-10,2015/7/19,usa,aaa10,phone685,ASD30505,15009

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
index 195c7a6..a2b72e3 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/AllDictionaryExample.scala
@@ -23,11 +23,10 @@ import org.apache.carbondata.examples.util.{AllDictionaryUtil, InitForExamples}
 object AllDictionaryExample {
   def main(args: Array[String]) {
     val cc = InitForExamples.createCarbonContext("CarbonExample")
-    val testData = InitForExamples.currentPath + "/src/main/resources/datawithoutheader.csv"
-    val csvHeader = "id,date,country,name,phonetype,serialname,salary"
+    val testData = InitForExamples.currentPath + "/src/main/resources/data.csv"
+    val csvHeader = "ID,date,country,name,phonetype,serialname,salary"
     val dictCol = "|date|country|name|phonetype|serialname|"
-    val allDictFile = InitForExamples.currentPath +
-      "/src/main/resources/datawithoutheader.dictionary"
+    val allDictFile = InitForExamples.currentPath + "/src/main/resources/data.dictionary"
     // extract all dictionary files from source data
     AllDictionaryUtil.extractDictionary(cc.sparkContext,
       testData, allDictFile, csvHeader, dictCol)
@@ -41,13 +40,12 @@ object AllDictionaryExample {
            CREATE TABLE IF NOT EXISTS t3
            (ID Int, date Timestamp, country String,
            name String, phonetype String, serialname String, salary Int)
-           STORED BY 'org.apache.carbondata.format'
+           STORED BY 'carbondata'
            """)
 
     cc.sql(s"""
            LOAD DATA LOCAL INPATH '$testData' into table t3
-           options('FILEHEADER'='id,date,country,name,phonetype,serialname,salary',
-           'ALL_DICTIONARY_PATH'='$allDictFile')
+           options('ALL_DICTIONARY_PATH'='$allDictFile')
            """)
 
     cc.sql("""

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
b/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
index 3e8df71..bd625f3 100644
--- a/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
+++ b/examples/src/main/scala/org/apache/carbondata/examples/util/AllDictionaryUtil.scala
@@ -35,7 +35,7 @@ object AllDictionaryUtil extends Logging{
     val fileHeaderArr = fileHeader.split(",")
     val isDictCol = new Array[Boolean](fileHeaderArr.length)
     for (i <- 0 until fileHeaderArr.length) {
-      if (dictCol.contains("|" + fileHeaderArr(i) + "|")) {
+      if (dictCol.contains("|" + fileHeaderArr(i).toLowerCase() + "|")) {
         isDictCol(i) = true
       } else {
         isDictCol(i) = false

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
index ddb596b..b96a826 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/util/GlobalDictionaryUtil.scala
@@ -43,7 +43,7 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.reader.CarbonDictionaryReader
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
 import org.apache.carbondata.core.writer.CarbonDictionaryWriter
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.spark.CarbonSparkFactory
@@ -583,8 +583,8 @@ object GlobalDictionaryUtil extends Logging {
    * @return allDictionaryRdd
    */
   private def readAllDictionaryFiles(sqlContext: SQLContext,
-                                       csvFileColumns: Array[String],
-                                       requireColumns: Array[String],
+                                     csvFileColumns: Array[String],
+                                     requireColumns: Array[String],
                                      allDictionaryPath: String) = {
     var allDictionaryRdd: RDD[(String, Iterable[String])] = null
     try {
@@ -651,6 +651,31 @@ object GlobalDictionaryUtil extends Logging {
   }
 
   /**
+   * get file headers from fact file
+   *
+   * @param carbonLoadModel
+   * @return headers
+   */
+  private def getHeaderFormFactFile(carbonLoadModel: CarbonLoadModel): Array[String] = {
+    var headers: Array[String] = null
+    val factFile: String = carbonLoadModel.getFactFilePath.split(",")(0)
+    val readLine = CarbonUtil.readHeader(factFile)
+
+    if (null != readLine) {
+      val delimiter = if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter)) {
+        "" + CSVWriter.DEFAULT_SEPARATOR
+      } else {
+        carbonLoadModel.getCsvDelimiter
+      }
+      headers = readLine.toLowerCase().split(delimiter);
+    } else {
+      logError("Not found file header! Please set fileheader")
+      throw new IOException("Failed to get file header")
+    }
+    headers
+  }
+
+  /**
    * generate global dictionary with SQLContext and CarbonLoadModel
    *
    * @param sqlContext  sql context
@@ -736,27 +761,20 @@ object GlobalDictionaryUtil extends Logging {
         logInfo("Generate global dictionary from all dictionary files!")
         val isNonempty = validateAllDictionaryPath(allDictionaryPath)
         if(isNonempty) {
-          // fill the map[columnIndex -> columnName]
-          var fileHeaders : Array[String] = null
-          if(!StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
-            val splitColumns = carbonLoadModel.getCsvHeader.split("" + CSVWriter.DEFAULT_SEPARATOR)
-            val fileHeadersArr = new ArrayBuffer[String]()
-            for(i <- 0 until splitColumns.length) {
-              fileHeadersArr += splitColumns(i).trim.toLowerCase()
-            }
-            fileHeaders = fileHeadersArr.toArray
+          var headers = if (StringUtils.isEmpty(carbonLoadModel.getCsvHeader)) {
+            getHeaderFormFactFile(carbonLoadModel)
           } else {
-            logError("Not found file header! Please set fileheader")
-            throw new IOException("Failed to get file header")
+            carbonLoadModel.getCsvHeader.toLowerCase.split("" + CSVWriter.DEFAULT_SEPARATOR)
           }
+          headers = headers.map(headerName => headerName.trim)
           // prune columns according to the CSV file header, dimension columns
           val (requireDimension, requireColumnNames) =
-            pruneDimensions(dimensions, fileHeaders, fileHeaders)
+            pruneDimensions(dimensions, headers, headers)
           if (requireDimension.nonEmpty) {
             val model = createDictionaryLoadModel(carbonLoadModel, table, requireDimension,
               hdfsLocation, dictfolderPath, false)
             // read local dictionary file, and group by key
-            val allDictionaryRdd = readAllDictionaryFiles(sqlContext, fileHeaders,
+            val allDictionaryRdd = readAllDictionaryFiles(sqlContext, headers,
               requireColumnNames, allDictionaryPath)
             // read exist dictionary and combine
             val inputRDD = new CarbonAllDictionaryCombineRDD(allDictionaryRdd, model)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/51e4c11e/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
index 2a35002..6d82bcd 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/csvload/GraphExecutionUtil.java
@@ -254,40 +254,13 @@ public final class GraphExecutionUtil {
 
   /**
    * @param csvFilePath
-   * @return
-   */
-  private static String readCSVFile(String csvFilePath) {
-
-    DataInputStream fileReader = null;
-    BufferedReader bufferedReader = null;
-    String readLine = null;
-
-    try {
-      fileReader =
-          FileFactory.getDataInputStream(csvFilePath, FileFactory.getFileType(csvFilePath));
-      bufferedReader =
-          new BufferedReader(new InputStreamReader(fileReader, Charset.defaultCharset()));
-      readLine = bufferedReader.readLine();
-
-    } catch (FileNotFoundException e) {
-      LOGGER.error(e, "CSV Input File not found  " + e.getMessage());
-    } catch (IOException e) {
-      LOGGER.error(e, "Not able to read CSV input File  " + e.getMessage());
-    } finally {
-      CarbonUtil.closeStreams(fileReader, bufferedReader);
-    }
-    return readLine;
-  }
-
-  /**
-   * @param csvFilePath
    * @param columnNames
    * @return
    */
   public static boolean checkCSVAndRequestedTableColumns(String csvFilePath, String[] columnNames,
       String delimiter) {
 
-    String readLine = readCSVFile(csvFilePath);
+    String readLine = CarbonUtil.readHeader(csvFilePath);
 
     if (null != readLine) {
       delimiter = CarbonUtil.delimiterConverter(delimiter);


Mime
View raw message