carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] carbondata git commit: NullPointer thrown if multi user multi client is used concurrently for show segments, delete segments.
Date Thu, 18 May 2017 09:34:58 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 75afd0680 -> bccedd32a


NullPointer thrown if multi user multi client is used concurrently for show segments, delete
segments.


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b01a7315
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b01a7315
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b01a7315

Branch: refs/heads/master
Commit: b01a73158db63cc910c4bf4c93b3e662e585d7c0
Parents: 75afd06
Author: nareshpr <prnaresh.naresh@gmail.com>
Authored: Wed May 17 23:38:06 2017 +0530
Committer: Venkata Ramana Gollamudi <g.ramana.v1@gmail.com>
Committed: Thu May 18 14:58:41 2017 +0530

----------------------------------------------------------------------
 .../carbondata/spark/load/CarbonLoaderUtil.java |  7 -----
 .../carbondata/spark/util/LoadMetadataUtil.java |  7 +----
 .../org/apache/carbondata/api/CarbonStore.scala | 22 ++++++-------
 .../spark/rdd/DataManagementFunc.scala          | 18 ++++++-----
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../execution/command/carbonTableSchema.scala   | 20 +++++++++---
 .../spark/rdd/CarbonDataRDDFactory.scala        |  2 +-
 .../execution/command/carbonTableSchema.scala   | 33 ++++++++++++++++----
 .../org/apache/spark/util/CleanFiles.scala      |  4 ++-
 .../apache/spark/util/DeleteSegmentByDate.scala |  4 ++-
 .../apache/spark/util/DeleteSegmentById.scala   |  4 ++-
 .../org/apache/spark/util/ShowSegments.scala    |  5 +--
 12 files changed, 79 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
index 964c536..a4f15d2 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java
@@ -360,13 +360,6 @@ public final class CarbonLoaderUtil {
     return date;
   }
 
