carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [16/50] [abbrv] incubator-carbondata git commit: fix comments and rebase 716 (#798)
Date Wed, 20 Jul 2016 10:13:44 GMT
fix comments and rebase 716 (#798)



Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/6a092bbe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/6a092bbe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/6a092bbe

Branch: refs/heads/master
Commit: 6a092bbea5550665050e70c97b9790c5de48664d
Parents: afa5d5d
Author: Zhangshunyu <zhangshunyu@huawei.com>
Authored: Sat Jul 16 00:58:59 2016 +0800
Committer: Ravindra Pesala <ravi.pesala@gmail.com>
Committed: Fri Jul 15 22:28:59 2016 +0530

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |   8 +
 .../core/util/CarbonLoadStatisticsDummy.java    | 114 +++++
 .../core/util/CarbonLoadStatisticsImpl.java     | 426 +++++++++++++++++++
 .../core/util/CarbonTimeStatisticsFactory.java  |  52 +++
 .../carbondata/core/util/LoadStatistics.java    |  67 +++
 .../spark/rdd/CarbonDataLoadRDD.scala           |  15 +-
 .../spark/rdd/CarbonGlobalDictionaryRDD.scala   |  16 +-
 .../processing/csvreaderstep/CsvInput.java      |   7 +
 .../processing/csvreaderstep/CsvInputMeta.java  |  18 +-
 .../graphgenerator/GraphGenerator.java          |   1 +
 .../processing/mdkeygen/MDKeyGenStep.java       |  14 +-
 .../sortdata/IntermediateFileMerger.java        |   6 +
 .../sortandgroupby/sortdata/SortDataRows.java   |   1 -
 .../sortdatastep/SortKeyStep.java               |   7 +
 .../csvbased/CarbonCSVBasedSeqGenStep.java      |   9 +-
 .../FileStoreSurrogateKeyGenForCSV.java         |   4 +
 16 files changed, 753 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
index 6fec471..5ca24b4 100644
--- a/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
+++ b/core/src/main/java/org/carbondata/core/constants/CarbonCommonConstants.java
@@ -415,6 +415,14 @@ public final class CarbonCommonConstants {
    */
   public static final String AGGREAGATE_COLUMNAR_KEY_BLOCK_DEFAULTVALUE = "true";
   /**
+   * TIME_STAT_UTIL_TYPE
+   */
+  public static final String ENABLE_DATA_LOADING_STATISTICS = "enable.data.loading.statistics";
+  /**
+   * TIME_STAT_UTIL_TYPE_DEFAULT
+   */
+  public static final String ENABLE_DATA_LOADING_STATISTICS_DEFAULT = "false";
+  /**
    * IS_INT_BASED_INDEXER
    */
   public static final String HIGH_CARDINALITY_VALUE = "high.cardinality.value";

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
new file mode 100644
index 0000000..1561efa
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsDummy.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.core.util;
+
+public class CarbonLoadStatisticsDummy implements LoadStatistics {
+  private CarbonLoadStatisticsDummy() {
+
+  }
+
+  private static CarbonLoadStatisticsDummy carbonLoadStatisticsDummyInstance =
+      new CarbonLoadStatisticsDummy();
+
+  public static CarbonLoadStatisticsDummy getInstance() {
+    return carbonLoadStatisticsDummyInstance;
+  }
+
+  @Override
+  public void  initPartitonInfo(String PartitionId) {
+
+  }
+
+  @Override
+  public void recordGlobalDicGenTotalTime(Long glblDicTimePoint) {
+
+  }
+
+  @Override
+  public void recordLoadCsvfilesToDfTime() {
+
+  }
+
+  @Override
+  public void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart) {
+
+  }
+
+  @Override
+  public void recordDicWriteFileMaxTime(Long dicWriteFileTimePart) {
+
+  }
+
+  @Override
+  public void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint) {
+
+  }
+
+  @Override
+  public void recordCsvInputStepTime(String partitionID, Long csvInputStepTimePoint) {
+
+  }
+
+  @Override
+  public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+
+  }
+
+  @Override
+  public void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint) {
+
+  }
+
+  @Override
+  public void recordSortRowsStepTotalTime(String partitionID, Long sortRowsStepTotalTimePoint) {
+
+  }
+
+  @Override
+  public void recordMdkGenerateTotalTime(String partitionID, Long mdkGenerateTotalTimePoint) {
+
+  }
+
+  @Override
+  public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint) {
+
+  }
+
+  @Override
+  public void recordTotalRecords(long totalRecords) {
+
+  }
+
+  @Override
+  public void recordHostBlockMap(String host, Integer numBlocks) {
+
+  }
+
+  @Override
+  public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+
+  }
+
+  @Override
+  public void printStatisticsInfo(String partitionID) {
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
new file mode 100644
index 0000000..e55179e
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/util/CarbonLoadStatisticsImpl.java
@@ -0,0 +1,426 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.core.util;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.carbondata.common.logging.LogService;
+import org.carbondata.common.logging.LogServiceFactory;
+
+/**
+ * A util which provide methods used to record time information druing data loading.
+ */
+public class CarbonLoadStatisticsImpl implements LoadStatistics {
+  private CarbonLoadStatisticsImpl() {
+
+  }
+
+  private static CarbonLoadStatisticsImpl carbonLoadStatisticsImplInstance =
+          new CarbonLoadStatisticsImpl();
+
+  public static CarbonLoadStatisticsImpl getInstance() {
+    return carbonLoadStatisticsImplInstance;
+  }
+
+  private final LogService LOGGER =
+          LogServiceFactory.getLogService(CarbonLoadStatisticsImpl.class.getName());
+
+  /*
+   *We only care about the earliest start time(EST) and the latest end time(LET) of different
+   *threads, who does the same thing, LET - EST is the cost time of doing one thing using
+   *multiple thread.
+ */
+  private long loadCsvfilesToDfStartTime = 0;
+  private long loadCsvfilesToDfCostTime = 0;
+  private long dicShuffleAndWriteFileTotalStartTime = 0;
+  private long dicShuffleAndWriteFileTotalCostTime = 0;
+
+  //Due to thread thread blocking in each task, we only record the max
+  //csvlDicShuffle Time of each single thread
+  private long csvlDicShuffleCostTime = 0;
+  //Due to thread thread blocking in each task, we only record the max
+  //dicWriteFile Time of each single thread
+  private long dicWriteFileCostTime = 0;
+
+  //LRU cache load one time
+  private double lruCacheLoadTime = 0;
+
+  //Generate surrogate keys total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parDictionaryValuesTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parCsvInputStepTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parGeneratingDictionaryValuesTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //Sort rows step total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parSortRowsStepTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //MDK generate total time for each partition:
+  private ConcurrentHashMap<String, Long[]> parMdkGenerateTotalTimeMap =
+          new ConcurrentHashMap<String, Long[]>();
+  private ConcurrentHashMap<String, Long[]> parDictionaryValue2MdkAdd2FileTime =
+          new ConcurrentHashMap<String, Long[]>();
+
+  //Node block process information
+  private ConcurrentHashMap<String, Integer> hostBlockMap =
+          new ConcurrentHashMap<String, Integer>();
+
+  //Partition block process information
+  private ConcurrentHashMap<String, Integer> partitionBlockMap =
+          new ConcurrentHashMap<String, Integer>();
+
+  private long totalRecords = 0;
+  private double totalTime = 0;
+
+  @Override
+  public void  initPartitonInfo(String PartitionId) {
+    parDictionaryValuesTotalTimeMap.put(PartitionId, new Long[2]);
+    parCsvInputStepTimeMap.put(PartitionId, new Long[2]);
+    parSortRowsStepTotalTimeMap.put(PartitionId, new Long[2]);
+    parGeneratingDictionaryValuesTimeMap.put(PartitionId, new Long[2]);
+    parMdkGenerateTotalTimeMap.put(PartitionId, new Long[2]);
+    parDictionaryValue2MdkAdd2FileTime.put(PartitionId, new Long[2]);
+  }
+
+  //Record the time
+  public void recordGlobalDicGenTotalTime(Long glblDicTimePoint) {
+    if (0 == dicShuffleAndWriteFileTotalStartTime) {
+      dicShuffleAndWriteFileTotalStartTime = glblDicTimePoint;
+    }
+    if (glblDicTimePoint - dicShuffleAndWriteFileTotalStartTime >
+            dicShuffleAndWriteFileTotalCostTime) {
+      dicShuffleAndWriteFileTotalCostTime = glblDicTimePoint - dicShuffleAndWriteFileTotalStartTime;
+    }
+  }
+
+  public void recordLoadCsvfilesToDfTime() {
+    Long loadCsvfilesToDfTimePoint = System.currentTimeMillis();
+    if (0 == loadCsvfilesToDfStartTime) {
+      loadCsvfilesToDfStartTime = loadCsvfilesToDfTimePoint;
+    }
+    if (loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime > loadCsvfilesToDfCostTime) {
+      loadCsvfilesToDfCostTime = loadCsvfilesToDfTimePoint - loadCsvfilesToDfStartTime;
+    }
+  }
+
+  public void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart) {
+    if (csvlDicShuffleTimePart > csvlDicShuffleCostTime) {
+      csvlDicShuffleCostTime = csvlDicShuffleTimePart;
+    }
+  }
+
+  public void recordDicWriteFileMaxTime(Long dicWriteFileTimePart) {
+    if (dicWriteFileTimePart > dicWriteFileCostTime) {
+      dicWriteFileCostTime = dicWriteFileTimePart;
+    }
+  }
+
+
+  public double getLruCacheLoadTime() {
+    return lruCacheLoadTime;
+  }
+
+  public void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint) {
+    if (null != parDictionaryValuesTotalTimeMap.get(partitionID)) {
+      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[0]) {
+        parDictionaryValuesTotalTimeMap.get(partitionID)[0] = dictionaryValuesTotalTimeTimePoint;
+      }
+      if (null == parDictionaryValuesTotalTimeMap.get(partitionID)[1] ||
+          dictionaryValuesTotalTimeTimePoint - parDictionaryValuesTotalTimeMap.get(partitionID)[0] >
+              parDictionaryValuesTotalTimeMap.get(partitionID)[1]) {
+        parDictionaryValuesTotalTimeMap.get(partitionID)[1] = dictionaryValuesTotalTimeTimePoint -
+            parDictionaryValuesTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordCsvInputStepTime(String partitionID,
+      Long csvInputStepTimePoint) {
+    if (null != parCsvInputStepTimeMap.get(partitionID)) {
+      if (null == parCsvInputStepTimeMap.get(partitionID)[0]) {
+        parCsvInputStepTimeMap.get(partitionID)[0] = csvInputStepTimePoint;
+      }
+      if (null == parCsvInputStepTimeMap.get(partitionID)[1] ||
+              csvInputStepTimePoint - parCsvInputStepTimeMap.get(partitionID)[0] >
+                      parCsvInputStepTimeMap.get(partitionID)[1]) {
+        parCsvInputStepTimeMap.get(partitionID)[1] = csvInputStepTimePoint -
+                parCsvInputStepTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordLruCacheLoadTime(double lruCacheLoadTime) {
+    this.lruCacheLoadTime = lruCacheLoadTime;
+  }
+
+  public void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint) {
+    if (null != parGeneratingDictionaryValuesTimeMap.get(partitionID)) {
+      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[0]) {
+        parGeneratingDictionaryValuesTimeMap.get(partitionID)[0] =
+                generatingDictionaryValuesTimePoint;
+      }
+      if (null == parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] ||
+              generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+                      .get(partitionID)[0] > parGeneratingDictionaryValuesTimeMap
+                      .get(partitionID)[1]) {
+        parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] =
+                generatingDictionaryValuesTimePoint - parGeneratingDictionaryValuesTimeMap
+                        .get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordSortRowsStepTotalTime(String partitionID,
+                                          Long sortRowsStepTotalTimePoint) {
+    if (null != parSortRowsStepTotalTimeMap.get(partitionID)) {
+      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[0]) {
+        parSortRowsStepTotalTimeMap.get(partitionID)[0] = sortRowsStepTotalTimePoint;
+      }
+      if (null == parSortRowsStepTotalTimeMap.get(partitionID)[1] ||
+              sortRowsStepTotalTimePoint - parSortRowsStepTotalTimeMap.get(partitionID)[0] >
+                      parSortRowsStepTotalTimeMap.get(partitionID)[1]) {
+        parSortRowsStepTotalTimeMap.get(partitionID)[1] = sortRowsStepTotalTimePoint -
+                parSortRowsStepTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordMdkGenerateTotalTime(String partitionID,
+                                         Long mdkGenerateTotalTimePoint) {
+    if (null != parMdkGenerateTotalTimeMap.get(partitionID)) {
+      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[0]) {
+        parMdkGenerateTotalTimeMap.get(partitionID)[0] = mdkGenerateTotalTimePoint;
+      }
+      if (null == parMdkGenerateTotalTimeMap.get(partitionID)[1] ||
+              mdkGenerateTotalTimePoint - parMdkGenerateTotalTimeMap.get(partitionID)[0] >
+                      parMdkGenerateTotalTimeMap.get(partitionID)[1]) {
+        parMdkGenerateTotalTimeMap.get(partitionID)[1] = mdkGenerateTotalTimePoint -
+                parMdkGenerateTotalTimeMap.get(partitionID)[0];
+      }
+    }
+  }
+
+  public void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint) {
+    if (null != parDictionaryValue2MdkAdd2FileTime.get(partitionID)) {
+      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0]) {
+        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[0] =
+                dictionaryValue2MdkAdd2FileTimePoint;
+      }
+      if (null == parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] ||
+              dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+                      .get(partitionID)[0] > parDictionaryValue2MdkAdd2FileTime
+                      .get(partitionID)[1]) {
+        parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] =
+                dictionaryValue2MdkAdd2FileTimePoint - parDictionaryValue2MdkAdd2FileTime
+                        .get(partitionID)[0];
+      }
+    }
+  }
+
+  //Record the node blocks information map
+  public void recordHostBlockMap(String host, Integer numBlocks) {
+    hostBlockMap.put(host, numBlocks);
+  }
+
+  //Record the partition blocks information map
+  public void recordPartitionBlockMap(String partitionID, Integer numBlocks) {
+    partitionBlockMap.put(partitionID, numBlocks);
+  }
+
+  public void recordTotalRecords(long totalRecords) {
+    this.totalRecords = totalRecords;
+  }
+
+  //Get the time
+  private double getDicShuffleAndWriteFileTotalTime() {
+    return dicShuffleAndWriteFileTotalCostTime / 1000.0;
+  }
+
+  private double getLoadCsvfilesToDfTime() {
+    return loadCsvfilesToDfCostTime / 1000.0;
+  }
+
+  private double getCsvlDicShuffleMaxTime() {
+    return csvlDicShuffleCostTime / 1000.0;
+  }
+
+  private double getDicWriteFileMaxTime() {
+    return dicWriteFileCostTime / 1000.0;
+  }
+
+  private double getDictionaryValuesTotalTime(String partitionID) {
+    return parDictionaryValuesTotalTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getCsvInputStepTime(String partitionID) {
+    return parCsvInputStepTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getGeneratingDictionaryValuesTime(String partitionID) {
+    return parGeneratingDictionaryValuesTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getSortRowsStepTotalTime(String partitionID) {
+    return parSortRowsStepTotalTimeMap.get(partitionID)[1] / 1000.0;
+  }
+
+  private double getDictionaryValue2MdkAdd2FileTime(String partitionID) {
+    return parDictionaryValue2MdkAdd2FileTime.get(partitionID)[1] / 1000.0;
+  }
+
+  //Get the hostBlockMap
+  private ConcurrentHashMap<String, Integer> getHostBlockMap() {
+    return hostBlockMap;
+  }
+
+  //Get the partitionBlockMap
+  private ConcurrentHashMap<String, Integer> getPartitionBlockMap() {
+    return partitionBlockMap;
+  }
+
+  //Speed calculate
+  private long getTotalRecords() {
+    return this.totalRecords;
+  }
+
+  private int getLoadSpeed() {
+    return (int)(totalRecords / totalTime);
+  }
+
+  private int getGenDicSpeed() {
+    return (int)(totalRecords / getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime());
+  }
+
+  private int getReadCSVSpeed(String partitionID) {
+    return (int)(totalRecords / getCsvInputStepTime(partitionID));
+  }
+
+  private int getGenSurKeySpeed(String partitionID) {
+    return (int)(totalRecords / getGeneratingDictionaryValuesTime(partitionID));
+  }
+
+  private int getSortKeySpeed(String partitionID) {
+    return (int)(totalRecords / getSortRowsStepTotalTime(partitionID));
+  }
+
+  private int getMDKSpeed(String partitionID) {
+    return (int)(totalRecords / getDictionaryValue2MdkAdd2FileTime(partitionID));
+  }
+
+  private double getTotalTime(String partitionID) {
+    this.totalTime = getLoadCsvfilesToDfTime() + getDicShuffleAndWriteFileTotalTime() +
+        getLruCacheLoadTime() + getDictionaryValuesTotalTime(partitionID) +
+        getDictionaryValue2MdkAdd2FileTime(partitionID);
+    return totalTime;
+  }
+
+  //Print the statistics information
+  private void printDicGenStatisticsInfo() {
+    double loadCsvfilesToDfTime = getLoadCsvfilesToDfTime();
+    LOGGER.audit("STAGE 1 ->Load csv to DataFrame and generate" +
+            " block distinct values: " + loadCsvfilesToDfTime + "(s)");
+    double dicShuffleAndWriteFileTotalTime = getDicShuffleAndWriteFileTotalTime();
+    LOGGER.audit("STAGE 2 ->Global dict shuffle and write dict file: " +
+            + dicShuffleAndWriteFileTotalTime + "(s)");
+    double csvShuffleMaxTime = getCsvlDicShuffleMaxTime();
+    LOGGER.audit("STAGE 2.1 ->  |_maximum distinct column shuffle time: "
+            + csvShuffleMaxTime + "(s)");
+    double dicWriteFileMaxTime = getDicWriteFileMaxTime();
+    LOGGER.audit("STAGE 2.2 ->  |_maximum distinct column write dict file time: "
+            + dicWriteFileMaxTime + "(s)");
+  }
+
+  private void printLruCacheLoadTimeInfo() {
+    LOGGER.audit("STAGE 3 ->LRU cache load: " + getLruCacheLoadTime() + "(s)");
+  }
+
+  private void printDictionaryValuesGenStatisticsInfo(String partitionID) {
+    double dictionaryValuesTotalTime = getDictionaryValuesTotalTime(partitionID);
+    LOGGER.audit("STAGE 4 ->Total cost of gen dictionary values, sort and write to temp files: "
+            + dictionaryValuesTotalTime + "(s)");
+    double csvInputStepTime = getCsvInputStepTime(partitionID);
+    double generatingDictionaryValuesTime = getGeneratingDictionaryValuesTime(partitionID);
+    LOGGER.audit("STAGE 4.1 ->  |_read csv file: " + csvInputStepTime + "(s)");
+    LOGGER.audit("STAGE 4.2 ->  |_transform to surrogate key: "
+            + generatingDictionaryValuesTime + "(s)");
+  }
+
+  private void printSortRowsStepStatisticsInfo(String partitionID) {
+    double sortRowsStepTotalTime = getSortRowsStepTotalTime(partitionID);
+    LOGGER.audit("STAGE 4.3 ->  |_sort rows and write to temp file: "
+            + sortRowsStepTotalTime + "(s)");
+  }
+
+  private void printGenMdkStatisticsInfo(String partitionID) {
+    double dictionaryValue2MdkAdd2FileTime = getDictionaryValue2MdkAdd2FileTime(partitionID);
+    LOGGER.audit("STAGE 5 ->Transform to MDK, compress and write fact files: "
+            + dictionaryValue2MdkAdd2FileTime + "(s)");
+  }
+
+  //Print the node blocks information
+  private void printHostBlockMapInfo() {
+    LOGGER.audit("========== BLOCK_INFO ==========");
+    if (getHostBlockMap().size() > 0) {
+      for (String host: getHostBlockMap().keySet()) {
+        LOGGER.audit("BLOCK_INFO ->Node host: " + host);
+        LOGGER.audit("BLOCK_INFO ->The block count in this node: " + getHostBlockMap().get(host));
+      }
+    } else if (getPartitionBlockMap().size() > 0) {
+      for (String parID: getPartitionBlockMap().keySet()) {
+        LOGGER.audit("BLOCK_INFO ->Partition ID: " + parID);
+        LOGGER.audit("BLOCK_INFO ->The block count in this partition: " +
+                getPartitionBlockMap().get(parID));
+      }
+    }
+  }
+
+  //Print the speed information
+  private void printLoadSpeedInfo(String partitionID) {
+    LOGGER.audit("===============Load_Speed_Info===============");
+    LOGGER.audit("Total Num of Records Processed: " + getTotalRecords());
+    LOGGER.audit("Total Time Cost: " + getTotalTime(partitionID) + "(s)");
+    LOGGER.audit("Total Load Speed: " + getLoadSpeed() + "records/s");
+    LOGGER.audit("Generate Dictionaries Speed: " + getGenDicSpeed() + "records/s");
+    LOGGER.audit("Read CSV Speed: " + getReadCSVSpeed(partitionID) + " records/s");
+    LOGGER.audit("Generate Surrogate Key Speed: " + getGenSurKeySpeed(partitionID) + " records/s");
+    LOGGER.audit("Sort Key/Write Temp Files Speed: " + getSortKeySpeed(partitionID) + " records/s");
+    LOGGER.audit("MDK Step Speed: " + getMDKSpeed(partitionID) + " records/s");
+    LOGGER.audit("=============================================");
+  }
+
+  public void printStatisticsInfo(String partitionID) {
+    try {
+      LOGGER.audit("========== TIME_STATISTICS PartitionID: " + partitionID + "==========");
+      printDicGenStatisticsInfo();
+      printLruCacheLoadTimeInfo();
+      printDictionaryValuesGenStatisticsInfo(partitionID);
+      printSortRowsStepStatisticsInfo(partitionID);
+      printGenMdkStatisticsInfo(partitionID);
+      printHostBlockMapInfo();
+      printLoadSpeedInfo(partitionID);
+    } catch (Exception e) {
+      LOGGER.audit("Can't print Statistics Information");
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/core/src/main/java/org/carbondata/core/util/CarbonTimeStatisticsFactory.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/CarbonTimeStatisticsFactory.java b/core/src/main/java/org/carbondata/core/util/CarbonTimeStatisticsFactory.java
new file mode 100644
index 0000000..a5c67ff
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/util/CarbonTimeStatisticsFactory.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.carbondata.core.util;
+
+import org.carbondata.core.constants.CarbonCommonConstants;
+
+public class CarbonTimeStatisticsFactory {
+  private static String LoadStatisticsInstanceType;
+  private static LoadStatistics LoadStatisticsInstance;
+
+  static {
+    CarbonTimeStatisticsFactory.updateTimeStatisticsUtilStatus();
+    LoadStatisticsInstance = genLoadStatisticsInstance();
+  }
+
+  private static void updateTimeStatisticsUtilStatus() {
+    LoadStatisticsInstanceType = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS,
+            CarbonCommonConstants.ENABLE_DATA_LOADING_STATISTICS_DEFAULT);
+  }
+
+  private static LoadStatistics genLoadStatisticsInstance() {
+    switch (LoadStatisticsInstanceType.toLowerCase()) {
+      case "false":
+        return CarbonLoadStatisticsDummy.getInstance();
+      case "true":
+        return CarbonLoadStatisticsImpl.getInstance();
+      default:
+        return CarbonLoadStatisticsDummy.getInstance();
+    }
+  }
+
+  public static LoadStatistics getLoadStatisticsInstance() {
+    return LoadStatisticsInstance;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/carbondata/core/util/LoadStatistics.java b/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
new file mode 100644
index 0000000..c353f0c
--- /dev/null
+++ b/core/src/main/java/org/carbondata/core/util/LoadStatistics.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.carbondata.core.util;
+
+public interface LoadStatistics {
+  //Init PartitonInfo
+  void  initPartitonInfo(String PartitionId);
+
+  //Record the time
+  void recordGlobalDicGenTotalTime(Long glblDicTimePoint);
+
+  void recordLoadCsvfilesToDfTime();
+
+  void recordCsvlDicShuffleMaxTime(Long csvlDicShuffleTimePart);
+
+  void recordDicWriteFileMaxTime(Long dicWriteFileTimePart);
+
+  void recordDictionaryValuesTotalTime(String partitionID,
+      Long dictionaryValuesTotalTimeTimePoint);
+
+  void recordCsvInputStepTime(String partitionID,
+      Long csvInputStepTimePoint);
+
+  void recordLruCacheLoadTime(double lruCacheLoadTime);
+
+  void recordGeneratingDictionaryValuesTime(String partitionID,
+      Long generatingDictionaryValuesTimePoint);
+
+  void recordSortRowsStepTotalTime(String partitionID,
+      Long sortRowsStepTotalTimePoint);
+
+  void recordMdkGenerateTotalTime(String partitionID,
+      Long mdkGenerateTotalTimePoint);
+
+  void recordDictionaryValue2MdkAdd2FileTime(String partitionID,
+      Long dictionaryValue2MdkAdd2FileTimePoint);
+
+  //Record the node blocks information map
+  void recordHostBlockMap(String host, Integer numBlocks);
+
+  //Record the partition blocks information map
+  void recordPartitionBlockMap(String partitionID, Integer numBlocks);
+
+  //Record total num of records processed
+  void recordTotalRecords(long totalRecords);
+
+  //Print the statistics information
+  void printStatisticsInfo(String partitionID);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
index 3add8f0..06bd12f 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonDataLoadRDD.scala
@@ -18,13 +18,18 @@
 
 package org.carbondata.spark.rdd
 
+import java.lang.Long
 import java.util.UUID
 
 import scala.collection.JavaConverters._
 import scala.util.Random
 
-import org.apache.spark.{Logging, Partition, SerializableWritable, SparkContext, TaskContext}
+import org.apache.spark.Logging
+import org.apache.spark.Partition
+import org.apache.spark.SerializableWritable
+import org.apache.spark.SparkContext
 import org.apache.spark.SparkEnv
+import org.apache.spark.TaskContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.sql.execution.command.Partitioner
 
@@ -33,6 +38,7 @@ import org.carbondata.common.logging.impl.StandardLogService
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.load.{BlockDetails, LoadMetadataDetails}
 import org.carbondata.core.util.CarbonProperties
+import org.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.carbondata.processing.constants.DataProcessorConstants
 import org.carbondata.processing.etl.DataLoadingException
 import org.carbondata.processing.graphgenerator.GraphGenerator
@@ -159,6 +165,7 @@ class CarbonDataLoadRDD[K, V](
         }
         carbonLoadModel.setSegmentId(String.valueOf(loadCount))
         setModelAndBlocksInfo()
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId)
         CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true")
         CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1")
         CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true")
@@ -219,6 +226,8 @@ class CarbonDataLoadRDD[K, V](
               } else {
                 logInfo("DataLoad complete")
                 logInfo("Data Loaded successfully with LoadCount:" + loadCount)
+                CarbonTimeStatisticsFactory.getLoadStatisticsInstance.printStatisticsInfo(
+                  model.getPartitionId)
               }
             } else {
               logInfo("DataLoad failure")
@@ -253,11 +262,15 @@ class CarbonDataLoadRDD[K, V](
             // get this partition data blocks and put it to global static map
             GraphGenerator.blockInfo.put(blocksID, split.partitionBlocksDetail)
             StandardLogService.setThreadName(partitionID, null)
+            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordPartitionBlockMap(
+              partitionID, split.partitionBlocksDetail.length)
           case false =>
             // for node partition
             val split = theSplit.asInstanceOf[CarbonNodePartition]
             logInfo("Input split: " + split.serializableHadoopSplit)
             logInfo("The Block Count in this node :" + split.nodeBlocksDetail.length)
+            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordHostBlockMap(
+              split.serializableHadoopSplit, split.nodeBlocksDetail.length)
             val blocksID = gernerateBlocksID
             carbonLoadModel.setBlocksID(blocksID)
             carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 8cc6d55..b734257 100644
--- a/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark/src/main/scala/org/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -37,7 +37,7 @@ import org.carbondata.core.carbon.metadata.encoder.Encoding
 import org.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension
 import org.carbondata.core.constants.CarbonCommonConstants
 import org.carbondata.core.datastorage.store.impl.FileFactory
-import org.carbondata.processing.etl.DataLoadingException
+import org.carbondata.core.util.CarbonTimeStatisticsFactory
 import org.carbondata.spark.load.{CarbonLoaderUtil, CarbonLoadModel}
 import org.carbondata.spark.partition.reader.{CSVParser, CSVReader}
 import org.carbondata.spark.util.GlobalDictionaryUtil
@@ -170,7 +170,7 @@ class CarbonBlockDistinctValuesCombineRDD(
   override def compute(split: Partition,
       context: TaskContext): Iterator[(Int, ColumnDistinctValues)] = {
     val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
-
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
     val distinctValuesList = new ArrayBuffer[(Int, HashSet[String])]
     var rowCount = 0L
     try {
@@ -203,6 +203,7 @@ class CarbonBlockDistinctValuesCombineRDD(
           }
         }
       }
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLoadCsvfilesToDfTime()
     } catch {
       case ex: Exception =>
         LOGGER.error(ex)
@@ -252,6 +253,9 @@ class CarbonGlobalDictionaryGenerateRDD(
         val valuesBuffer = new mutable.HashSet[String]
         val rddIter = firstParent[(Int, ColumnDistinctValues)].iterator(split, context)
         var rowCount = 0L
+        val dicShuffleStartTime = System.currentTimeMillis()
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGlobalDicGenTotalTime(
+          dicShuffleStartTime)
         breakable {
           while (rddIter.hasNext) {
             val distinctValueList = rddIter.next()._2
@@ -291,6 +295,14 @@ class CarbonGlobalDictionaryGenerateRDD(
             GlobalDictionaryUtil.writeGlobalDictionaryColumnSortInfo(model, split.index,
               dictionaryForSortIndexWriting)
             val t5 = System.currentTimeMillis
+            val dicWriteStartTime = t4
+            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvlDicShuffleMaxTime(
+              dicWriteStartTime - dicShuffleStartTime)
+            val dicWriteEndTime = System.currentTimeMillis()
+            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDicWriteFileMaxTime(
+              dicWriteEndTime - dicWriteStartTime)
+            CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGlobalDicGenTotalTime(
+              dicWriteEndTime)
             LOGGER.info("\n columnName:" + model.primDimensions(split.index).getColName +
               "\n columnId:" + model.primDimensions(split.index).getColumnId +
               "\n new distinct values count:" + distinctValueCount +

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
index 0bcdce5..3b69b4a 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInput.java
@@ -32,6 +32,7 @@ import org.carbondata.common.logging.impl.StandardLogService;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.load.BlockDetails;
 import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.carbondata.processing.graphgenerator.GraphGenerator;
 
 import org.pentaho.di.core.Const;
@@ -291,6 +292,10 @@ public class CsvInput extends BaseStep implements StepInterface {
     data = (CsvInputData) sdi;
 
     if (first) {
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(
+          meta.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
+          meta.getPartitionID(), System.currentTimeMillis());
       first = false;
       data.outputRowMeta = new RowMeta();
       meta.getFields(data.outputRowMeta, getStepname(), null, null, this);
@@ -364,6 +369,8 @@ public class CsvInput extends BaseStep implements StepInterface {
     LOGGER.info("*****************Started ALL ALL csv reading***********");
     startProcess(numberOfNodes);
     LOGGER.info("*****************Completed ALL ALL csv reading***********");
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordCsvInputStepTime(
+        meta.getPartitionID(), System.currentTimeMillis());
     setOutputDone();
     return false;
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
index 0e4c2e7..764944b 100644
--- a/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
+++ b/processing/src/main/java/org/carbondata/processing/csvreaderstep/CsvInputMeta.java
@@ -97,6 +97,8 @@ public class CsvInputMeta extends BaseStepMeta
 
   private String blocksID;
 
+  private String partitionID;
+
   private String escapeCharacter;
 
   public CsvInputMeta() {
@@ -118,6 +120,7 @@ public class CsvInputMeta extends BaseStepMeta
     bufferSize = "50000";
     currentRestructNumber = -1;
     blocksID = "";
+    partitionID = "";
     escapeCharacter ="\\";
   }
 
@@ -155,6 +158,7 @@ public class CsvInputMeta extends BaseStepMeta
       currentRestructNumber =
           Integer.parseInt(XMLHandler.getTagValue(stepnode, "currentRestructNumber"));
       blocksID = XMLHandler.getTagValue(stepnode, "blocksID");
+      partitionID = XMLHandler.getTagValue(stepnode, "partitionID");
       escapeCharacter = XMLHandler.getTagValue(stepnode, "escapeCharacter");
       Node fields = XMLHandler.getSubNode(stepnode, getXmlCode("FIELDS"));
       int nrfields = XMLHandler.countNodes(fields, getXmlCode("FIELD"));
@@ -218,6 +222,7 @@ public class CsvInputMeta extends BaseStepMeta
     retval.append("    ")
         .append(XMLHandler.addTagValue("currentRestructNumber", currentRestructNumber));
     retval.append("    ").append(XMLHandler.addTagValue("blocksID", blocksID));
+    retval.append("    ").append(XMLHandler.addTagValue("partitionID", partitionID));
     retval.append("    ").append(XMLHandler.addTagValue("escapeCharacter", escapeCharacter));
     retval.append("    ").append(XMLHandler.openTag(getXmlCode("FIELDS"))).append(Const.CR);
     for (int i = 0; i < inputFields.length; i++) {
@@ -270,6 +275,7 @@ public class CsvInputMeta extends BaseStepMeta
       encoding = rep.getStepAttributeString(idStep, getRepCode("ENCODING"));
       currentRestructNumber = (int) rep.getStepAttributeInteger(idStep, "currentRestructNumber");
       blocksID = rep.getStepAttributeString(idStep, getRepCode("blocksID"));
+      partitionID = rep.getStepAttributeString(idStep, getRepCode("partitionID"));
       escapeCharacter = rep.getStepAttributeString(idStep, getRepCode("escapeCharacter"));
       int nrfields = rep.countNrStepAttributes(idStep, getRepCode("FIELD_NAME"));
 
@@ -325,6 +331,7 @@ public class CsvInputMeta extends BaseStepMeta
       rep.saveStepAttribute(idTransformation, idStep, "currentRestructNumber",
           currentRestructNumber);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("blocksID"), blocksID);
+      rep.saveStepAttribute(idTransformation, idStep, getRepCode("partitionID"), partitionID);
       rep.saveStepAttribute(idTransformation, idStep, getRepCode("escapeCharacter"),
           escapeCharacter);
       for (int i = 0; i < inputFields.length; i++) {
@@ -822,6 +829,8 @@ public class CsvInputMeta extends BaseStepMeta
           currentRestructNumber = (Integer) entry.getValue();
         } else if ("blocksID".equals(attributeKey)) {
           blocksID = (String) entry.getValue();
+        } else if ("partitionID".equals(attributeKey)) {
+          partitionID = (String) entry.getValue();
         } else if ("escapeCharacter".equals(attributeKey)) {
           escapeCharacter = (String) entry.getValue();
         } else {
@@ -918,4 +927,11 @@ public class CsvInputMeta extends BaseStepMeta
     this.currentRestructNumber = currentRestructNum;
   }
 
-}
\ No newline at end of file
+  public void setPartitionID(String partitionID) {
+    this.partitionID = partitionID;
+  }
+
+  public String getPartitionID() {
+    return this.partitionID;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
index f61e5dd..92c00b5 100644
--- a/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
+++ b/processing/src/main/java/org/carbondata/processing/graphgenerator/GraphGenerator.java
@@ -439,6 +439,7 @@ public class GraphGenerator {
         CarbonCommonConstants.CSV_READ_BUFFER_SIZE_DEFAULT));
     //set blocks info id
     csvInputMeta.setBlocksID(this.blocksID);
+    csvInputMeta.setPartitionID(this.partitionID);
     csvInputMeta.setEscapeCharacter(this.escapeCharacter);
     csvDataStep.setDraw(true);
     csvDataStep.setDescription("Read raw data from " + GraphGeneratorConstants.CSV_INPUT);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
index e76b681..8ddb429 100644
--- a/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/mdkeygen/MDKeyGenStep.java
@@ -43,10 +43,7 @@ import org.carbondata.core.file.manager.composite.FileData;
 import org.carbondata.core.file.manager.composite.FileManager;
 import org.carbondata.core.file.manager.composite.IFileManagerComposite;
 import org.carbondata.core.keygenerator.KeyGenException;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.core.util.CarbonUtilException;
-import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.core.util.*;
 import org.carbondata.processing.datatypes.GenericDataType;
 import org.carbondata.processing.store.CarbonDataFileAttributes;
 import org.carbondata.processing.store.CarbonFactDataHandlerColumnar;
@@ -177,6 +174,8 @@ public class MDKeyGenStep extends BaseStep {
     meta.initialize();
     Object[] row = getRow();
     if (first) {
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordMdkGenerateTotalTime(
+          meta.getPartitionID(), System.currentTimeMillis());
       first = false;
 
       data.outputRowMeta = new RowMeta();
@@ -198,6 +197,8 @@ public class MDKeyGenStep extends BaseStep {
       initDataHandler();
       dataHandler.initialise();
       finalMerger.startFinalMerge();
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValue2MdkAdd2FileTime(
+              meta.getPartitionID(), System.currentTimeMillis());
       while (finalMerger.hasNext()) {
         Object[] r = finalMerger.next();
         readCounter++;
@@ -224,7 +225,12 @@ public class MDKeyGenStep extends BaseStep {
     String logMessage =
         "Finished Carbon Mdkey Generation Step: Read: " + readCounter + ": Write: " + writeCounter;
     LOGGER.info(logMessage);
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
     processingComplete();
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValue2MdkAdd2FileTime(
+        meta.getPartitionID(), System.currentTimeMillis());
+    CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordMdkGenerateTotalTime(
+        meta.getPartitionID(), System.currentTimeMillis());
     return false;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
index 2874577..779b4e8 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/IntermediateFileMerger.java
@@ -97,6 +97,8 @@ public class IntermediateFileMerger implements Callable<Void> {
   }
 
   @Override public Void call() throws Exception {
+    long intermediateMergeStartTime = System.currentTimeMillis();
+    int fileConterConst = fileCounter;
     boolean isFailed = false;
     try {
       startSorting();
@@ -117,6 +119,10 @@ public class IntermediateFileMerger implements Callable<Void> {
           }
         }
       }
+      double intermediateMergeCostTime = (System.currentTimeMillis() -
+          intermediateMergeStartTime)/1000.0;
+      LOGGER.info("============================== Intermediate Merge of " + fileConterConst +
+          " Sort Temp Files Cost Time: " + intermediateMergeCostTime + "(s)");
     } catch (Exception e) {
       LOGGER.error(e, "Problem while intermediate merging");
       isFailed = true;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
index 0f68ff3..1b3de04 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdata/SortDataRows.java
@@ -305,7 +305,6 @@ public class SortDataRows {
         }
 
         LOGGER.debug("Sumitting request for intermediate merging no of files: " + fileList.length);
-
         startIntermediateMerging(fileList);
       }
       Object[][] recordHolderListLocal = recordHolderList;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
index c7b847d..77b106d 100644
--- a/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
+++ b/processing/src/main/java/org/carbondata/processing/sortandgroupby/sortdatastep/SortKeyStep.java
@@ -24,6 +24,7 @@ import org.carbondata.common.logging.LogServiceFactory;
 import org.carbondata.common.logging.impl.StandardLogService;
 import org.carbondata.core.constants.CarbonCommonConstants;
 import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.carbondata.processing.schema.metadata.SortObserver;
 import org.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException;
 import org.carbondata.processing.sortandgroupby.sortdata.SortDataRows;
@@ -154,6 +155,8 @@ public class SortKeyStep extends BaseStep {
 
     // if first
     if (first) {
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(
+          meta.getPartitionID(), System.currentTimeMillis());
       first = false;
 
       // clone out row meta
@@ -232,6 +235,10 @@ public class SortKeyStep extends BaseStep {
       LOGGER.info(logMessage);
       putRow(data.getOutputRowMeta(), new Object[0]);
       setOutputDone();
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordSortRowsStepTotalTime(
+          meta.getPartitionID(), System.currentTimeMillis());
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordDictionaryValuesTotalTime(
+          meta.getPartitionID(), System.currentTimeMillis());
       return false;
     } catch (CarbonSortKeyAndGroupByException e) {
       throw new KettleException(e);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
index f4e333e..57dc8a8 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/CarbonCSVBasedSeqGenStep.java
@@ -57,9 +57,7 @@ import org.carbondata.core.keygenerator.KeyGenerator;
 import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryGenerator;
 import org.carbondata.core.keygenerator.directdictionary.DirectDictionaryKeyGeneratorFactory;
 import org.carbondata.core.keygenerator.factory.KeyGeneratorFactory;
-import org.carbondata.core.util.CarbonProperties;
-import org.carbondata.core.util.CarbonUtil;
-import org.carbondata.core.util.DataTypeUtil;
+import org.carbondata.core.util.*;
 import org.carbondata.core.writer.ByteArrayHolder;
 import org.carbondata.core.writer.HierarchyValueWriterForCSV;
 import org.carbondata.processing.dataprocessor.manager.CarbonDataProcessorManager;
@@ -290,6 +288,9 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
 
       Object[] r = getRow();  // get row, blocks when needed!
       if (first) {
+        CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
+                .recordGeneratingDictionaryValuesTime(meta.getPartitionID(),
+                        System.currentTimeMillis());
         first = false;
         meta.initialize();
         final Object dataProcessingLockObject = CarbonDataProcessorManager.getInstance()
@@ -471,6 +472,8 @@ public class CarbonCSVBasedSeqGenStep extends BaseStep {
       String logMessage =
           "Summary: Carbon CSV Based Seq Gen Step : " + readCounter + ": Write: " + writeCounter;
       LOGGER.info(logMessage);
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordGeneratingDictionaryValuesTime(
+          meta.getPartitionID(), System.currentTimeMillis());
       setOutputDone();
 
     } catch (RuntimeException ex) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/6a092bbe/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
index 196d677..2af5c5e 100644
--- a/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
+++ b/processing/src/main/java/org/carbondata/processing/surrogatekeysgenerator/csvbased/FileStoreSurrogateKeyGenForCSV.java
@@ -46,6 +46,7 @@ import org.carbondata.core.file.manager.composite.IFileManagerComposite;
 import org.carbondata.core.keygenerator.KeyGenException;
 import org.carbondata.core.keygenerator.KeyGenerator;
 import org.carbondata.core.util.CarbonProperties;
+import org.carbondata.core.util.CarbonTimeStatisticsFactory;
 import org.carbondata.core.util.CarbonUtilException;
 import org.carbondata.core.writer.ByteArrayHolder;
 import org.carbondata.core.writer.HierarchyValueWriterForCSV;
@@ -277,6 +278,7 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
   private void initDictionaryCacheInfo(List<String> dictionaryKeys,
       List<DictionaryColumnUniqueIdentifier> dictionaryColumnUniqueIdentifiers,
       Cache reverseDictionaryCache, String carbonStorePath) throws KettleException {
+    long lruCacheStartTime = System.currentTimeMillis();
     try {
       List reverseDictionaries = reverseDictionaryCache.getAll(dictionaryColumnUniqueIdentifiers);
       for (int i = 0; i < reverseDictionaries.size(); i++) {
@@ -284,6 +286,8 @@ public class FileStoreSurrogateKeyGenForCSV extends CarbonCSVBasedDimSurrogateKe
         getDictionaryCaches().put(dictionaryKeys.get(i), reverseDictionary);
         updateMaxKeyInfo(dictionaryKeys.get(i), reverseDictionary.getDictionaryChunks().getSize());
       }
+      CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordLruCacheLoadTime(
+          (System.currentTimeMillis() - lruCacheStartTime)/1000.0);
     } catch (CarbonUtilException e) {
       throw new KettleException(e.getMessage());
     }


Mime
View raw message