carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jack...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2270] Write segment file in loading for non-partition table
Date Fri, 30 Mar 2018 03:19:04 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master e43be5e74 -> 7e0803fec


http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
index a987127..15ae30f 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonTableCompactor.scala
@@ -35,7 +35,6 @@ import org.apache.carbondata.events._
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.spark.MergeResultImpl
-import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * This class is used to perform compaction on carbon table.
@@ -201,6 +200,7 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
 
     if (finalMergeStatus) {
       val mergedLoadNumber = CarbonDataMergerUtil.getLoadNumberFromLoadName(mergedLoadName)
+      var segmentFilesForIUDCompact = new util.ArrayList[Segment]()
       var segmentFileName: String = null
       if (carbonTable.isHivePartitionTable) {
         val readPath =
@@ -220,6 +220,23 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
               carbonLoadModel.getTablePath)
         }
         segmentFileName = segmentFileName + CarbonTablePath.SEGMENT_EXT
+      } else {
+        // Get the segment files each updated segment in case of IUD compaction
+        if (compactionType == CompactionType.IUD_UPDDEL_DELTA) {
+          val segmentFilesList = loadsToMerge.asScala.map{seg =>
+            val file = SegmentFileStore.writeSegmentFile(
+              carbonTable.getTablePath,
+              seg.getLoadName,
+              carbonLoadModel.getFactTimeStamp.toString)
+            new Segment(seg.getLoadName, file)
+          }.filter(_.getSegmentFileName != null).asJava
+          segmentFilesForIUDCompact = new util.ArrayList[Segment](segmentFilesList)
+        } else {
+          segmentFileName = SegmentFileStore.writeSegmentFile(
+            carbonTable.getTablePath,
+            mergedLoadNumber,
+            carbonLoadModel.getFactTimeStamp.toString)
+        }
       }
       // trigger event for compaction
       val alterTableCompactionPreStatusUpdateEvent =
@@ -238,11 +255,12 @@ class CarbonTableCompactor(carbonLoadModel: CarbonLoadModel,
          CarbonDataMergerUtil
            .updateLoadMetadataIUDUpdateDeltaMergeStatus(loadsToMerge,
              carbonTable.getMetadataPath,
-             carbonLoadModel)) ||
+             carbonLoadModel,
+             segmentFilesForIUDCompact)) ||
         CarbonDataMergerUtil.updateLoadMetadataWithMergeStatus(
           loadsToMerge,
-            carbonTable.getMetadataPath,
-            mergedLoadNumber,
+          carbonTable.getMetadataPath,
+          mergedLoadNumber,
           carbonLoadModel,
           compactionType,
           segmentFileName)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
