carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [31/50] [abbrv] carbondata git commit: [CARBONDATA-1967][PARTITION] Fix autocompaction and auto merge index in partition tables
Date Sun, 07 Jan 2018 03:05:39 GMT
[CARBONDATA-1967][PARTITION] Fix autocompaction and auto merge index in partition tables

Auto compaction is not working in case of the partition table and merge index files are merging
always even though it is configured as false.

Solution:
Auto compaction code is added after finishing of partition loading. And also merge index configuration
is checked before going for index merging.

This closes #1748


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d7852abe
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d7852abe
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d7852abe

Branch: refs/heads/carbonstore
Commit: d7852abeaa8915055bbad92e250662320b090bbc
Parents: 45787fb
Author: ravipesala <ravi.pesala@gmail.com>
Authored: Tue Jan 2 18:26:15 2018 +0530
Committer: Jacky Li <jacky.likun@qq.com>
Committed: Thu Jan 4 16:06:40 2018 +0800

----------------------------------------------------------------------
 .../core/util/path/CarbonTablePath.java         | 11 ++++++
 .../hadoop/api/CarbonOutputCommitter.java       | 22 +++++++++++-
 ...andardPartitionTableCompactionTestCase.scala | 33 ++++++++++++++++++
 .../StandardPartitionTableLoadingTestCase.scala | 29 ++++++++++++++++
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../management/CarbonLoadDataCommand.scala      | 35 +++++++++++++++-----
 .../processing/util/DeleteLoadFolders.java      |  3 +-
 7 files changed, 124 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index c33c0a0..9e66657 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -111,6 +111,17 @@ public class CarbonTablePath extends Path {
   }
 
   /**
+   * Return true if the fileNameWithPath ends with partition map file extension name
+   */
+  public static boolean isPartitionMapFile(String fileNameWithPath) {
+    int pos = fileNameWithPath.lastIndexOf('.');
+    if (pos != -1) {
+      return fileNameWithPath.substring(pos).startsWith(PARTITION_MAP_EXT);
+    }
+    return false;
+  }
+
+  /**
    * check if it is carbon index file matching extension
    *
    * @param fileNameWithPath

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 6f5d0e4..97d5a7f 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -24,12 +24,14 @@ import java.util.Set;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.PartitionMapFileStore;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatus;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
+import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.path.CarbonTablePath;
 import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter;
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel;
@@ -95,7 +97,7 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
         .addDataIndexSizeIntoMetaEntry(newMetaEntry, loadModel.getSegmentId(), carbonTable);
     if (segmentSize > 0) {
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, loadModel, false, overwriteSet);
-      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
+      mergeCarbonIndexFiles(segmentPath);
       String updateTime =
           context.getConfiguration().get(CarbonTableOutputFormat.UPADTE_TIMESTAMP, null);
       if (updateTime != null) {
@@ -111,6 +113,24 @@ public class CarbonOutputCommitter extends FileOutputCommitter {
   }
 
   /**
+   * Merge index files to a new single file.
+   */
+  private void mergeCarbonIndexFiles(String segmentPath) throws IOException {
+    boolean mergeIndex = false;
+    try {
+      mergeIndex = Boolean.parseBoolean(CarbonProperties.getInstance().getProperty(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT));
+    } catch (Exception e) {
+      mergeIndex = Boolean.parseBoolean(
+          CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT);
+    }
+    if (mergeIndex) {
+      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentPath);
+    }
+  }
+
+  /**
    * Update the tablestatus as fail if any fail happens.
    *
    * @param context

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
index 9056fea..3e6cd26 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala
@@ -161,6 +161,38 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with
BeforeAndA
     checkAnswer(sql(s"""select count(*) from staticpartition where deptname='finance'"""),
p2)
   }
 
+  test("enable auto compaction for partition table"){
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "true")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.COMPACTION_SEGMENT_LEVEL_THRESHOLD, "4,2")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.PRESERVE_LATEST_SEGMENTS_NUMBER, "0")
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.DAYS_ALLOWED_TO_COMPACT, "0")
+
+    sql(
+      """
+        | CREATE TABLE staticpartitioncompaction (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int,workgroupcategory int, empname String, designation
String)
+        | PARTITIONED BY (deptname String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    for (i <- 0 until 4) {
+      sql(s"""insert into staticpartitioncompaction PARTITION(deptname='software') select
empno,doj,workgroupcategoryname,deptno,projectcode,projectjoindate,projectenddate,attendance,utilization,salary,workgroupcategory,empname,designation
from originTable""")
+    }
+    sql("CLEAN FILES FOR TABLE staticpartitioncompaction").show()
+    var segments = sql("SHOW SEGMENTS FOR TABLE staticpartitioncompaction")
+    var segmentSequenceIds = segments.collect().map { each => (each.toSeq) (0) }
+    assert(segmentSequenceIds.size==1)
+    assert(segmentSequenceIds.contains("0.1"))
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.ENABLE_AUTO_LOAD_MERGE, "false")
+  }
+
   override def afterAll = {
     dropTable
   }
@@ -173,6 +205,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with
BeforeAndA
     sql("drop table if exists partitionthree")
     sql("drop table if exists partitionmajor")
     sql("drop table if exists staticpartition")
+    sql("drop table if exists staticpartitioncompaction")
   }
 
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
index bd4252f..b0afb0f 100644
--- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala
@@ -272,6 +272,34 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     }
   }
 
+  test("merge carbon index disable data loading for partition table for three partition column")
{
+    CarbonProperties.getInstance.addProperty(
+      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT, "false")
+    sql(
+      """
+        | CREATE TABLE mergeindexpartitionthree (empno int, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance
int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int, empname String, designation String)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE mergeindexpartitionthree
OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree")
+    val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier,
+      carbonTable.getTablePath)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0")
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val files = carbonFile.listFiles(new CarbonFileFilter {
+      override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName)
+    })
+    CarbonProperties.getInstance.addProperty(
+      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT)
+    assert(files.length == 10)
+  }
+
   test("load static partition table for one static partition column with load syntax issue")
{
     sql(
       """
@@ -307,6 +335,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte
     sql("drop table if exists loadstaticpartitionone")
     sql("drop table if exists loadstaticpartitiononeoverwrite")
     sql("drop table if exists streamingpartitionedtable")
+    sql("drop table if exists mergeindexpartitionthree")
     sql("drop table if exists loadstaticpartitiononeissue")
     sql("drop table if exists loadpartitionwithspecialchar")
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8a8338e..18e9181 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -712,7 +712,7 @@ object CarbonDataRDDFactory {
   /**
    * Trigger compaction after data load
    */
