Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 736B6200C5A for ; Tue, 18 Apr 2017 14:56:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 71EF3160BA1; Tue, 18 Apr 2017 12:56:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BF99D160B90 for ; Tue, 18 Apr 2017 14:56:36 +0200 (CEST) Received: (qmail 88682 invoked by uid 500); 18 Apr 2017 12:56:36 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 88673 invoked by uid 99); 18 Apr 2017 12:56:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 12:56:35 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 81F28C3453 for ; Tue, 18 Apr 2017 12:56:35 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id r627AcK24ncx for ; Tue, 18 Apr 2017 12:56:32 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id CE0DE5F5FD for ; Tue, 18 Apr 2017 12:56:30 +0000 (UTC) Received: (qmail 88598 invoked by uid 99); 18 Apr 2017 12:56:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 18 Apr 2017 12:56:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E6A95DFDAC; Tue, 18 Apr 2017 12:56:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jackylk@apache.org To: commits@carbondata.incubator.apache.org Date: Tue, 18 Apr 2017 12:56:29 -0000 Message-Id: <9c2b4e0e0676443cbae1ba7ad74f07dc@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-carbondata git commit: add SORT_COLUMNS option in dataframe archived-at: Tue, 18 Apr 2017 12:56:38 -0000 Repository: incubator-carbondata Updated Branches: refs/heads/12-dev 8843aecfe -> ebf70bca8 add SORT_COLUMNS option in dataframe Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/be5904f8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/be5904f8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/be5904f8 Branch: refs/heads/12-dev Commit: be5904f8ee9bab3c226cedba89d309c23f0aefae Parents: 8843aec Author: jackylk Authored: Tue Apr 18 07:26:43 2017 +0800 Committer: jackylk Committed: Tue Apr 18 07:26:43 2017 +0800 ---------------------------------------------------------------------- .../core/datastore/SegmentTaskIndexStore.java | 17 --- .../core/mutate/CarbonUpdateUtil.java | 122 +++++++++---------- .../statusmanager/SegmentStatusManager.java | 17 +-- .../carbondata/examples/CompareTest.scala | 2 + .../carbondata/hadoop/CarbonInputSplit.java | 10 +- .../testsuite/dataload/TestLoadDataFrame.scala | 28 +++++ .../apache/carbondata/spark/CarbonOption.scala | 2 + .../spark/sql/CarbonDataFrameWriter.scala | 12 +- .../processing/merger/CarbonDataMergerUtil.java | 82 ++++++------- 9 files changed, 148 insertions(+), 144 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java index 862455e..334efb4 100644 --- a/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java +++ b/core/src/main/java/org/apache/carbondata/core/datastore/SegmentTaskIndexStore.java @@ -40,7 +40,6 @@ import org.apache.carbondata.core.metadata.blocklet.DataFileFooter; import org.apache.carbondata.core.mutate.UpdateVO; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.util.path.CarbonTablePath.DataFileUtil; /** @@ -140,22 +139,6 @@ public class SegmentTaskIndexStore } /** - * returns block timestamp value from the given task - * @param taskKey - * @param listOfUpdatedFactFiles - * @return - */ - private String getTimeStampValueFromBlock(String taskKey, List listOfUpdatedFactFiles) { - for (String blockName : listOfUpdatedFactFiles) { - if (taskKey.equals(CarbonTablePath.DataFileUtil.getTaskNo(blockName))) { - blockName = blockName.substring(blockName.lastIndexOf('-') + 1, blockName.lastIndexOf('.')); - return blockName; - } - } - return null; - } - - /** * Below method will be used to load the segment of segments * One segment may have multiple task , so table segment will be loaded * based on task id and will return the map of taksId to table segment http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index af26035..72c750f 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -54,7 +54,7 @@ import org.apache.carbondata.core.util.path.CarbonTablePath; public class CarbonUpdateUtil { private static final LogService LOGGER = - LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName()); + LogServiceFactory.getLogService(CarbonUpdateUtil.class.getName()); /** * returns required filed from tuple id @@ -86,11 +86,11 @@ public class CarbonUpdateUtil { */ public static String getTableBlockPath(String tid, String factPath) { String part = - CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID)); + CarbonTablePath.addPartPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.PART_ID)); String segment = - CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID)); + CarbonTablePath.addSegmentPrefix(getRequiredFieldFromTID(tid, TupleIdEnum.SEGMENT_ID)); return factPath + CarbonCommonConstants.FILE_SEPARATOR + part - + CarbonCommonConstants.FILE_SEPARATOR + segment; + + CarbonCommonConstants.FILE_SEPARATOR + segment; } @@ -103,7 +103,7 @@ public class CarbonUpdateUtil { * @return */ public static String getDeleteDeltaFilePath(String blockPath, String blockName, - String timestamp) { + String timestamp) { return blockPath + CarbonCommonConstants.FILE_SEPARATOR + blockName + CarbonCommonConstants.HYPHEN + timestamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT; @@ -119,7 +119,7 @@ public class CarbonUpdateUtil { CarbonTable table, String updateStatusFileIdentifier, boolean isCompaction) { boolean status = false; SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); + new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); boolean lockStatus = false; @@ -130,12 +130,12 @@ public class CarbonUpdateUtil { AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); // read the existing file if present and update the same. SegmentUpdateDetails[] oldDetails = segmentUpdateStatusManager - .getUpdateStatusDetails(); + .getUpdateStatusDetails(); List oldList = new ArrayList(Arrays.asList(oldDetails)); @@ -187,9 +187,9 @@ public class CarbonUpdateUtil { * @return */ public static boolean updateTableMetadataStatus(Set updatedSegmentsList, - CarbonTable table, String updatedTimeStamp, - boolean isTimestampUpdationRequired, - List segmentsToBeDeleted) { + CarbonTable table, String updatedTimeStamp, + boolean isTimestampUpdationRequired, + List segmentsToBeDeleted) { boolean status = false; @@ -198,8 +198,8 @@ public class CarbonUpdateUtil { AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); String tableStatusPath = carbonTablePath.getTableStatusFilePath(); @@ -211,11 +211,11 @@ public class CarbonUpdateUtil { lockStatus = carbonLock.lockWithRetries(); if (lockStatus) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() - + " for table status updation"); + "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + segmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { @@ -223,7 +223,7 @@ public class CarbonUpdateUtil { // we are storing the link between the 2 status files in the segment 0 only. if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { loadMetadata.setUpdateStatusFileName( - CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp)); + CarbonUpdateUtil.getUpdateStatusFileName(updatedTimeStamp)); } // if the segments is in the list of marked for delete then update the status. @@ -252,7 +252,7 @@ public class CarbonUpdateUtil { try { segmentStatusManager - .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); } catch (IOException e) { return false; } @@ -260,18 +260,18 @@ public class CarbonUpdateUtil { status = true; } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getFactTableName()); } } finally { if (lockStatus) { if (carbonLock.unlock()) { LOGGER.info( - "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); + "Table unlocked successfully after table status updation" + table.getDatabaseName() + + "." + table.getFactTableName()); } else { LOGGER.error( - "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); + "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table + .getFactTableName() + " during table status updation"); } } } @@ -287,7 +287,7 @@ public class CarbonUpdateUtil { */ public static String getUpdateStatusFileName(String updatedTimeStamp) { return CarbonCommonConstants.TABLEUPDATESTATUS_FILENAME + CarbonCommonConstants.HYPHEN - + updatedTimeStamp; + + updatedTimeStamp; } /** @@ -301,13 +301,13 @@ public class CarbonUpdateUtil { AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); // as of now considering only partition 0. String partitionId = "0"; String partitionDir = carbonTablePath.getPartitionDir(partitionId); CarbonFile file = - FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir)); + FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir)); if (!file.exists()) { return; } @@ -317,8 +317,8 @@ public class CarbonUpdateUtil { @Override public boolean accept(CarbonFile file) { String fileName = file.getName(); return (fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT) - || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) - || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT)); + || fileName.endsWith(timeStamp + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT) + || fileName.endsWith(timeStamp + CarbonCommonConstants.DELETE_DELTA_FILE_EXT)); } }); // deleting the files of a segment. @@ -390,7 +390,7 @@ public class CarbonUpdateUtil { // scan all the carbondata files and get the latest task ID. CarbonFile segment = - FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath)); + FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath)); CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { @@ -420,7 +420,7 @@ public class CarbonUpdateUtil { // scan all the carbondata files and get the latest task ID. CarbonFile segment = - FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath)); + FileFactory.getCarbonFile(segmentDirPath, FileFactory.getFileType(segmentDirPath)); CarbonFile[] dataFiles = segment.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { @@ -495,8 +495,8 @@ public class CarbonUpdateUtil { SegmentStatusManager ssm = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(), - table.getAbsoluteTableIdentifier().getCarbonTableIdentifier()); + .getCarbonTablePath(table.getAbsoluteTableIdentifier().getStorePath(), + table.getAbsoluteTableIdentifier().getCarbonTableIdentifier()); LoadMetadataDetails[] details = ssm.readLoadMetadata(table.getMetaDataFilepath()); @@ -513,23 +513,23 @@ public class CarbonUpdateUtil { // if the segment is mark for delete or compacted then any way it will get deleted. if (segment.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - || segment.getLoadStatus() - .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) { + || segment.getLoadStatus() + .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)) { // take the list of files from this segment. String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); CarbonFile segDir = - FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); + FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); // scan through the segment and find the carbondatafiles and index files. SegmentUpdateStatusManager updateStatusManager = - new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); + new SegmentUpdateStatusManager(table.getAbsoluteTableIdentifier()); // get Invalid update delta files. CarbonFile[] invalidUpdateDeltaFiles = updateStatusManager - .getUpdateDeltaFilesList(segment.getLoadName(), false, - CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles); + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_DELTA_FILE_EXT, true, allSegmentFiles); // now for each invalid delta file need to check the query execution time out // and then delete. @@ -541,8 +541,8 @@ public class CarbonUpdateUtil { // do the same for the index files. CarbonFile[] invalidIndexFiles = updateStatusManager - .getUpdateDeltaFilesList(segment.getLoadName(), false, - CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles); + .getUpdateDeltaFilesList(segment.getLoadName(), false, + CarbonCommonConstants.UPDATE_INDEX_FILE_EXT, true, allSegmentFiles); // now for each invalid index file need to check the query execution time out // and then delete. @@ -571,16 +571,16 @@ public class CarbonUpdateUtil { // case 1 if (CarbonUpdateUtil.isBlockInvalid(block.getStatus())) { completeListOfDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true, - allSegmentFiles); + .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, true, + allSegmentFiles); for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) { compareTimestampsAndDelete(invalidFile, forceDelete, false); } CarbonFile[] blockRelatedFiles = updateStatusManager - .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles, - block.getActualBlockName()); + .getAllBlockRelatedFiles(block.getBlockName(), allSegmentFiles, + block.getActualBlockName()); // now for each invalid index file need to check the query execution time out // and then delete. @@ -593,8 +593,8 @@ public class CarbonUpdateUtil { } else { invalidDeleteDeltaFiles = updateStatusManager - .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false, - allSegmentFiles); + .getDeleteDeltaInvalidFilesList(segment.getLoadName(), block, false, + allSegmentFiles); for (CarbonFile invalidFile : invalidDeleteDeltaFiles) { compareTimestampsAndDelete(invalidFile, forceDelete, false); @@ -608,10 +608,10 @@ public class CarbonUpdateUtil { if (null != validUpdateStatusFile && !validUpdateStatusFile.isEmpty()) { final String updateStatusTimestamp = validUpdateStatusFile - .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1); + .substring(validUpdateStatusFile.lastIndexOf(CarbonCommonConstants.HYPHEN) + 1); CarbonFile metaFolder = FileFactory.getCarbonFile(carbonTablePath.getMetadataDirectoryPath(), - FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath())); + FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath())); CarbonFile[] invalidUpdateStatusFiles = metaFolder.listFiles(new CarbonFileFilter() { @Override public boolean accept(CarbonFile file) { @@ -645,7 +645,7 @@ public class CarbonUpdateUtil { int maxTime; try { maxTime = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME)); + .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME)); } catch (NumberFormatException e) { maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME; } @@ -665,15 +665,15 @@ public class CarbonUpdateUtil { * @param isUpdateStatusFile if true then the parsing of file name logic changes. */ private static void compareTimestampsAndDelete(CarbonFile invalidFile, - boolean forceDelete, boolean isUpdateStatusFile) { + boolean forceDelete, boolean isUpdateStatusFile) { long fileTimestamp = 0L; if (isUpdateStatusFile) { fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong(invalidFile.getName() - .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1)); + .substring(invalidFile.getName().lastIndexOf(CarbonCommonConstants.HYPHEN) + 1)); } else { fileTimestamp = CarbonUpdateUtil.getTimeStampAsLong( - CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName())); + CarbonTablePath.DataFileUtil.getTimeStampFromFileName(invalidFile.getName())); } // if the timestamp of the file is more than the current time by query execution timeout. @@ -698,7 +698,7 @@ public class CarbonUpdateUtil { */ public static boolean isBlockInvalid(String blockStatus) { if (blockStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED) || blockStatus - .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) { + .equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) { return true; } return false; @@ -718,7 +718,7 @@ public class CarbonUpdateUtil { * @param segmentBlockCount */ public static void decrementDeletedBlockCount(SegmentUpdateDetails details, - Map segmentBlockCount) { + Map segmentBlockCount) { String segId = details.getSegmentName(); @@ -751,12 +751,12 @@ public class CarbonUpdateUtil { * @param segmentUpdateStatusManager */ public static void createBlockDetailsMap(BlockMappingVO blockMappingVO, - SegmentUpdateStatusManager segmentUpdateStatusManager) { + SegmentUpdateStatusManager segmentUpdateStatusManager) { Map blockRowCountMap = blockMappingVO.getBlockRowCountMapping(); Map outputMap = - new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); for (Map.Entry blockRowEntry : blockRowCountMap.entrySet()) { String key = blockRowEntry.getKey(); @@ -771,7 +771,7 @@ public class CarbonUpdateUtil { } RowCountDetailsVO rowCountDetailsVO = - new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount); + new RowCountDetailsVO(blockRowEntry.getValue(), alreadyDeletedCount); outputMap.put(key, rowCountDetailsVO); } @@ -789,8 +789,8 @@ public class CarbonUpdateUtil { public static String getSegmentBlockNameKey(String segID, String blockName) { String blockNameWithOutPart = blockName - .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1, - blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension())); + .substring(blockName.indexOf(CarbonCommonConstants.HYPHEN) + 1, + blockName.lastIndexOf(CarbonTablePath.getCarbonDataExtension())); return segID + CarbonCommonConstants.FILE_SEPARATOR + blockNameWithOutPart; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index c2c41e5..658ff8d 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -562,6 +562,7 @@ public class SegmentStatusManager { } } + /** * This API will return the update status file name. * @param segmentList @@ -582,22 +583,6 @@ public class SegmentStatusManager { return ""; } - /** - * getting the task numbers present in the segment. - * @param segmentId - * @return - */ - public List getUpdatedTasksDetailsForSegment(String segmentId, SegmentUpdateStatusManager - updateStatusManager) { - List taskList = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - List list = updateStatusManager.getUpdateDeltaFiles(segmentId); - for (String eachFileName : list) { - taskList.add(CarbonTablePath.DataFileUtil.getTaskNo(eachFileName)); - } - return taskList; - } - - public static class ValidAndInvalidSegmentsInfo { private final List listOfValidSegments; private final List listOfValidUpdatedSegments; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala index 82bd02a..f577fcf 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CompareTest.scala @@ -266,6 +266,7 @@ object CompareTest { .option("tempCSV", "false") .option("single_pass", "true") .option("dictionary_exclude", "id") // id is high cardinality column + .option("sort_columns", "") .mode(SaveMode.Overwrite) .save() } @@ -278,6 +279,7 @@ object CompareTest { val loadParquetTime = loadParquetTable(spark, df) val loadCarbonV3Time = loadCarbonTable(spark, df, version = "3") println(s"load completed, time: $loadParquetTime, $loadCarbonV3Time") + df.unpersist() spark.read.parquet(parquetTableName).registerTempTable(parquetTableName) } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java index 0dcaba2..e89da75 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java @@ -231,14 +231,14 @@ public class CarbonInputSplit extends FileSplit } // Comparing the time task id of the file to other - // if both the task id of the file is same then we need to compare the - // offset of - // the file + // if both the task id of the file is same then we need to compare the offset of the file String filePath1 = this.getPath().getName(); String filePath2 = other.getPath().getName(); if (CarbonTablePath.isCarbonDataFile(filePath1)) { - int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1)); - int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2)); + int firstTaskId = + Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]); + int otherTaskId = + Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]); if (firstTaskId != otherTaskId) { return firstTaskId - otherTaskId; } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala index b790131..d8c70b5 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestLoadDataFrame.scala @@ -111,6 +111,34 @@ class TestLoadDataFrame extends QueryTest with BeforeAndAfterAll { ) } + test("test load dataframe without sort") { + df.write + .format("carbondata") + .option("tableName", "carbon3") + .option("sort_columns", "") + .mode(SaveMode.Overwrite) + .save() + sql("select count(*) from carbon3 where c3 > 400").show + df.registerTempTable("temp") + sql("select count(*) from temp where c3 > 400").show + //sql("select * from carbon3 where c3 > 500").show + checkAnswer( + sql("select count(*) from carbon3 where c3 > 500"), Row(500) + ) + } + + test("test load dataframe using sort_columns") { + df.write + .format("carbondata") + .option("tableName", "carbon3") + .option("sort_columns", "c2, c3") + .mode(SaveMode.Overwrite) + .save() + checkAnswer( + sql("select count(*) from carbon3 where c3 > 500"), Row(500) + ) + } + test("test decimal values for dataframe load"){ dataFrame.write .format("carbondata") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala index c29c1a2..93c2d18 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/CarbonOption.scala @@ -57,5 +57,7 @@ class CarbonOption(options: Map[String, String]) { def isBucketingEnabled: Boolean = options.contains("bucketcolumns") && options.contains("bucketnumber") + def sortColumns: Option[String] = options.get("sort_columns") + def toMap: Map[String, String] = options } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala index 576da58..b1e9ada 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDataFrameWriter.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql +import scala.collection.mutable + import org.apache.hadoop.fs.Path import org.apache.hadoop.io.compress.GzipCodec import org.apache.spark.sql.execution.command.LoadTable @@ -33,8 +35,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { def saveAsCarbonFile(parameters: Map[String, String] = Map()): Unit = { // create a new table using dataframe's schema and write its content into the table - sqlContext.sparkSession.sql( - makeCreateTableString(dataFrame.schema, new CarbonOption(parameters))) + val sqlString = makeCreateTableString(dataFrame.schema, new CarbonOption(parameters)) + sqlContext.sparkSession.sql(sqlString) writeToCarbonFile(parameters) } @@ -84,7 +86,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { LOGGER.info(s"temporary CSV file size: ${countSize / 1024 / 1024} MB") try { - sqlContext.sql(makeLoadString(tempCSVFolder, options)) + val sqlString = makeLoadString(tempCSVFolder, options) + sqlContext.sql(sqlString) } finally { fs.delete(tempCSVPath, true) } @@ -164,7 +167,8 @@ class CarbonDataFrameWriter(sqlContext: SQLContext, val dataFrame: DataFrame) { } val property = Map( "DICTIONARY_INCLUDE" -> options.dictionaryInclude, - "DICTIONARY_EXCLUDE" -> options.dictionaryExclude + "DICTIONARY_EXCLUDE" -> options.dictionaryExclude, + "SORT_COLUMNS" -> options.sortColumns ).filter(_._2.isDefined).map(p => s"'${p._1}' = '${p._2.get}'").mkString(",") s""" | CREATE TABLE IF NOT EXISTS ${options.dbName}.${options.tableName} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/be5904f8/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 2b81eec..a6a9b88 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -853,12 +853,12 @@ public final class CarbonDataMergerUtil { * @return */ public static List getValidSegmentList(AbsoluteTableIdentifier absoluteTableIdentifier) - throws IOException { + throws IOException { SegmentStatusManager.ValidAndInvalidSegmentsInfo validAndInvalidSegments = null; try { validAndInvalidSegments = - new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); + new SegmentStatusManager(absoluteTableIdentifier).getValidAndInvalidSegments(); } catch (IOException e) { LOGGER.error("Error while getting valid segment list for a table identifier"); throw new IOException(); @@ -913,9 +913,9 @@ public final class CarbonDataMergerUtil { private static boolean isSegmentValid(LoadMetadataDetails seg) { return seg.getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) - || seg.getLoadStatus() - .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg - .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE); + || seg.getLoadStatus() + .equalsIgnoreCase(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) || seg + .getLoadStatus().equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_UPDATE); } /** @@ -1200,16 +1200,16 @@ public final class CarbonDataMergerUtil { CarbonDeleteFilesDataReader dataReader = new CarbonDeleteFilesDataReader(); try { deleteDeltaBlockDetails = - dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName); + dataReader.getCompactedDeleteDeltaFileFromBlock(deleteDeltaFiles, blockName); } catch (Exception e) { String blockFilePath = fullBlockFilePath - .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); + .substring(0, fullBlockFilePath.lastIndexOf(CarbonCommonConstants.FILE_SEPARATOR)); LOGGER.error("Error while getting the delete delta blocks in path " + blockFilePath); throw new IOException(); } CarbonDeleteDeltaWriterImpl carbonDeleteWriter = - new CarbonDeleteDeltaWriterImpl(fullBlockFilePath, - FileFactory.getFileType(fullBlockFilePath)); + new CarbonDeleteDeltaWriterImpl(fullBlockFilePath, + FileFactory.getFileType(fullBlockFilePath)); try { carbonDeleteWriter.write(deleteDeltaBlockDetails); } catch (IOException e) { @@ -1220,11 +1220,11 @@ public final class CarbonDataMergerUtil { } public static Boolean updateStatusFile( - List updateDataMergerDetailsList, CarbonTable table, - String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) { + List updateDataMergerDetailsList, CarbonTable table, + String timestamp, SegmentUpdateStatusManager segmentUpdateStatusManager) { List segmentUpdateDetails = - new ArrayList(updateDataMergerDetailsList.size()); + new ArrayList(updateDataMergerDetailsList.size()); // Check the list output. @@ -1235,10 +1235,10 @@ public final class CarbonDataMergerUtil { tempSegmentUpdateDetails.setBlockName(carbonDataMergerUtilResult.getBlockName()); for (SegmentUpdateDetails origDetails : segmentUpdateStatusManager - .getUpdateStatusDetails()) { + .getUpdateStatusDetails()) { if (origDetails.getBlockName().equalsIgnoreCase(carbonDataMergerUtilResult.getBlockName()) - && origDetails.getSegmentName() - .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) { + && origDetails.getSegmentName() + .equalsIgnoreCase(carbonDataMergerUtilResult.getSegmentName())) { tempSegmentUpdateDetails.setDeletedRowsInBlock(origDetails.getDeletedRowsInBlock()); tempSegmentUpdateDetails.setStatus(origDetails.getStatus()); @@ -1247,9 +1247,9 @@ public final class CarbonDataMergerUtil { } tempSegmentUpdateDetails.setDeleteDeltaStartTimestamp( - carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp()); + carbonDataMergerUtilResult.getDeleteDeltaStartTimestamp()); tempSegmentUpdateDetails - .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp()); + .setDeleteDeltaEndTimestamp(carbonDataMergerUtilResult.getDeleteDeltaEndTimestamp()); segmentUpdateDetails.add(tempSegmentUpdateDetails); } else return false; @@ -1262,8 +1262,8 @@ public final class CarbonDataMergerUtil { AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); String tableStatusPath = carbonTablePath.getTableStatusFilePath(); @@ -1277,38 +1277,38 @@ public final class CarbonDataMergerUtil { lockStatus = carbonLock.lockWithRetries(); if (lockStatus) { LOGGER.info( - "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() - + " for table status updation"); + "Acquired lock for table" + table.getDatabaseName() + "." + table.getFactTableName() + + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + segmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { loadMetadata.setUpdateStatusFileName( - CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); + CarbonUpdateUtil.getUpdateStatusFileName(timestamp)); } } try { segmentStatusManager - .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); + .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); } catch (IOException e) { return false; } } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + table - .getDatabaseName() + "." + table.getFactTableName()); + .getDatabaseName() + "." + table.getFactTableName()); } } finally { if (lockStatus) { if (carbonLock.unlock()) { LOGGER.info( - "Table unlocked successfully after table status updation" + table.getDatabaseName() - + "." + table.getFactTableName()); + "Table unlocked successfully after table status updation" + table.getDatabaseName() + + "." + table.getFactTableName()); } else { LOGGER.error( - "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table - .getFactTableName() + " during table status updation"); + "Unable to unlock Table lock for table" + table.getDatabaseName() + "." + table + .getFactTableName() + " during table status updation"); } } } @@ -1326,7 +1326,7 @@ public final class CarbonDataMergerUtil { String metadataPath = model.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath(); AbsoluteTableIdentifier absoluteTableIdentifier = - model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); + model.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); LoadMetadataDetails[] details = segmentStatusManager.readLoadMetadata(metadataPath); List originalList = Arrays.asList(details); @@ -1340,24 +1340,24 @@ public final class CarbonDataMergerUtil { ICarbonLock carbonTableStatusLock = CarbonLockFactory.getCarbonLockObj( - model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(), - LockUsage.TABLE_STATUS_LOCK); + model.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(), + LockUsage.TABLE_STATUS_LOCK); try { if (carbonTableStatusLock.lockWithRetries()) { LOGGER.info( "Acquired lock for the table " + model.getDatabaseName() + "." + model.getTableName() - + " for table status updation "); + + " for table status updation "); CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); + .getCarbonTablePath(absoluteTableIdentifier.getStorePath(), + absoluteTableIdentifier.getCarbonTableIdentifier()); segmentStatusManager.writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), - originalList.toArray(new LoadMetadataDetails[originalList.size()])); + originalList.toArray(new LoadMetadataDetails[originalList.size()])); } else { LOGGER.error( - "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model - .getTableName() + "for table status updation"); + "Could not able to obtain lock for table" + model.getDatabaseName() + "." + model + .getTableName() + "for table status updation"); throw new Exception("Failed to update the MajorCompactionStatus."); } } catch (IOException e) { @@ -1366,11 +1366,11 @@ public final class CarbonDataMergerUtil { } finally { if (carbonTableStatusLock.unlock()) { LOGGER.info( - "Table unlocked successfully after table status updation" + model.getDatabaseName() - + "." + model.getTableName()); + "Table unlocked successfully after table status updation" + model.getDatabaseName() + + "." + model.getTableName()); } else { LOGGER.error("Unable to unlock Table lock for table" + model.getDatabaseName() + "." + model - .getTableName() + " during table status updation"); + .getTableName() + " during table status updation"); } }