carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chenliang...@apache.org
Subject [3/5] carbondata git commit: add hive integration for carbon
Date Thu, 25 May 2017 03:35:33 GMT
add hive integration for carbon

add hive integration to assembly

alter CarbonInputFormat to implement mapred.InputFormat

add a hive serde for carbon

add hive integration to assembly

fix error in getQueryModel

add debug info

add debug info

add debug info

add debug info

fix error in CarbonRecordReader

use ArrayWritable for CarbonRecordReader

fix error in initializing CarbonRecordReader

fix error in initializing CarbonRecordReader

fix error in initializing CarbonRecordReader

fix error in initializing CarbonRecordReader

修改InputFormat的返回值

把需要查的列设置到carbon里去

fix nullpoint exception

add catalyst depedency

add catalyst depedency

add catalyst depedency

fix error in intializing carbon error

add a new hive carbon recordreader

添加把object序列化成ArrayWritable的代码

short/int等数据类型在Carbon当中实际上是Long类型

use right inspector

use right inspector

fix long can't cast int error

fix decimal cast error

column size is not equal to column type

column size is not equal to column type

column size is not equal to column type

column size is not equal to column type

fix ObjInspector error

fix ObjInspector error

fix ObjInspector error

add a new hive input split

should not combine path

add support for timestamp

clean codes

remove unused codes

support Date and TimeStamp type

add basic hive integration

alter code style

alter code style

alter code style

alter code style

change create table statement

alter CarbonSerde test case

alter CarbonSerde test case

add carbondata-hive to test classpath

add carbondata-hive to test classpath

use hive compatible schema

exclude kryo

exclude kryo

make a new profile for hive 1.2.1

remove carbon-hive from parent and assembly pom

use groupId to apache hive in pom.xml

remote hadoop-yarn-api, but HadoopFileExample will throw exception when debugging in IDEA

change profile name

add quick start guide for basic hive integration module

add private for properties

add some params for hive to read subdirectories recursively


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/a700f83e
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/a700f83e
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/a700f83e

Branch: refs/heads/master
Commit: a700f83e0844619616a1c66e22b70dcc9c3e1b46
Parents: 9669c0b
Author: cenyuhai <cenyuhai@didichuxing.com>
Authored: Sun Mar 12 23:17:40 2017 +0800
Committer: chenliang613 <chenliang613@apache.org>
Committed: Thu May 25 11:34:37 2017 +0800

----------------------------------------------------------------------
 dev/java-code-format-template.xml               |   2 +-
 .../carbondata/hadoop/CarbonInputFormat.java    |   2 +-
 .../carbondata/hadoop/CarbonRecordReader.java   |   8 +-
 integration/hive/hive-guide.md                  | 106 +++++++
 integration/hive/pom.xml                        | 114 ++++++++
 .../carbondata/hive/CarbonArrayInspector.java   | 192 ++++++++++++
 .../carbondata/hive/CarbonHiveInputSplit.java   | 290 +++++++++++++++++++
 .../carbondata/hive/CarbonHiveRecordReader.java | 249 ++++++++++++++++
 .../apache/carbondata/hive/CarbonHiveSerDe.java | 231 +++++++++++++++
 .../carbondata/hive/CarbonObjectInspector.java  | 221 ++++++++++++++
 .../hive/CarbonStorageFormatDescriptor.java     |  47 +++
 .../hive/MapredCarbonInputFormat.java           |  99 +++++++
 .../hive/MapredCarbonOutputFormat.java          |  49 ++++
 ...he.hadoop.hive.ql.io.StorageFormatDescriptor |   1 +
 .../apache/carbondata/hive/TestCarbonSerde.java | 133 +++++++++
 pom.xml                                         |  17 ++
 16 files changed, 1755 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/dev/java-code-format-template.xml
----------------------------------------------------------------------
diff --git a/dev/java-code-format-template.xml b/dev/java-code-format-template.xml
index d117313..b39ef1e 100644
--- a/dev/java-code-format-template.xml
+++ b/dev/java-code-format-template.xml
@@ -34,8 +34,8 @@
   <option name="IMPORT_LAYOUT_TABLE">
     <value>
       <emptyLine />
-      <package name="javax" withSubpackages="true" static="false" />
       <package name="java" withSubpackages="true" static="false" />
+      <package name="javax" withSubpackages="true" static="false" />
       <emptyLine />
       <package name="org.apache.carbondata" withSubpackages="true" static="false" />
       <emptyLine />

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
index b63e8b8..b760158 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonInputFormat.java
@@ -355,7 +355,7 @@ public class CarbonInputFormat<T> extends FileInputFormat<Void, T> {
     return result;
   }
 
