carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ravipes...@apache.org
Subject [16/19] carbondata git commit: [CARBONDATA-937] Data loading for partition table(12-dev) This closes #842
Date Mon, 15 May 2017 05:06:29 GMT
[CARBONDATA-937] Data loading for partition table(12-dev)  This closes #842

load data for partition table

fix comments

fix comments


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/b6b93f07
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/b6b93f07
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/b6b93f07

Branch: refs/heads/master
Commit: b6b93f076d53a4ba87d1f0a2123c04189f2a76e9
Parents: 86f6a81
Author: QiangCai <qiangcai@qq.com>
Authored: Mon Apr 24 11:28:28 2017 +0800
Committer: jackylk <jacky.likun@huawei.com>
Committed: Sun May 14 20:40:14 2017 +0800

----------------------------------------------------------------------
 .../core/scan/partition/HashPartitioner.java    |  41 +++
 .../core/scan/partition/ListPartitioner.java    |  65 +++++
 .../core/scan/partition/PartitionUtil.java      | 109 +++++++
 .../core/scan/partition/Partitioner.java        |  27 ++
 .../core/scan/partition/RangePartitioner.java   | 137 +++++++++
 .../carbondata/hadoop/CarbonInputSplit.java     |   6 +-
 .../TestDataLoadingForPartitionTable.scala      | 289 +++++++++++++++++++
 .../carbondata/spark/PartitionFactory.scala     |  65 +++++
 .../spark/rdd/NewCarbonDataLoadRDD.scala        |  79 +++++
 .../spark/rdd/CarbonDataRDDFactory.scala        | 117 +++++++-
 .../spark/rdd/CarbonDataRDDFactory.scala        | 126 +++++++-
 11 files changed, 1039 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
