carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From gvram...@apache.org
Subject [1/2] incubator-carbondata git commit: [CARBONDATA-341] CarbonTableIdentifier being passed to the query flow has wrong tableid
Date Fri, 20 Jan 2017 15:20:42 GMT
Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 556a4118e -> b2eb9c744


[CARBONDATA-341] CarbonTableIdentifier being passed to the query flow has wrong tableid


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/733968b4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/733968b4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/733968b4

Branch: refs/heads/master
Commit: 733968b4e0cdba12c928554a04f836c7b36477f3
Parents: 556a411
Author: mohammadshahidkhan <mohdshahidkhan1987@gmail.com>
Authored: Thu Jan 5 18:45:51 2017 +0530
Committer: Venkata Ramana G <ramana.gollamudi@huawei.com>
Committed: Fri Jan 20 20:42:08 2017 +0530

----------------------------------------------------------------------
 .../core/metadata/AbsoluteTableIdentifier.java  |  8 +++
 .../carbondata/hadoop/CarbonInputFormat.java    | 43 +++++++----
 .../testsuite/createTable/TestTableIdTest.scala | 76 ++++++++++++++++++++
 .../sql/CarbonDatasourceHadoopRelation.scala    |  2 +-
 .../sql/CarbonDatasourceHadoopRelation.scala    |  2 +-
 .../processing/newflow/DataLoadExecutor.java    |  2 +-
 6 files changed, 117 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
index aed248c..3791150 100644
--- a/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
+++ b/core/src/main/java/org/apache/carbondata/core/metadata/AbsoluteTableIdentifier.java
@@ -73,6 +73,14 @@ public class AbsoluteTableIdentifier implements Serializable {
     return new AbsoluteTableIdentifier(CarbonUtil.getCarbonStorePath(), identifier);
   }
 
+  /**
+   * By using the tablePath this method will prepare a AbsoluteTableIdentifier with
+   * dummy tableId(Long.toString(System.currentTimeMillis()).
+   * This instance could not be used to uniquely identify the table, this is just
+   * to get the database name, table name and store path to load the schema.
+   * @param tablePath
+   * @return returns AbsoluteTableIdentifier with dummy tableId
+   */
   public static AbsoluteTableIdentifier fromTablePath(String tablePath) {
     String formattedTablePath = tablePath.replace('\\', '/');
     String[] names = formattedTablePath.split("/");

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index 5acd1d5..2cee974 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -119,16 +119,35 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
   public static CarbonTable getCarbonTable(Configuration configuration) throws IOException
{
     String carbonTableStr = configuration.get(CARBON_TABLE);
     if (carbonTableStr == null) {
+      populateCarbonTable(configuration);
       // read it from schema file in the store
-      AbsoluteTableIdentifier absIdentifier = getAbsoluteTableIdentifier(configuration);
-      CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absIdentifier);
-      setCarbonTable(configuration, carbonTable);
-      return carbonTable;
+      carbonTableStr = configuration.get(CARBON_TABLE);
+      return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
     }
     return (CarbonTable) ObjectSerializationUtil.convertStringToObject(carbonTableStr);
   }
 
-  public static void setTablePath(Configuration configuration, String tablePath) {
+  /**
+   * this method will read the schema from the physical file and populate into CARBON_TABLE
+   * @param configuration
+   * @throws IOException
+   */
+  private static void populateCarbonTable(Configuration configuration) throws IOException
{
+    String dirs = configuration.get(INPUT_DIR, "");
+    String[] inputPaths = StringUtils.split(dirs);
+    if (inputPaths.length == 0) {
+      throw new InvalidPathException("No input paths specified in job");
+    }
+    AbsoluteTableIdentifier absoluteTableIdentifier =
+        AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+    // read the schema file to get the absoluteTableIdentifier having the correct table id
+    // persisted in the schema
+    CarbonTable carbonTable = SchemaReader.readCarbonTableFromStore(absoluteTableIdentifier);
+    setCarbonTable(configuration, carbonTable);
+  }
+
+  public static void setTablePath(Configuration configuration, String tablePath)
+      throws IOException {
     configuration.set(FileInputFormat.INPUT_DIR, tablePath);
   }
 
@@ -187,13 +206,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
         .set(CarbonInputFormat.INPUT_SEGMENT_NUMBERS, CarbonUtil.getSegmentString(validSegments));
   }
 