-  private Expression getFilterPredicates(Configuration configuration) {
+  protected Expression getFilterPredicates(Configuration configuration) {
     try {
       String filterExprString = configuration.get(FILTER_PREDICATE);
       if (filterExprString == null) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
----------------------------------------------------------------------
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
index 26b269a..5af4c30 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/CarbonRecordReader.java
@@ -40,13 +40,13 @@ import org.apache.hadoop.mapreduce.TaskAttemptContext;
  */
 public class CarbonRecordReader<T> extends AbstractRecordReader<T> {
 
-  private QueryModel queryModel;
+  protected QueryModel queryModel;
 
-  private CarbonReadSupport<T> readSupport;
+  protected CarbonReadSupport<T> readSupport;
 
-  private CarbonIterator<Object[]> carbonIterator;
+  protected CarbonIterator<Object[]> carbonIterator;
 
-  private QueryExecutor queryExecutor;
+  protected QueryExecutor queryExecutor;
 
   public CarbonRecordReader(QueryModel queryModel, CarbonReadSupport<T> readSupport) {
     this.queryModel = queryModel;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/hive-guide.md
----------------------------------------------------------------------
diff --git a/integration/hive/hive-guide.md b/integration/hive/hive-guide.md
new file mode 100644
index 0000000..202b2b2
--- /dev/null
+++ b/integration/hive/hive-guide.md
@@ -0,0 +1,106 @@
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+-->
+
+# Quick Start
+This tutorial provides a quick introduction to using current integration/hive module.
+
+## Prerequisites
+## Spark Version 2.1
+* Build integration/hive
+mvn -DskipTests -Pspark-2.1 -Dspark.version=2.1.0 clean package -Phadoop-2.7.2 -Phive-1.2
+
+
+* Create a sample.csv file using the following commands. The CSV file is required for loading data into CarbonData.
+
+  ```
+  cd carbondata
+  cat > sample.csv << EOF
+  id,name,scale,country,salary
+  1,yuhai,1.77,china,33000.1
+  2,runlin,1.70,china,33000.2
+  EOF
+  ```
+  $HADOOP_HOME/bin/hadoop fs -put sample.csv /user/hadoop/sample.csv
+
+## Create hive carbon table in spark shell
+
+Start Spark shell by running the following command in the Spark directory:
+
+```
+./bin/spark-shell --jars <carbondata assembly jar path, carbondata hive jar path>
+```
+
+```
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.CarbonSession._
+val rootPath = "hdfs:////user/hadoop/carbon"
+val storeLocation = s"$rootPath/store"
+val warehouse = s"$rootPath/warehouse"
+val metastoredb = s"$rootPath/metastore_db"
+
+ val carbon = SparkSession.builder().enableHiveSupport().config("spark.sql.warehouse.dir", warehouse).config(org.apache.carbondata.core.constants.CarbonCommonConstants.STORE_LOCATION, storeLocation).getOrCreateCarbonSession(storeLocation, metastoredb)
+
+carbon.sql("create table hive_carbon(id int, name string, scale decimal, country string, salary double) STORED BY 'carbondata'")
+carbon.sql("LOAD DATA INPATH 'hdfs://mycluster/user/hadoop/sample.csv' INTO TABLE hive_carbon")
+
+```
+
+## Query Data from a Table
+
+```
+scala>carbon.sql("SELECT * FROM hive_carbon").show()
+```
+
+## Query Data in Hive
+
+### Configure hive classpath
+```
+mkdir hive/auxlibs/
+cp incubator-carbondata/assembly/target/scala-2.11/carbondata_2.11*.jar hive/auxlibs/
+cp incubator-carbondata/integration/hive/target/carbondata-hive-*.jar hive/auxlibs/
+cp $SPARK_HOME/jars/spark-catalyst*.jar hive/auxlibs/
+export HIVE_AUX_JARS_PATH=hive/auxlibs/
+```
+
+### Alter schema in Hive
+$HIVE_HOME/bin/hive
+
+```
+alter table hive_carbon set FILEFORMAT
+INPUTFORMAT "org.apache.carbondata.hive.MapredCarbonInputFormat"
+OUTPUTFORMAT "org.apache.carbondata.hive.MapredCarbonOutputFormat"
+SERDE "org.apache.carbondata.hive.CarbonHiveSerDe";
+
+alter table hive_carbon set LOCATION 'hdfs://mycluster-tj/user/hadoop/carbon/store/default/hive_carbon';
+alter table hive_carbon change col id INT;
+alter table hive_carbon add columns(name string, scale decimal(10, 2), country string, salary double);
+
+```
+
+### Query data from hive table
+```
+set hive.mapred.supports.subdirectories=true;
+set mapreduce.input.fileinputformat.input.dir.recursive=true;
+
+select * from hive_carbon;
+
+select * from hive_carbon order by id;
+```
+
+

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/pom.xml
----------------------------------------------------------------------
diff --git a/integration/hive/pom.xml b/integration/hive/pom.xml
new file mode 100644
index 0000000..714245e
--- /dev/null
+++ b/integration/hive/pom.xml
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.carbondata</groupId>
+        <artifactId>carbondata-parent</artifactId>
+        <version>1.1.0-incubating-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>carbondata-hive</artifactId>
+    <name>Apache CarbonData :: Hive</name>
+
+    <properties>
+        <hive.version>1.2.1</hive.version>
+        <dev.path>${basedir}/../../dev</dev.path>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.hive</groupId>
+            <artifactId>hive-exec</artifactId>
+            <version>${hive.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-metastore</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-shims</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>hive-ant</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.hive</groupId>
+                    <artifactId>spark-client</artifactId>
+                </exclusion>
+                <exclusion>
+                    <artifactId>kryo</artifactId>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                </exclusion>
+            </exclusions>
+            <scope>compile</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.carbondata</groupId>
+            <artifactId>carbondata-hadoop</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <resources>
+            <resource>
+                <directory>src/resources</directory>
+            </resource>
+        </resources>
+        <plugins>
+            <plugin>
+                <artifactId>maven-compiler-plugin</artifactId>
+                <configuration>
+                    <source>1.7</source>
+                    <target>1.7</target>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <version>2.18</version>
+                <!-- Note config is repeated in scalatest config -->
+                <configuration>
+                    <includes>
+                        <include>**/Test*.java</include>
+                        <include>**/*Test.java</include>
+                        <include>**/*TestCase.java</include>
+                        <include>**/*Suite.java</include>
+                    </includes>
+                    <reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
+                    <argLine>-Xmx3g -XX:MaxPermSize=512m -XX:ReservedCodeCacheSize=512m</argLine>
+                    <systemProperties>
+                        <java.awt.headless>true</java.awt.headless>
+                    </systemProperties>
+                    <failIfNoTests>false</failIfNoTests>
+                </configuration>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
new file mode 100644
index 0000000..424dc5a
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonArrayInspector.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * The CarbonHiveArrayInspector will inspect an ArrayWritable, considering it as an Hive array.
+ * It can also inspect a List if Hive decides to inspect the result of an inspection.
+ */
+public class CarbonArrayInspector implements SettableListObjectInspector {
+
+  private ObjectInspector arrayElementInspector;
+
+  public CarbonArrayInspector(final ObjectInspector arrayElementInspector) {
+    this.arrayElementInspector = arrayElementInspector;
+  }
+
+  @Override
+  public String getTypeName() {
+    return "array<" + arrayElementInspector.getTypeName() + ">";
+  }
+
+  @Override
+  public Category getCategory() {
+    return Category.LIST;
+  }
+
+  @Override
+  public ObjectInspector getListElementObjectInspector() {
+    return arrayElementInspector;
+  }
+
+  @Override
+  public Object getListElement(final Object data, final int index) {
+    if (data == null) {
+      return null;
+    }
+
+    if (data instanceof ArrayWritable) {
+      final Writable[] listContainer = ((ArrayWritable) data).get();
+
+      if (listContainer == null || listContainer.length == 0) {
+        return null;
+      }
+
+      final Writable subObj = listContainer[0];
+
+      if (subObj == null) {
+        return null;
+      }
+
+      if (index >= 0 && index < ((ArrayWritable) subObj).get().length) {
+        return ((ArrayWritable) subObj).get()[index];
+      } else {
+        return null;
+      }
+    }
+
+    throw new UnsupportedOperationException("Cannot inspect "
+      + data.getClass().getCanonicalName());
+  }
+
+  @Override
+  public int getListLength(final Object data) {
+    if (data == null) {
+      return -1;
+    }
+
+    if (data instanceof ArrayWritable) {
+      final Writable[] listContainer = ((ArrayWritable) data).get();
+
+      if (listContainer == null || listContainer.length == 0) {
+        return -1;
+      }
+
+      final Writable subObj = listContainer[0];
+
+      if (subObj == null) {
+        return 0;
+      }
+
+      return ((ArrayWritable) subObj).get().length;
+    }
+
+    throw new UnsupportedOperationException("Cannot inspect "
+      + data.getClass().getCanonicalName());
+  }
+
+  @Override
+  public List<?> getList(final Object data) {
+    if (data == null) {
+      return null;
+    }
+
+    if (data instanceof ArrayWritable) {
+      final Writable[] listContainer = ((ArrayWritable) data).get();
+
+      if (listContainer == null || listContainer.length == 0) {
+        return null;
+      }
+
+      final Writable subObj = listContainer[0];
+
+      if (subObj == null) {
+        return null;
+      }
+
+      final Writable[] array = ((ArrayWritable) subObj).get();
+      final List<Writable> list = Arrays.asList(array);
+
+      for (final Writable obj : array) {
+        list.add(obj);
+      }
+
+      return list;
+    }
+
+    throw new UnsupportedOperationException("Cannot inspect "
+      + data.getClass().getCanonicalName());
+  }
+
+  @Override
+  public Object create(final int size) {
+    final List<Object> result = Arrays.asList(new Object[size]);
+    for (int i = 0; i < size; ++i) {
+      result.add(null);
+    }
+    return result;
+  }
+
+  @Override
+  public Object set(final Object list, final int index, final Object element) {
+    final ArrayList<Object> l = (ArrayList<Object>) list;
+    l.set(index, element);
+    return list;
+  }
+
+  @Override
+  public Object resize(final Object list, final int newSize) {
+    final ArrayList<Object> l = (ArrayList<Object>) list;
+    l.ensureCapacity(newSize);
+    while (l.size() < newSize) {
+      l.add(null);
+    }
+    while (l.size() > newSize) {
+      l.remove(l.size() - 1);
+    }
+    return list;
+  }
+
+  @Override
+  public boolean equals(final Object o) {
+    if (o == null || o.getClass() != getClass()) {
+      return false;
+    } else if (o == this) {
+      return true;
+    } else {
+      final ObjectInspector other = ((CarbonArrayInspector) o).arrayElementInspector;
+      return other.equals(arrayElementInspector);
+    }
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 3;
+    hash = 29 * hash + (this.arrayElementInspector != null ?
+      this.arrayElementInspector.hashCode() : 0);
+    return hash;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
new file mode 100644
index 0000000..16b859b
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveInputSplit.java
@@ -0,0 +1,290 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.block.BlockletInfos;
+import org.apache.carbondata.core.datastore.block.Distributable;
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.metadata.ColumnarFormatVersion;
+import org.apache.carbondata.core.mutate.UpdateVO;
+import org.apache.carbondata.core.util.CarbonProperties;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+import org.apache.carbondata.hadoop.internal.index.Block;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileSplit;
+
+public class CarbonHiveInputSplit extends FileSplit
+    implements Distributable, Serializable, Writable, Block {
+
+  private static final long serialVersionUID = 3520344046772190208L;
+  private String taskId;
+
+  private String segmentId;
+
+  private String bucketId;
+  /*
+   * Invalid segments that need to be removed in task side index
+   */
+  private List<String> invalidSegments;
+
+  /*
+   * Number of BlockLets in a block
+   */
+  private int numberOfBlocklets;
+
+  private ColumnarFormatVersion version;
+
+  /**
+   * map of blocklocation and storage id
+   */
+  private Map<String, String> blockStorageIdMap =
+      new HashMap<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE);
+
+  private List<UpdateVO> invalidTimestampsList;
+
+  public CarbonHiveInputSplit() {
+    segmentId = null;
+    taskId = "0";
+    bucketId = "0";
+    numberOfBlocklets = 0;
+    invalidSegments = new ArrayList<>();
+    version = CarbonProperties.getInstance().getFormatVersion();
+  }
+
+  public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
+      String[] locations, ColumnarFormatVersion version) {
+    super(path, start, length, locations);
+    this.segmentId = segmentId;
+    this.taskId = CarbonTablePath.DataFileUtil.getTaskNo(path.getName());
+    this.bucketId = CarbonTablePath.DataFileUtil.getBucketNo(path.getName());
+    this.invalidSegments = new ArrayList<>();
+    this.version = version;
+  }
+
+  public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
+      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version) {
+    this(segmentId, path, start, length, locations, version);
+    this.numberOfBlocklets = numberOfBlocklets;
+  }
+
+  /**
+   * Constructor to initialize the CarbonInputSplit with blockStorageIdMap
+   *
+   * @param segmentId
+   * @param path
+   * @param start
+   * @param length
+   * @param locations
+   * @param numberOfBlocklets
+   * @param version
+   * @param blockStorageIdMap
+   */
+  public CarbonHiveInputSplit(String segmentId, Path path, long start, long length,
+      String[] locations, int numberOfBlocklets, ColumnarFormatVersion version,
+      Map<String, String> blockStorageIdMap) {
+    this(segmentId, path, start, length, locations, numberOfBlocklets, version);
+    this.blockStorageIdMap = blockStorageIdMap;
+  }
+
+  public static CarbonHiveInputSplit from(String segmentId, FileSplit split,
+                                          ColumnarFormatVersion version)
+    throws IOException {
+    return new CarbonHiveInputSplit(segmentId, split.getPath(), split.getStart(), split.getLength(),
+      split.getLocations(), version);
+  }
+
+  public static List<TableBlockInfo> createBlocks(List<CarbonHiveInputSplit> splitList) {
+    List<TableBlockInfo> tableBlockInfoList = new ArrayList<>();
+    for (CarbonHiveInputSplit split : splitList) {
+      BlockletInfos blockletInfos =
+          new BlockletInfos(split.getNumberOfBlocklets(), 0, split.getNumberOfBlocklets());
+      try {
+        tableBlockInfoList.add(
+            new TableBlockInfo(split.getPath().toString(), split.getStart(), split.getSegmentId(),
+            split.getLocations(), split.getLength(), blockletInfos, split.getVersion()));
+      } catch (IOException e) {
+        throw new RuntimeException("fail to get location of split: " + split, e);
+      }
+    }
+    return tableBlockInfoList;
+  }
+
+  public static TableBlockInfo getTableBlockInfo(CarbonHiveInputSplit inputSplit) {
+    BlockletInfos blockletInfos =
+        new BlockletInfos(inputSplit.getNumberOfBlocklets(), 0, inputSplit.getNumberOfBlocklets());
+    try {
+      return new TableBlockInfo(inputSplit.getPath().toString(), inputSplit.getStart(),
+        inputSplit.getSegmentId(), inputSplit.getLocations(), inputSplit.getLength(),
+        blockletInfos, inputSplit.getVersion());
+    } catch (IOException e) {
+      throw new RuntimeException("fail to get location of split: " + inputSplit, e);
+    }
+  }
+
+  public String getSegmentId() {
+    return segmentId;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    this.segmentId = in.readUTF();
+    this.version = ColumnarFormatVersion.valueOf(in.readShort());
+    this.bucketId = in.readUTF();
+    int numInvalidSegment = in.readInt();
+    invalidSegments = new ArrayList<>(numInvalidSegment);
+    for (int i = 0; i < numInvalidSegment; i++) {
+      invalidSegments.add(in.readUTF());
+    }
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(segmentId);
+    out.writeShort(version.number());
+    out.writeUTF(bucketId);
+    out.writeInt(invalidSegments.size());
+    for (String invalidSegment : invalidSegments) {
+      out.writeUTF(invalidSegment);
+    }
+  }
+
+  public List<String> getInvalidSegments() {
+    return invalidSegments;
+  }
+
+  public void setInvalidSegments(List<String> invalidSegments) {
+    this.invalidSegments = invalidSegments;
+  }
+
+  public void setInvalidTimestampRange(List<UpdateVO> invalidTimestamps) {
+    invalidTimestampsList = invalidTimestamps;
+  }
+
+  public List<UpdateVO> getInvalidTimestampRange() {
+    return invalidTimestampsList;
+  }
+
+  /**
+   * returns the number of blocklets
+   *
+   * @return
+   */
+  public int getNumberOfBlocklets() {
+    return numberOfBlocklets;
+  }
+
+  public ColumnarFormatVersion getVersion() {
+    return version;
+  }
+
+  public void setVersion(ColumnarFormatVersion version) {
+    this.version = version;
+  }
+
+  public String getBucketId() {
+    return bucketId;
+  }
+
+  @Override
+  public int compareTo(Distributable o) {
+    if (o == null) {
+      return -1;
+    }
+    CarbonHiveInputSplit other = (CarbonHiveInputSplit) o;
+    int compareResult = 0;
+    // get the segment id
+    // converr seg ID to double.
+
+    double seg1 = Double.parseDouble(segmentId);
+    double seg2 = Double.parseDouble(other.getSegmentId());
+    if (seg1 - seg2 < 0) {
+      return -1;
+    }
+    if (seg1 - seg2 > 0) {
+      return 1;
+    }
+
+    // Comparing the time task id of the file to other
+    // if both the task id of the file is same then we need to compare the
+    // offset of
+    // the file
+    String filePath1 = this.getPath().getName();
+    String filePath2 = other.getPath().getName();
+    if (CarbonTablePath.isCarbonDataFile(filePath1)) {
+      int firstTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath1));
+      int otherTaskId = Integer.parseInt(CarbonTablePath.DataFileUtil.getTaskNo(filePath2));
+      if (firstTaskId != otherTaskId) {
+        return firstTaskId - otherTaskId;
+      }
+
+      int firstBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath1));
+      int otherBucketNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getBucketNo(filePath2));
+      if (firstBucketNo != otherBucketNo) {
+        return firstBucketNo - otherBucketNo;
+      }
+
+      // compare the part no of both block info
+      int firstPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath1));
+      int SecondPartNo = Integer.parseInt(CarbonTablePath.DataFileUtil.getPartNo(filePath2));
+      compareResult = firstPartNo - SecondPartNo;
+    } else {
+      compareResult = filePath1.compareTo(filePath2);
+    }
+    if (compareResult != 0) {
+      return compareResult;
+    }
+    return 0;
+  }
+
+  @Override
+  public String getBlockPath() {
+    return getPath().getName();
+  }
+
+  @Override
+  public List<Long> getMatchedBlocklets() {
+    return null;
+  }
+
+  @Override
+  public boolean fullScan() {
+    return true;
+  }
+
+  /**
+   * returns map of blocklocation and storage id
+   *
+   * @return
+   */
+  public Map<String, String> getBlockStorageIdMap() {
+    return blockStorageIdMap;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
new file mode 100644
index 0000000..ba29028
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveRecordReader.java
@@ -0,0 +1,249 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+
+import java.io.IOException;
+import java.sql.Date;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.carbondata.core.datastore.block.TableBlockInfo;
+import org.apache.carbondata.core.scan.executor.exception.QueryExecutionException;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.core.scan.result.iterator.ChunkRowIterator;
+import org.apache.carbondata.hadoop.CarbonRecordReader;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.DateWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.io.TimestampWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.*;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+
+public class CarbonHiveRecordReader extends CarbonRecordReader<ArrayWritable>
+    implements org.apache.hadoop.mapred.RecordReader<Void, ArrayWritable> {
+
+  ArrayWritable valueObj = null;
+  private CarbonObjectInspector objInspector;
+
+  public CarbonHiveRecordReader(QueryModel queryModel, CarbonReadSupport<ArrayWritable> readSupport,
+                                InputSplit inputSplit, JobConf jobConf) throws IOException {
+    super(queryModel, readSupport);
+    initialize(inputSplit, jobConf);
+  }
+
+  public void initialize(InputSplit inputSplit, Configuration conf) throws IOException {
+    // The input split can contain single HDFS block or multiple blocks, so firstly get all the
+    // blocks and then set them in the query model.
+    List<CarbonHiveInputSplit> splitList;
+    if (inputSplit instanceof CarbonHiveInputSplit) {
+      splitList = new ArrayList<>(1);
+      splitList.add((CarbonHiveInputSplit) inputSplit);
+    } else {
+      throw new RuntimeException("unsupported input split type: " + inputSplit);
+    }
+    List<TableBlockInfo> tableBlockInfoList = CarbonHiveInputSplit.createBlocks(splitList);
+    queryModel.setTableBlockInfos(tableBlockInfoList);
+    readSupport.initialize(queryModel.getProjectionColumns(),
+        queryModel.getAbsoluteTableIdentifier());
+    try {
+      carbonIterator = new ChunkRowIterator(queryExecutor.execute(queryModel));
+    } catch (QueryExecutionException e) {
+      throw new IOException(e.getMessage(), e.getCause());
+    }
+    if (valueObj == null) {
+      valueObj = new ArrayWritable(Writable.class,
+          new Writable[queryModel.getProjectionColumns().length]);
+    }
+
+    final TypeInfo rowTypeInfo;
+    final List<String> columnNames;
+    List<TypeInfo> columnTypes;
+    // Get column names and sort order
+    final String columnNameProperty = conf.get("hive.io.file.readcolumn.names");
+    final String columnTypeProperty = conf.get(serdeConstants.LIST_COLUMN_TYPES);
+
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNameProperty.split(","));
+    }
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+    columnTypes = columnTypes.subList(0, columnNames.size());
+    // Create row related objects
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+  }
+
+  @Override
+  public boolean next(Void aVoid, ArrayWritable value) throws IOException {
+    if (carbonIterator.hasNext()) {
+      Object obj = readSupport.readRow(carbonIterator.next());
+      ArrayWritable tmpValue = null;
+      try {
+        tmpValue = createArrayWritable(obj);
+      } catch (SerDeException se) {
+        throw new IOException(se.getMessage(), se.getCause());
+      }
+
+      if (value != tmpValue) {
+        final Writable[] arrValue = value.get();
+        final Writable[] arrCurrent = tmpValue.get();
+        if (valueObj != null && arrValue.length == arrCurrent.length) {
+          System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length);
+        } else {
+          if (arrValue.length != arrCurrent.length) {
+            throw new IOException("CarbonHiveInput : size of object differs. Value" +
+              " size :  " + arrValue.length + ", Current Object size : " + arrCurrent.length);
+          } else {
+            throw new IOException("CarbonHiveInput can not support RecordReaders that" +
+              " don't return same key & value & value is null");
+          }
+        }
+      }
+      return true;
+    } else {
+      return false;
+    }
+  }
+
+  public ArrayWritable createArrayWritable(Object obj) throws SerDeException {
+    return createStruct(obj, objInspector);
+  }
+
+  @Override
+  public Void createKey() {
+    return null;
+  }
+
+  @Override
+  public ArrayWritable createValue() {
+    return valueObj;
+  }
+
+  @Override
+  public long getPos() throws IOException {
+    return 0;
+  }
+
+  @Override
+  public float getProgress() throws IOException {
+    return 0;
+  }
+
+  public ArrayWritable createStruct(Object obj, StructObjectInspector inspector)
+      throws SerDeException {
+    List fields = inspector.getAllStructFieldRefs();
+    Writable[] arr = new Writable[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      StructField field = (StructField) fields.get(i);
+      Object subObj = inspector.getStructFieldData(obj, field);
+      ObjectInspector subInspector = field.getFieldObjectInspector();
+      arr[i] = createObject(subObj, subInspector);
+    }
+    return new ArrayWritable(Writable.class, arr);
+  }
+
+  private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
+    throws SerDeException {
+    List sourceArray = inspector.getList(obj);
+    ObjectInspector subInspector = inspector.getListElementObjectInspector();
+    List array = new ArrayList();
+    Iterator iterator;
+    if (sourceArray != null) {
+      for (iterator = sourceArray.iterator(); iterator.hasNext(); ) {
+        Object curObj = iterator.next();
+        Writable newObj = createObject(curObj, subInspector);
+        if (newObj != null) {
+          array.add(newObj);
+        }
+      }
+    }
+    if (array.size() > 0) {
+      ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(),
+          (Writable[]) array.toArray(new Writable[array.size()]));
+
+      return new ArrayWritable(Writable.class, new Writable[]{subArray});
+    }
+    return null;
+  }
+
+  private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector)
+      throws SerDeException {
+    if (obj == null) {
+      return null;
+    }
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return null;
+      case DOUBLE:
+        return new DoubleWritable((double) obj);
+      case INT:
+        return new IntWritable(((Long) obj).intValue());
+      case LONG:
+        return new LongWritable((long) obj);
+      case SHORT:
+        return new ShortWritable(((Long) obj).shortValue());
+      case DATE:
+        return new DateWritable(new Date(((long) obj)));
+      case TIMESTAMP:
+        return new TimestampWritable(new Timestamp((long) obj));
+      case STRING:
+        return new Text(obj.toString());
+      case DECIMAL:
+        return new HiveDecimalWritable(HiveDecimal.create(
+          ((org.apache.spark.sql.types.Decimal) obj).toJavaBigDecimal()));
+    }
+    throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
+  }
+
+  private Writable createObject(Object obj, ObjectInspector inspector) throws SerDeException {
+    switch (inspector.getCategory()) {
+      case STRUCT:
+        return createStruct(obj, (StructObjectInspector) inspector);
+      case LIST:
+        return createArray(obj, (ListObjectInspector) inspector);
+      case PRIMITIVE:
+        return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
+    }
+    throw new SerDeException("Unknown data type" + inspector.getCategory());
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
new file mode 100644
index 0000000..cbc2514
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonHiveSerDe.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+import javax.annotation.Nullable;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.AbstractSerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeSpec;
+import org.apache.hadoop.hive.serde2.SerDeStats;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+
+
+/**
+ * A serde class for Carbondata.
+ * It transparently passes the object to/from the Carbon file reader/writer.
+ */
+@SerDeSpec(schemaProps = {serdeConstants.LIST_COLUMNS, serdeConstants.LIST_COLUMN_TYPES})
+public class CarbonHiveSerDe extends AbstractSerDe {
+  private SerDeStats stats;
+  private ObjectInspector objInspector;
+
+  private enum LAST_OPERATION {
+    SERIALIZE,
+    DESERIALIZE,
+    UNKNOWN
+  }
+
+  private LAST_OPERATION status;
+  private long serializedSize;
+  private long deserializedSize;
+
+  public CarbonHiveSerDe() {
+    stats = new SerDeStats();
+  }
+
+  @Override
+  public void initialize(@Nullable Configuration configuration, Properties tbl)
+      throws SerDeException {
+
+    final TypeInfo rowTypeInfo;
+    final List<String> columnNames;
+    final List<TypeInfo> columnTypes;
+    // Get column names and sort order
+    final String columnNameProperty = tbl.getProperty(serdeConstants.LIST_COLUMNS);
+    final String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES);
+
+    if (columnNameProperty.length() == 0) {
+      columnNames = new ArrayList<String>();
+    } else {
+      columnNames = Arrays.asList(columnNameProperty.split(","));
+    }
+    if (columnTypeProperty.length() == 0) {
+      columnTypes = new ArrayList<TypeInfo>();
+    } else {
+      columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+    }
+    // Create row related objects
+    rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
+    this.objInspector = new CarbonObjectInspector((StructTypeInfo) rowTypeInfo);
+
+    // Stats part
+    serializedSize = 0;
+    deserializedSize = 0;
+    status = LAST_OPERATION.UNKNOWN;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return ArrayWritable.class;
+  }
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objectInspector) throws SerDeException {
+    if (!objInspector.getCategory().equals(ObjectInspector.Category.STRUCT)) {
+      throw new SerDeException("Cannot serialize " + objInspector.getCategory()
+        + ". Can only serialize a struct");
+    }
+    serializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size();
+    status = LAST_OPERATION.SERIALIZE;
+    return createStruct(obj, (StructObjectInspector) objInspector);
+  }
+
+  public ArrayWritable createStruct(Object obj, StructObjectInspector inspector)
+      throws SerDeException {
+    List fields = inspector.getAllStructFieldRefs();
+    Writable[] arr = new Writable[fields.size()];
+    for (int i = 0; i < fields.size(); i++) {
+      StructField field = (StructField) fields.get(i);
+      Object subObj = inspector.getStructFieldData(obj, field);
+      ObjectInspector subInspector = field.getFieldObjectInspector();
+      arr[i] = createObject(subObj, subInspector);
+    }
+    return new ArrayWritable(Writable.class, arr);
+  }
+
+  private ArrayWritable createArray(Object obj, ListObjectInspector inspector)
+    throws SerDeException {
+    List sourceArray = inspector.getList(obj);
+    ObjectInspector subInspector = inspector.getListElementObjectInspector();
+    List array = new ArrayList();
+    Iterator iterator;
+    if (sourceArray != null) {
+      for (iterator = sourceArray.iterator(); iterator.hasNext(); ) {
+        Object curObj = iterator.next();
+        Writable newObj = createObject(curObj, subInspector);
+        if (newObj != null) {
+          array.add(newObj);
+        }
+      }
+    }
+    if (array.size() > 0) {
+      ArrayWritable subArray = new ArrayWritable(((Writable) array.get(0)).getClass(),
+          (Writable[]) array.toArray(new Writable[array.size()]));
+
+      return new ArrayWritable(Writable.class, new Writable[]{subArray});
+    }
+    return null;
+  }
+
+  private Writable createPrimitive(Object obj, PrimitiveObjectInspector inspector)
+    throws SerDeException {
+    if (obj == null) {
+      return null;
+    }
+    switch (inspector.getPrimitiveCategory()) {
+      case VOID:
+        return null;
+      case DOUBLE:
+        return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
+      case INT:
+        return new IntWritable(((IntObjectInspector) inspector).get(obj));
+      case LONG:
+        return new LongWritable(((LongObjectInspector) inspector).get(obj));
+      case SHORT:
+        return new ShortWritable(((ShortObjectInspector) inspector).get(obj));
+      case TIMESTAMP:
+        return ((TimestampObjectInspector) inspector).getPrimitiveWritableObject(obj);
+      case DATE:
+        return ((DateObjectInspector) inspector).getPrimitiveWritableObject(obj);
+      case STRING:
+        return ((StringObjectInspector) inspector).getPrimitiveWritableObject(obj);
+      case DECIMAL:
+        return ((HiveDecimalObjectInspector) inspector).getPrimitiveWritableObject(obj);
+    }
+    throw new SerDeException("Unknown primitive : " + inspector.getPrimitiveCategory());
+  }
+
+  private Writable createObject(Object obj, ObjectInspector inspector) throws SerDeException {
+    switch (inspector.getCategory()) {
+      case STRUCT:
+        return createStruct(obj, (StructObjectInspector) inspector);
+      case LIST:
+        return createArray(obj, (ListObjectInspector) inspector);
+      case PRIMITIVE:
+        return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
+    }
+    throw new SerDeException("Unknown data type" + inspector.getCategory());
+  }
+
+  @Override
+  public SerDeStats getSerDeStats() {
+    // must be different
+    assert (status != LAST_OPERATION.UNKNOWN);
+    if (status == LAST_OPERATION.SERIALIZE) {
+      stats.setRawDataSize(serializedSize);
+    } else {
+      stats.setRawDataSize(deserializedSize);
+    }
+    return stats;
+  }
+
+  @Override
+  public Object deserialize(Writable writable) throws SerDeException {
+    status = LAST_OPERATION.DESERIALIZE;
+    if (writable instanceof ArrayWritable) {
+      deserializedSize += ((StructObjectInspector) objInspector).getAllStructFieldRefs().size();
+      return writable;
+    } else {
+      return null;
+    }
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return objInspector;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
new file mode 100644
index 0000000..f6ab256
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonObjectInspector.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory;
+import org.apache.hadoop.io.ArrayWritable;
+
+public class CarbonObjectInspector extends SettableStructObjectInspector {
+  private final TypeInfo typeInfo;
+  private final List<TypeInfo> fieldInfos;
+  private final List<String> fieldNames;
+  private final List<StructField> fields;
+  private final HashMap<String, StructFieldImpl> fieldsByName;
+
+  public CarbonObjectInspector(final StructTypeInfo rowTypeInfo) {
+
+    typeInfo = rowTypeInfo;
+    fieldNames = rowTypeInfo.getAllStructFieldNames();
+    fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos();
+    fields = new ArrayList<StructField>(fieldNames.size());
+    fieldsByName = new HashMap<String, StructFieldImpl>();
+
+    for (int i = 0; i < fieldNames.size(); ++i) {
+      final String name = fieldNames.get(i);
+      final TypeInfo fieldInfo = fieldInfos.get(i);
+
+      final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i);
+      fields.add(field);
+      fieldsByName.put(name, field);
+    }
+  }
+
+  public ObjectInspector getObjectInspector(final TypeInfo typeInfo) {
+    if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector;
+    } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
+    } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableLongObjectInspector;
+    } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableStringObjectInspector;
+    } else if (typeInfo instanceof DecimalTypeInfo) {
+      return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector(
+        (DecimalTypeInfo) typeInfo);
+    } else if (typeInfo.getCategory().equals(Category.STRUCT)) {
+      return new CarbonObjectInspector((StructTypeInfo) typeInfo);
+    } else if (typeInfo.getCategory().equals(Category.LIST)) {
+      final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo();
+      return new CarbonArrayInspector(getObjectInspector(subTypeInfo));
+    } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableShortObjectInspector;
+    } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector;
+    } else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) {
+      return PrimitiveObjectInspectorFactory.writableDateObjectInspector;
+    } else {
+      throw new UnsupportedOperationException("Unknown field type: " + typeInfo);
+    }
+  }
+
+  @Override
+  public Category getCategory() {
+    return Category.STRUCT;
+  }
+
+  @Override
+  public String getTypeName() {
+    return typeInfo.getTypeName();
+  }
+
+  @Override
+  public List<? extends StructField> getAllStructFieldRefs() {
+    return fields;
+  }
+
+  @Override
+  public Object getStructFieldData(final Object data, final StructField fieldRef) {
+    if (data == null) {
+      return null;
+    }
+
+    if (data instanceof ArrayWritable) {
+      final ArrayWritable arr = (ArrayWritable) data;
+      return arr.get()[((StructFieldImpl) fieldRef).getIndex()];
+    }
+
+    boolean isArray = !(data instanceof List);
+    if (!isArray && !(data instanceof List)) {
+      return data;
+    } else {
+      int listSize = isArray ? ((Object[]) ((Object[]) data)).length : ((List) data).size();
+      int fieldID = fieldRef.getFieldID();
+      return fieldID >= listSize ? null :
+        (isArray ? ((Object[]) ((Object[]) data))[fieldID] : ((List) data).get(fieldID));
+    }
+  }
+
+  @Override
+  public StructField getStructFieldRef(final String name) {
+    return fieldsByName.get(name);
+  }
+
+  @Override
+  public List<Object> getStructFieldsDataAsList(final Object data) {
+    if (data == null) {
+      return null;
+    }
+
+    if (data instanceof ArrayWritable) {
+      final ArrayWritable arr = (ArrayWritable) data;
+      final Object[] arrWritable = arr.get();
+      return new ArrayList<Object>(Arrays.asList(arrWritable));
+    }
+
+    throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName());
+  }
+
+  @Override
+  public Object create() {
+    final ArrayList<Object> list = new ArrayList<Object>(fields.size());
+    for (int i = 0; i < fields.size(); ++i) {
+      list.add(null);
+    }
+    return list;
+  }
+
+  @Override
+  public Object setStructFieldData(Object struct, StructField field, Object fieldValue) {
+    final ArrayList<Object> list = (ArrayList<Object>) struct;
+    list.set(((StructFieldImpl) field).getIndex(), fieldValue);
+    return list;
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+    if (getClass() != obj.getClass()) {
+      return false;
+    }
+    final CarbonObjectInspector other = (CarbonObjectInspector) obj;
+    if (this.typeInfo != other.typeInfo &&
+        (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) {
+      return false;
+    }
+    return true;
+  }
+
+  @Override
+  public int hashCode() {
+    int hash = 5;
+    hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0);
+    return hash;
+  }
+
+  class StructFieldImpl implements StructField {
+
+    private final String name;
+    private final ObjectInspector inspector;
+    private final int index;
+
+    public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) {
+      this.name = name;
+      this.inspector = inspector;
+      this.index = index;
+    }
+
+    @Override
+    public String getFieldComment() {
+      return "";
+    }
+
+    @Override
+    public String getFieldName() {
+      return name;
+    }
+
+    public int getIndex() {
+      return index;
+    }
+
+    @Override
+    public ObjectInspector getFieldObjectInspector() {
+      return inspector;
+    }
+
+    @Override
+    public int getFieldID() {
+      return index;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java
new file mode 100644
index 0000000..f25342d
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/CarbonStorageFormatDescriptor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.hive;
+
+import java.util.Set;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.hadoop.hive.ql.io.AbstractStorageFormatDescriptor;
+
+public class CarbonStorageFormatDescriptor extends AbstractStorageFormatDescriptor {
+
+  @Override
+  public Set<String> getNames() {
+    return ImmutableSet.of("CARBONDATA");
+  }
+
+  @Override
+  public String getInputFormat() {
+    return MapredCarbonInputFormat.class.getName();
+  }
+
+  @Override
+  public String getOutputFormat() {
+    return MapredCarbonOutputFormat.class.getName();
+  }
+
+  @Override
+  public String getSerde() {
+    return CarbonHiveSerDe.class.getName();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
new file mode 100644
index 0000000..5caf5a8
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonInputFormat.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import org.apache.carbondata.core.scan.model.CarbonQueryPlan;
+import org.apache.carbondata.core.scan.model.QueryModel;
+import org.apache.carbondata.hadoop.CarbonInputFormat;
+import org.apache.carbondata.hadoop.CarbonInputSplit;
+import org.apache.carbondata.hadoop.readsupport.CarbonReadSupport;
+import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+
+
+public class MapredCarbonInputFormat extends CarbonInputFormat<ArrayWritable>
+    implements InputFormat<Void, ArrayWritable>, CombineHiveInputFormat.AvoidSplitCombination {
+
+  @Override
+  public InputSplit[] getSplits(JobConf jobConf, int numSplits) throws IOException {
+    org.apache.hadoop.mapreduce.JobContext jobContext = Job.getInstance(jobConf);
+    List<org.apache.hadoop.mapreduce.InputSplit> splitList = super.getSplits(jobContext);
+    InputSplit[] splits = new InputSplit[splitList.size()];
+    CarbonInputSplit split = null;
+    for (int i = 0; i < splitList.size(); i++) {
+      split = (CarbonInputSplit) splitList.get(i);
+      splits[i] = new CarbonHiveInputSplit(split.getSegmentId(), split.getPath(),
+          split.getStart(), split.getLength(), split.getLocations(),
+          split.getNumberOfBlocklets(), split.getVersion(), split.getBlockStorageIdMap());
+    }
+    return splits;
+  }
+
+  @Override
+  public RecordReader<Void, ArrayWritable> getRecordReader(InputSplit inputSplit, JobConf jobConf,
+                                                           Reporter reporter) throws IOException {
+    QueryModel queryModel = getQueryModel(jobConf);
+    CarbonReadSupport<ArrayWritable> readSupport = getReadSupportClass(jobConf);
+    return new CarbonHiveRecordReader(queryModel, readSupport, inputSplit, jobConf);
+  }
+
+  public QueryModel getQueryModel(Configuration configuration) throws IOException {
+    CarbonTable carbonTable = getCarbonTable(configuration);
+    // getting the table absoluteTableIdentifier from the carbonTable
+    // to avoid unnecessary deserialization
+    AbsoluteTableIdentifier identifier = carbonTable.getAbsoluteTableIdentifier();
+
+    // query plan includes projection column
+
+    String projection = getColumnProjection(configuration);
+    if (projection == null) {
+      projection = configuration.get("hive.io.file.readcolumn.names");
+    }
+    CarbonQueryPlan queryPlan = CarbonInputFormatUtil.createQueryPlan(carbonTable, projection);
+    QueryModel queryModel = QueryModel.createModel(identifier, queryPlan, carbonTable);
+
+    // set the filter to the query model in order to filter blocklet before scan
+    Expression filter = getFilterPredicates(configuration);
+    CarbonInputFormatUtil.processFilterExpression(filter, carbonTable);
+    FilterResolverIntf filterIntf = CarbonInputFormatUtil.resolveFilter(filter, identifier);
+    queryModel.setFilterExpressionResolverTree(filterIntf);
+
+    return queryModel;
+  }
+
+  @Override
+  public boolean shouldSkipCombine(Path path, Configuration conf) throws IOException {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
new file mode 100644
index 0000000..83fd2d5
--- /dev/null
+++ b/integration/hive/src/main/java/org/apache/carbondata/hive/MapredCarbonOutputFormat.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.FileOutputFormat;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordWriter;
+import org.apache.hadoop.util.Progressable;
+
+
+public class MapredCarbonOutputFormat<T> extends FileOutputFormat<Void, T>
+    implements HiveOutputFormat<Void, T> {
+
+  @Override
+  public RecordWriter<Void, T> getRecordWriter(FileSystem fileSystem, JobConf jobConf, String s,
+      Progressable progressable) throws IOException {
+    return null;
+  }
+
+  @Override
+  public FileSinkOperator.RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+      Class<? extends Writable> valueClass, boolean isCompressed, Properties tableProperties,
+      Progressable progress) throws IOException {
+    return null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor
----------------------------------------------------------------------
diff --git a/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor b/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor
new file mode 100644
index 0000000..9a39d21
--- /dev/null
+++ b/integration/hive/src/main/resources/META-INF.services/org.apache.hadoop.hive.ql.io.StorageFormatDescriptor
@@ -0,0 +1 @@
+org.apache.hadoop.hive.ql.io.CarbonStorageFormatDescriptor
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
----------------------------------------------------------------------
diff --git a/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
new file mode 100644
index 0000000..3969914
--- /dev/null
+++ b/integration/hive/src/test/java/org/apache/carbondata/hive/TestCarbonSerde.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.carbondata.hive;
+
+import junit.framework.TestCase;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hive.common.type.HiveDecimal;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.*;
+import org.junit.Test;
+
+import java.util.Properties;
+
+public class TestCarbonSerde extends TestCase {
+  @Test
+  public void testCarbonHiveSerDe() throws Throwable {
+    try {
+      // Create the SerDe
+      System.out.println("test: testCarbonHiveSerDe");
+
+      final CarbonHiveSerDe serDe = new CarbonHiveSerDe();
+      final Configuration conf = new Configuration();
+      final Properties tbl = createProperties();
+      SerDeUtils.initializeSerDe(serDe, conf, tbl, null);
+
+      // Data
+      final Writable[] arr = new Writable[7];
+
+      //primitive types
+      arr[0] = new ShortWritable((short) 456);
+      arr[1] = new IntWritable(789);
+      arr[2] = new LongWritable(1000l);
+      arr[3] = new DoubleWritable((double) 5.3);
+      arr[4] = new HiveDecimalWritable(HiveDecimal.create(1));
+      arr[5] = new Text("carbonSerde binary".getBytes("UTF-8"));
+
+      final Writable[] arrayContainer = new Writable[1];
+      final Writable[] array = new Writable[5];
+      for (int i = 0; i < 5; ++i) {
+        array[i] = new IntWritable(i);
+      }
+      arrayContainer[0] = new ArrayWritable(Writable.class, array);
+      arr[6] = new ArrayWritable(Writable.class, arrayContainer);
+
+      final ArrayWritable arrWritable = new ArrayWritable(Writable.class, arr);
+      // Test
+      deserializeAndSerializeLazySimple(serDe, arrWritable);
+      System.out.println("test: testCarbonHiveSerDe - OK");
+
+    } catch (final Throwable e) {
+      e.printStackTrace();
+      throw e;
+    }
+  }
+
+  private void deserializeAndSerializeLazySimple(final CarbonHiveSerDe serDe,
+      final ArrayWritable t) throws SerDeException {
+
+    // Get the row structure
+    final StructObjectInspector oi = (StructObjectInspector) serDe.getObjectInspector();
+
+    // Deserialize
+    final Object row = serDe.deserialize(t);
+    assertEquals("deserialization gives the wrong object class", row.getClass(),
+        ArrayWritable.class);
+    assertEquals("size correct after deserialization",
+        serDe.getSerDeStats().getRawDataSize(), t.get().length);
+    assertEquals("deserialization gives the wrong object", t, row);
+
+    // Serialize
+    final ArrayWritable serializedArr = (ArrayWritable) serDe.serialize(row, oi);
+    assertEquals("size correct after serialization", serDe.getSerDeStats().getRawDataSize(),
+        serializedArr.get().length);
+    assertTrue("serialized object should be equal to starting object",
+        arrayWritableEquals(t, serializedArr));
+  }
+
+  private Properties createProperties() {
+    final Properties tbl = new Properties();
+
+    // Set the configuration parameters
+    tbl.setProperty("columns", "ashort,aint,along,adouble,adecimal,astring,alist");
+    tbl.setProperty("columns.types",
+        "smallint:int:bigint:double:decimal:string:array<int>");
+    tbl.setProperty(org.apache.hadoop.hive.serde.serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL");
+    return tbl;
+  }
+
+  public static boolean arrayWritableEquals(final ArrayWritable a1, final ArrayWritable a2) {
+    final Writable[] a1Arr = a1.get();
+    final Writable[] a2Arr = a2.get();
+
+    if (a1Arr.length != a2Arr.length) {
+      return false;
+    }
+
+    for (int i = 0; i < a1Arr.length; ++i) {
+      if (a1Arr[i] instanceof ArrayWritable) {
+        if (!(a2Arr[i] instanceof ArrayWritable)) {
+          return false;
+        }
+        if (!arrayWritableEquals((ArrayWritable) a1Arr[i], (ArrayWritable) a2Arr[i])) {
+          return false;
+        }
+      } else {
+        if (!a1Arr[i].equals(a2Arr[i])) {
+          return false;
+        }
+      }
+
+    }
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/a700f83e/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index d7fb6a8..d8626e7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -150,6 +150,12 @@
       </dependency>
       <dependency>
         <groupId>org.apache.hadoop</groupId>
+        <artifactId>hadoop-yarn-api</artifactId>
+        <version>${hadoop.version}</version>
+        <scope>${hadoop.deps.scope}</scope>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.hadoop</groupId>
         <artifactId>hadoop-mapreduce-client-core</artifactId>
         <version>${hadoop.version}</version>
         <scope>${hadoop.deps.scope}</scope>
@@ -309,6 +315,7 @@
         <scala.binary.version>2.10</scala.binary.version>
         <scala.version>2.10.4</scala.version>
         <maven.test.skip>true</maven.test.skip>
+        <hive.version>1.2.1</hive.version>
         <flink.version>1.1.4</flink.version>
       </properties>
       <modules>
@@ -316,6 +323,7 @@
         <module>integration/spark</module>
         <module>examples/spark</module>
         <module>integration/spark2</module>
+        <module>integration/hive</module>
         <module>examples/spark2</module>
         <module>examples/flink</module>
       </modules>
@@ -383,6 +391,15 @@
       </modules>
     </profile>
     <profile>
+      <id>hive-1.2</id>
+      <properties>
+        <hive.version>1.2.1</hive.version>
+      </properties>
+      <modules>
+        <module>integration/hive</module>
+      </modules>
+    </profile>
+    <profile>
       <id>findbugs</id>
       <build>
         <plugins>


Mime
View raw message