new file mode 100644
index 0000000..240c449
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/HashPartitioner.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+/**
+ * Hash Partitioner
+ */
+public class HashPartitioner implements Partitioner {
+
+  private int numPartitions = 0;
+
+  public HashPartitioner(int numPartitions) {
+    this.numPartitions = numPartitions;
+  }
+
+  @Override public int numPartitions() {
+    return numPartitions;
+  }
+
+  @Override public int getPartition(Object key) {
+    if (key == null) {
+      return 0;
+    }
+    return (key.hashCode() & Integer.MAX_VALUE) % numPartitions;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
new file mode 100644
index 0000000..bab2ede
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/ListPartitioner.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+
+/**
+ * List Partitioner
+ */
+public class ListPartitioner implements Partitioner {
+
+  /**
+   * map the value of ListPartition to partition id.
+   */
+  private Map<Object, Integer> map = new java.util.HashMap<Object, Integer>();
+
+  private int numPartitions;
+
+  public ListPartitioner(PartitionInfo partitionInfo) {
+    List<List<String>> values = partitionInfo.getListInfo();
+    DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
+    numPartitions = values.size();
+    for (int i = 0; i < numPartitions; i++) {
+      for (String value : values.get(i)) {
+        map.put(PartitionUtil.getDataBasedOnDataType(value, partitionColumnDataType), i);
+      }
+    }
+  }
+
+  /**
+   * number of partitions
+   * add extra default partition
+   * @return
+   */
+  @Override public int numPartitions() {
+    return numPartitions + 1;
+  }
+
+  @Override public int getPartition(Object key) {
+    Integer partition = map.get(key);
+    if (partition == null) {
+      return numPartitions;
+    }
+    return partition;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
new file mode 100644
index 0000000..a5b3a9f
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/PartitionUtil.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.math.BigDecimal;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.BitSet;
+
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.commons.lang.StringUtils;
+
+public class PartitionUtil {
+
+  private static LogService LOGGER = LogServiceFactory.getLogService(PartitionUtil.class.getName());
+
+  private static final ThreadLocal<DateFormat> timestampFormatter = new ThreadLocal<DateFormat>() {
+    @Override protected DateFormat initialValue() {
+      return new SimpleDateFormat(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+              CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT));
+    }
+  };
+
+  private static final ThreadLocal<DateFormat> dateFormatter = new ThreadLocal<DateFormat>() {
+    @Override protected DateFormat initialValue() {
+      return new SimpleDateFormat(CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+              CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT));
+    }
+  };
+
+  public static Partitioner getPartitioner(PartitionInfo partitionInfo) {
+    switch (partitionInfo.getPartitionType()) {
+      case HASH:
+        return new HashPartitioner(partitionInfo.getNumPartitions());
+      case LIST:
+        return new ListPartitioner(partitionInfo);
+      case RANGE:
+        return new RangePartitioner(partitionInfo);
+      default:
+        throw new UnsupportedOperationException(
+            "unsupport partition type: " + partitionInfo.getPartitionType().name());
+    }
+  }
+
+  public static Object getDataBasedOnDataType(String data, DataType actualDataType) {
+    if (data == null) {
+      return null;
+    }
+    if (actualDataType != DataType.STRING && StringUtils.isEmpty(data)) {
+      return null;
+    }
+    try {
+      switch (actualDataType) {
+        case STRING:
+          return data;
+        case INT:
+          return Integer.parseInt(data);
+        case SHORT:
+          return Short.parseShort(data);
+        case DOUBLE:
+          return Double.parseDouble(data);
+        case LONG:
+          return Long.parseLong(data);
+        case DATE:
+          return PartitionUtil.dateFormatter.get().parse(data).getTime();
+        case TIMESTAMP:
+          return PartitionUtil.timestampFormatter.get().parse(data).getTime();
+        case DECIMAL:
+          return new BigDecimal(data);
+        default:
+          return data;
+      }
+    } catch (Exception ex) {
+      return null;
+    }
+  }
+
+  public static BitSet generateBitSetBySize(int size, boolean isContainAll) {
+    BitSet bitSet = new BitSet(size);
+    if (isContainAll) {
+      bitSet.set(0, size);
+    }
+    return bitSet;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
new file mode 100644
index 0000000..772a98e
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/Partitioner.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.io.Serializable;
+
+public interface Partitioner extends Serializable {
+
+  int numPartitions();
+
+  int getPartition(Object key);
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
new file mode 100644
index 0000000..c73f0b5
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/scan/partition/RangePartitioner.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.scan.partition;
+
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.datatype.DataType;
+import org.apache.carbondata.core.metadata.schema.PartitionInfo;
+import org.apache.carbondata.core.util.ByteUtil;
+
+/**
+ * Range Partitioner
+ */
+public class RangePartitioner implements Partitioner {
+
+  private int numPartitions;
+  private Object[] bounds;
+  private SerializableComparator comparator;
+
+  public RangePartitioner(PartitionInfo partitionInfo) {
+    List<String> values = partitionInfo.getRangeInfo();
+    DataType partitionColumnDataType = partitionInfo.getColumnSchemaList().get(0).getDataType();
+    numPartitions = values.size();
+    bounds = new Object[numPartitions];
+    if (partitionColumnDataType == DataType.STRING) {
+      for (int i = 0; i < numPartitions; i++) {
+        bounds[i] = ByteUtil.toBytes(values.get(i));
+      }
+    } else {
+      for (int i = 0; i < numPartitions; i++) {
+        bounds[i] = PartitionUtil.getDataBasedOnDataType(values.get(i), partitionColumnDataType);
+      }
+    }
+
+    switch (partitionColumnDataType) {
+      case INT:
+        comparator = new IntSerializableComparator();
+        break;
+      case SHORT:
+        comparator = new ShortSerializableComparator();
+        break;
+      case DOUBLE:
+        comparator = new DoubleSerializableComparator();
+        break;
+      case LONG:
+      case DATE:
+      case TIMESTAMP:
+        comparator = new LongSerializableComparator();
+        break;
+      case DECIMAL:
+        comparator = new BigDecimalSerializableComparator();
+        break;
+      default:
+        comparator = new ByteArraySerializableComparator();
+    }
+  }
+
+  /**
+   * number of partitions
+   * add extra default partition
+   * @return
+   */
+  @Override public int numPartitions() {
+    return numPartitions + 1;
+  }
+
+  @Override public int getPartition(Object key) {
+    if (key == null) {
+      return numPartitions;
+    } else {
+      for (int i = 0; i < numPartitions; i++) {
+        if (comparator.compareTo(key, bounds[i])) {
+          return i;
+        }
+      }
+      return numPartitions;
+    }
+  }
+
+  interface SerializableComparator extends Serializable {
+    boolean compareTo(Object key1, Object key2);
+  }
+
+  class ByteArraySerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return ByteUtil.compare((byte[]) key1, (byte[]) key2) < 0;
+    }
+  }
+
+  class IntSerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return (int) key1 - (int) key2 < 0;
+    }
+  }
+
+  class ShortSerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return (short) key1 - (short) key2 < 0;
+    }
+  }
+
+  class DoubleSerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return (double) key1 - (double) key2 < 0;
+    }
+  }
+
+  class LongSerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return (long) key1 - (long) key2 < 0;
+    }
+  }
+
+  class BigDecimalSerializableComparator implements SerializableComparator {
+    @Override public boolean compareTo(Object key1, Object key2) {
+      return ((BigDecimal) key1).compareTo((BigDecimal) key2) < 0;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
index 0dcaba2..1a75bf9 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputSplit.java
@@ -237,8 +237,10 @@ public class CarbonInputSplit extends FileSplit
     String filePath1 = this.getPath().getName();
     String filePath2 = other.getPath().getName();
     if (CarbonTablePath.isCarbonDataFile(filePath1)) {
-      int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
-      int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+      int firstTaskId =
+          Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1).split("_")[0]);
+      int otherTaskId =
+          Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2).split("_")[0]);
       if (firstTaskId != otherTaskId) {
         return firstTaskId - otherTaskId;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
new file mode 100644
index 0000000..c0ee4f2
--- /dev/null
+++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala
@@ -0,0 +1,289 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.spark.testsuite.partition
+
+import org.apache.spark.sql.common.util.QueryTest
+import org.scalatest.BeforeAndAfterAll
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.metadata.CarbonMetadata
+import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll {
+
+  val defaultTimestampFormat = CarbonProperties.getInstance()
+    .getProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT)
+
+  override def beforeAll {
+    dropTable
+
+    CarbonProperties.getInstance()
+      .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "dd-MM-yyyy")
+    sql(
+      """
+        | CREATE TABLE originTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    sql(
+      """
+        | CREATE TABLE originMultiLoads (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | STORED BY 'org.apache.carbondata.format'
+      """.stripMargin)
+
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE originMultiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+  }
+
+  def validateDataFiles(tableUniqueName: String, sgementId: String, partitions: Seq[Int]): Unit = {
+    val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName)
+    val tablePath = new CarbonTablePath(carbonTable.getStorePath, carbonTable.getDatabaseName,
+      carbonTable.getFactTableName)
+    val segmentDir = tablePath.getCarbonDataDirectoryPath("0", sgementId)
+    val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir))
+    val dataFiles = carbonFile.listFiles(new CarbonFileFilter() {
+      override def accept(file: CarbonFile): Boolean = {
+        return file.getName.endsWith(".carbondata")
+      }
+    })
+
+    assert(dataFiles.size == partitions.size)
+
+    dataFiles.foreach { dataFile =>
+      val taskId = CarbonTablePath.DataFileUtil.getTaskNo(dataFile.getName).split("_")(0).toInt
+      assert(partitions.exists(_ == taskId))
+    }
+  }
+
+  test("data loading for partition table: hash partition") {
+    sql(
+      """
+        | CREATE TABLE hashTable (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE hashTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_hashTable", "0", Seq(0, 1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTable order by empno"),
+      sql("select  empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for partition table: range partition") {
+    sql(
+      """
+        | CREATE TABLE rangeTable (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        |  'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE rangeTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_rangeTable", "0", Seq(0, 1, 3, 4))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTable order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("data loading for partition table: list partition") {
+    sql(
+      """
+        | CREATE TABLE listTable (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        |  'LIST_INFO'='0, 1, (2, 3)')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE listTable OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    validateDataFiles("default_listTable", "0", Seq(1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTable order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+  }
+
+  test("Insert into for partition table: hash partition") {
+    sql(
+      """
+        | CREATE TABLE hashTableForInsert (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql("insert into hashTableForInsert select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+
+    validateDataFiles("default_hashTableForInsert", "0", Seq(0, 1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from hashTableForInsert order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("Insert into for partition table: range partition") {
+    sql(
+      """
+        | CREATE TABLE rangeTableForInsert (empno int, empname String, designation String,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (doj Timestamp)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='RANGE',
+        |  'RANGE_INFO'='01-01-2010, 01-01-2015, 01-04-2015, 01-07-2015')
+      """.stripMargin)
+    sql("insert into rangeTableForInsert select empno, empname, designation, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, doj from originTable")
+
+    validateDataFiles("default_rangeTableForInsert", "0", Seq(0, 1, 3, 4))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from rangeTableForInsert order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+
+  }
+
+  test("Insert into partition table: list partition") {
+    sql(
+      """
+        | CREATE TABLE listTableForInsert (empno int, empname String, designation String, doj Timestamp,
+        |  workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (workgroupcategory int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='LIST',
+        |  'LIST_INFO'='0, 1, (2, 3)')
+      """.stripMargin)
+    sql("insert into listTableForInsert select empno, empname, designation, doj, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, workgroupcategory from originTable")
+
+    validateDataFiles("default_listTableForInsert", "0", Seq(1, 2))
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from listTableForInsert order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originTable order by empno"))
+  }
+
+  test("multiple data loading for partition table") {
+    sql(
+      """
+        | CREATE TABLE multiLoads (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE multiLoads OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiLoads order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
+  test("multiple insertInto for partition table") {
+    sql(
+      """
+        | CREATE TABLE multiInserts (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+    sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+    sql("insert into multiInserts select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from multiInserts order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
+  test("multiple data loading and insertInto for partition table") {
+    sql(
+      """
+        | CREATE TABLE loadAndInsert (empname String, designation String, doj Timestamp,
+        |  workgroupcategory int, workgroupcategoryname String, deptno int, deptname String,
+        |  projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int,
+        |  utilization int,salary int)
+        | PARTITIONED BY (empno int)
+        | STORED BY 'org.apache.carbondata.format'
+        | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3')
+      """.stripMargin)
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadAndInsert OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+    sql("insert into loadAndInsert select empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary, empno from originTable")
+    sql(s"""LOAD DATA local inpath '$resourcesPath/data.csv' INTO TABLE loadAndInsert OPTIONS('DELIMITER'= ',', 'QUOTECHAR'= '"')""")
+
+    checkAnswer(sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from loadAndInsert order by empno"),
+      sql("select empno, empname, designation, doj, workgroupcategory, workgroupcategoryname, deptno, deptname, projectcode, projectjoindate, projectenddate, attendance, utilization, salary from originMultiLoads order by empno"))
+  }
+
+  override def afterAll = {
+    dropTable
+    if (defaultTimestampFormat == null) {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+          CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+    } else {
+      CarbonProperties.getInstance()
+        .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, defaultTimestampFormat)
+    }
+  }
+
+  def dropTable = {
+    sql("drop table if exists originTable")
+    sql("drop table if exists hashTable")
+    sql("drop table if exists rangeTable")
+    sql("drop table if exists listTable")
+    sql("drop table if exists hashTableForInsert")
+    sql("drop table if exists rangeTableForInsert")
+    sql("drop table if exists listTableForInsert")
+    sql("drop table if exists originMultiLoads")
+    sql("drop table if exists multiLoads")
+    sql("drop table if exists multiInserts")
+    sql("drop table if exists loadAndInsert")
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
new file mode 100644
index 0000000..f7758a6
--- /dev/null
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/PartitionFactory.scala
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.spark
+
+import org.apache.spark.Partitioner
+
+import org.apache.carbondata.core.metadata.schema.PartitionInfo
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
+import org.apache.carbondata.core.scan.partition.{HashPartitioner => JavaHashPartitioner, ListPartitioner => JavaListPartitioner, RangePartitioner => JavaRangePartitioner}
+import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
+
+object PartitionFactory {
+
+  def getPartitioner(partitionInfo: PartitionInfo): Partitioner = {
+    partitionInfo.getPartitionType match {
+      case PartitionType.HASH => new HashPartitioner(partitionInfo.getNumPartitions)
+      case PartitionType.LIST => new ListPartitioner(partitionInfo)
+      case PartitionType.RANGE => new RangePartitioner(partitionInfo)
+      case partitionType =>
+        throw new CarbonDataLoadingException(s"Unsupport partition type: ${partitionType}")
+    }
+  }
+}
+
+class HashPartitioner(partitions: Int) extends Partitioner {
+
+  private val partitioner = new JavaHashPartitioner(partitions)
+
+  override def numPartitions: Int = partitioner.numPartitions()
+
+  override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}
+
+class ListPartitioner(partitionInfo: PartitionInfo) extends Partitioner {
+
+  private val partitioner = new JavaListPartitioner(partitionInfo)
+
+  override def numPartitions: Int = partitioner.numPartitions()
+
+  override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}
+
+class RangePartitioner(partitionInfo: PartitionInfo) extends Partitioner {
+
+  private val partitioner = new JavaRangePartitioner(partitionInfo)
+
+  override def numPartitions: Int = partitioner.numPartitions()
+
+  override def getPartition(key: Any): Int = partitioner.getPartition(key)
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
----------------------------------------------------------------------
diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
index a6d231d..4d0770d 100644
--- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
+++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala
@@ -566,3 +566,82 @@ class LazyRddIterator(serializer: SerializerInstance,
   }
 
 }
+
+/*
+ *  It loads the data  to carbon from RDD for partition table
+ *  @see org.apache.carbondata.processing.newflow.DataLoadExecutor
+ */
+class PartitionTableDataLoaderRDD[K, V](
+    sc: SparkContext,
+    result: DataLoadResult[K, V],
+    carbonLoadModel: CarbonLoadModel,
+    loadCount: Integer,
+    tableCreationTime: Long,
+    schemaLastUpdatedTime: Long,
+    prev: RDD[Row]) extends RDD[(K, V)](prev) {
+
+
+  override def compute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = {
+    val LOGGER = LogServiceFactory.getLogService(this.getClass.getName)
+    val iter = new Iterator[(K, V)] {
+      val partitionID = "0"
+      val loadMetadataDetails = new LoadMetadataDetails()
+      val model: CarbonLoadModel = carbonLoadModel
+      val uniqueLoadStatusId =
+        carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index
+      try {
+
+        loadMetadataDetails.setPartitionCount(partitionID)
+        loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS)
+        carbonLoadModel.setPartitionId(partitionID)
+        carbonLoadModel.setSegmentId(String.valueOf(loadCount))
+        carbonLoadModel.setTaskNo(String.valueOf(theSplit.index))
+        carbonLoadModel.setPreFetch(false)
+
+        val recordReaders = Array[CarbonIterator[Array[AnyRef]]] {
+          new NewRddIterator(firstParent[Row].iterator(theSplit, context), carbonLoadModel, context)
+        }
+
+        val loader = new SparkPartitionLoader(model,
+          theSplit.index,
+          null,
+          String.valueOf(loadCount),
+          loadMetadataDetails)
+        // Intialize to set carbon properties
+        loader.initialize()
+        new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders)
+      } catch {
+        case e: BadRecordFoundException =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS)
+          logInfo("Bad Record Found")
+        case e: Exception =>
+          loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE)
+          logInfo("DataLoad For Partition Table failure", e)
+          LOGGER.error(e)
+          throw e
+      } finally {
+        // clean up the folders and files created locally for data load operation
+        CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false)
+        // in case of failure the same operation will be re-tried several times.
+        // So print the data load statistics only in case of non failure case
+        if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE
+          .equals(loadMetadataDetails.getLoadStatus)) {
+          CarbonTimeStatisticsFactory.getLoadStatisticsInstance
+            .printStatisticsInfo(model.getPartitionId)
+        }
+      }
+      var finished = false
+
+      override def hasNext: Boolean = !finished
+
+      override def next(): (K, V) = {
+        finished = true
+        result.getKey(uniqueLoadStatusId, loadMetadataDetails)
+      }
+    }
+    iter
+  }
+
+  override protected def getPartitions: Array[Partition] = firstParent[Row].partitions
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 8f4727a..9685871 100644
--- a/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -17,6 +17,7 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.text.SimpleDateFormat
 import java.util
 import java.util.UUID
 import java.util.concurrent._
@@ -28,10 +29,11 @@ import scala.util.control.Breaks._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD, UpdateCoalescedRDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
@@ -43,12 +45,15 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
 import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
@@ -56,7 +61,7 @@ import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingExcep
 import org.apache.carbondata.spark._
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -732,6 +737,23 @@ object CarbonDataRDDFactory {
 
       }
 
+      def loadDataForPartitionTable(): Unit = {
+        try {
+          val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+          status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            currentLoadCount,
+            tableCreationTime,
+            schemaLastUpdatedTime,
+            rdd).collect()
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex, "load data failed for partition table")
+            throw ex
+        }
+      }
+
       if (!updateModel.isDefined) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -742,10 +764,11 @@ object CarbonDataRDDFactory {
       try {
         if (updateModel.isDefined) {
           loadDataFrameForUpdate()
+        } else if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+          loadDataForPartitionTable()
         } else if (dataFrame.isDefined) {
           loadDataFrame()
-        }
-        else {
+        } else {
           loadDataFile()
         }
         if (updateModel.isDefined) {
@@ -932,8 +955,86 @@ object CarbonDataRDDFactory {
 
   }
 
-  private def writeDictionary(carbonLoadModel: CarbonLoadModel,
-      result: Option[DictionaryServer], writeAll: Boolean) = {
+  /**
+   * repartition the input data for partiton table.
+   * @param sqlContext
+   * @param dataFrame
+   * @param carbonLoadModel
+   * @return
+   */
+  private def repartitionInputData(sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      carbonLoadModel: CarbonLoadModel): RDD[Row] = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
+    val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
+    val columns = carbonLoadModel.getCsvHeaderColumns
+    var partitionColumnIndex = -1
+    for (i <- 0 until columns.length) {
+      if (partitionColumn.equals(columns(i))) {
+        partitionColumnIndex = i
+      }
+    }
+    if (partitionColumnIndex == -1) {
+      throw new DataLoadingException("Partition column not found.")
+    }
+    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+      // input data from DataFrame
+      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+      val dateFormat = new SimpleDateFormat(dateFormatString)
+      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+      val serializationNullFormat =
+        carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+      dataFrame.get.rdd.map { row =>
+        (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+          delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+      }
+    } else {
+      // input data from csv files
+      val hadoopConfiguration = new Configuration()
+      CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+      val columnCount = columns.length
+      new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sqlContext.sparkContext,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        hadoopConfiguration
+      ).map { currentRow =>
+        val row = new StringArrayRow(new Array[String](columnCount))
+        (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+      }
+    }
+
+    val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+    if (partitionColumnDataType == DataType.STRING) {
+      if (partitionInfo.getPartitionType == PartitionType.RANGE) {
+        inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
+          .partitionBy(partitioner)
+          .map(_._2)
+      } else {
+        inputRDD.partitionBy(partitioner)
+          .map(_._2)
+      }
+    } else {
+      inputRDD.map { row =>
+        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+      }
+        .partitionBy(partitioner)
+        .map(_._2)
+    }
+  }
+
+    private def writeDictionary(carbonLoadModel: CarbonLoadModel,
+        result: Option[DictionaryServer], writeAll: Boolean) = {
     // write dictionary file and shutdown dictionary server
     val uniqueTableName: String = s"${ carbonLoadModel.getDatabaseName }_${
       carbonLoadModel.getTableName }"

http://git-wip-us.apache.org/repos/asf/carbondata/blob/b6b93f07/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
----------------------------------------------------------------------
diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
index 835af35..812151e 100644
--- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
+++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
@@ -17,8 +17,8 @@
 
 package org.apache.carbondata.spark.rdd
 
+import java.text.SimpleDateFormat
 import java.util
-import java.util.UUID
 import java.util.concurrent._
 
 import scala.collection.JavaConverters._
@@ -28,10 +28,11 @@ import scala.util.control.Breaks._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.NullWritable
 import org.apache.hadoop.mapreduce.Job
 import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit}
 import org.apache.spark.{SparkEnv, SparkException}
-import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, UpdateCoalescedRDD}
+import org.apache.spark.rdd.{DataLoadCoalescedRDD, DataLoadPartitionCoalescer, NewHadoopRDD, RDD}
 import org.apache.spark.sql.{CarbonEnv, DataFrame, Row, SQLContext}
 import org.apache.spark.sql.execution.command.{AlterTableModel, CompactionModel, ExecutionErrors, UpdateTableModel}
 import org.apache.spark.sql.hive.DistributionUtil
@@ -43,20 +44,22 @@ import org.apache.carbondata.core.datastore.block.{Distributable, TableBlockInfo
 import org.apache.carbondata.core.dictionary.server.DictionaryServer
 import org.apache.carbondata.core.locks.{CarbonLockFactory, ICarbonLock, LockUsage}
 import org.apache.carbondata.core.metadata.{CarbonTableIdentifier, ColumnarFormatVersion}
+import org.apache.carbondata.core.metadata.datatype.DataType
+import org.apache.carbondata.core.metadata.schema.partition.PartitionType
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
 import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.scan.partition.PartitionUtil
 import org.apache.carbondata.core.statusmanager.LoadMetadataDetails
-import org.apache.carbondata.core.util.CarbonProperties
-import org.apache.carbondata.core.util.path.CarbonStorePath
-import org.apache.carbondata.processing.csvload.BlockDetails
+import org.apache.carbondata.core.util.{ByteUtil, CarbonProperties}
+import org.apache.carbondata.processing.csvload.{BlockDetails, CSVInputFormat, StringArrayWritable}
 import org.apache.carbondata.processing.etl.DataLoadingException
 import org.apache.carbondata.processing.merger.{CarbonCompactionUtil, CarbonDataMergerUtil, CompactionType}
 import org.apache.carbondata.processing.model.CarbonLoadModel
 import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException
-import org.apache.carbondata.spark._
+import org.apache.carbondata.spark.{DataLoadResultImpl, PartitionFactory}
 import org.apache.carbondata.spark.load._
 import org.apache.carbondata.spark.splits.TableSplit
-import org.apache.carbondata.spark.util.{CarbonQueryUtil, CommonUtil}
+import org.apache.carbondata.spark.util.{CarbonQueryUtil, CarbonScalaUtil, CommonUtil}
 
 /**
  * This is the factory class which can create different RDD depends on user needs.
@@ -653,6 +656,23 @@ object CarbonDataRDDFactory {
         }
       }
 
+      def loadDataForPartitionTable(): Unit = {
+        try {
+          val rdd = repartitionInputData(sqlContext, dataFrame, carbonLoadModel)
+          status = new PartitionTableDataLoaderRDD(sqlContext.sparkContext,
+            new DataLoadResultImpl(),
+            carbonLoadModel,
+            currentLoadCount,
+            tableCreationTime,
+            schemaLastUpdatedTime,
+            rdd).collect()
+        } catch {
+          case ex: Exception =>
+            LOGGER.error(ex, "load data failed for partition table")
+            throw ex
+        }
+      }
+
       if (!updateModel.isDefined) {
       CarbonLoaderUtil.checkAndCreateCarbonDataLocation(storePath,
         carbonLoadModel.getDatabaseName, carbonLoadModel.getTableName, currentLoadCount.toString)
@@ -661,11 +681,15 @@ object CarbonDataRDDFactory {
       var errorMessage: String = "DataLoad failure"
       var executorMessage: String = ""
       try {
-        if (dataFrame.isDefined) {
-          loadDataFrame()
-        }
-        else {
-          loadDataFile()
+        if (carbonTable.getPartitionInfo(carbonTable.getFactTableName) != null) {
+          loadDataForPartitionTable()
+        } else {
+          if (dataFrame.isDefined) {
+            loadDataFrame()
+          }
+          else {
+            loadDataFile()
+          }
         }
         if (updateModel.isDefined) {
 
@@ -859,6 +883,84 @@ object CarbonDataRDDFactory {
 
   }
 
+  /**
+   * repartition the input data for partiton table.
+   * @param sqlContext
+   * @param dataFrame
+   * @param carbonLoadModel
+   * @return
+   */
+  private def repartitionInputData(sqlContext: SQLContext,
+      dataFrame: Option[DataFrame],
+      carbonLoadModel: CarbonLoadModel): RDD[Row] = {
+    val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+    val partitionInfo = carbonTable.getPartitionInfo(carbonTable.getFactTableName)
+    val partitionColumn = partitionInfo.getColumnSchemaList.get(0).getColumnName
+    val partitionColumnDataType = partitionInfo.getColumnSchemaList.get(0).getDataType
+    val columns = carbonLoadModel.getCsvHeaderColumns
+    var partitionColumnIndex = -1
+    for (i <- 0 until columns.length) {
+      if (partitionColumn.equals(columns(i))) {
+        partitionColumnIndex = i
+      }
+    }
+    if (partitionColumnIndex == -1) {
+      throw new DataLoadingException("Partition column not found.")
+    }
+    // generate RDD[(K, V)] to use the partitionBy method of PairRDDFunctions
+    val inputRDD: RDD[(String, Row)] = if (dataFrame.isDefined) {
+      // input data from DataFrame
+      val timestampFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_TIMESTAMP_FORMAT, CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+      val timeStampFormat = new SimpleDateFormat(timestampFormatString)
+      val dateFormatString = CarbonProperties.getInstance().getProperty(CarbonCommonConstants
+        .CARBON_DATE_FORMAT, CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+      val dateFormat = new SimpleDateFormat(dateFormatString)
+      val delimiterLevel1 = carbonLoadModel.getComplexDelimiterLevel1
+      val delimiterLevel2 = carbonLoadModel.getComplexDelimiterLevel2
+      val serializationNullFormat =
+        carbonLoadModel.getSerializationNullFormat.split(CarbonCommonConstants.COMMA, 2)(1)
+      dataFrame.get.rdd.map { row =>
+        (CarbonScalaUtil.getString(row.get(partitionColumnIndex), serializationNullFormat,
+          delimiterLevel1, delimiterLevel2, timeStampFormat, dateFormat), row)
+      }
+    } else {
+      // input data from csv files
+      val hadoopConfiguration = new Configuration()
+      CommonUtil.configureCSVInputFormat(hadoopConfiguration, carbonLoadModel)
+      hadoopConfiguration.set(FileInputFormat.INPUT_DIR, carbonLoadModel.getFactFilePath)
+      val columnCount = columns.length
+      new NewHadoopRDD[NullWritable, StringArrayWritable](
+        sqlContext.sparkContext,
+        classOf[CSVInputFormat],
+        classOf[NullWritable],
+        classOf[StringArrayWritable],
+        hadoopConfiguration
+      ).map { currentRow =>
+        val row = new StringArrayRow(new Array[String](columnCount))
+        (currentRow._2.get()(partitionColumnIndex), row.setValues(currentRow._2.get()))
+      }
+    }
+
+    val partitioner = PartitionFactory.getPartitioner(partitionInfo)
+    if (partitionColumnDataType == DataType.STRING) {
+      if (partitionInfo.getPartitionType == PartitionType.RANGE) {
+        inputRDD.map { row => (ByteUtil.toBytes(row._1), row._2) }
+          .partitionBy(partitioner)
+          .map(_._2)
+      } else {
+        inputRDD.partitionBy(partitioner)
+          .map(_._2)
+      }
+    } else {
+      inputRDD.map { row =>
+        (PartitionUtil.getDataBasedOnDataType(row._1, partitionColumnDataType), row._2)
+      }
+        .partitionBy(partitioner)
+        .map(_._2)
+    }
+  }
+
   private def writeDictionary(carbonLoadModel: CarbonLoadModel,
       result: Option[DictionaryServer], writeAll: Boolean) = {
     // write dictionary file and shutdown dictionary server


Mime
View raw message