carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-1617] Merging carbonindex files within segment
Date Wed, 01 Nov 2017 15:44:54 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master b49160935 -> 0586146a8


http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
new file mode 100644
index 0000000..6e8b000
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergeFilesRDD.scala
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.rdd
+
+import org.apache.spark.{Partition, SparkContext, TaskContext}
+
+import org.apache.carbondata.core.util.path.CarbonTablePath
+import org.apache.carbondata.core.writer.CarbonIndexFileMergeWriter
+
+case class CarbonMergeFilePartition(rddId: Int, val idx: Int, segmentPath: String)
+  extends Partition {
+
+  override val index: Int = idx
+
+  override def hashCode(): Int = 41 * (41 + rddId) + idx
+}
+
+/**
+ * RDD to merge all carbonindex files of each segment to carbonindex file into the same segment.
+ * @param sc
+ * @param tablePath
+ * @param segments segments to be merged
+ */
+class CarbonMergeFilesRDD(
+    sc: SparkContext,
+    tablePath: String,
+    segments: Seq[String])
+  extends CarbonRDD[String](sc, Nil) {
+
+  override def getPartitions: Array[Partition] = {
+    segments.zipWithIndex.map {s =>
+      CarbonMergeFilePartition(id, s._2, CarbonTablePath.getSegmentPath(tablePath, s._1))
+    }.toArray
+  }
+
+  override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[String]
= {
+    val iter = new Iterator[String] {
+      val split = theSplit.asInstanceOf[CarbonMergeFilePartition]
+      logInfo("Merging carbon index files of segment : " + split.segmentPath)
+
+      new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(split.segmentPath)
+
+      var havePair = false
+      var finished = false
+
+      override def hasNext: Boolean = {
+        if (!finished && !havePair) {
+          finished = true
+          havePair = !finished
+        }
+        !finished
+      }
+
+      override def next(): String = {
+        if (!hasNext) {
+          throw new java.util.NoSuchElementException("End of stream")
+        }
+        havePair = false
+        ""
+      }
+
+    }
+    iter
+  }
+
+}
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
index fb610c1..cb25756 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/Compactor.scala
@@ -22,10 +22,12 @@ import scala.collection.JavaConverters._
 import org.apache.spark.sql.execution.command.{CarbonMergerMapping, CompactionCallableModel}
 
 import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Compactor class which handled the compaction cases.
@@ -106,6 +108,8 @@ object Compactor {
     }
 
     if (finalMergeStatus) {
+      val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+      CommonUtil.mergeIndexFiles(sc.sparkContext, Seq(mergedLoadNumber), storePath, carbonTable)
       val endTime = System.nanoTime()
       logger.info(s"time taken to merge $mergedLoadName is ${ endTime - startTime }")
       val statusFileUpdation =
@@ -116,7 +120,7 @@ object Compactor {
               carbonLoadModel))) ||
          (CarbonDataMergerUtil
            .updateLoadMetadataWithMergeStatus(loadsToMerge, carbonTable.getMetaDataFilepath(),
-             mergedLoadName, carbonLoadModel, mergeLoadStartTime, compactionType))
+             mergedLoadNumber, carbonLoadModel, mergeLoadStartTime, compactionType))
           )
 
       if (!statusFileUpdation) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index bc24c12..27ebf42 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -28,7 +28,7 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat
 import org.apache.spark.SparkContext
-import org.apache.spark.sql.{Row, RowFactory}
+import org.apache.spark.sql.{Row, RowFactory, SQLContext}
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.execution.command.{ColumnProperty, Field, PartitionerField}
 import org.apache.spark.sql.types.{MetadataBuilder, StringType}
@@ -44,6 +44,7 @@ import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
 import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
 import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatusManager}
@@ -55,6 +56,7 @@ import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil}
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
+import org.apache.carbondata.spark.rdd.CarbonMergeFilesRDD
 
 object CommonUtil {
   private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName)
@@ -831,4 +833,20 @@ object CommonUtil {
         LOGGER.error(s)
     }
   }
+
+  /**
+   * Merge the carbonindex files with in the segment to carbonindexmerge file inside same
segment
+   */
+  def mergeIndexFiles(sparkContext: SparkContext,
+      segmentIds: Seq[String],
+      storePath: String,
+      carbonTable: CarbonTable): Unit = {
+    if (CarbonProperties.getInstance().getProperty(
+      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT,
+      CarbonCommonConstants.CARBON_MERGE_INDEX_IN_SEGMENT_DEFAULT).toBoolean) {
+      new CarbonMergeFilesRDD(sparkContext, AbsoluteTableIdentifier.from(storePath,
+        carbonTable.getDatabaseName, carbonTable.getFactTableName).getTablePath,
+        segmentIds).collect()
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
index 4649082..1163b3f 100644
--- a/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
+++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/catalyst/CarbonDDLSqlParser.scala
@@ -35,12 +35,11 @@ import org.apache.spark.util.PartitionUtils
 import org.apache.carbondata.common.constants.LoggerAction
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
-import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes}
+import org.apache.carbondata.core.metadata.datatype.DataTypes
 import org.apache.carbondata.core.metadata.schema.PartitionInfo
 import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil}
-import org.apache.carbondata.processing.loading.sort.SortScopeOptions
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.util.{CommonUtil, DataTypeConverterUtil}
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 87de8ae..628d444 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -98,8 +98,10 @@ object CarbonDataRDDFactory {
           .setLoadMetadataDetails(alterTableModel.segmentUpdateStatusManager.get
             .getLoadMetadataDetails.toList.asJava)
       }
