From commits-return-10038-archive-asf-public=cust-asf.ponee.io@carbondata.apache.org Tue Mar 20 16:44:37 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 9CBB118064A for ; Tue, 20 Mar 2018 16:44:36 +0100 (CET) Received: (qmail 38664 invoked by uid 500); 20 Mar 2018 15:44:35 -0000 Mailing-List: contact commits-help@carbondata.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.apache.org Delivered-To: mailing list commits@carbondata.apache.org Received: (qmail 38654 invoked by uid 99); 20 Mar 2018 15:44:35 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 20 Mar 2018 15:44:35 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 02672F670A; Tue, 20 Mar 2018 15:44:34 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.apache.org Date: Tue, 20 Mar 2018 15:44:34 -0000 Message-Id: <26d91e946309476ab6b50bf5babd672f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/10] carbondata git commit: [HOTFIX] Fix CI random failure [Forced Update!] Repository: carbondata Updated Branches: refs/heads/carbonfile 99766b8af -> b384b6e1f (forced update) [HOTFIX] Fix CI random failure This closes #2068 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/04ff3676 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/04ff3676 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/04ff3676 Branch: refs/heads/carbonfile Commit: 04ff36764c797264f5396fa3cbf1d6fe883737e0 Parents: a386f1f Author: Jacky Li Authored: Thu Mar 15 19:49:07 2018 +0800 Committer: Jacky Li Committed: Fri Mar 16 21:12:25 2018 +0800 ---------------------------------------------------------------------- .../statusmanager/SegmentStatusManager.java | 66 ++++++++++++-------- .../carbondata/spark/util/CommonUtil.scala | 2 +- .../preaaggregate/PreAggregateTableHelper.scala | 13 +++- 3 files changed, 53 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 820a5a4..f466018 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -30,7 +30,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; -import org.apache.carbondata.common.exceptions.TableStatusLockException; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; @@ -834,10 +833,30 @@ public class SegmentStatusManager { } } - public static void deleteLoadsAndUpdateMetadata( + private static class ReturnTuple { + LoadMetadataDetails[] details; + boolean isUpdateRequired; + ReturnTuple(LoadMetadataDetails[] details, boolean isUpdateRequired) { + this.details = details; + this.isUpdateRequired = isUpdateRequired; + } + } + + private static ReturnTuple isUpdationRequired( + boolean isForceDeletion, CarbonTable carbonTable, - boolean isForceDeletion) throws IOException { - deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null); + AbsoluteTableIdentifier absoluteTableIdentifier) { + LoadMetadataDetails[] details = + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); + // Delete marked loads + boolean isUpdationRequired = + DeleteLoadFolders.deleteLoadFoldersFromFileSystem( + absoluteTableIdentifier, + isForceDeletion, + details, + carbonTable.getMetadataPath() + ); + return new ReturnTuple(details, isUpdationRequired); } public static void deleteLoadsAndUpdateMetadata( @@ -845,31 +864,29 @@ public class SegmentStatusManager { boolean isForceDeletion, List partitionSpecs) throws IOException { if (isLoadDeletionRequired(carbonTable.getMetadataPath())) { - LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier(); - ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( - identifier, LockUsage.TABLE_STATUS_LOCK); - - // Delete marked loads - boolean isUpdationRequired = DeleteLoadFolders.deleteLoadFoldersFromFileSystem( - identifier, isForceDeletion, details, carbonTable.getMetadataPath()); - - boolean updationCompletionStatus = false; - - if (isUpdationRequired) { + ReturnTuple tuple = isUpdationRequired(isForceDeletion, carbonTable, identifier); + if (tuple.isUpdateRequired) { + ICarbonLock carbonTableStatusLock = + CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.TABLE_STATUS_LOCK); + boolean locked = false; try { // Update load metadate file after cleaning deleted nodes - if (carbonTableStatusLock.lockWithRetries()) { + locked = carbonTableStatusLock.lockWithRetries(); + if (locked) { LOG.info("Table status lock has been successfully acquired."); - + // Again read status and check to verify updation required or not. + ReturnTuple tuple2 = isUpdationRequired(isForceDeletion, carbonTable, identifier); + if (!tuple2.isUpdateRequired) { + return; + } // read latest table status again. LoadMetadataDetails[] latestMetadata = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); // update the metadata details from old to new status. List latestStatus = - updateLoadMetadataFromOldToNew(details, latestMetadata); + updateLoadMetadataFromOldToNew(tuple2.details, latestMetadata); writeLoadMetadata(identifier, latestStatus); } else { @@ -881,14 +898,13 @@ public class SegmentStatusManager { "running in the background."; LOG.audit(errorMsg); LOG.error(errorMsg); - throw new TableStatusLockException(errorMsg + " Please try after some time."); + throw new IOException(errorMsg + " Please try after some time."); } - updationCompletionStatus = true; + DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( + identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } finally { - CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); - if (updationCompletionStatus) { - DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); + if (locked) { + CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index d3093fb..77ff139 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -815,7 +815,7 @@ object CommonUtil { try { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) - SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/04ff3676/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala index b64c91e..d89aa5b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/preaaggregate/PreAggregateTableHelper.scala @@ -26,12 +26,13 @@ import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.command.management.CarbonLoadDataCommand import org.apache.spark.sql.execution.command.table.CarbonCreateTableCommand import org.apache.spark.sql.execution.command.timeseries.TimeSeriesUtil +import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.parser.CarbonSpark2SqlParser import org.apache.carbondata.common.exceptions.sql.MalformedDataMapCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.table.CarbonTable -import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} +import org.apache.carbondata.core.statusmanager.SegmentStatusManager /** * Below helper class will be used to create pre-aggregate table @@ -164,7 +165,15 @@ case class PreAggregateTableHelper( // This will be used to check if the parent table has any segments or not. If not then no // need to fire load for pre-aggregate table. Therefore reading the load details for PARENT // table. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(parentTable, false) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + parentTable, + false, + CarbonFilters.getCurrentPartitions( + sparkSession, + TableIdentifier(parentTable.getTableName, + Some(parentTable.getDatabaseName)) + ).map(_.asJava).orNull) + if (SegmentStatusManager.isLoadInProgressInTable(parentTable)) { throw new UnsupportedOperationException( "Cannot create pre-aggregate table when insert is in progress on main table")