carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject [1/2] carbondata git commit: [CARBONDATA-2347][LUCENE_DATAMAP]load issue in lucene datamap, make multiple directory based on taskId
Date Wed, 18 Apr 2018 07:59:26 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master ceac8abf6 -> 860e144d4


http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
index be5287f..f64299c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonCreateDataMapCommand.scala
@@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.command._
 
 import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException}
 import org.apache.carbondata.common.logging.LogServiceFactory
-import org.apache.carbondata.core.datamap.DataMapProvider
+import org.apache.carbondata.core.datamap.{DataMapProvider, DataMapStoreManager}
 import org.apache.carbondata.core.datamap.status.DataMapStatusManager
 import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, DataMapSchema}
@@ -69,6 +69,29 @@ case class CarbonCreateDataMapCommand(
     }
 
     dataMapSchema = new DataMapSchema(dataMapName, dmClassName)
+    // TODO: move this if logic inside lucene module
+    if (dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.LUCENE.toString))
{
+      val datamaps = DataMapStoreManager.getInstance().getAllDataMap(mainTable).asScala
+      if (datamaps.nonEmpty) {
+        datamaps.foreach(datamap => {
+          val dmColumns = datamap.getDataMapSchema.getProperties.get("text_columns")
+          val existingColumns = dmProperties("text_columns")
+
+          def getAllSubString(columns: String): Set[String] = {
+            columns.inits.flatMap(_.tails).toSet
+          }
+
+          val existingClmSets = getAllSubString(existingColumns)
+          val dmColumnsSets = getAllSubString(dmColumns)
+          val duplicateDMColumn = existingClmSets.intersect(dmColumnsSets).maxBy(_.length)
+          if (!duplicateDMColumn.isEmpty) {
+            throw new MalformedDataMapCommandException(
+              s"Create lucene datamap $dataMapName failed, datamap already exists on column(s)
" +
+              s"$duplicateDMColumn")
+          }
+        })
+      }
+    }
     if (mainTable != null &&
         mainTable.isStreamingTable &&
         !(dataMapSchema.getProviderName.equalsIgnoreCase(DataMapClassProvider.PREAGGREGATE.toString)

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
index 21aba7d..613c8b2 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/datamap/CarbonDataMapShowCommand.scala
@@ -28,10 +28,12 @@ import org.apache.spark.sql.execution.command.{Checker, DataCommand}
 import org.apache.spark.sql.types.StringType
 
 import org.apache.carbondata.core.datamap.DataMapStoreManager
+import org.apache.carbondata.core.metadata.schema.datamap.DataMapClassProvider
 import org.apache.carbondata.core.metadata.schema.table.DataMapSchema
 
 /**
  * Show the datamaps on the table
+ *
  * @param tableIdentifier
  */
 case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
@@ -44,20 +46,22 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
   }
 
   override def processData(sparkSession: SparkSession): Seq[Row] = {
+    val dataMapSchemaList: util.List[DataMapSchema] = new util.ArrayList[DataMapSchema]()
     tableIdentifier match {
       case Some(table) =>
         Checker.validateTableExists(table.database, table.table, sparkSession)
         val carbonTable = CarbonEnv.getCarbonTable(table)(sparkSession)
         if (carbonTable.hasDataMapSchema) {
-          val schemaList = carbonTable.getTableInfo.getDataMapSchemaList
-          convertToRow(schemaList)
-        } else {
-          convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable))
+          dataMapSchemaList.addAll(carbonTable.getTableInfo.getDataMapSchemaList)
+        }
+        val indexSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+        if (!indexSchemas.isEmpty) {
+          dataMapSchemaList.addAll(indexSchemas)
         }
+        convertToRow(dataMapSchemaList)
       case _ =>
         convertToRow(DataMapStoreManager.getInstance().getAllDataMapSchemas)
     }
-
   }
 
   private def convertToRow(schemaList: util.List[DataMapSchema]) = {
@@ -65,9 +69,7 @@ case class CarbonDataMapShowCommand(tableIdentifier: Option[TableIdentifier])
       schemaList.asScala.map { s =>
         var table = "(NA)"
         val relationIdentifier = s.getRelationIdentifier
-        if (relationIdentifier != null) {
           table = relationIdentifier.getDatabaseName + "." + relationIdentifier.getTableName
-        }
         Row(s.getDataMapName, s.getProviderName, table)
       }
     } else {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
index ca9a6a1..1b087bd 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command.mutation
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 import org.apache.spark.sql.execution.command._
-import org.apache.spark.sql.execution.datasources.LogicalRelation
 
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.LogServiceFactory

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
index 458bc8d..07cdf7c 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala
@@ -110,9 +110,10 @@ case class CarbonDropTableCommand(
             dropCommand
           }
         childDropCommands.foreach(_.processMetadata(sparkSession))
-      } else {
-        val schemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
-        childDropDataMapCommands = schemas.asScala.map{ schema =>
+      }
+      val indexDatamapSchemas = DataMapStoreManager.getInstance().getAllDataMapSchemas(carbonTable)
+      if (!indexDatamapSchemas.isEmpty) {
+        childDropDataMapCommands = indexDatamapSchemas.asScala.map { schema =>
           val command = CarbonDropDataMapCommand(schema.getDataMapName,
             ifExistsSet,
             Some(TableIdentifier(tableName, Some(dbName))),

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
index ad3ad2e..c58d02d 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/strategy/DDLStrategy.scala
@@ -28,7 +28,6 @@ import org.apache.spark.sql.execution.command.schema._
 import org.apache.spark.sql.execution.command.table.{CarbonDescribeFormattedCommand, CarbonDropTableCommand}
 import org.apache.spark.sql.hive.execution.command.{CarbonDropDatabaseCommand, CarbonResetCommand,
CarbonSetCommand}
 import org.apache.spark.sql.CarbonExpressions.{CarbonDescribeTable => DescribeTableCommand}
-import org.apache.spark.sql.execution.command.datamap.CarbonDataMapRefreshCommand
 import org.apache.spark.sql.execution.datasources.{RefreshResource, RefreshTable}
 import org.apache.spark.sql.hive.CarbonRelation
 import org.apache.spark.sql.parser.CarbonSpark2SqlParser
@@ -37,7 +36,6 @@ import org.apache.spark.util.{CarbonReflectionUtils, FileUtils}
 import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException
 import org.apache.carbondata.common.logging.{LogService, LogServiceFactory}
 import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.processing.merger.CompactionType
 
 /**
  * Carbon strategies for ddl commands

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
index 2e39c91..14950eb 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java
@@ -85,10 +85,10 @@ public class DataMapWriterListener {
     LOG.info("DataMapWriter " + writer + " added");
   }
 
-  public void onBlockStart(String blockId, String blockPath) throws IOException {
+  public void onBlockStart(String blockId, String blockPath, long taskId) throws IOException
{
     for (List<DataMapWriter> writers : registry.values()) {
       for (DataMapWriter writer : writers) {
-        writer.onBlockStart(blockId);
+        writer.onBlockStart(blockId, taskId);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/860e144d/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 5783fe5..94ade87 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -250,7 +250,8 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter
{
   private void notifyDataMapBlockStart() {
     if (listener != null) {
       try {
-        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath);
+        listener.onBlockStart(carbonDataFileName, carbonDataFileHdfsPath,
+            model.getCarbonDataFileAttributes().getTaskId());
       } catch (IOException e) {
         throw new CarbonDataWriterException("Problem while writing datamap", e);
       }


Mime
View raw message