-  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
{
-    String dirs = configuration.get(INPUT_DIR, "");
-    String[] inputPaths = StringUtils.split(dirs);
-    if (inputPaths.length == 0) {
-      throw new InvalidPathException("No input paths specified in job");
-    }
-    return AbsoluteTableIdentifier.fromTablePath(inputPaths[0]);
+  private static AbsoluteTableIdentifier getAbsoluteTableIdentifier(Configuration configuration)
+      throws IOException {
+    return getCarbonTable(configuration).getAbsoluteTableIdentifier();
   }
 
   /**
@@ -592,7 +607,9 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void,
T> {
       throws IOException {
     Configuration configuration = taskAttemptContext.getConfiguration();
     CarbonTable carbonTable = getCarbonTable(configuration);
-    AbsoluteTableIdentifier identifier = getAbsoluteTableIdentifier(configuration);
+    // getting the table absoluteTableIdentifier from the carbonTable
+    // to avoid unnecessary deserialization
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
 
     // query plan includes projection column
     String projection = getColumnProjection(configuration);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
new file mode 100644
index 0000000..3ccd2e3
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/createTable/TestTableIdTest.scala
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark.testsuite.createTable
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.mapred.JobConf
+import org.apache.hadoop.mapreduce.Job
+import org.apache.spark.sql.common.util.QueryTest
+import org.junit.Assert
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.hadoop.CarbonInputFormat
+
+/**
+ * test functionality related the case change for database name
+ */
+class TestTableIdTest extends QueryTest with BeforeAndAfterAll {
+
+  override def beforeAll: Unit = {
+    sql("drop table if exists carbontable")
+  }
+
+  def validateTableId: Unit = {
+    val carbonInputFormat: CarbonInputFormat[Array[Object]] = new CarbonInputFormat[Array[Object]]
+    val jobConf: JobConf = new JobConf(new Configuration)
+    val job: Job = Job.getInstance(jobConf)
+    val storePath: String = storeLocation.replaceAll("\\\\", "/")
+    job.getConfiguration
+      .set("mapreduce.input.fileinputformat.inputdir",
+        storePath + "/default/carbontable")
+    val carbonTable: CarbonTable = CarbonInputFormat.getCarbonTable(job.getConfiguration)
+    val getAbsoluteTableIdentifier = classOf[CarbonInputFormat[Array[Object]]]
+      .getDeclaredMethod("getAbsoluteTableIdentifier", classOf[Configuration])
+    getAbsoluteTableIdentifier.setAccessible(true)
+    val absoluteTableIdentifier: AbsoluteTableIdentifier = getAbsoluteTableIdentifier
+      .invoke(carbonInputFormat, job.getConfiguration).asInstanceOf[AbsoluteTableIdentifier]
+
+    Assert
+      .assertEquals(carbonTable.getCarbonTableIdentifier.getTableId,
+        absoluteTableIdentifier.getCarbonTableIdentifier.getTableId)
+  }
+
+  test("test create table with database case name change") {
+
+    try {
+      // table creation should be successful
+      sql("create table carbontable(a int, b string)stored by 'carbondata'")
+      assert(true)
+    } catch {
+      case ex: Exception =>
+        assert(false)
+    }
+    validateTableId
+  }
+
+  override def afterAll {
+    sql("drop table if exists carbontable")
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 93ede38..e322fc8 100644
--- a/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -68,7 +68,7 @@ private[sql] case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
+      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
       None
     )(sqlContext)
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
index 7633b22..eb1730f 100644
--- a/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
+++ b/integration/spark2/src/main/scala/org/apache/spark/sql/CarbonDatasourceHadoopRelation.scala
@@ -49,7 +49,7 @@ case class CarbonDatasourceHadoopRelation(
       carbonTable.getDatabaseName,
       carbonTable.getFactTableName,
       CarbonSparkUtil.createSparkMeta(carbonTable),
-      new TableMeta(absIdentifier.getCarbonTableIdentifier, paths.head, carbonTable),
+      new TableMeta(carbonTable.getCarbonTableIdentifier, paths.head, carbonTable),
       None
     )
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/733968b4/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
----------------------------------------------------------------------
diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
index b7d215d..d18c727 100644
--- a/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
+++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/DataLoadExecutor.java
@@ -63,7 +63,7 @@ public class DataLoadExecutor {
         new CarbonTableIdentifier(loadModel.getDatabaseName(), loadModel.getTableName(),
null)
             .getBadRecordLoggerKey();
     if (null != BadRecordsLogger.hasBadRecord(key)) {
-      LOGGER.error("Data Load is partcially success for table " + loadModel.getTableName());
+      LOGGER.error("Data Load is partially success for table " + loadModel.getTableName());
       throw new BadRecordFoundException("Bad records found during load");
     } else {
       LOGGER.info("Data loading is successful for table "+loadModel.getTableName());


Mime
View raw message