-  public static String extractLoadMetadataFileLocation(String dbName, String tableName) {
-    CarbonTable carbonTable =
-        org.apache.carbondata.core.metadata.CarbonMetadata.getInstance()
-            .getCarbonTable(dbName + '_' + tableName);
-    return carbonTable.getMetaDataFilepath();
-  }
-
   public static Dictionary getDictionary(DictionaryColumnUniqueIdentifier columnIdentifier,
       String carbonStorePath) throws IOException {
     Cache<DictionaryColumnUniqueIdentifier, Dictionary> dictCache =

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
index 0eec314..91a9556 100644
--- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
+++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/util/LoadMetadataUtil.java
@@ -18,8 +18,6 @@
 package org.apache.carbondata.spark.util;
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.metadata.CarbonMetadata;
-import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails;
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager;
 
@@ -31,10 +29,7 @@ public final class LoadMetadataUtil {
 
   }
 
-  public static boolean isLoadDeletionRequired(String dbName, String tableName) {
-    CarbonTable table = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName);
-
-    String metaDataLocation = table.getMetaDataFilepath();
+  public static boolean isLoadDeletionRequired(String metaDataLocation) {
     LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation);
     if (details != null && details.length != 0) {
       for (LoadMetadataDetails oneRow : details) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
index 8e885a0..45719fc 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
@@ -29,6 +29,7 @@ import org.apache.spark.sql.types.TimestampType
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonMetadata}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.statusmanager.SegmentStatusManager
 import org.apache.carbondata.spark.exception.MalformedCarbonCommandException
 import org.apache.carbondata.spark.rdd.DataManagementFunc
@@ -39,11 +40,9 @@ object CarbonStore {
   def showSegments(
       dbName: String,
       tableName: String,
-      limit: Option[String]): Seq[Row] = {
-    val tableUniqueName = dbName + "_" + tableName
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
-    val path = carbonTable.getMetaDataFilepath
-    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(path)
+      limit: Option[String],
+      tableFolderPath: String): Seq[Row] = {
+    val loadMetadataDetailsArray = SegmentStatusManager.readLoadMetadata(tableFolderPath)
     if (loadMetadataDetailsArray.nonEmpty) {
       val parser = new SimpleDateFormat(CarbonCommonConstants.CARBON_TIMESTAMP)
       var loadMetadataDetailsSortedArray = loadMetadataDetailsArray.sortWith { (l1, l2) =>
@@ -79,10 +78,11 @@ object CarbonStore {
   def cleanFiles(
       dbName: String,
       tableName: String,
-      storePath: String): Unit = {
+      storePath: String,
+      carbonTable: CarbonTable): Unit = {
     LOGGER.audit(s"The clean files request has been received for $dbName.$tableName")
     try {
-      DataManagementFunc.cleanFiles(dbName, tableName, storePath)
+      DataManagementFunc.cleanFiles(dbName, tableName, storePath, carbonTable)
       LOGGER.audit(s"Clean files operation is success for $dbName.$tableName.")
     } catch {
       case ex: Exception =>
@@ -102,12 +102,12 @@ object CarbonStore {
   def deleteLoadById(
       loadids: Seq[String],
       dbName: String,
-      tableName: String): Unit = {
+      tableName: String,
+      carbonTable: CarbonTable): Unit = {
 
     LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
     validateLoadIds(loadids)
 
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
     val path = carbonTable.getMetaDataFilepath
 
     try {
@@ -128,11 +128,11 @@ object CarbonStore {
   def deleteLoadByDate(
       timestamp: String,
       dbName: String,
-      tableName: String): Unit = {
+      tableName: String,
+      carbonTable: CarbonTable): Unit = {
     LOGGER.audit(s"Delete segment by Id request has been received for $dbName.$tableName")
 
     val time = validateTimeFormat(timestamp)
-    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(dbName + '_' + tableName)
     val path = carbonTable.getMetaDataFilepath
 
     try {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
index d6cc2e6..8039d24 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/DataManagementFunc.scala
@@ -297,11 +297,10 @@ object DataManagementFunc {
       dbName: String,
       tableName: String,
       storePath: String,
-      isForceDeletion: Boolean): Unit = {
-    if (LoadMetadataUtil.isLoadDeletionRequired(dbName, tableName)) {
-      val loadMetadataFilePath =
-        CarbonLoaderUtil.extractLoadMetadataFileLocation(dbName, tableName)
-      val details = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+      isForceDeletion: Boolean,
+      carbonTable: CarbonTable): Unit = {
+    if (LoadMetadataUtil.isLoadDeletionRequired(carbonTable.getMetaDataFilepath)) {
+      val details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath)
       val carbonTableStatusLock =
         CarbonLockFactory.getCarbonLockObj(
           new CarbonTableIdentifier(dbName, tableName, ""),
@@ -325,7 +324,8 @@ object DataManagementFunc {
             LOGGER.info("Table status lock has been successfully acquired.")
 
             // read latest table status again.
-            val latestMetadata = SegmentStatusManager.readLoadMetadata(loadMetadataFilePath)
+            val latestMetadata = SegmentStatusManager
+              .readLoadMetadata(carbonTable.getMetaDataFilepath)
 
             // update the metadata details from old to new status.
             val latestStatus = CarbonLoaderUtil
@@ -351,14 +351,16 @@ object DataManagementFunc {
   def cleanFiles(
       dbName: String,
       tableName: String,
-      storePath: String): Unit = {
+      storePath: String,
+      carbonTable: CarbonTable): Unit = {
     val identifier = new CarbonTableIdentifier(dbName, tableName, "")
     val carbonCleanFilesLock =
       CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.CLEAN_FILES_LOCK)
     try {
       if (carbonCleanFilesLock.lockWithRetries()) {
         LOGGER.info("Clean files lock has been successfully acquired.")
-        deleteLoadsAndUpdateMetadata(dbName, tableName, storePath, isForceDeletion = true)
+        deleteLoadsAndUpdateMetadata(dbName, tableName, storePath,
+          isForceDeletion = true, carbonTable)
       } else {
         val errorMsg = "Clean files request is failed for " +
             s"$dbName.$tableName" +

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 9685871..7752cb3 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -441,7 +441,7 @@ object CarbonDataRDDFactory {
           s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName, storePath, isForceDeletion = false)
+        carbonLoadModel.getTableName, storePath, false, carbonTable)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index ac51fa0..1192e08 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -196,10 +196,13 @@ private[sql] case class DeleteLoadsById(
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
+      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
     CarbonStore.deleteLoadById(
       loadids,
       getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName
+      tableName,
+      carbonTable
     )
     Seq.empty
 
@@ -225,10 +228,13 @@ private[sql] case class DeleteLoadsByLoadDate(
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
+      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sqlContext),
-      tableName
+      tableName,
+      carbonTable
     )
     Seq.empty
 
@@ -743,10 +749,13 @@ private[sql] case class ShowLoads(
 
   override def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
+      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sqlContext),
       tableName,
-      limit
+      limit,
+      carbonTable.getMetaDataFilepath
     )
   }
 }
@@ -885,10 +894,13 @@ private[sql] case class CleanFiles(
 
   def run(sqlContext: SQLContext): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sqlContext)
+    val carbonTable = CarbonEnv.get.carbonMetastore.lookupRelation1(databaseNameOp,
+      tableName)(sqlContext).asInstanceOf[CarbonRelation].tableMeta.carbonTable
     CarbonStore.cleanFiles(
       getDB.getDatabaseName(databaseNameOp, sqlContext),
       tableName,
-      sqlContext.asInstanceOf[CarbonContext].storePath
+      sqlContext.asInstanceOf[CarbonContext].storePath,
+      carbonTable
     )
     Seq.empty
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index b01602f..e4a89c4 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -452,7 +452,7 @@ object CarbonDataRDDFactory {
           s" ${ carbonLoadModel.getDatabaseName }.${ carbonLoadModel.getTableName }")
       // Check if any load need to be deleted before loading new data
       DataManagementFunc.deleteLoadsAndUpdateMetadata(carbonLoadModel.getDatabaseName,
-        carbonLoadModel.getTableName, storePath, isForceDeletion = false)
+        carbonLoadModel.getTableName, storePath, false, carbonTable)
       if (null == carbonLoadModel.getLoadMetadataDetails) {
         CommonUtil.readLoadMetadataDetails(carbonLoadModel, storePath)
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
index 6bc9e61..e2405f2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchema.scala
@@ -205,10 +205,14 @@ case class DeleteLoadsById(
   def run(sparkSession: SparkSession): Seq[Row] = {
 
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
+      .map(_.carbonTable).getOrElse(null)
     CarbonStore.deleteLoadById(
       loadids,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName
+      tableName,
+      carbonTable
     )
     Seq.empty
 
@@ -226,10 +230,14 @@ case class DeleteLoadsByLoadDate(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
+      .map(_.carbonTable).getOrElse(null)
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName
+      tableName,
+      carbonTable
     )
     Seq.empty
   }
@@ -662,10 +670,14 @@ private[sql] case class DeleteLoadByDate(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
+      .map(_.carbonTable).getOrElse(null)
     CarbonStore.deleteLoadByDate(
       loadDate,
       getDB.getDatabaseName(databaseNameOp, sparkSession),
-      tableName
+      tableName,
+      carbonTable
     )
     Seq.empty
   }
@@ -680,12 +692,17 @@ case class CleanFiles(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
-    val relation = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val catalog = CarbonEnv.getInstance(sparkSession).carbonMetastore
+    val relation = catalog
       .lookupRelation(databaseNameOp, tableName)(sparkSession).asInstanceOf[CarbonRelation]
+    val carbonTable = catalog
+      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
+      .map(_.carbonTable).getOrElse(null)
     CarbonStore.cleanFiles(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,
-      relation.asInstanceOf[CarbonRelation].tableMeta.storePath
+      relation.asInstanceOf[CarbonRelation].tableMeta.storePath,
+      carbonTable
     )
     Seq.empty
   }
@@ -699,10 +716,14 @@ case class ShowLoads(
 
   def run(sparkSession: SparkSession): Seq[Row] = {
     Checker.validateTableExists(databaseNameOp, tableName, sparkSession)
+    val carbonTable = CarbonEnv.getInstance(sparkSession).carbonMetastore
+      .getTableFromMetadata(getDB.getDatabaseName(databaseNameOp, sparkSession), tableName)
+      .map(_.carbonTable).getOrElse(null)
     CarbonStore.showSegments(
       getDB.getDatabaseName(databaseNameOp, sparkSession),
       tableName,
-      limit
+      limit,
+      carbonTable.getMetaDataFilepath
     )
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
index b9a6708..74e11f1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/CleanFiles.scala
@@ -30,7 +30,9 @@ object CleanFiles {
   def cleanFiles(spark: SparkSession, dbName: String, tableName: String,
       storePath: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    CarbonStore.cleanFiles(dbName, tableName, storePath)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
+      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    CarbonStore.cleanFiles(dbName, tableName, storePath, carbonTable)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
index 7815417..3dffb42 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentByDate.scala
@@ -29,7 +29,9 @@ object DeleteSegmentByDate {
   def deleteSegmentByDate(spark: SparkSession, dbName: String, tableName: String,
       dateValue: String): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    CarbonStore.deleteLoadByDate(dateValue, dbName, tableName)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
+      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    CarbonStore.deleteLoadByDate(dateValue, dbName, tableName, carbonTable)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
index 65b76b2..35afa28 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/DeleteSegmentById.scala
@@ -33,7 +33,9 @@ object DeleteSegmentById {
   def deleteSegmentById(spark: SparkSession, dbName: String, tableName: String,
       segmentIds: Seq[String]): Unit = {
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    CarbonStore.deleteLoadById(segmentIds, dbName, tableName)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
+      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    CarbonStore.deleteLoadById(segmentIds, dbName, tableName, carbonTable)
   }
 
   def main(args: Array[String]): Unit = {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b01a7315/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
index d918381..07dfcc1 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/util/ShowSegments.scala
@@ -29,9 +29,10 @@ object ShowSegments {
 
   def showSegments(spark: SparkSession, dbName: String, tableName: String,
       limit: Option[String]): Seq[Row] = {
-    //val databaseName = dbName.getOrElse(spark.catalog.currentDatabase)
     TableAPIUtil.validateTableExists(spark, dbName, tableName)
-    CarbonStore.showSegments(dbName, tableName, limit)
+    val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore
+      .getTableFromMetadata(dbName, tableName).map(_.carbonTable).getOrElse(null)
+    CarbonStore.showSegments(dbName, tableName, limit, carbonTable.getMetaDataFilepath)
   }
 
   def showString(rows: Seq[Row]): String = {


Mime
View raw message