-    }
-    else {
+    } else if (alterTableModel.compactionType.
+      equalsIgnoreCase(CompactionType.SEGMENT_INDEX_COMPACTION.toString)) {
+      compactionType = CompactionType.SEGMENT_INDEX_COMPACTION
+    } else {
       compactionType = CompactionType.MINOR_COMPACTION
     }
 
@@ -110,6 +112,14 @@ object CarbonDataRDDFactory {
     if (null == carbonLoadModel.getLoadMetadataDetails) {
       CommonUtil.readLoadMetadataDetails(carbonLoadModel)
     }
+    if (compactionType == CompactionType.SEGMENT_INDEX_COMPACTION) {
+      // Just launch job to merge index and return
+      CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+        carbonLoadModel.getLoadMetadataDetails.asScala.map(_.getLoadName),
+        carbonLoadModel.getStorePath,
+        carbonTable)
+      return
+    }
     // reading the start time of data load.
     val loadStartTime : Long =
     if (alterTableModel.factTimeStamp.isEmpty) {
@@ -959,9 +969,10 @@ object CarbonDataRDDFactory {
             }
           ))
 
-        }
-        else {
-        val newStatusMap = scala.collection.mutable.Map.empty[String, String]
+        } else {
+          CommonUtil.mergeIndexFiles(sqlContext.sparkContext,
+            Seq(carbonLoadModel.getSegmentId), storePath, carbonTable)
+          val newStatusMap = scala.collection.mutable.Map.empty[String, String]
           if (status.nonEmpty) {
             status.foreach { eachLoadStatus =>
               val state = newStatusMap.get(eachLoadStatus._1)
@@ -1142,8 +1153,10 @@ object CarbonDataRDDFactory {
 
   }
 
+
   /**
    * repartition the input data for partition table.
+   *
    * @param sqlContext
    * @param dataFrame
    * @param carbonLoadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index 715af1d..bdfaa5a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -87,7 +87,8 @@ class DDLStrategy(sparkSession: SparkSession) extends SparkStrategy {
             altertablemodel.dbName))(sparkSession)
         if (isCarbonTable) {
           if (altertablemodel.compactionType.equalsIgnoreCase("minor") ||
-              altertablemodel.compactionType.equalsIgnoreCase("major")) {
+              altertablemodel.compactionType.equalsIgnoreCase("major") ||
+              altertablemodel.compactionType.equalsIgnoreCase("SEGMENT_INDEX_COMPACTION"))
{
             ExecutedCommandExec(alterTable) :: Nil
           } else {
             throw new MalformedCarbonCommandException(

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index 13972c8..53add22 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -131,24 +131,6 @@ public final class CarbonDataMergerUtil {
 
 
   /**
-   * Form the Name of the New Merge Folder
-   *
-   * @param segmentToBeMerged
-   * @return
-   */
-  public static String getMergedLoadName(final String segmentToBeMerged) {
-    if (segmentToBeMerged.contains(".")) {
-      String beforeDecimal = segmentToBeMerged.substring(0, segmentToBeMerged.indexOf("."));
-      String afterDecimal = segmentToBeMerged.substring(segmentToBeMerged.indexOf(".") +
1);
-      int fraction = Integer.parseInt(afterDecimal) + 1;
-      return beforeDecimal + "." + fraction;
-    } else {
-      return segmentToBeMerged + "." + 1;
-    }
-
-  }
-
-  /**
    * Update Both Segment Update Status and Table Status for the case of IUD Delete
    * delta compaction.
    *
@@ -294,13 +276,13 @@ public final class CarbonDataMergerUtil {
    * method to update table status in case of IUD Update Delta Compaction.
    * @param loadsToMerge
    * @param metaDataFilepath
-   * @param MergedLoadName
+   * @param mergedLoadNumber
    * @param carbonLoadModel
    * @param compactionType
    * @return
    */
   public static boolean updateLoadMetadataWithMergeStatus(List<LoadMetadataDetails>
loadsToMerge,
-      String metaDataFilepath, String MergedLoadName, CarbonLoadModel carbonLoadModel,
+      String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel,
       long mergeLoadStartTime, CompactionType compactionType) {
 
     boolean tableStatusUpdationStatus = false;
@@ -323,10 +305,6 @@ public final class CarbonDataMergerUtil {
 
         LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
-        String mergedLoadNumber = MergedLoadName.substring(
-            MergedLoadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER)
-                + CarbonCommonConstants.LOAD_FOLDER.length(), MergedLoadName.length());
-
         long modificationOrDeletionTimeStamp = CarbonUpdateUtil.readCurrentTime();
         for (LoadMetadataDetails loadDetail : loadDetails) {
           // check if this segment is merged.
@@ -391,6 +369,17 @@ public final class CarbonDataMergerUtil {
   }
 
   /**
+   * Get the load number from load name.
+   * @param loadName
+   * @return
+   */
+  public static String getLoadNumberFromLoadName(String loadName) {
+    return loadName.substring(
+        loadName.lastIndexOf(CarbonCommonConstants.LOAD_FOLDER) + CarbonCommonConstants.LOAD_FOLDER
+            .length(), loadName.length());
+  }
+
+  /**
    * To identify which all segments can be merged.
    *
    * @param carbonLoadModel

http://git-wip-us.apache.org/repos/asf/carbondata/blob/0586146a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
index 6b9c80a..863257c 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionType.java
@@ -27,5 +27,6 @@ public enum CompactionType {
     MAJOR_COMPACTION,
     IUD_UPDDEL_DELTA_COMPACTION,
     IUD_DELETE_DELTA_COMPACTION,
+    SEGMENT_INDEX_COMPACTION,
     NONE
 }


Mime
View raw message