-  private def handleSegmentMerging(
+  def handleSegmentMerging(
       sqlContext: SQLContext,
       carbonLoadModel: CarbonLoadModel,
       carbonTable: CarbonTable,

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
index 55c8769..383f272 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
@@ -373,7 +373,12 @@ case class CarbonLoadDataCommand(
 
     if (carbonTable.isHivePartitionTable) {
       try {
-        loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+        loadDataWithPartition(
+          sparkSession,
+          carbonLoadModel,
+          hadoopConf,
+          loadDataFrame,
+          operationContext)
       } finally {
         server match {
           case Some(dictServer) =>
@@ -428,7 +433,12 @@ case class CarbonLoadDataCommand(
         dictionaryDataFrame)
     }
     if (table.isHivePartitionTable) {
-      loadDataWithPartition(sparkSession, carbonLoadModel, hadoopConf, loadDataFrame)
+      loadDataWithPartition(
+        sparkSession,
+        carbonLoadModel,
+        hadoopConf,
+        loadDataFrame,
+        operationContext)
     } else {
       CarbonDataRDDFactory.loadCarbonData(
         sparkSession.sqlContext,
@@ -448,16 +458,12 @@ case class CarbonLoadDataCommand(
    * Loads the data in a hive partition way. This method uses InsertIntoTable command to
load data
    * into partitoned data. The table relation would be converted to HadoopFSRelation to let
spark
    * handling the partitioning.
-   * @param sparkSession
-   * @param carbonLoadModel
-   * @param hadoopConf
-   * @param dataFrame
-   * @return
    */
   private def loadDataWithPartition(sparkSession: SparkSession,
       carbonLoadModel: CarbonLoadModel,
       hadoopConf: Configuration,
-      dataFrame: Option[DataFrame]) = {
+      dataFrame: Option[DataFrame],
+      operationContext: OperationContext) = {
     val table = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val identifier = TableIdentifier(table.getTableName, Some(table.getDatabaseName))
     val logicalPlan =
@@ -640,6 +646,19 @@ case class CarbonLoadDataCommand(
     } else {
       Dataset.ofRows(sparkSession, convertedPlan)
     }
+    try {
+      // Trigger auto compaction
+      CarbonDataRDDFactory.handleSegmentMerging(
+        sparkSession.sqlContext,
+        carbonLoadModel,
+        table,
+        operationContext)
+    } catch {
+      case e: Exception =>
+        throw new Exception(
+          "Dataload is success. Auto-Compaction has failed. Please check logs.",
+          e)
+    }
   }
 
   private def convertToLogicalRelation(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/d7852abe/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
index a6bbe48..845f629 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java
@@ -65,7 +65,8 @@ public final class DeleteLoadFolders {
 
           @Override public boolean accept(CarbonFile file) {
             return (CarbonTablePath.isCarbonDataFile(file.getName())
-                || CarbonTablePath.isCarbonIndexFile(file.getName()));
+                || CarbonTablePath.isCarbonIndexFile(file.getName())
+                || CarbonTablePath.isPartitionMapFile(file.getName()));
           }
         });
 


Mime
View raw message