Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 9EECB200BF4 for ; Fri, 6 Jan 2017 14:57:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9D75A160B37; Fri, 6 Jan 2017 13:57:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id AF648160B4C for ; Fri, 6 Jan 2017 14:57:20 +0100 (CET) Received: (qmail 28778 invoked by uid 500); 6 Jan 2017 13:57:20 -0000 Mailing-List: contact commits-help@carbondata.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@carbondata.incubator.apache.org Delivered-To: mailing list commits@carbondata.incubator.apache.org Received: (qmail 28742 invoked by uid 99); 6 Jan 2017 13:57:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jan 2017 13:57:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 239021A05E5 for ; Fri, 6 Jan 2017 13:57:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id rJHZ7tGZrQwL for ; Fri, 6 Jan 2017 13:57:08 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 8725E5F4A7 for ; Fri, 6 Jan 2017 13:57:02 +0000 (UTC) Received: (qmail 28180 invoked by uid 99); 6 Jan 2017 13:57:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 06 Jan 2017 13:57:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 68651DFCF6; Fri, 6 Jan 2017 13:57:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gvramana@apache.org To: commits@carbondata.incubator.apache.org Date: Fri, 06 Jan 2017 13:57:07 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [07/22] incubator-carbondata git commit: IUD update flow support archived-at: Fri, 06 Jan 2017 13:57:22 -0000 http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java new file mode 100644 index 0000000..75c6efd --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.locks; + +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.impl.FileFactory; +import org.apache.carbondata.core.util.CarbonProperties; + +import org.apache.hadoop.conf.Configuration; + +/** + * This class is used to handle the HDFS File locking. + * This is acheived using the concept of acquiring the data out stream using Append option. + */ +public class HdfsFileLock extends AbstractCarbonLock { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(HdfsFileLock.class.getName()); + /** + * location hdfs file location + */ + private String location; + + private DataOutputStream dataOutputStream; + + public static String tmpPath; + + static { + Configuration conf = new Configuration(true); + String hdfsPath = conf.get(CarbonCommonConstants.FS_DEFAULT_FS); + // By default, we put the hdfs lock meta file for one table inside this table's store folder. + // If can not get the STORE_LOCATION, then use hadoop.tmp.dir . + tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION, + System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION)); + if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) { + tmpPath = hdfsPath + tmpPath; + } + } + + /** + * @param lockFileLocation + * @param lockFile + */ + public HdfsFileLock(String lockFileLocation, String lockFile) { + this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation + + CarbonCommonConstants.FILE_SEPARATOR + lockFile; + LOGGER.info("HDFS lock path:"+this.location); + initRetry(); + } + + /** + * @param tableIdentifier + * @param lockFile + */ + public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { + this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier + .getTableName(), lockFile); + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#lock() + */ + @Override public boolean lock() { + try { + if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) { + FileFactory.createNewLockFile(location, FileFactory.getFileType(location)); + } + dataOutputStream = + FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location)); + + return true; + + } catch (IOException e) { + return false; + } + } + + /* (non-Javadoc) + * @see org.apache.carbondata.core.locks.ICarbonLock#unlock() + */ + @Override + public boolean unlock() { + if (null != dataOutputStream) { + try { + dataOutputStream.close(); + } catch (IOException e) { + return false; + } finally { + if (FileFactory.getCarbonFile(location, FileFactory.getFileType(location)).delete()) { + LOGGER.info("Deleted the lock file " + location); + } else { + LOGGER.error("Not able to delete the lock file " + location); + } + } + } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java new file mode 100644 index 0000000..c2b3753 --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java @@ -0,0 +1,164 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.locks; + +import java.io.FileOutputStream; +import java.io.IOException; +import java.nio.channels.FileChannel; +import java.nio.channels.FileLock; +import java.nio.channels.OverlappingFileLockException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.datastorage.store.impl.FileFactory; + +/** + * This class handles the file locking in the local file system. + * This will be handled using the file channel lock API. + */ +public class LocalFileLock extends AbstractCarbonLock { + /** + * location is the location of the lock file. + */ + private String location; + + /** + * fileOutputStream of the local lock file + */ + private FileOutputStream fileOutputStream; + + /** + * channel is the FileChannel of the lock file. + */ + private FileChannel channel; + + /** + * fileLock NIO FileLock Object + */ + private FileLock fileLock; + + /** + * lock file + */ + private String lockFile; + + public static final String tmpPath; + + private String lockFilePath; + + /** + * LOGGER for logging the messages. + */ + private static final LogService LOGGER = + LogServiceFactory.getLogService(LocalFileLock.class.getName()); + + static { + tmpPath = System.getProperty("java.io.tmpdir"); + } + + /** + * @param lockFileLocation + * @param lockFile + */ + public LocalFileLock(String lockFileLocation, String lockFile) { + this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation; + this.lockFile = lockFile; + initRetry(); + } + + /** + * @param tableIdentifier + * @param lockFile + */ + public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) { + this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier + .getTableName(), lockFile); + initRetry(); + } + + /** + * Lock API for locking of the file channel of the lock file. + * + * @return + */ + @Override public boolean lock() { + try { + if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) { + FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath)); + } + lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR + + lockFile; + if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) { + FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location)); + } + + fileOutputStream = new FileOutputStream(lockFilePath); + channel = fileOutputStream.getChannel(); + try { + fileLock = channel.tryLock(); + } catch (OverlappingFileLockException e) { + return false; + } + if (null != fileLock) { + return true; + } else { + return false; + } + } catch (IOException e) { + return false; + } + + } + + /** + * Unlock API for unlocking of the acquired lock. + * + * @return + */ + @Override public boolean unlock() { + boolean status; + try { + if (null != fileLock) { + fileLock.release(); + } + status = true; + } catch (IOException e) { + status = false; + } finally { + if (null != fileOutputStream) { + try { + fileOutputStream.close(); + // deleting the lock file after releasing the lock. + if (FileFactory.getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath)) + .delete()) { + LOGGER.info("Successfully deleted the lock file " + lockFilePath); + } else { + LOGGER.error("Not able to delete the lock file " + lockFilePath); + } + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + } + } + return status; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java b/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java new file mode 100644 index 0000000..b70579e --- /dev/null +++ b/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.locks; + +import java.io.IOException; + +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; + +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.ZooKeeper; + +/** + * This is a singleton class for initialization of zookeeper client. + */ +public class ZookeeperInit { + + private static final LogService LOGGER = + LogServiceFactory.getLogService(ZookeeperInit.class.getName()); + + private static ZookeeperInit zooKeeperInit; + /** + * zk is the zookeeper client instance + */ + private ZooKeeper zk; + + private ZookeeperInit(String zooKeeperUrl) { + + int sessionTimeOut = 100000; + try { + zk = new ZooKeeper(zooKeeperUrl, sessionTimeOut, new DummyWatcher()); + + } catch (IOException e) { + LOGGER.error(e.getMessage()); + } + + } + + public static ZookeeperInit getInstance(String zooKeeperUrl) { + + if (null == zooKeeperInit) { + synchronized (ZookeeperInit.class) { + if (null == zooKeeperInit) { + LOGGER.info("Initiating Zookeeper client."); + zooKeeperInit = new ZookeeperInit(zooKeeperUrl); + } + } + } + return zooKeeperInit; + + } + + public static ZookeeperInit getInstance() { + return zooKeeperInit; + } + + public ZooKeeper getZookeeper() { + return zk; + } + + private static class DummyWatcher implements Watcher { + public void process(WatchedEvent event) { + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java b/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java index 4aed38f..089b614 100644 --- a/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java +++ b/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java @@ -19,20 +19,16 @@ package org.apache.carbondata.core.load; -import java.text.ParseException; -import java.text.SimpleDateFormat; -import java.util.TimeZone; - import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; - import org.junit.Before; import org.junit.Test; -import static junit.framework.Assert.assertEquals; -import static junit.framework.Assert.assertNotSame; -import static junit.framework.Assert.assertNull; +import java.text.ParseException; +import java.text.SimpleDateFormat; + +import static junit.framework.Assert.*; public class LoadMetadataDetailsUnitTest { @@ -103,20 +99,21 @@ public class LoadMetadataDetailsUnitTest { } @Test public void testGetTimeStampWithEmptyTimeStamp() throws Exception { - loadMetadataDetails.setLoadStartTime(""); + loadMetadataDetails.setLoadStartTime(0); Long result = loadMetadataDetails.getLoadStartTimeAsLong(); assertNull(result); } @Test public void testGetTimeStampWithParserException() throws Exception { - loadMetadataDetails.setLoadStartTime("00.00.00"); + loadMetadataDetails.setLoadStartTime(0); Long result = loadMetadataDetails.getLoadStartTimeAsLong(); assertNull(result); } @Test public void testGetTimeStampWithDate() throws Exception { - String date = "01-01-2016 00:00:00"; - loadMetadataDetails.setLoadStartTime(date); + String date = "01-01-2016 00:00:00:000"; + long longVal = loadMetadataDetails.getTimeStamp(date); + loadMetadataDetails.setLoadStartTime(longVal); Long expected_result = getTime(date); Long result = loadMetadataDetails.getLoadStartTimeAsLong(); assertEquals(expected_result, result); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index 7a88b63..b2951b4 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -47,6 +47,7 @@ import org.apache.carbondata.core.cache.CacheType; import org.apache.carbondata.core.cache.dictionary.Dictionary; import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier; import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier; +import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.carbon.ColumnIdentifier; import org.apache.carbondata.core.carbon.datastore.block.Distributable; @@ -61,13 +62,14 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType; import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.core.update.CarbonUpdateUtil; +import org.apache.carbondata.core.updatestatus.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.lcm.fileoperations.FileWriteOperation; -import org.apache.carbondata.lcm.locks.ICarbonLock; -import org.apache.carbondata.lcm.status.SegmentStatusManager; +import org.apache.carbondata.fileoperations.AtomicFileOperations; +import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.fileoperations.FileWriteOperation; +import org.apache.carbondata.locks.ICarbonLock; import org.apache.carbondata.processing.api.dataloader.DataLoadModel; import org.apache.carbondata.processing.api.dataloader.SchemaInfo; import org.apache.carbondata.processing.csvload.DataGraphExecuter; @@ -76,6 +78,7 @@ import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus; import org.apache.carbondata.processing.graphgenerator.GraphGenerator; import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException; import org.apache.carbondata.processing.model.CarbonLoadModel; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.spark.merger.NodeBlockRelation; import org.apache.carbondata.spark.merger.NodeMultiBlockRelation; @@ -83,12 +86,37 @@ import com.google.gson.Gson; import org.apache.spark.SparkConf; import org.apache.spark.util.Utils; +import java.io.*; +import java.net.InetAddress; +import java.net.UnknownHostException; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.*; + + public final class CarbonLoaderUtil { private static final LogService LOGGER = LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName()); + /** + * minimum no of blocklet required for distribution + */ + private static int minBlockLetsReqForDistribution = 0; + + static { + String property = CarbonProperties.getInstance() + .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE); + try { + minBlockLetsReqForDistribution = Integer.parseInt(property); + } catch (NumberFormatException ne) { + LOGGER.info("Invalid configuration. Consisering the defaul"); + minBlockLetsReqForDistribution = + CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE; + } + } private CarbonLoaderUtil() { + } private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info, @@ -99,6 +127,13 @@ public final class CarbonLoaderUtil { || null != dataProcessTaskStatus.getFilesToProcess()); model.setSchemaInfo(info); model.setTableName(dataProcessTaskStatus.getTableName()); + List loadMetadataDetails = loadModel.getLoadMetadataDetails(); + if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) { + model.setLoadNames( + CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails)); + model.setModificationOrDeletionTime(CarbonDataProcessorUtil + .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails)); + } model.setBlocksID(dataProcessTaskStatus.getBlocksID()); model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter()); model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter()); @@ -128,6 +163,7 @@ public final class CarbonLoaderUtil { CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation); CarbonProperties.getInstance() .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath); + // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc); CarbonProperties.getInstance().addProperty("send.signal.load", "false"); String fileNamePrefix = ""; @@ -146,6 +182,7 @@ public final class CarbonLoaderUtil { DataProcessTaskStatus dataProcessTaskStatus = new DataProcessTaskStatus(databaseName, tableName); dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath()); + dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath()); if (loadModel.isDirectLoad()) { dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess()); dataProcessTaskStatus.setDirectLoad(true); @@ -160,6 +197,7 @@ public final class CarbonLoaderUtil { dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey()); dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat()); SchemaInfo info = new SchemaInfo(); + info.setDatabaseName(databaseName); info.setTableName(tableName); info.setAutoAggregateRequest(loadModel.isAggLoadRequest()); @@ -343,7 +381,7 @@ public final class CarbonLoaderUtil { * @throws IOException */ public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails, - CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException { + CarbonLoadModel loadModel, String loadStatus, long startLoadTime) throws IOException { boolean status = false; @@ -358,7 +396,9 @@ public final class CarbonLoaderUtil { absoluteTableIdentifier.getCarbonTableIdentifier()); String tableStatusPath = carbonTablePath.getTableStatusFilePath(); - ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + + ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); try { if (carbonLock.lockWithRetries()) { @@ -369,8 +409,8 @@ public final class CarbonLoaderUtil { LoadMetadataDetails[] listOfLoadFolderDetailsArray = SegmentStatusManager.readLoadMetadata(metaDataFilepath); - String loadEnddate = readCurrentTime(); - loadMetadataDetails.setTimestamp(loadEnddate); + long loadEnddate = CarbonUpdateUtil.readCurrentTime(); + loadMetadataDetails.setLoadEndTime(loadEnddate); loadMetadataDetails.setLoadStatus(loadStatus); loadMetadataDetails.setLoadName(String.valueOf(loadCount)); loadMetadataDetails.setLoadStartTime(startLoadTime); @@ -419,9 +459,11 @@ public final class CarbonLoaderUtil { new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation)); try { + dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE); brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream, Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET))); + String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray()); brWriter.write(metadataInstance); } finally { @@ -431,10 +473,12 @@ public final class CarbonLoaderUtil { } } catch (Exception e) { LOGGER.error("error in flushing "); + } CarbonUtil.closeStreams(brWriter); writeOperation.close(); } + } public static String readCurrentTime() { @@ -502,6 +546,17 @@ public final class CarbonLoaderUtil { } /** + * This method will divide the blocks among the nodes as per the data locality + * + * @param blockInfos + * @return + */ + public static Map> nodeBlockMapping(List blockInfos) { + // -1 if number of nodes has to be decided based on block location information + return nodeBlockMapping(blockInfos, -1); + } + + /** * the method returns the number of required executors * * @param blockInfos http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java index 9f092e7..ce19080 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java @@ -46,7 +46,8 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile; import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastorage.store.impl.FileFactory; import org.apache.carbondata.core.load.LoadMetadataDetails; -import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.update.CarbonUpdateUtil; +import org.apache.carbondata.processing.model.CarbonLoadModel; public final class DeleteLoadFolders { @@ -59,6 +60,12 @@ public final class DeleteLoadFolders { /** * returns segment path + * + * @param loadModel + * @param storeLocation + * @param partitionId + * @param oneLoad + * @return */ private static String getSegmentPath(String dbName, String tableName, String storeLocation, int partitionId, LoadMetadataDetails oneLoad) { @@ -121,38 +128,14 @@ public final class DeleteLoadFolders { private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad, boolean isForceDelete) { if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus()) - || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus())) + || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus())) && oneLoad.getVisibility().equalsIgnoreCase("true")) { if (isForceDelete) { return true; } - String deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); - SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP); - Date deletionDate = null; - String date = null; - Date currentTimeStamp = null; - try { - deletionDate = parser.parse(deletionTime); - date = CarbonLoaderUtil.readCurrentTime(); - currentTimeStamp = parser.parse(date); - } catch (ParseException e) { - return false; - } - - long difference = currentTimeStamp.getTime() - deletionDate.getTime(); + long deletionTime = oneLoad.getModificationOrdeletionTimesStamp(); - long minutesElapsed = (difference / (1000 * 60)); - - int maxTime; - try { - maxTime = Integer.parseInt(CarbonProperties.getInstance() - .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME)); - } catch (NumberFormatException e) { - maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME; - } - if (minutesElapsed > maxTime) { - return true; - } + return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime); } @@ -163,7 +146,9 @@ public final class DeleteLoadFolders { String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) { List deletedLoads = new ArrayList(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); + boolean isDeleted = false; + if (details != null && details.length != 0) { for (LoadMetadataDetails oneLoad : details) { if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java new file mode 100644 index 0000000..ac8e7de --- /dev/null +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.carbondata.spark.load; + +/** + * This Enum is used to determine the Reasons of Failure. + */ +public enum FailureCauses { + NONE, + BAD_RECORDS, + EXECUTOR_FAILURE, + STATUS_FILE_UPDATION_FAILURE, + MULTIPLE_INPUT_ROWS_MATCHING +} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala index 254052b..8247296 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala @@ -26,6 +26,8 @@ package org.apache.carbondata.spark import org.apache.carbondata.core.load.LoadMetadataDetails +import org.apache.carbondata.core.update.SegmentUpdateDetails +import org.apache.spark.sql.execution.command.ExecutionErrors trait Value[V] extends Serializable { def getValue(value: Array[Object]): V @@ -53,6 +55,35 @@ class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] { } } +trait updateResult[K, V] extends Serializable { + def getKey(key: String, + value: (LoadMetadataDetails, ExecutionErrors)): + (K, V) +} + +class updateResultImpl + extends updateResult[String, (LoadMetadataDetails, ExecutionErrors)] { + override def getKey(key: String, + value: (LoadMetadataDetails, ExecutionErrors)): + (String, + (LoadMetadataDetails, ExecutionErrors)) = { + (key, value) + } +} + +trait DeleteDelataResult[K, V] extends Serializable { + def getKey(key: String, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V) +} + +class DeleteDelataResultImpl + extends DeleteDelataResult[String, (SegmentUpdateDetails, ExecutionErrors)] { + override def getKey(key: String, + value: (SegmentUpdateDetails, ExecutionErrors)): (String, (SegmentUpdateDetails, + ExecutionErrors)) = { + (key, value) + } +} + trait PartitionResult[K, V] extends Serializable { def getKey(key: Int, value: Boolean): (K, V) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala index 53ebd41..1950fb7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala @@ -22,6 +22,8 @@ import java.nio.charset.Charset import java.text.SimpleDateFormat import java.util.regex.Pattern +import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory} + import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import scala.util.control.Breaks.{break, breakable} @@ -42,7 +44,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastorage.store.impl.FileFactory import org.apache.carbondata.core.service.PathService import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil} -import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.processing.model.CarbonLoadModel import org.apache.carbondata.spark.load.CarbonLoaderUtil import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask} http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 8285bf8..9ea9f13 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -19,6 +19,8 @@ package org.apache.carbondata.spark.util import java.util +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import scala.collection.mutable.Map http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala index 71b4247..bccdeb5 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala @@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import org.apache.spark.sql.Row @@ -30,7 +32,6 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for data compaction scenario. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala index 97a5bdd..fad2ba2 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala @@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import org.apache.spark.sql.common.util.CarbonHiveContext._ @@ -29,7 +31,6 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for data compaction Boundary condition verification. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala index e1e1412..58d89a9 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala @@ -20,13 +20,13 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager import org.apache.spark.sql.Row import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager import org.scalatest.BeforeAndAfterAll import scala.collection.JavaConverters._ @@ -80,13 +80,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter var noOfRetries = 0 while (status && noOfRetries < 10) { - val identifier = new AbsoluteTableIdentifier( + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new + AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), - new CarbonTableIdentifier( - CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1") + new CarbonTableIdentifier("default", "cardinalityTest", "1") ) - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + ) + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala index 0b228c5..e138f62 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala @@ -20,6 +20,9 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager +import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory, ICarbonLock} + import scala.collection.JavaConverters._ import org.apache.spark.sql.common.util.CarbonHiveContext._ @@ -30,8 +33,6 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage} -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for data compaction Locking scenario. @@ -104,13 +105,18 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll { * Compaction should fail as lock is being held purposefully */ test("check if compaction is failed or not.") { - val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier) - .getValidSegments.asScala.toList - if (!segments.contains("0.1")) { - assert(true) - } else { - assert(false) - } + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager( + absoluteTableIdentifier + ) + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList + + if (!segments.contains("0.1")) { + assert(true) + } + else { + assert(false) + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala index 15ed78b..257f382 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala @@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import org.apache.spark.sql.common.util.CarbonHiveContext._ @@ -30,8 +32,6 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath} import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage} -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for data compaction Minor threshold verification. @@ -95,8 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll sql("clean files for table minorthreshold") - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.2")) assert(!segments.contains("0.1")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala index 00d295a..42f9cfb 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala @@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import org.apache.spark.sql.Row @@ -28,7 +30,6 @@ import org.apache.spark.sql.common.util.QueryTest import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager import org.scalatest.BeforeAndAfterAll /** @@ -41,7 +42,9 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll { val identifier = new AbsoluteTableIdentifier( CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId)) - SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList } val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala index 3c18bb7..3d2e4fe 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala @@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager + import scala.collection.JavaConverters._ import org.apache.spark.sql.Row @@ -30,7 +32,6 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for data compaction scenario. @@ -82,8 +83,10 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1") ) - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList if (!segments.contains("0.1")) { // wait for 2 seconds for compaction to complete. @@ -129,9 +132,11 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll { new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid") ) + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + // merged segment should not be there - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(!segments.contains("0")) assert(!segments.contains("1")) assert(!segments.contains("2")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala index 4e3d2f3..7604fad 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala @@ -22,6 +22,9 @@ import java.io.File import scala.collection.JavaConverters._ +import org.apache.carbondata.core.carbon.path.CarbonStorePath +import org.apache.carbondata.core.updatestatus.SegmentStatusManager +import org.apache.spark.sql.Row import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -33,7 +36,6 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.hadoop.CacheClient -import org.apache.carbondata.lcm.status.SegmentStatusManager /** * FT for compaction scenario where major segment should not be included in minor. @@ -103,8 +105,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "") ) - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -134,9 +137,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr") ) + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + // merged segment should not be there - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(segments.contains("2.1")) assert(!segments.contains("2")) @@ -171,7 +175,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) // status should remain as compacted. - assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED)) + assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED)) } @@ -193,7 +197,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll val segs = SegmentStatusManager.readLoadMetadata(carbontablePath) // status should remain as compacted for segment 2. - assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED)) + assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED)) // for segment 0.1 . should get deleted assert(segs(2).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE)) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala ---------------------------------------------------------------------- diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala index f5ff191..f816c15 100644 --- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala +++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala @@ -20,12 +20,12 @@ package org.apache.carbondata.spark.testsuite.datacompaction import java.io.File +import org.apache.carbondata.core.updatestatus.SegmentStatusManager import org.apache.spark.sql.common.util.CarbonHiveContext._ import org.apache.spark.sql.common.util.QueryTest import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.lcm.status.SegmentStatusManager import org.scalatest.BeforeAndAfterAll import scala.collection.JavaConverters._ @@ -92,8 +92,10 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA new CarbonTableIdentifier( CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "") ) - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList segments.foreach(seg => System.out.println( "valid segment is =" + seg) ) @@ -122,9 +124,11 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION), new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr") ) + + val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier) + // merged segment should not be there - val segments = SegmentStatusManager.getSegmentStatus(identifier) - .getValidSegments.asScala.toList + val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList assert(segments.contains("0.1")) assert(!segments.contains("0.2")) assert(!segments.contains("0")) http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java new file mode 100644 index 0000000..129fbfc --- /dev/null +++ b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.carbondata.processing.exception; + +public class MultipleMatchingException extends Exception { + private static final long serialVersionUID = 1L; + + private long errorCode = -1; + + public MultipleMatchingException() { + super(); + } + + public MultipleMatchingException(long errorCode, String message) { + super(message); + this.errorCode = errorCode; + } + + public MultipleMatchingException(String message) { + super(message); + } + + public MultipleMatchingException(Throwable cause) { + super(cause); + } + + public MultipleMatchingException(String message, Throwable cause) { + super(message, cause); + } + + public long getErrorCode() { + return errorCode; + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java index b7c17dc..af97eb9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java @@ -22,13 +22,15 @@ */ package org.apache.carbondata.processing.model; -import java.io.Serializable; -import java.util.HashMap; -import java.util.List; - import org.apache.carbondata.core.carbon.CarbonDataLoadSchema; import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.load.LoadMetadataDetails; +import org.apache.carbondata.core.update.SegmentUpdateDetails; +import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.List; public class CarbonLoadModel implements Serializable { /** @@ -68,6 +70,8 @@ public class CarbonLoadModel implements Serializable { private boolean isDirectLoad; private List loadMetadataDetails; + private transient List segmentUpdateDetails; + private transient SegmentUpdateStatusManager segmentUpdateStatusManager; private String blocksID; @@ -133,6 +137,16 @@ public class CarbonLoadModel implements Serializable { */ private String rddIteratorKey; + private String carbondataFileName = ""; + + public String getCarbondataFileName() { + return carbondataFileName; + } + + public void setCarbondataFileName(String carbondataFileName) { + this.carbondataFileName = carbondataFileName; + } + /** * Use one pass to generate dictionary */ @@ -495,6 +509,41 @@ public class CarbonLoadModel implements Serializable { } /** + * getSegmentUpdateDetails + * @return + */ + public List getSegmentUpdateDetails() { + return segmentUpdateDetails; + } + + /** + * setSegmentUpdateDetails + * + * @param segmentUpdateDetails + */ + public void setSegmentUpdateDetails(List segmentUpdateDetails) { + this.segmentUpdateDetails = segmentUpdateDetails; + } + + /** + * getSegmentUpdateStatusManager + * @return + */ + public SegmentUpdateStatusManager getSegmentUpdateStatusManager() { + return segmentUpdateStatusManager; + } + + /** + * setSegmentUpdateStatusManager + * + * @param segmentUpdateStatusManager + */ + public void setSegmentUpdateStatusManager(SegmentUpdateStatusManager segmentUpdateStatusManager) { + this.segmentUpdateStatusManager = segmentUpdateStatusManager; + } + + + /** * @return */ public String getTaskNo() { @@ -518,8 +567,8 @@ public class CarbonLoadModel implements Serializable { /** * @param factTimeStamp */ - public void setFactTimeStamp(String factTimeStamp) { - this.factTimeStamp = factTimeStamp; + public void setFactTimeStamp(long factTimeStamp) { + this.factTimeStamp = factTimeStamp + ""; } public String[] getDelimiters() { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java index 63f00e1..1b0b41a 100644 --- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java +++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java @@ -19,6 +19,8 @@ package org.apache.carbondata.lcm.locks; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; +import org.apache.carbondata.locks.LocalFileLock; +import org.apache.carbondata.locks.LockUsage; import org.junit.After; import org.junit.Before; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java index ba46917..a4fe655 100644 --- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java +++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java @@ -18,16 +18,12 @@ */ package org.apache.carbondata.lcm.locks; -import java.io.File; -import java.io.IOException; -import java.net.ServerSocket; -import java.util.Properties; - +import mockit.NonStrictExpectations; import org.apache.carbondata.core.carbon.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonProperties; - -import mockit.NonStrictExpectations; - +import org.apache.carbondata.locks.LockUsage; +import org.apache.carbondata.locks.ZooKeeperLocking; +import org.apache.carbondata.locks.ZookeeperInit; import org.apache.zookeeper.server.ServerConfig; import org.apache.zookeeper.server.ZooKeeperServerMain; import org.apache.zookeeper.server.quorum.QuorumPeerConfig; @@ -36,6 +32,11 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import java.io.File; +import java.io.IOException; +import java.net.ServerSocket; +import java.util.Properties; + /** * @author Administrator */ http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java index 7e9feb6..2712e6e 100644 --- a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java @@ -18,22 +18,7 @@ */ package org.apache.carbondata.test.util; -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.File; -import java.io.FileReader; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.nio.charset.Charset; -import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Set; -import java.util.UUID; - +import com.google.gson.Gson; import org.apache.carbondata.core.cache.Cache; import org.apache.carbondata.core.cache.CacheProvider; import org.apache.carbondata.core.cache.CacheType; @@ -72,9 +57,9 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl; import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo; import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations; -import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl; -import org.apache.carbondata.lcm.fileoperations.FileWriteOperation; +import org.apache.carbondata.fileoperations.AtomicFileOperations; +import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl; +import org.apache.carbondata.fileoperations.FileWriteOperation; import org.apache.carbondata.processing.api.dataloader.DataLoadModel; import org.apache.carbondata.processing.api.dataloader.SchemaInfo; import org.apache.carbondata.processing.csvload.DataGraphExecuter; @@ -82,10 +67,13 @@ import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus; import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus; import org.apache.carbondata.processing.graphgenerator.GraphGenerator; import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException; - -import com.google.gson.Gson; import org.apache.hadoop.fs.Path; +import java.io.*; +import java.nio.charset.Charset; +import java.text.SimpleDateFormat; +import java.util.*; + /** * This class will create store file based on provided schema * @@ -410,10 +398,10 @@ public class StoreCreator { public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName, String tableName, List listOfLoadFolderDetails) throws IOException { LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); - loadMetadataDetails.setTimestamp(readCurrentTime()); + loadMetadataDetails.setLoadEndTime(System.currentTimeMillis()); loadMetadataDetails.setLoadStatus("SUCCESS"); loadMetadataDetails.setLoadName(String.valueOf(0)); - loadMetadataDetails.setLoadStartTime(readCurrentTime()); + loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); listOfLoadFolderDetails.add(loadMetadataDetails); String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator @@ -472,7 +460,7 @@ public class StoreCreator { model.setTableName(dataProcessTaskStatus.getTableName()); model.setTaskNo("1"); model.setBlocksID(dataProcessTaskStatus.getBlocksID()); - model.setFactTimeStamp(readCurrentTime()); + model.setFactTimeStamp(System.currentTimeMillis() + ""); model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter()); model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter()); model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());