index 2d19fd4..b27a150 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonCountStar.scala
@@ -51,14 +51,14 @@ case class CarbonCountStar(
     val rowCount = CarbonUpdateUtil.getRowCount(
       tableInputFormat.getBlockRowCount(
         job,
-        absoluteTableIdentifier,
+        carbonTable,
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
           TableIdentifier(
             carbonTable.getTableName,
             Some(carbonTable.getDatabaseName))).map(_.asJava).orNull),
-      absoluteTableIdentifier)
+      carbonTable)
     val value = new GenericInternalRow(Seq(Long.box(rowCount)).toArray.asInstanceOf[Array[Any]])
     val unsafeProjection = UnsafeProjection.create(output.map(_.dataType).toArray)
     val row = if (outUnsafeRows) unsafeProjection(value) else value

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
index 8eaeab1..0c6d2ba 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/DeleteExecution.scala
@@ -105,12 +105,12 @@ object DeleteExecution {
     val blockMappingVO =
       carbonInputFormat.getBlockRowCount(
         job,
-        absoluteTableIdentifier,
+        carbonTable,
         CarbonFilters.getPartitions(
           Seq.empty,
           sparkSession,
           TableIdentifier(tableName, databaseNameOp)).map(_.asJava).orNull)
-    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(absoluteTableIdentifier)
+    val segmentUpdateStatusMngr = new SegmentUpdateStatusManager(carbonTable)
     CarbonUpdateUtil
       .createBlockDetailsMap(blockMappingVO, segmentUpdateStatusMngr)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
index f88e767..8c88d0e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/HorizontalCompaction.scala
@@ -73,7 +73,7 @@ object HorizontalCompaction {
     // SegmentUpdateStatusManager reads the Table Status File and Table Update Status
     // file and save the content in segmentDetails and updateDetails respectively.
     val segmentUpdateStatusManager: SegmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      absTableIdentifier)
+      carbonTable)
 
     if (isUpdateOperation) {
 
@@ -199,7 +199,7 @@ object HorizontalCompaction {
               .substring(segmentAndBlocks.lastIndexOf("/") + 1, segmentAndBlocks.length)
 
             val result = CarbonDataMergerUtil.compactBlockDeleteDeltaFiles(segment, blockName,
-              absTableIdentifier,
+              carbonTable,
               updateStatusDetails,
               timestamp)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
index b583c6a..25a0d8e 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableAddHivePartitionCommand.scala
@@ -122,7 +122,9 @@ case class CarbonAlterTableAddHivePartitionCommand(
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(loadModel, false)
         val newMetaEntry = loadModel.getCurrentLoadMetadataDetail
         val segmentFileName =
-          loadModel.getSegmentId + "_" + loadModel.getFactTimeStamp + CarbonTablePath.SEGMENT_EXT
+          SegmentFileStore.genSegmentFileName(
+            loadModel.getSegmentId, String.valueOf(loadModel.getFactTimeStamp)) +
+          CarbonTablePath.SEGMENT_EXT
         newMetaEntry.setSegmentFile(segmentFileName)
         val segmentsLoc = CarbonTablePath.getSegmentFilesLocation(table.getTablePath)
         CarbonUtil.checkAndCreateFolderWithPermission(segmentsLoc)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
index 9c2835e..756bc97 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableDropPartitionCommand.scala
@@ -210,9 +210,7 @@ case class CarbonAlterTableDropPartitionCommand(
       for (thread <- threadArray) {
         thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
       refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
index 1bdf414..929de0a 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/partition/CarbonAlterTableSplitPartitionCommand.scala
@@ -231,9 +231,7 @@ case class CarbonAlterTableSplitPartitionCommand(
       threadArray.foreach {
         thread => thread.join()
       }
-      val identifier = AbsoluteTableIdentifier.from(carbonLoadModel.getTablePath,
-        carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName)
-      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(identifier)
+      val refresher = DataMapStoreManager.getInstance().getTableSegmentRefresher(carbonTable)
       refresher.refreshSegments(validSegments.map(_.getSegmentNo).asJava)
     } catch {
       case e: Exception =>

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
index 47da9a5..7123b93 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/CarbonLateDecodeStrategy.scala
@@ -96,7 +96,7 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy {
   private def driverSideCountStar(logicalRelation: LogicalRelation): Boolean = {
     val relation = logicalRelation.relation.asInstanceOf[CarbonDatasourceHadoopRelation]
     val segmentUpdateStatusManager = new SegmentUpdateStatusManager(
-      relation.carbonRelation.metaData.carbonTable.getAbsoluteTableIdentifier)
+      relation.carbonRelation.metaData.carbonTable)
     val updateDeltaMetadata = segmentUpdateStatusManager.readLoadMetadata()
     if (updateDeltaMetadata != null && updateDeltaMetadata.nonEmpty) {
       false

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
index ea5eb42..5bc85f8 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java
@@ -141,7 +141,7 @@ public final class CarbonDataMergerUtil {
    */
   public static boolean updateLoadMetadataIUDUpdateDeltaMergeStatus(
       List<LoadMetadataDetails> loadsToMerge, String metaDataFilepath,
-      CarbonLoadModel carbonLoadModel) {
+      CarbonLoadModel carbonLoadModel, List<Segment> segmentFilesToBeUpdated) {
 
     boolean status = false;
     boolean updateLockStatus = false;
@@ -171,7 +171,7 @@ public final class CarbonDataMergerUtil {
         carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier();
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(identifier);
+        new SegmentUpdateStatusManager(carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable());
 
     SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier);
 
@@ -230,6 +230,13 @@ public final class CarbonDataMergerUtil {
                 loadDetail
                     .setUpdateStatusFileName(CarbonUpdateUtil.getUpdateStatusFileName(timestamp));
               }
+              // Update segement file name to status file
+              int segmentFileIndex =
+                  segmentFilesToBeUpdated.indexOf(Segment.toSegment(loadDetail.getLoadName()));
+              if (segmentFileIndex > -1) {
+                loadDetail.setSegmentFile(
+                    segmentFilesToBeUpdated.get(segmentFileIndex).getSegmentFileName());
+              }
             }
           }
 
@@ -1135,18 +1142,17 @@ public final class CarbonDataMergerUtil {
    *
    * @param seg
    * @param blockName
-   * @param absoluteTableIdentifier
    * @param segmentUpdateDetails
    * @param timestamp
    * @return
    * @throws IOException
    */
   public static List<CarbonDataMergerUtilResult> compactBlockDeleteDeltaFiles(String
seg,
-      String blockName, AbsoluteTableIdentifier absoluteTableIdentifier,
+      String blockName, CarbonTable table,
       SegmentUpdateDetails[] segmentUpdateDetails, Long timestamp) throws IOException {
 
     SegmentUpdateStatusManager segmentUpdateStatusManager =
-        new SegmentUpdateStatusManager(absoluteTableIdentifier);
+        new SegmentUpdateStatusManager(table);
 
     List<CarbonDataMergerUtilResult> resultList = new ArrayList<CarbonDataMergerUtilResult>(1);
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/7e0803fe/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
index 65827b0..aabe91a 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java
@@ -47,7 +47,6 @@ import org.apache.carbondata.core.locks.CarbonLockUtil;
 import org.apache.carbondata.core.locks.ICarbonLock;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.ColumnIdentifier;
-import org.apache.carbondata.core.metadata.SegmentFileStore;
 import org.apache.carbondata.core.metadata.datatype.DataType;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil;
@@ -334,60 +333,6 @@ public final class CarbonLoaderUtil {
     return status;
   }
 
-  /**
-   * This API will update the segmentFile of a passed segment.
-   *
-   * @return boolean which determines whether status update is done or not.
-   * @throws IOException
-   */
-  private static boolean updateSegmentFile(String tablePath, String segmentId, String segmentFile)
-      throws IOException {
-    boolean status = false;
-    String tableStatusPath = CarbonTablePath.getTableStatusFilePath(tablePath);
-    String metadataPath = CarbonTablePath.getMetadataPath(tablePath);
-    AbsoluteTableIdentifier absoluteTableIdentifier =
-        AbsoluteTableIdentifier.from(tablePath, null, null);
-    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
-    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
-    int retryCount = CarbonLockUtil
-        .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK_DEFAULT);
-    int maxTimeout = CarbonLockUtil
-        .getLockProperty(CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK,
-            CarbonCommonConstants.MAX_TIMEOUT_FOR_CONCURRENT_LOCK_DEFAULT);
-    try {
-      if (carbonLock.lockWithRetries(retryCount, maxTimeout)) {
-        LOGGER.info("Acquired lock for tablepath" + tablePath + " for table status updation");
-        LoadMetadataDetails[] listOfLoadFolderDetailsArray =
-            SegmentStatusManager.readLoadMetadata(metadataPath);
-
-        for (LoadMetadataDetails detail : listOfLoadFolderDetailsArray) {
-          // if the segments is in the list of marked for delete then update the status.
-          if (segmentId.equals(detail.getLoadName())) {
-            detail.setSegmentFile(segmentFile);
-            break;
-          }
-        }
-
-        SegmentStatusManager
-            .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray);
-        status = true;
-      } else {
-        LOGGER.error(
-            "Not able to acquire the lock for Table status updation for table path " + tablePath);
-      }
-      ;
-    } finally {
-      if (carbonLock.unlock()) {
-        LOGGER.info("Table unlocked successfully after table status updation" + tablePath);
-      } else {
-        LOGGER.error(
-            "Unable to unlock Table lock for table" + tablePath + " during table status updation");
-      }
-    }
-    return status;
-  }
-
   private static void addToStaleFolders(AbsoluteTableIdentifier identifier,
       List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException
{
     String path = CarbonTablePath.getSegmentPath(
@@ -1102,26 +1047,15 @@ public final class CarbonLoaderUtil {
   /**
    * Merge index files with in the segment of partitioned table
    * @param segmentId
-   * @param tablePath
+   * @param table
    * @return
    * @throws IOException
    */
-  public static String mergeIndexFilesinPartitionedSegment(String segmentId, String tablePath)
+  public static String mergeIndexFilesinPartitionedSegment(String segmentId, CarbonTable
table)
       throws IOException {
-    CarbonIndexFileMergeWriter.SegmentIndexFIleMergeStatus segmentIndexFIleMergeStatus =
-        new CarbonIndexFileMergeWriter().mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
-    String uniqueId = "";
-    if (segmentIndexFIleMergeStatus != null) {
-      uniqueId = System.currentTimeMillis() + "";
-      String newSegmentFileName = segmentId + "_" + uniqueId + CarbonTablePath.SEGMENT_EXT;
-      String path =
-          CarbonTablePath.getSegmentFilesLocation(tablePath) + CarbonCommonConstants.FILE_SEPARATOR
-              + newSegmentFileName;
-      SegmentFileStore.writeSegmentFile(segmentIndexFIleMergeStatus.getSegmentFile(), path);
-      updateSegmentFile(tablePath, segmentId, newSegmentFileName);
-      deleteFiles(segmentIndexFIleMergeStatus.getFilesTobeDeleted());
-    }
-    return uniqueId;
+    String tablePath = table.getTablePath();
+    return new CarbonIndexFileMergeWriter(table)
+        .mergeCarbonIndexFilesOfSegment(segmentId, tablePath);
   }
 
   private static void deleteFiles(List<String> filesToBeDeleted) throws IOException
{


Mime
View raw message