carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [07/22] incubator-carbondata git commit: IUD update flow support
Date Fri, 06 Jan 2017 13:57:07 GMT
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java b/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java
new file mode 100644
index 0000000..75c6efd
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/HdfsFileLock.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * This class is used to handle the HDFS File locking.
+ * This is acheived using the concept of acquiring the data out stream using Append option.
+ */
+public class HdfsFileLock extends AbstractCarbonLock {
+
+  private static final LogService LOGGER =
+             LogServiceFactory.getLogService(HdfsFileLock.class.getName());
+  /**
+   * location hdfs file location
+   */
+  private String location;
+
+  private DataOutputStream dataOutputStream;
+
+  public static String tmpPath;
+
+  static {
+    Configuration conf = new Configuration(true);
+    String hdfsPath = conf.get(CarbonCommonConstants.FS_DEFAULT_FS);
+    // By default, we put the hdfs lock meta file for one table inside this table's store folder.
+    // If can not get the STORE_LOCATION, then use hadoop.tmp.dir .
+    tmpPath = CarbonProperties.getInstance().getProperty(CarbonCommonConstants.STORE_LOCATION,
+               System.getProperty(CarbonCommonConstants.HDFS_TEMP_LOCATION));
+    if (!tmpPath.startsWith(CarbonCommonConstants.HDFSURL_PREFIX)) {
+      tmpPath = hdfsPath + tmpPath;
+    }
+  }
+
+  /**
+   * @param lockFileLocation
+   * @param lockFile
+   */
+  public HdfsFileLock(String lockFileLocation, String lockFile) {
+    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation
+        + CarbonCommonConstants.FILE_SEPARATOR + lockFile;
+    LOGGER.info("HDFS lock path:"+this.location);
+    initRetry();
+  }
+
+  /**
+   * @param tableIdentifier
+   * @param lockFile
+   */
+  public HdfsFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
+        .getTableName(), lockFile);
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#lock()
+   */
+  @Override public boolean lock() {
+    try {
+      if (!FileFactory.isFileExist(location, FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(location, FileFactory.getFileType(location));
+      }
+      dataOutputStream =
+          FileFactory.getDataOutputStreamUsingAppend(location, FileFactory.getFileType(location));
+
+      return true;
+
+    } catch (IOException e) {
+      return false;
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see org.apache.carbondata.core.locks.ICarbonLock#unlock()
+   */
+  @Override
+  public boolean unlock() {
+    if (null != dataOutputStream) {
+      try {
+        dataOutputStream.close();
+      } catch (IOException e) {
+        return false;
+      } finally {
+        if (FileFactory.getCarbonFile(location, FileFactory.getFileType(location)).delete()) {
+          LOGGER.info("Deleted the lock file " + location);
+        } else {
+          LOGGER.error("Not able to delete the lock file " + location);
+        }
+      }
+    }
+    return true;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java b/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java
new file mode 100644
index 0000000..c2b3753
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/LocalFileLock.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.nio.channels.FileChannel;
+import java.nio.channels.FileLock;
+import java.nio.channels.OverlappingFileLockException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
+
+/**
+ * This class handles the file locking in the local file system.
+ * This will be handled using the file channel lock API.
+ */
+public class LocalFileLock extends AbstractCarbonLock {
+  /**
+   * location is the location of the lock file.
+   */
+  private String location;
+
+  /**
+   * fileOutputStream of the local lock file
+   */
+  private FileOutputStream fileOutputStream;
+
+  /**
+   * channel is the FileChannel of the lock file.
+   */
+  private FileChannel channel;
+
+  /**
+   * fileLock NIO FileLock Object
+   */
+  private FileLock fileLock;
+
+  /**
+   * lock file
+   */
+  private String lockFile;
+
+  public static final String tmpPath;
+
+  private  String lockFilePath;
+
+  /**
+   * LOGGER for  logging the messages.
+   */
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(LocalFileLock.class.getName());
+
+  static {
+    tmpPath = System.getProperty("java.io.tmpdir");
+  }
+
+  /**
+   * @param lockFileLocation
+   * @param lockFile
+   */
+  public LocalFileLock(String lockFileLocation, String lockFile) {
+    this.location = tmpPath + CarbonCommonConstants.FILE_SEPARATOR + lockFileLocation;
+    this.lockFile = lockFile;
+    initRetry();
+  }
+
+  /**
+   * @param tableIdentifier
+   * @param lockFile
+   */
+  public LocalFileLock(CarbonTableIdentifier tableIdentifier, String lockFile) {
+    this(tableIdentifier.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR + tableIdentifier
+        .getTableName(), lockFile);
+    initRetry();
+  }
+
+  /**
+   * Lock API for locking of the file channel of the lock file.
+   *
+   * @return
+   */
+  @Override public boolean lock() {
+    try {
+      if (!FileFactory.isFileExist(location, FileFactory.getFileType(tmpPath))) {
+        FileFactory.mkdirs(location, FileFactory.getFileType(tmpPath));
+      }
+      lockFilePath = location + CarbonCommonConstants.FILE_SEPARATOR +
+          lockFile;
+      if (!FileFactory.isFileExist(lockFilePath, FileFactory.getFileType(location))) {
+        FileFactory.createNewLockFile(lockFilePath, FileFactory.getFileType(location));
+      }
+
+      fileOutputStream = new FileOutputStream(lockFilePath);
+      channel = fileOutputStream.getChannel();
+      try {
+        fileLock = channel.tryLock();
+      } catch (OverlappingFileLockException e) {
+        return false;
+      }
+      if (null != fileLock) {
+        return true;
+      } else {
+        return false;
+      }
+    } catch (IOException e) {
+      return false;
+    }
+
+  }
+
+  /**
+   * Unlock API for unlocking of the acquired lock.
+   *
+   * @return
+   */
+  @Override public boolean unlock() {
+    boolean status;
+    try {
+      if (null != fileLock) {
+        fileLock.release();
+      }
+      status = true;
+    } catch (IOException e) {
+      status = false;
+    } finally {
+      if (null != fileOutputStream) {
+        try {
+          fileOutputStream.close();
+          // deleting the lock file after releasing the lock.
+          if (FileFactory.getCarbonFile(lockFilePath, FileFactory.getFileType(lockFilePath))
+              .delete()) {
+            LOGGER.info("Successfully deleted the lock file " + lockFilePath);
+          } else {
+            LOGGER.error("Not able to delete the lock file " + lockFilePath);
+          }
+        } catch (IOException e) {
+          LOGGER.error(e.getMessage());
+        }
+      }
+    }
+    return status;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java b/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java
new file mode 100644
index 0000000..b70579e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/locks/ZookeeperInit.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.locks;
+
+import java.io.IOException;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+
+/**
+ * This is a singleton class for initialization of zookeeper client.
+ */
+public class ZookeeperInit {
+
+  private static final LogService LOGGER =
+      LogServiceFactory.getLogService(ZookeeperInit.class.getName());
+
+  private static ZookeeperInit zooKeeperInit;
+  /**
+   * zk is the zookeeper client instance
+   */
+  private ZooKeeper zk;
+
+  private ZookeeperInit(String zooKeeperUrl) {
+
+    int sessionTimeOut = 100000;
+    try {
+      zk = new ZooKeeper(zooKeeperUrl, sessionTimeOut, new DummyWatcher());
+
+    } catch (IOException e) {
+      LOGGER.error(e.getMessage());
+    }
+
+  }
+
+  public static ZookeeperInit getInstance(String zooKeeperUrl) {
+
+    if (null == zooKeeperInit) {
+      synchronized (ZookeeperInit.class) {
+        if (null == zooKeeperInit) {
+          LOGGER.info("Initiating Zookeeper client.");
+          zooKeeperInit = new ZookeeperInit(zooKeeperUrl);
+        }
+      }
+    }
+    return zooKeeperInit;
+
+  }
+
+  public static ZookeeperInit getInstance() {
+    return zooKeeperInit;
+  }
+
+  public ZooKeeper getZookeeper() {
+    return zk;
+  }
+
+  private static class DummyWatcher implements Watcher {
+    public void process(WatchedEvent event) {
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java b/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java
index 4aed38f..089b614 100644
--- a/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java
+++ b/core/src/test/java/org/apache/carbondata/core/load/LoadMetadataDetailsUnitTest.java
@@ -19,20 +19,16 @@
 
 package org.apache.carbondata.core.load;
 
-import java.text.ParseException;
-import java.text.SimpleDateFormat;
-import java.util.TimeZone;
-
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-
 import org.junit.Before;
 import org.junit.Test;
 
-import static junit.framework.Assert.assertEquals;
-import static junit.framework.Assert.assertNotSame;
-import static junit.framework.Assert.assertNull;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+
+import static junit.framework.Assert.*;
 
 public class LoadMetadataDetailsUnitTest {
 
@@ -103,20 +99,21 @@ public class LoadMetadataDetailsUnitTest {
   }
 
   @Test public void testGetTimeStampWithEmptyTimeStamp() throws Exception {
-    loadMetadataDetails.setLoadStartTime("");
+    loadMetadataDetails.setLoadStartTime(0);
     Long result = loadMetadataDetails.getLoadStartTimeAsLong();
     assertNull(result);
   }
 
   @Test public void testGetTimeStampWithParserException() throws Exception {
-    loadMetadataDetails.setLoadStartTime("00.00.00");
+    loadMetadataDetails.setLoadStartTime(0);
     Long result = loadMetadataDetails.getLoadStartTimeAsLong();
     assertNull(result);
   }
 
   @Test public void testGetTimeStampWithDate() throws Exception {
-    String date = "01-01-2016 00:00:00";
-    loadMetadataDetails.setLoadStartTime(date);
+    String date = "01-01-2016 00:00:00:000";
+    long longVal = loadMetadataDetails.getTimeStamp(date);
+    loadMetadataDetails.setLoadStartTime(longVal);
     Long expected_result = getTime(date);
     Long result = loadMetadataDetails.getLoadStartTimeAsLong();
     assertEquals(expected_result, result);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 7a88b63..b2951b4 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -47,6 +47,7 @@ import org.apache.carbondata.core.cache.CacheType;
 import org.apache.carbondata.core.cache.dictionary.Dictionary;
 import org.apache.carbondata.core.cache.dictionary.DictionaryColumnUniqueIdentifier;
 import org.apache.carbondata.core.carbon.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.carbon.ColumnIdentifier;
 import org.apache.carbondata.core.carbon.datastore.block.Distributable;
@@ -61,13 +62,14 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory.FileType;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonUtil;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
-import org.apache.carbondata.lcm.locks.ICarbonLock;
-import org.apache.carbondata.lcm.status.SegmentStatusManager;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.fileoperations.FileWriteOperation;
+import org.apache.carbondata.locks.ICarbonLock;
 import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
 import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
 import org.apache.carbondata.processing.csvload.DataGraphExecuter;
@@ -76,6 +78,7 @@ import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
 import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
 import org.apache.carbondata.processing.model.CarbonLoadModel;
+import org.apache.carbondata.processing.util.CarbonDataProcessorUtil;
 import org.apache.carbondata.spark.merger.NodeBlockRelation;
 import org.apache.carbondata.spark.merger.NodeMultiBlockRelation;
 
@@ -83,12 +86,37 @@ import com.google.gson.Gson;
 import org.apache.spark.SparkConf;
 import org.apache.spark.util.Utils;
 
+import java.io.*;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
+
 public final class CarbonLoaderUtil {
 
   private static final LogService LOGGER =
       LogServiceFactory.getLogService(CarbonLoaderUtil.class.getName());
+  /**
+   * minimum no of blocklet required for distribution
+   */
+  private static int minBlockLetsReqForDistribution = 0;
+
+  static {
+    String property = CarbonProperties.getInstance()
+        .getProperty(CarbonCommonConstants.CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE);
+    try {
+      minBlockLetsReqForDistribution = Integer.parseInt(property);
+    } catch (NumberFormatException ne) {
+      LOGGER.info("Invalid configuration. Consisering the defaul");
+      minBlockLetsReqForDistribution =
+          CarbonCommonConstants.DEFAULT_CARBON_BLOCKLETDISTRIBUTION_MIN_REQUIRED_SIZE;
+    }
+  }
 
   private CarbonLoaderUtil() {
+
   }
 
   private static void generateGraph(IDataProcessStatus dataProcessTaskStatus, SchemaInfo info,
@@ -99,6 +127,13 @@ public final class CarbonLoaderUtil {
             || null != dataProcessTaskStatus.getFilesToProcess());
     model.setSchemaInfo(info);
     model.setTableName(dataProcessTaskStatus.getTableName());
+    List<LoadMetadataDetails> loadMetadataDetails = loadModel.getLoadMetadataDetails();
+    if (null != loadMetadataDetails && !loadMetadataDetails.isEmpty()) {
+      model.setLoadNames(
+          CarbonDataProcessorUtil.getLoadNameFromLoadMetaDataDetails(loadMetadataDetails));
+      model.setModificationOrDeletionTime(CarbonDataProcessorUtil
+          .getModificationOrDeletionTimesFromLoadMetadataDetails(loadMetadataDetails));
+    }
     model.setBlocksID(dataProcessTaskStatus.getBlocksID());
     model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
     model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
@@ -128,6 +163,7 @@ public final class CarbonLoaderUtil {
     CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation);
     CarbonProperties.getInstance()
         .addProperty(CarbonCommonConstants.STORE_LOCATION_HDFS, storePath);
+    // CarbonProperties.getInstance().addProperty("store_output_location", outPutLoc);
     CarbonProperties.getInstance().addProperty("send.signal.load", "false");
 
     String fileNamePrefix = "";
@@ -146,6 +182,7 @@ public final class CarbonLoaderUtil {
     DataProcessTaskStatus dataProcessTaskStatus
             = new DataProcessTaskStatus(databaseName, tableName);
     dataProcessTaskStatus.setCsvFilePath(loadModel.getFactFilePath());
+    dataProcessTaskStatus.setDimCSVDirLoc(loadModel.getDimFolderPath());
     if (loadModel.isDirectLoad()) {
       dataProcessTaskStatus.setFilesToProcess(loadModel.getFactFilesToProcess());
       dataProcessTaskStatus.setDirectLoad(true);
@@ -160,6 +197,7 @@ public final class CarbonLoaderUtil {
     dataProcessTaskStatus.setRddIteratorKey(loadModel.getRddIteratorKey());
     dataProcessTaskStatus.setDateFormat(loadModel.getDateFormat());
     SchemaInfo info = new SchemaInfo();
+
     info.setDatabaseName(databaseName);
     info.setTableName(tableName);
     info.setAutoAggregateRequest(loadModel.isAggLoadRequest());
@@ -343,7 +381,7 @@ public final class CarbonLoaderUtil {
    * @throws IOException
    */
   public static boolean recordLoadMetadata(int loadCount, LoadMetadataDetails loadMetadataDetails,
-      CarbonLoadModel loadModel, String loadStatus, String startLoadTime) throws IOException {
+      CarbonLoadModel loadModel, String loadStatus, long startLoadTime) throws IOException {
 
     boolean status = false;
 
@@ -358,7 +396,9 @@ public final class CarbonLoaderUtil {
             absoluteTableIdentifier.getCarbonTableIdentifier());
 
     String tableStatusPath = carbonTablePath.getTableStatusFilePath();
-    ICarbonLock carbonLock = SegmentStatusManager.getTableStatusLock(absoluteTableIdentifier);
+    SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier);
+
+    ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock();
 
     try {
       if (carbonLock.lockWithRetries()) {
@@ -369,8 +409,8 @@ public final class CarbonLoaderUtil {
         LoadMetadataDetails[] listOfLoadFolderDetailsArray =
             SegmentStatusManager.readLoadMetadata(metaDataFilepath);
 
-        String loadEnddate = readCurrentTime();
-        loadMetadataDetails.setTimestamp(loadEnddate);
+        long loadEnddate = CarbonUpdateUtil.readCurrentTime();
+        loadMetadataDetails.setLoadEndTime(loadEnddate);
         loadMetadataDetails.setLoadStatus(loadStatus);
         loadMetadataDetails.setLoadName(String.valueOf(loadCount));
         loadMetadataDetails.setLoadStartTime(startLoadTime);
@@ -419,9 +459,11 @@ public final class CarbonLoaderUtil {
         new AtomicFileOperationsImpl(dataLoadLocation, FileFactory.getFileType(dataLoadLocation));
 
     try {
+
       dataOutputStream = writeOperation.openForWrite(FileWriteOperation.OVERWRITE);
       brWriter = new BufferedWriter(new OutputStreamWriter(dataOutputStream,
               Charset.forName(CarbonCommonConstants.DEFAULT_CHARSET)));
+
       String metadataInstance = gsonObjectToWrite.toJson(listOfLoadFolderDetails.toArray());
       brWriter.write(metadataInstance);
     } finally {
@@ -431,10 +473,12 @@ public final class CarbonLoaderUtil {
         }
       } catch (Exception e) {
         LOGGER.error("error in  flushing ");
+
       }
       CarbonUtil.closeStreams(brWriter);
       writeOperation.close();
     }
+
   }
 
   public static String readCurrentTime() {
@@ -502,6 +546,17 @@ public final class CarbonLoaderUtil {
   }
 
   /**
+   * This method will divide the blocks among the nodes as per the data locality
+   *
+   * @param blockInfos
+   * @return
+   */
+  public static Map<String, List<Distributable>> nodeBlockMapping(List<Distributable> blockInfos) {
+    // -1 if number of nodes has to be decided based on block location information
+    return nodeBlockMapping(blockInfos, -1);
+  }
+
+  /**
    * the method returns the number of required executors
    *
    * @param blockInfos

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
index 9f092e7..ce19080 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/DeleteLoadFolders.java
@@ -46,7 +46,8 @@ import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFile;
 import org.apache.carbondata.core.datastorage.store.filesystem.CarbonFileFilter;
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
-import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.update.CarbonUpdateUtil;
+import org.apache.carbondata.processing.model.CarbonLoadModel;
 
 public final class DeleteLoadFolders {
 
@@ -59,6 +60,12 @@ public final class DeleteLoadFolders {
 
   /**
    * returns segment path
+   *
+   * @param loadModel
+   * @param storeLocation
+   * @param partitionId
+   * @param oneLoad
+   * @return
    */
   private static String getSegmentPath(String dbName, String tableName, String storeLocation,
       int partitionId, LoadMetadataDetails oneLoad) {
@@ -121,38 +128,14 @@ public final class DeleteLoadFolders {
   private static boolean checkIfLoadCanBeDeleted(LoadMetadataDetails oneLoad,
       boolean isForceDelete) {
     if ((CarbonCommonConstants.MARKED_FOR_DELETE.equalsIgnoreCase(oneLoad.getLoadStatus())
-        || CarbonCommonConstants.SEGMENT_COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
+        || CarbonCommonConstants.COMPACTED.equalsIgnoreCase(oneLoad.getLoadStatus()))
         && oneLoad.getVisibility().equalsIgnoreCase("true")) {
       if (isForceDelete) {
         return true;
       }
-      String deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
-      SimpleDateFormat parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP);
-      Date deletionDate = null;
-      String date = null;
-      Date currentTimeStamp = null;
-      try {
-        deletionDate = parser.parse(deletionTime);
-        date = CarbonLoaderUtil.readCurrentTime();
-        currentTimeStamp = parser.parse(date);
-      } catch (ParseException e) {
-        return false;
-      }
-
-      long difference = currentTimeStamp.getTime() - deletionDate.getTime();
+      long deletionTime = oneLoad.getModificationOrdeletionTimesStamp();
 
-      long minutesElapsed = (difference / (1000 * 60));
-
-      int maxTime;
-      try {
-        maxTime = Integer.parseInt(CarbonProperties.getInstance()
-            .getProperty(CarbonCommonConstants.MAX_QUERY_EXECUTION_TIME));
-      } catch (NumberFormatException e) {
-        maxTime = CarbonCommonConstants.DEFAULT_MAX_QUERY_EXECUTION_TIME;
-      }
-      if (minutesElapsed > maxTime) {
-        return true;
-      }
+      return CarbonUpdateUtil.isMaxQueryTimeoutExceeded(deletionTime);
 
     }
 
@@ -163,7 +146,9 @@ public final class DeleteLoadFolders {
       String storeLocation, boolean isForceDelete, LoadMetadataDetails[] details) {
     List<LoadMetadataDetails> deletedLoads =
         new ArrayList<LoadMetadataDetails>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
     boolean isDeleted = false;
+
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneLoad : details) {
         if (checkIfLoadCanBeDeleted(oneLoad, isForceDelete)) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
new file mode 100644
index 0000000..ac8e7de
--- /dev/null
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/FailureCauses.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.carbondata.spark.load;
+
+/**
+ * This Enum is used to determine the Reasons of Failure.
+ */
+public enum FailureCauses {
+  NONE,
+  BAD_RECORDS,
+  EXECUTOR_FAILURE,
+  STATUS_FILE_UPDATION_FAILURE,
+  MULTIPLE_INPUT_ROWS_MATCHING
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
index 254052b..8247296 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/KeyVal.scala
@@ -26,6 +26,8 @@
 package org.apache.carbondata.spark
 
 import org.apache.carbondata.core.load.LoadMetadataDetails
+import org.apache.carbondata.core.update.SegmentUpdateDetails
+import org.apache.spark.sql.execution.command.ExecutionErrors
 
 trait Value[V] extends Serializable {
   def getValue(value: Array[Object]): V
@@ -53,6 +55,35 @@ class DataLoadResultImpl extends DataLoadResult[String, LoadMetadataDetails] {
   }
 }
 
+trait updateResult[K, V] extends Serializable {
+  def getKey(key: String,
+             value: (LoadMetadataDetails, ExecutionErrors)):
+  (K, V)
+}
+
+class updateResultImpl
+  extends updateResult[String, (LoadMetadataDetails, ExecutionErrors)] {
+  override def getKey(key: String,
+                      value: (LoadMetadataDetails, ExecutionErrors)):
+  (String,
+    (LoadMetadataDetails, ExecutionErrors)) = {
+    (key, value)
+  }
+}
+
+trait DeleteDelataResult[K, V] extends Serializable {
+  def getKey(key: String, value: (SegmentUpdateDetails, ExecutionErrors)): (K, V)
+}
+
+class DeleteDelataResultImpl
+  extends DeleteDelataResult[String, (SegmentUpdateDetails, ExecutionErrors)] {
+  override def getKey(key: String,
+                      value: (SegmentUpdateDetails, ExecutionErrors)): (String, (SegmentUpdateDetails,
+    ExecutionErrors)) = {
+    (key, value)
+  }
+}
+
 
 trait PartitionResult[K, V] extends Serializable {
   def getKey(key: Int, value: Boolean): (K, V)

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
index 53ebd41..1950fb7 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonGlobalDictionaryRDD.scala
@@ -22,6 +22,8 @@ import java.nio.charset.Charset
 import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 
+import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory}
+
 import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.util.control.Breaks.{break, breakable}
@@ -42,7 +44,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.datastorage.store.impl.FileFactory
 import org.apache.carbondata.core.service.PathService
 import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, CarbonUtil}
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, LockUsage}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.spark.load.CarbonLoaderUtil
 import org.apache.carbondata.spark.tasks.{DictionaryWriterTask, SortIndexWriterTask}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
index 8285bf8..9ea9f13 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala
@@ -19,6 +19,8 @@ package org.apache.carbondata.spark.util
 
 import java.util
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 import scala.collection.mutable.Map
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
index 71b4247..bccdeb5 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBlockletBoundryTest.scala
@@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
@@ -30,7 +32,6 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
   * FT for data compaction scenario.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
index 97a5bdd..fad2ba2 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionBoundaryConditionsTest.scala
@@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.common.util.CarbonHiveContext._
@@ -29,7 +31,6 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
  * FT for data compaction Boundary condition verification.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
index e1e1412..58d89a9 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionCardinalityBoundryTest.scala
@@ -20,13 +20,13 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.scalatest.BeforeAndAfterAll
 
 import scala.collection.JavaConverters._
@@ -80,13 +80,13 @@ class DataCompactionCardinalityBoundryTest extends QueryTest with BeforeAndAfter
     var noOfRetries = 0
     while (status && noOfRetries < 10) {
 
-      val identifier = new AbsoluteTableIdentifier(
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(new
+          AbsoluteTableIdentifier(
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
-            new CarbonTableIdentifier(
-              CarbonCommonConstants.DATABASE_DEFAULT_NAME, "cardinalityTest", "1")
+            new CarbonTableIdentifier("default", "cardinalityTest", "1")
           )
-      val segments = SegmentStatusManager.getSegmentStatus(identifier)
-          .getValidSegments.asScala.toList
+      )
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
index 0b228c5..e138f62 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionLockTest.scala
@@ -20,6 +20,9 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.carbondata.locks.{LockUsage, CarbonLockFactory, ICarbonLock}
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.common.util.CarbonHiveContext._
@@ -30,8 +33,6 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
   * FT for data compaction Locking scenario.
@@ -104,13 +105,18 @@ class DataCompactionLockTest extends QueryTest with BeforeAndAfterAll {
     * Compaction should fail as lock is being held purposefully
     */
   test("check if compaction is failed or not.") {
-      val segments = SegmentStatusManager.getSegmentStatus(absoluteTableIdentifier)
-          .getValidSegments.asScala.toList
-      if (!segments.contains("0.1")) {
-        assert(true)
-      } else {
-        assert(false)
-      }
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(
+      absoluteTableIdentifier
+    )
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
+
+    if (!segments.contains("0.1")) {
+      assert(true)
+    }
+    else {
+      assert(false)
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
index 15ed78b..257f382 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionMinorThresholdTest.scala
@@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.common.util.CarbonHiveContext._
@@ -30,8 +32,6 @@ import org.apache.carbondata.core.carbon.path.{CarbonStorePath, CarbonTablePath}
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
  * FT for data compaction Minor threshold verification.
@@ -95,8 +95,8 @@ class DataCompactionMinorThresholdTest extends QueryTest with BeforeAndAfterAll
 
     sql("clean files for table minorthreshold")
 
-    val segments = SegmentStatusManager.getSegmentStatus(identifier)
-        .getValidSegments.asScala.toList
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
     assert(segments.contains("0.2"))
     assert(!segments.contains("0.1"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
index 00d295a..42f9cfb 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionNoDictionaryTest.scala
@@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
@@ -28,7 +30,6 @@ import org.apache.spark.sql.common.util.QueryTest
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.scalatest.BeforeAndAfterAll
 
 /**
@@ -41,7 +42,9 @@ class DataCompactionNoDictionaryTest extends QueryTest with BeforeAndAfterAll {
     val identifier = new AbsoluteTableIdentifier(
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(databaseName, tableName.toLowerCase , tableId))
-    SegmentStatusManager.getSegmentStatus(identifier).getValidSegments.asScala.toList
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+    segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
   }
 
   val currentDirectory = new File(this.getClass.getResource("/").getPath + "/../../")

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
index 3c18bb7..3d2e4fe 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/DataCompactionTest.scala
@@ -20,6 +20,8 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+
 import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.Row
@@ -30,7 +32,6 @@ import org.scalatest.BeforeAndAfterAll
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
   * FT for data compaction scenario.
@@ -82,8 +83,10 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
             CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
             new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "1")
           )
-      val segments = SegmentStatusManager.getSegmentStatus(identifier)
-          .getValidSegments.asScala.toList
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
 
       if (!segments.contains("0.1")) {
         // wait for 2 seconds for compaction to complete.
@@ -129,9 +132,11 @@ class DataCompactionTest extends QueryTest with BeforeAndAfterAll {
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "normalcompaction", "uniqueid")
         )
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
     // merged segment should not be there
-    val segments = SegmentStatusManager.getSegmentStatus(identifier)
-        .getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(!segments.contains("0"))
     assert(!segments.contains("1"))
     assert(!segments.contains("2"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
index 4e3d2f3..7604fad 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionIgnoreInMinorTest.scala
@@ -22,6 +22,9 @@ import java.io.File
 
 import scala.collection.JavaConverters._
 
+import org.apache.carbondata.core.carbon.path.CarbonStorePath
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
+import org.apache.spark.sql.Row
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.scalatest.BeforeAndAfterAll
@@ -33,7 +36,6 @@ import org.apache.carbondata.core.carbon.path.CarbonStorePath
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.hadoop.CacheClient
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 
 /**
   * FT for compaction scenario where major segment should not be included in minor.
@@ -103,8 +105,9 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", noOfRetries + "")
           )
-      val segments = SegmentStatusManager.getSegmentStatus(identifier)
-          .getValidSegments.asScala.toList
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -134,9 +137,10 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
           new CarbonTableIdentifier(
             CarbonCommonConstants.DATABASE_DEFAULT_NAME, "ignoremajor", "rrr")
         )
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
     // merged segment should not be there
-    val segments = SegmentStatusManager.getSegmentStatus(identifier)
-        .getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(segments.contains("2.1"))
     assert(!segments.contains("2"))
@@ -171,7 +175,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted.
-    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
+    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
 
   }
 
@@ -193,7 +197,7 @@ class MajorCompactionIgnoreInMinorTest extends QueryTest with BeforeAndAfterAll
     val segs = SegmentStatusManager.readLoadMetadata(carbontablePath)
 
     // status should remain as compacted for segment 2.
-    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.SEGMENT_COMPACTED))
+    assert(segs(3).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.COMPACTED))
     // for segment 0.1 . should get deleted
     assert(segs(2).getLoadStatus.equalsIgnoreCase(CarbonCommonConstants.MARKED_FOR_DELETE))
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
index f5ff191..f816c15 100644
--- a/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
+++ b/integration/spark/src/test/scala/org/apache/carbondata/spark/testsuite/datacompaction/MajorCompactionStopsAfterCompaction.scala
@@ -20,12 +20,12 @@ package org.apache.carbondata.spark.testsuite.datacompaction
 
 import java.io.File
 
+import org.apache.carbondata.core.updatestatus.SegmentStatusManager
 import org.apache.spark.sql.common.util.CarbonHiveContext._
 import org.apache.spark.sql.common.util.QueryTest
 import org.apache.carbondata.core.carbon.{AbsoluteTableIdentifier, CarbonTableIdentifier}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.lcm.status.SegmentStatusManager
 import org.scalatest.BeforeAndAfterAll
 
 import scala.collection.JavaConverters._
@@ -92,8 +92,10 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
             new CarbonTableIdentifier(
               CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", noOfRetries + "")
           )
-      val segments = SegmentStatusManager.getSegmentStatus(identifier)
-          .getValidSegments.asScala.toList
+
+      val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
+      val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
       segments.foreach(seg =>
         System.out.println( "valid segment is =" + seg)
       )
@@ -122,9 +124,11 @@ class MajorCompactionStopsAfterCompaction extends QueryTest with BeforeAndAfterA
           CarbonProperties.getInstance.getProperty(CarbonCommonConstants.STORE_LOCATION),
           new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "stopmajor", "rrr")
         )
+
+    val segmentStatusManager: SegmentStatusManager = new SegmentStatusManager(identifier)
+
     // merged segment should not be there
-    val segments = SegmentStatusManager.getSegmentStatus(identifier)
-        .getValidSegments.asScala.toList
+    val segments = segmentStatusManager.getValidAndInvalidSegments.getValidSegments.asScala.toList
     assert(segments.contains("0.1"))
     assert(!segments.contains("0.2"))
     assert(!segments.contains("0"))

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
new file mode 100644
index 0000000..129fbfc
--- /dev/null
+++ b/processing/src/main/java/org/apache/carbondata/processing/exception/MultipleMatchingException.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.carbondata.processing.exception;
+
+public class MultipleMatchingException extends Exception {
+  private static final long serialVersionUID = 1L;
+
+  private long errorCode = -1;
+
+  public MultipleMatchingException() {
+    super();
+  }
+
+  public MultipleMatchingException(long errorCode, String message) {
+    super(message);
+    this.errorCode = errorCode;
+  }
+
+  public MultipleMatchingException(String message) {
+    super(message);
+  }
+
+  public MultipleMatchingException(Throwable cause) {
+    super(cause);
+  }
+
+  public MultipleMatchingException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public long getErrorCode() {
+    return errorCode;
+  }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
index b7c17dc..af97eb9 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/model/CarbonLoadModel.java
@@ -22,13 +22,15 @@
  */
 package org.apache.carbondata.processing.model;
 
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.List;
-
 import org.apache.carbondata.core.carbon.CarbonDataLoadSchema;
 import org.apache.carbondata.core.carbon.metadata.schema.table.column.CarbonDimension;
 import org.apache.carbondata.core.load.LoadMetadataDetails;
+import org.apache.carbondata.core.update.SegmentUpdateDetails;
+import org.apache.carbondata.core.updatestatus.SegmentUpdateStatusManager;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
 
 public class CarbonLoadModel implements Serializable {
   /**
@@ -68,6 +70,8 @@ public class CarbonLoadModel implements Serializable {
 
   private boolean isDirectLoad;
   private List<LoadMetadataDetails> loadMetadataDetails;
+  private transient List<SegmentUpdateDetails> segmentUpdateDetails;
+  private transient SegmentUpdateStatusManager segmentUpdateStatusManager;
 
   private String blocksID;
 
@@ -133,6 +137,16 @@ public class CarbonLoadModel implements Serializable {
    */
   private String rddIteratorKey;
 
+  private String carbondataFileName = "";
+
+  public String getCarbondataFileName() {
+    return carbondataFileName;
+  }
+
+  public void setCarbondataFileName(String carbondataFileName) {
+    this.carbondataFileName = carbondataFileName;
+  }
+
   /**
    *  Use one pass to generate dictionary
    */
@@ -495,6 +509,41 @@ public class CarbonLoadModel implements Serializable {
   }
 
   /**
+   * getSegmentUpdateDetails
+   * @return
+   */
+  public List<SegmentUpdateDetails> getSegmentUpdateDetails() {
+    return segmentUpdateDetails;
+  }
+
+  /**
+   * setSegmentUpdateDetails
+   *
+   * @param segmentUpdateDetails
+   */
+  public void setSegmentUpdateDetails(List<SegmentUpdateDetails> segmentUpdateDetails) {
+    this.segmentUpdateDetails = segmentUpdateDetails;
+  }
+
+  /**
+   * getSegmentUpdateStatusManager
+   * @return
+   */
+  public SegmentUpdateStatusManager getSegmentUpdateStatusManager() {
+    return segmentUpdateStatusManager;
+  }
+
+  /**
+   * setSegmentUpdateStatusManager
+   *
+   * @param segmentUpdateStatusManager
+   */
+  public void setSegmentUpdateStatusManager(SegmentUpdateStatusManager segmentUpdateStatusManager) {
+    this.segmentUpdateStatusManager = segmentUpdateStatusManager;
+  }
+
+
+  /**
    * @return
    */
   public String getTaskNo() {
@@ -518,8 +567,8 @@ public class CarbonLoadModel implements Serializable {
   /**
    * @param factTimeStamp
    */
-  public void setFactTimeStamp(String factTimeStamp) {
-    this.factTimeStamp = factTimeStamp;
+  public void setFactTimeStamp(long factTimeStamp) {
+    this.factTimeStamp = factTimeStamp + "";
   }
 
   public String[] getDelimiters() {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
index 63f00e1..1b0b41a 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/LocalFileLockTest.java
@@ -19,6 +19,8 @@
 package org.apache.carbondata.lcm.locks;
 
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
+import org.apache.carbondata.locks.LocalFileLock;
+import org.apache.carbondata.locks.LockUsage;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
index ba46917..a4fe655 100644
--- a/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
+++ b/processing/src/test/java/org/apache/carbondata/lcm/locks/ZooKeeperLockingTest.java
@@ -18,16 +18,12 @@
  */
 package org.apache.carbondata.lcm.locks;
 
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.util.Properties;
-
+import mockit.NonStrictExpectations;
 import org.apache.carbondata.core.carbon.CarbonTableIdentifier;
 import org.apache.carbondata.core.util.CarbonProperties;
-
-import mockit.NonStrictExpectations;
-
+import org.apache.carbondata.locks.LockUsage;
+import org.apache.carbondata.locks.ZooKeeperLocking;
+import org.apache.carbondata.locks.ZookeeperInit;
 import org.apache.zookeeper.server.ServerConfig;
 import org.apache.zookeeper.server.ZooKeeperServerMain;
 import org.apache.zookeeper.server.quorum.QuorumPeerConfig;
@@ -36,6 +32,11 @@ import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.Properties;
+
 /**
  * @author Administrator
  */

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/0d42f52d/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
----------------------------------------------------------------------
diff --git a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
index 7e9feb6..2712e6e 100644
--- a/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
+++ b/processing/src/test/java/org/apache/carbondata/test/util/StoreCreator.java
@@ -18,22 +18,7 @@
  */
 package org.apache.carbondata.test.util;
 
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.DataOutputStream;
-import java.io.File;
-import java.io.FileReader;
-import java.io.IOException;
-import java.io.OutputStreamWriter;
-import java.nio.charset.Charset;
-import java.text.SimpleDateFormat;
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.UUID;
-
+import com.google.gson.Gson;
 import org.apache.carbondata.core.cache.Cache;
 import org.apache.carbondata.core.cache.CacheProvider;
 import org.apache.carbondata.core.cache.CacheType;
@@ -72,9 +57,9 @@ import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWrit
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortIndexWriterImpl;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfo;
 import org.apache.carbondata.core.writer.sortindex.CarbonDictionarySortInfoPreparator;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperations;
-import org.apache.carbondata.lcm.fileoperations.AtomicFileOperationsImpl;
-import org.apache.carbondata.lcm.fileoperations.FileWriteOperation;
+import org.apache.carbondata.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.fileoperations.AtomicFileOperationsImpl;
+import org.apache.carbondata.fileoperations.FileWriteOperation;
 import org.apache.carbondata.processing.api.dataloader.DataLoadModel;
 import org.apache.carbondata.processing.api.dataloader.SchemaInfo;
 import org.apache.carbondata.processing.csvload.DataGraphExecuter;
@@ -82,10 +67,13 @@ import org.apache.carbondata.processing.dataprocessor.DataProcessTaskStatus;
 import org.apache.carbondata.processing.dataprocessor.IDataProcessStatus;
 import org.apache.carbondata.processing.graphgenerator.GraphGenerator;
 import org.apache.carbondata.processing.graphgenerator.GraphGeneratorException;
-
-import com.google.gson.Gson;
 import org.apache.hadoop.fs.Path;
 
+import java.io.*;
+import java.nio.charset.Charset;
+import java.text.SimpleDateFormat;
+import java.util.*;
+
 /**
  * This class will create store file based on provided schema
  *
@@ -410,10 +398,10 @@ public class StoreCreator {
   public static void writeLoadMetadata(CarbonDataLoadSchema schema, String databaseName,
       String tableName, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException {
     LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails();
-    loadMetadataDetails.setTimestamp(readCurrentTime());
+    loadMetadataDetails.setLoadEndTime(System.currentTimeMillis());
     loadMetadataDetails.setLoadStatus("SUCCESS");
     loadMetadataDetails.setLoadName(String.valueOf(0));
-    loadMetadataDetails.setLoadStartTime(readCurrentTime());
+    loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime()));
     listOfLoadFolderDetails.add(loadMetadataDetails);
 
     String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator
@@ -472,7 +460,7 @@ public class StoreCreator {
     model.setTableName(dataProcessTaskStatus.getTableName());
     model.setTaskNo("1");
     model.setBlocksID(dataProcessTaskStatus.getBlocksID());
-    model.setFactTimeStamp(readCurrentTime());
+    model.setFactTimeStamp(System.currentTimeMillis() + "");
     model.setEscapeCharacter(dataProcessTaskStatus.getEscapeCharacter());
     model.setQuoteCharacter(dataProcessTaskStatus.getQuoteCharacter());
     model.setCommentCharacter(dataProcessTaskStatus.getCommentCharacter());


Mime
View raw message