hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From na...@apache.org
Subject svn commit: r922404 [1/3] - in /hadoop/hive/trunk: ./ hbase-handler/ hbase-handler/lib/ hbase-handler/src/ hbase-handler/src/java/ hbase-handler/src/java/org/ hbase-handler/src/java/org/apache/ hbase-handler/src/java/org/apache/hadoop/ hbase-handler/sr...
Date Fri, 12 Mar 2010 19:53:59 GMT
Author: namit
Date: Fri Mar 12 19:53:57 2010
New Revision: 922404

URL: http://svn.apache.org/viewvc?rev=922404&view=rev
Log:
HIVE-705. Read HBase tables via Hive
(John Sichi via namit)


Added:
    hadoop/hive/trunk/hbase-handler/
    hadoop/hive/trunk/hbase-handler/README.txt
    hadoop/hive/trunk/hbase-handler/build.xml
    hadoop/hive/trunk/hbase-handler/ivy.xml
    hadoop/hive/trunk/hbase-handler/lib/
    hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar   (with props)
    hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar   (with props)
    hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar   (with props)
    hadoop/hive/trunk/hbase-handler/src/
    hadoop/hive/trunk/hbase-handler/src/java/
    hadoop/hive/trunk/hbase-handler/src/java/org/
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java
    hadoop/hive/trunk/hbase-handler/src/test/
    hadoop/hive/trunk/hbase-handler/src/test/org/
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
    hadoop/hive/trunk/hbase-handler/src/test/queries/
    hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_queries.q
    hadoop/hive/trunk/hbase-handler/src/test/results/
    hadoop/hive/trunk/hbase-handler/src/test/results/hbase_queries.q.out
    hadoop/hive/trunk/hbase-handler/src/test/templates/
    hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookLoader.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/alter_non_native.q
    hadoop/hive/trunk/ql/src/test/queries/clientnegative/load_non_native.q
    hadoop/hive/trunk/ql/src/test/results/clientnegative/alter_non_native.q.out
    hadoop/hive/trunk/ql/src/test/results/clientnegative/load_non_native.q.out
Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/build-common.xml
    hadoop/hive/trunk/build.xml
    hadoop/hive/trunk/metastore/if/hive_metastore.thrift
    hadoop/hive/trunk/metastore/src/gen-cpp/hive_metastore_constants.cpp
    hadoop/hive/trunk/metastore/src/gen-cpp/hive_metastore_constants.h
    hadoop/hive/trunk/metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java
    hadoop/hive/trunk/metastore/src/gen-php/hive_metastore_constants.php
    hadoop/hive/trunk/metastore/src/gen-py/hive_metastore/constants.py
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java
    hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java
    hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java
    hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java
    hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyMap.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java
    hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=922404&r1=922403&r2=922404&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Fri Mar 12 19:53:57 2010
@@ -48,8 +48,8 @@ Trunk -  Unreleased
     HIVE-1194. Add sort merge join
     (He Yongqiang via namit)
 
-    HIVE-1241. Drop the table at the beginning of tests
-    (He Yongqiang via namit)
+    HIVE-705. Read HBase tables via Hive
+    (John Sichi via namit)
 
   IMPROVEMENTS
     HIVE-983. Function from_unixtime takes long.
@@ -258,6 +258,9 @@ Trunk -  Unreleased
     HIVE-1022. desc Table should work.
     (namit via He Yongqiang)
 
+    HIVE-1241. Drop the table at the beginning of tests
+    (He Yongqiang via namit)
+
 Release 0.5.0 -  Unreleased
 
   INCOMPATIBLE CHANGES

Modified: hadoop/hive/trunk/build-common.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=922404&r1=922403&r2=922404&view=diff
==============================================================================
--- hadoop/hive/trunk/build-common.xml (original)
+++ hadoop/hive/trunk/build-common.xml Fri Mar 12 19:53:57 2010
@@ -429,6 +429,7 @@
 
   <target name="clean-test">
     <delete dir="${test.build.dir}"/>
+    <delete dir="${build.dir.hive}/ql/tmp"/>
     <delete dir="${build.dir.hive}/test"/>
   </target>
 

Modified: hadoop/hive/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build.xml?rev=922404&r1=922403&r2=922404&view=diff
==============================================================================
--- hadoop/hive/trunk/build.xml (original)
+++ hadoop/hive/trunk/build.xml Fri Mar 12 19:53:57 2010
@@ -46,6 +46,16 @@
   <property name="checkstyle.conf.dir" location="${hive.root}/checkstyle"/>
   <property name="checkstyle.build.dir" location="${build.dir.hive}/checkstyle"/>
 
+  <!-- hbase-handler is enabled by default only for Hadoop 0.20.x -->
+  <condition property="hbase.enabled" value="true" else="false">
+    <matches string="${hadoop.version.ant-internal}" pattern="^0\.20\..*" />
+  </condition>
+
+  <!-- only iterate over hbase-handler when enabled -->
+  <condition property="hbase.iterate" value=",hbase-handler/build.xml" else="">
+    <istrue value="${hbase.enabled}"/>
+  </condition>
+
   <condition property="is-offline" value="true" else="false">
     <isset property="offline"/>
   </condition>
@@ -82,7 +92,7 @@
       <subant target="@{target}">
         <property name="build.dir.hive" location="${build.dir.hive}"/>
         <property name="is-offline" value="${is-offline}"/>
-        <filelist dir="." files="ant/build.xml,shims/build.xml,common/build.xml,serde/build.xml,metastore/build.xml,ql/build.xml,cli/build.xml,contrib/build.xml,service/build.xml,jdbc/build.xml,hwi/build.xml,ant/build.xml"/>
+        <filelist dir="." files="ant/build.xml,shims/build.xml,common/build.xml,serde/build.xml,metastore/build.xml,ql/build.xml,cli/build.xml,contrib/build.xml,service/build.xml,jdbc/build.xml,hwi/build.xml${hbase.iterate},ant/build.xml"/>
       </subant>
     </sequential>
   </macrodef>
@@ -93,7 +103,7 @@
       <subant target="@{target}">
         <property name="build.dir.hive" location="${build.dir.hive}"/>
         <property name="is-offline" value="${is-offline}"/>
-        <filelist dir="." files="shims/build.xml,common/build.xml,serde/build.xml,metastore/build.xml,ql/build.xml,cli/build.xml,contrib/build.xml,service/build.xml,jdbc/build.xml,hwi/build.xml"/>
+        <filelist dir="." files="shims/build.xml,common/build.xml,serde/build.xml,metastore/build.xml,ql/build.xml,cli/build.xml,contrib/build.xml,service/build.xml,jdbc/build.xml,hwi/build.xml${hbase.iterate}"/>
       </subant>
     </sequential>
   </macrodef>
@@ -286,6 +296,7 @@
       <fileset file="${build.dir.hive}/metastore/hive-metastore-${version}.jar"/>
       <fileset file="${build.dir.hive}/hwi/hive-hwi-${version}.war"/>
       <fileset file="${build.dir.hive}/contrib/hive-contrib-${version}.jar"/>
+      <fileset file="${build.dir.hive}/contrib/hive-hbase_handler-${version}.jar"/>
     </copy>
     <copy todir="${target.example.dir}/files" preservelastmodified="true" flatten="true">
       <fileset dir="${test.data.dir}/files" includes="*.*" excludes="**/.svn"/>

Added: hadoop/hive/trunk/hbase-handler/README.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/README.txt?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/README.txt (added)
+++ hadoop/hive/trunk/hbase-handler/README.txt Fri Mar 12 19:53:57 2010
@@ -0,0 +1,2 @@
+See http://wiki.apache.org/hadoop/Hive/HBaseIntegration for
+information about the HBase storage handler.

Added: hadoop/hive/trunk/hbase-handler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/build.xml?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/build.xml (added)
+++ hadoop/hive/trunk/hbase-handler/build.xml Fri Mar 12 19:53:57 2010
@@ -0,0 +1,97 @@
+<?xml version="1.0"?>
+
+<!--
+   Licensed to the Apache Software Foundation (ASF) under one or more
+   contributor license agreements.  See the NOTICE file distributed with
+   this work for additional information regarding copyright ownership.
+   The ASF licenses this file to You under the Apache License, Version 2.0
+   (the "License"); you may not use this file except in compliance with
+   the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+   Unless required by applicable law or agreed to in writing, software
+   distributed under the License is distributed on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+   See the License for the specific language governing permissions and
+   limitations under the License.
+-->
+
+
+<project name="hbase-handler" default="jar">
+
+  <property name="hbase-handler.lib.dir" value="${basedir}/lib"/>
+  <property name="src.dir"  location="${basedir}/src/java"/>
+  <property name="hbase-handler.test.query.dir" location="${basedir}/src/test/queries"/>
+  <property name="ql.test.template.dir" location="${basedir}/../ql/src/test/templates"/>
+  <property name="ql.hbase.test.template.dir" location="${basedir}/src/test/templates"/>
+  <property name="hbase-handler.test.results.dir" location="${basedir}/src/test/results"/>
+
+  <import file="../build-common.xml"/>
+
+  <path id="test.classpath">
+    <pathelement location="${build.dir.hive}/ql/test/classes" />
+    <pathelement location="${test.build.classes}" />
+    <pathelement location="" />
+    <pathelement location="${hadoop.conf.dir}"/>
+    <pathelement location="${test.data.dir}/conf"/>
+    <pathelement location="${hive.conf.dir}"/>
+    <pathelement location="${hive.root}/cli/lib/jline-0.9.94.jar"/>
+    <pathelement location="${hadoop.test.jar}"/>
+    <pathelement location="${jetty.test.jar}"/>
+    <pathelement location="${servlet.test.jar}"/>
+    <pathelement location="${jasper.test.jar}"/>
+    <pathelement location="${jasperc.test.jar}"/>
+    <pathelement location="${jsp.test.jar}"/>
+    <pathelement location="${common.jar}"/>
+    <fileset dir="${hive.root}" includes="testlibs/*.jar"/>
+    <fileset dir="${hadoop.root}/lib" includes="*.jar"/>
+    <fileset dir="${hadoop.root}/lib/jsp-2.1" includes="*.jar"/>
+    <path refid="classpath"/>
+  </path>
+
+  <target name="test-jar" depends="compile-test, jar">
+    <delete file="${test.build.dir}/test-udfs.jar"/>
+    <jar jarfile="${test.build.dir}/test-udfs.jar">
+        <fileset dir="${test.build.classes}" includes="**/udf/*.class"/>
+        <fileset dir="${test.build.classes}" includes="**/udf/generic/*.class"/>
+    </jar>
+  </target>
+
+  <target name="gen-test" depends="deploy-ant-tasks, test-conditions, test-init" >
+    <taskdef name="qtestgen" classname="org.apache.hadoop.hive.ant.QTestGenTask"
+             classpath="${build.dir.hive}/anttasks/hive-anttasks-${version}.jar:${hive.root}/lib/velocity-1.5.jar:${hive.root}/lib/commons-collections-3.2.1.jar:${hive.root}/lib/commons-lang-2.4.jar"/>
+    
+    <mkdir dir="${test.build.src}/org/apache/hadoop/hive/cli"/>
+    <mkdir dir="${test.log.dir}/hbase-handler"/>
+    <mkdir dir="${hbase-handler.test.results.dir}"/>
+
+    <qtestgen outputDirectory="${test.build.src}/org/apache/hadoop/hive/cli" 
+              templatePath="${ql.hbase.test.template.dir}" template="TestHBaseCliDriver.vm" 
+              queryDirectory="${hbase-handler.test.query.dir}" 
+              queryFile="${qfile}"
+              clusterMode="${clustermode}"
+              resultsDirectory="${hbase-handler.test.results.dir}" className="TestHBaseCliDriver"
+              logFile="${test.log.dir}/testhbaseclidrivergen.log"
+              logDirectory="${test.log.dir}/hbase-handler"/>
+
+  </target>
+
+  <!-- override target jar because tests need to add hbase-handler jars,
+       which needs to be a constant without the version number -->
+  <target name="jar" depends="compile">
+    <echo message="Jar: ${ant.project.name}"/>
+    <jar
+      jarfile="${build.dir}/hive_${ant.project.name}.jar"
+      basedir="${build.classes}">
+      <manifest>
+        <!-- Not putting these in their own manifest section, since that inserts
+        a new-line, which breaks the reading of the attributes. -->
+        <attribute name="Implementation-Title" value="Hive"/>
+        <attribute name="Implementation-Version" value="${version}"/>
+        <attribute name="Implementation-Vendor" value="Apache"/>
+      </manifest>
+    </jar>
+  </target>
+
+</project>

Added: hadoop/hive/trunk/hbase-handler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/ivy.xml?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/ivy.xml (added)
+++ hadoop/hive/trunk/hbase-handler/ivy.xml Fri Mar 12 19:53:57 2010
@@ -0,0 +1,8 @@
+<ivy-module version="2.0">
+    <info organisation="org.apache.hadoop.hive" module="contrib"/>
+    <dependencies>
+        <dependency org="hadoop" name="core" rev="${hadoop.version.ant-internal}">
+          <artifact name="hadoop" type="source" ext="tar.gz"/>
+        </dependency> 
+    </dependencies>
+</ivy-module>

Added: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar?rev=922404&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar?rev=922404&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar?rev=922404&view=auto
==============================================================================
Binary file - no diff available.

Propchange: hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,495 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.ByteStream;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.SerDeUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category;
+import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
+import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * HBaseSerDe can be used to serialize object into an HBase table and
+ * deserialize objects from an HBase table.
+ */
+public class HBaseSerDe implements SerDe {
+  
+  public static final String HBASE_COL_MAPPING = "hbase.columns.mapping";
+  
+  public static final String HBASE_TABLE_NAME = "hbase.table.name";
+  
+  public static final Log LOG = LogFactory.getLog(
+      HBaseSerDe.class.getName());
+
+  private ObjectInspector cachedObjectInspector;
+  private HBaseSerDeParameters hbSerDeParams;
+  private boolean useJSONSerialize;
+  private LazyHBaseRow cachedHBaseRow;
+  private BatchUpdate serializeCache;
+  private ByteStream.Output serializeStream = new ByteStream.Output();
+
+  /**
+   * HBaseSerDeParameters defines the parameters used to
+   * instantiate HBaseSerDe.
+   */
+  public static class HBaseSerDeParameters {
+    private List<String> hbaseColumnNames;
+    private SerDeParameters serdeParams;
+    
+    public List<String> getHBaseColumnNames() {
+      return hbaseColumnNames;
+    }
+    
+    public SerDeParameters getSerDeParameters() {
+      return serdeParams;
+    }
+  }
+  
+  public String toString() {
+    return getClass().toString()
+        + "["
+        + hbSerDeParams.hbaseColumnNames
+        + ":"
+        + ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo())
+            .getAllStructFieldNames()
+        + ":"
+        + ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo())
+            .getAllStructFieldTypeInfos() + "]";
+  }
+  
+  public HBaseSerDe() throws SerDeException {
+  }
+  
+  /**
+   * Initialize the SerDe given parameters.
+   * @see SerDe#initialize(Configuration, Properties)
+   */
+  public void initialize(Configuration conf, Properties tbl)
+      throws SerDeException {
+    hbSerDeParams = HBaseSerDe.initHBaseSerDeParameters(conf, tbl, 
+        getClass().getName());
+    
+    // We just used columnNames & columnTypes these two parameters
+    cachedObjectInspector = LazyFactory.createLazyStructInspector(
+        hbSerDeParams.serdeParams.getColumnNames(), 
+        hbSerDeParams.serdeParams.getColumnTypes(), 
+        hbSerDeParams.serdeParams.getSeparators(),
+        hbSerDeParams.serdeParams.getNullSequence(),
+        hbSerDeParams.serdeParams.isLastColumnTakesRest(),
+        hbSerDeParams.serdeParams.isEscaped(),
+        hbSerDeParams.serdeParams.getEscapeChar()); 
+    
+    cachedHBaseRow = new LazyHBaseRow(
+      (LazySimpleStructObjectInspector) cachedObjectInspector);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("HBaseSerDe initialized with : columnNames = "
+        + hbSerDeParams.serdeParams.getColumnNames()
+        + " columnTypes = "
+        + hbSerDeParams.serdeParams.getColumnTypes()
+        + " hbaseColumnMapping = "
+        + hbSerDeParams.hbaseColumnNames);
+    }
+  }
+  
+  public static HBaseSerDeParameters initHBaseSerDeParameters(
+      Configuration job, Properties tbl, String serdeName) 
+    throws SerDeException {
+
+    HBaseSerDeParameters serdeParams = new HBaseSerDeParameters();
+    
+    // Read Configuration Parameter
+    String hbaseColumnNameProperty =
+      tbl.getProperty(HBaseSerDe.HBASE_COL_MAPPING);
+    String columnTypeProperty =
+      tbl.getProperty(Constants.LIST_COLUMN_TYPES);
+    
+    // Initial the hbase column list
+    if (hbaseColumnNameProperty != null
+      && hbaseColumnNameProperty.length() > 0) {
+
+      serdeParams.hbaseColumnNames =
+        Arrays.asList(hbaseColumnNameProperty.split(","));
+    } else {
+      serdeParams.hbaseColumnNames = new ArrayList<String>();
+    }
+    
+    // Add the hbase key to the columnNameList and columnTypeList
+    
+    // Build the type property string
+    if (columnTypeProperty == null) {
+      StringBuilder sb = new StringBuilder();
+      sb.append(Constants.STRING_TYPE_NAME);
+      
+      for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+        String colName = serdeParams.hbaseColumnNames.get(i);
+        if (colName.endsWith(":"))  {
+          sb.append(":").append(
+            Constants.MAP_TYPE_NAME + "<"
+            + Constants.STRING_TYPE_NAME
+            + "," + Constants.STRING_TYPE_NAME + ">");
+        } else {
+          sb.append(":").append(Constants.STRING_TYPE_NAME);
+        }
+      }
+      tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString());
+    }
+    
+    serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams(
+      job, tbl, serdeName);
+    
+    if (serdeParams.hbaseColumnNames.size() + 1
+      != serdeParams.serdeParams.getColumnNames().size()) {
+
+      throw new SerDeException(serdeName + ": columns has " + 
+          serdeParams.serdeParams.getColumnNames().size() + 
+          " elements while hbase.columns.mapping has " + 
+          serdeParams.hbaseColumnNames.size() + " elements!");
+    }
+    
+    // check that the mapping schema is right;
+    // we just can make sure that "columnfamily:" is mapped to MAP<String,?> 
+    for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) {
+      String hbaseColName = serdeParams.hbaseColumnNames.get(i);
+      if (hbaseColName.endsWith(":")) {    
+        TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i + 1);
+        if ((typeInfo.getCategory() != Category.MAP) ||
+          (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName()
+            !=  Constants.STRING_TYPE_NAME)) {
+
+          throw new SerDeException(
+            serdeName + ": hbase column family '"
+            + hbaseColName
+            + "' should be mapped to map<string,?> but is mapped to "
+            + typeInfo.getTypeName());
+        }
+      }
+    }
+    
+    return serdeParams;
+  }
+  
+  /**
+   * Deserialize a row from the HBase RowResult writable to a LazyObject
+   * @param rowResult the HBase RowResult Writable contain a row
+   * @return the deserialized object
+   * @see SerDe#deserialize(Writable) 
+   */
+  public Object deserialize(Writable rowResult) throws SerDeException {
+    
+    if (!(rowResult instanceof RowResult)) {
+      throw new SerDeException(getClass().getName() + ": expects RowResult!");
+    }
+    
+    RowResult rr = (RowResult)rowResult;
+    cachedHBaseRow.init(rr, hbSerDeParams.hbaseColumnNames);
+    return cachedHBaseRow;
+  }
+
+  @Override
+  public ObjectInspector getObjectInspector() throws SerDeException {
+    return cachedObjectInspector;
+  }
+
+  @Override
+  public Class<? extends Writable> getSerializedClass() {
+    return BatchUpdate.class;
+  }
+
+  @Override
+  public Writable serialize(Object obj, ObjectInspector objInspector)
+      throws SerDeException {
+    if (objInspector.getCategory() != Category.STRUCT) {
+      throw new SerDeException(getClass().toString() 
+          + " can only serialize struct types, but we got: " 
+          + objInspector.getTypeName());
+    }
+
+    // Prepare the field ObjectInspectors
+    StructObjectInspector soi = (StructObjectInspector) objInspector;
+    List<? extends StructField> fields = soi.getAllStructFieldRefs();
+    List<Object> list = soi.getStructFieldsDataAsList(obj);
+    List<? extends StructField> declaredFields =
+      (hbSerDeParams.serdeParams.getRowTypeInfo() != null && 
+        ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo())
+        .getAllStructFieldNames().size() > 0) ? 
+      ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs()
+      : null;
+        
+    boolean isNotNull = false;
+    String hbaseColumn = "";
+
+    try {
+      // Serialize each field
+      for (int i = 0; i < fields.size(); i++) {
+        serializeStream.reset();
+        // Get the field objectInspector and the field object.
+        ObjectInspector foi = fields.get(i).getFieldObjectInspector();
+        Object f = (list == null ? null : list.get(i));
+
+        if (declaredFields != null && i >= declaredFields.size()) {
+          throw new SerDeException(
+              "Error: expecting " + declaredFields.size() 
+              + " but asking for field " + i + "\n" + "data=" + obj + "\n"
+              + "tableType="
+              + hbSerDeParams.serdeParams.getRowTypeInfo().toString()
+              + "\n"
+              + "dataType=" 
+              + TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector));
+        }
+        
+        if (f == null) {
+          // a null object, we do not serialize it
+          continue;
+        }
+        
+        if (i > 0) {
+          hbaseColumn = hbSerDeParams.hbaseColumnNames.get(i-1);
+        }
+        
+        // If the field that is column family in hbase
+        if (i > 0 && hbaseColumn.endsWith(":")) {
+          MapObjectInspector moi = (MapObjectInspector)foi;
+          ObjectInspector koi = moi.getMapKeyObjectInspector();
+          ObjectInspector voi = moi.getMapValueObjectInspector();
+
+          Map<?, ?> map = moi.getMap(f);
+          if (map == null) {
+            continue;
+          } else {
+            for (Map.Entry<?, ?> entry: map.entrySet()) {
+              // Get the Key
+              serialize(serializeStream, entry.getKey(), koi, 
+                  hbSerDeParams.serdeParams.getSeparators(), 3,
+                  hbSerDeParams.serdeParams.getNullSequence(),
+                  hbSerDeParams.serdeParams.isEscaped(),
+                  hbSerDeParams.serdeParams.getEscapeChar(),
+                  hbSerDeParams.serdeParams.getNeedsEscape());
+              
+              // generate a column name (column_family:column_name)
+              hbaseColumn += Bytes.toString(
+                serializeStream.getData(), 0, serializeStream.getCount());
+
+              // Get the Value
+              serializeStream.reset();
+
+              isNotNull = serialize(serializeStream, entry.getValue(), voi, 
+                  hbSerDeParams.serdeParams.getSeparators(), 3,
+                  hbSerDeParams.serdeParams.getNullSequence(),
+                  hbSerDeParams.serdeParams.isEscaped(),
+                  hbSerDeParams.serdeParams.getEscapeChar(),
+                  hbSerDeParams.serdeParams.getNeedsEscape());
+            }
+          }
+        } else {        
+          // If the field that is passed in is NOT a primitive, and either the 
+          // field is not declared (no schema was given at initialization), or 
+          // the field is declared as a primitive in initialization, serialize 
+          // the data to JSON string.  Otherwise serialize the data in the 
+          // delimited way.
+          if (!foi.getCategory().equals(Category.PRIMITIVE)
+              && (declaredFields == null || 
+                  declaredFields.get(i).getFieldObjectInspector().getCategory()
+                  .equals(Category.PRIMITIVE) || useJSONSerialize)) {
+            isNotNull = serialize(
+              serializeStream, SerDeUtils.getJSONString(f, foi),
+              PrimitiveObjectInspectorFactory.javaStringObjectInspector,
+              hbSerDeParams.serdeParams.getSeparators(), 1,
+              hbSerDeParams.serdeParams.getNullSequence(),
+              hbSerDeParams.serdeParams.isEscaped(),
+              hbSerDeParams.serdeParams.getEscapeChar(),
+              hbSerDeParams.serdeParams.getNeedsEscape());
+          } else {
+            isNotNull = serialize(
+              serializeStream, f, foi, 
+              hbSerDeParams.serdeParams.getSeparators(), 1,
+              hbSerDeParams.serdeParams.getNullSequence(),
+              hbSerDeParams.serdeParams.isEscaped(),
+              hbSerDeParams.serdeParams.getEscapeChar(),
+              hbSerDeParams.serdeParams.getNeedsEscape());
+          }
+        }
+        
+        byte [] key = new byte[serializeStream.getCount()];
+        System.arraycopy(
+          serializeStream.getData(), 0, key, 0, serializeStream.getCount());
+        if (i == 0) {
+          // the first column is the hbase key
+          serializeCache = new BatchUpdate(key);
+        } else {
+          if (isNotNull) {
+            serializeCache.put(hbaseColumn, key);
+          }
+        }
+      }
+    } catch (IOException e) {
+      throw new SerDeException(e);
+    }
+    
+    return serializeCache;
+  }
+  
+  /**
+   * Serialize the row into the StringBuilder.
+   * @param out  The StringBuilder to store the serialized data.
+   * @param obj The object for the current field.
+   * @param objInspector  The ObjectInspector for the current Object.
+   * @param separators    The separators array.
+   * @param level         The current level of separator.
+   * @param nullSequence  The byte sequence representing the NULL value.
+   * @param escaped       Whether we need to escape the data when writing out
+   * @param escapeChar    Which char to use as the escape char, e.g. '\\'     
+   * @param needsEscape   Which chars needs to be escaped.
+   *                      This array should have size of 128.
+   *                      Negative byte values (or byte values >= 128)
+   *                      are never escaped.
+   * @throws IOException 
+   * @return true, if serialize a not-null object; otherwise false.
+   */
+  public static boolean serialize(ByteStream.Output out, Object obj, 
+    ObjectInspector objInspector, byte[] separators, int level,
+    Text nullSequence, boolean escaped, byte escapeChar,
+    boolean[] needsEscape) throws IOException {
+    
+    switch (objInspector.getCategory()) {
+      case PRIMITIVE: {
+        LazyUtils.writePrimitiveUTF8(
+          out, obj,
+          (PrimitiveObjectInspector) objInspector,
+          escaped, escapeChar, needsEscape);
+        return true;
+      }
+      case LIST: {
+        char separator = (char) separators[level];
+        ListObjectInspector loi = (ListObjectInspector)objInspector;
+        List<?> list = loi.getList(obj);
+        ObjectInspector eoi = loi.getListElementObjectInspector();
+        if (list == null) {
+          return false;
+        } else {
+          for (int i = 0; i < list.size(); i++) {
+            if (i > 0) {
+              out.write(separator);
+            }
+            serialize(out, list.get(i), eoi, separators, level + 1,
+                nullSequence, escaped, escapeChar, needsEscape);
+          }
+        }
+        return true;
+      }
+      case MAP: {
+        char separator = (char) separators[level];
+        char keyValueSeparator = (char) separators[level+1];
+        MapObjectInspector moi = (MapObjectInspector) objInspector;
+        ObjectInspector koi = moi.getMapKeyObjectInspector();
+        ObjectInspector voi = moi.getMapValueObjectInspector();
+        
+        Map<?, ?> map = moi.getMap(obj);
+        if (map == null) {
+          return false;
+        } else {
+          boolean first = true;
+          for (Map.Entry<?, ?> entry: map.entrySet()) {
+            if (first) {
+              first = false;
+            } else {
+              out.write(separator);
+            }
+            serialize(out, entry.getKey(), koi, separators, level+2, 
+                nullSequence, escaped, escapeChar, needsEscape);
+            out.write(keyValueSeparator);
+            serialize(out, entry.getValue(), voi, separators, level+2, 
+                nullSequence, escaped, escapeChar, needsEscape);
+          }
+        }
+        return true;
+      }
+      case STRUCT: {
+        char separator = (char)separators[level];
+        StructObjectInspector soi = (StructObjectInspector)objInspector;
+        List<? extends StructField> fields = soi.getAllStructFieldRefs();
+        List<Object> list = soi.getStructFieldsDataAsList(obj);
+        if (list == null) {
+          return false;
+        } else {
+          for (int i = 0; i<list.size(); i++) {
+            if (i > 0) {
+              out.write(separator);
+            }
+            serialize(out, list.get(i),
+                fields.get(i).getFieldObjectInspector(), separators, level + 1,
+                nullSequence, escaped, escapeChar, needsEscape);
+          }
+        }
+        return true;
+      }
+    }
+    
+    throw new RuntimeException("Unknown category type: "
+        + objInspector.getCategory());
+  }
+    
+  
+  /**
+   * @return the useJSONSerialize
+   */
+  public boolean isUseJSONSerialize() {
+    return useJSONSerialize;
+  }
+
+  /**
+   * @param useJSONSerialize the useJSONSerialize to set
+   */
+  public void setUseJSONSerialize(boolean useJSONSerialize) {
+    this.useJSONSerialize = useJSONSerialize;
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+
+/**
+ * HBaseSplit augments FileSplit with HBase column mapping.
+ */
+public class HBaseSplit extends FileSplit implements InputSplit {
+  private String hbaseColumnMapping;
+  private TableSplit split;
+    
+  public HBaseSplit() {
+    super((Path) null, 0, 0, (String[]) null);
+    hbaseColumnMapping = "";
+    split = new TableSplit();
+  }
+    
+  public HBaseSplit(TableSplit split, String columnsMapping, Path dummyPath) {
+    super(dummyPath, 0, 0, (String[]) null);
+    this.split = split;
+    hbaseColumnMapping = columnsMapping;
+  }
+    
+  public TableSplit getSplit() {
+    return this.split;
+  }
+    
+  public String getColumnsMapping() {
+    return this.hbaseColumnMapping;
+  }
+    
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    super.readFields(in);
+    hbaseColumnMapping = in.readUTF();
+    split.readFields(in);
+  }
+
+  @Override
+  public String toString() {
+    return "TableSplit " + split + " : " + hbaseColumnMapping;
+  }
+
+  @Override
+  public void write(DataOutput out) throws IOException {
+    super.write(out);
+    out.writeUTF(hbaseColumnMapping);
+    split.write(out);
+  }
+
+  @Override
+  public long getLength() {
+    return split.getLength();
+  }
+
+  @Override
+  public String[] getLocations() throws IOException {
+    return split.getLocations();
+  }
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,255 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.metastore.HiveMetaHook;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.Constants;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
+import org.apache.hadoop.hive.ql.plan.TableDesc;
+import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.OutputFormat;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * HBaseStorageHandler provides a HiveStorageHandler implementation for
+ * HBase.
+ */
+public class HBaseStorageHandler
+  implements HiveStorageHandler, HiveMetaHook {
+
+  private HBaseConfiguration hbaseConf;
+  private HBaseAdmin admin;
+  
+  private HBaseAdmin getHBaseAdmin() throws MetaException {
+    try {
+      if (admin == null) {
+        admin = new HBaseAdmin(hbaseConf);
+      }
+      return admin;
+    } catch (MasterNotRunningException mnre) {
+      throw new MetaException(StringUtils.stringifyException(mnre));
+    }
+  }
+
+  private String getHBaseTableName(Table tbl) {
+    String tableName = tbl.getSd().getSerdeInfo().getParameters().get(
+      HBaseSerDe.HBASE_TABLE_NAME);
+    if (tableName == null) {
+      tableName = tbl.getTableName();
+    }
+    return tableName;
+  }
+
+  @Override
+  public void preDropTable(Table table) throws MetaException {
+    // nothing to do
+  }
+  
+  @Override
+  public void rollbackDropTable(Table table) throws MetaException {
+    // nothing to do
+  }
+  
+  @Override
+  public void commitDropTable(
+    Table tbl, boolean deleteData) throws MetaException {
+
+    try {
+      String tableName = getHBaseTableName(tbl);
+      boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+      if (deleteData && !isExternal) {
+        if (getHBaseAdmin().isTableEnabled(tableName)) {
+          getHBaseAdmin().disableTable(tableName);
+        }
+        getHBaseAdmin().deleteTable(tableName);
+      }
+    } catch (IOException ie) {
+      throw new MetaException(StringUtils.stringifyException(ie));
+    }
+  }
+
+  @Override
+  public void preCreateTable(Table tbl) throws MetaException {
+    boolean isExternal = MetaStoreUtils.isExternalTable(tbl);
+
+    // We'd like to move this to HiveMetaStore for any non-native table, but
+    // first we need to support storing NULL for location on a table
+    if (tbl.getSd().getLocation() != null) {
+      throw new MetaException("LOCATION may not be specified for HBase.");
+    }
+
+    try {
+      String tblName = getHBaseTableName(tbl);
+
+      // Build the mapping schema
+      Set<String> columnFamilies = new HashSet<String>();
+      // Check the hbase columns and get all the families
+      Map<String, String> serdeParam =
+        tbl.getSd().getSerdeInfo().getParameters();
+      String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_COL_MAPPING);
+      if (hbaseColumnStr == null) {
+        throw new MetaException("No hbase.columns.mapping defined in Serde.");
+      }
+      String [] hbaseColumns = hbaseColumnStr.split(",");
+      for (String hbaseColumn : hbaseColumns) {
+        int idx = hbaseColumn.indexOf(":");
+        if (idx < 0) {
+          throw new MetaException(
+            hbaseColumn + " is not a qualified hbase column.");
+        }
+        columnFamilies.add(hbaseColumn.substring(0, idx));
+      }
+  
+      // Check if the given hbase table exists
+      HTableDescriptor tblDesc;
+      
+      if (!getHBaseAdmin().tableExists(tblName)) {
+        // if it is not an external table then create one
+        if (!isExternal) {
+          // Create the all column descriptors
+          tblDesc = new HTableDescriptor(tblName);
+          for (String cf : columnFamilies) {
+            tblDesc.addFamily(new HColumnDescriptor(cf + ":"));
+          }
+  
+          getHBaseAdmin().createTable(tblDesc);
+        } else {
+          // an external table
+          throw new MetaException("HBase table " + tblName + 
+              " doesn't exist while the table is declared as an external table.");
+        }
+      
+      } else {
+        if (!isExternal) {
+          throw new MetaException("Table " + tblName + " already exists"
+            + " within HBase; use CREATE EXTERNAL TABLE instead to"
+            + " register it in Hive.");
+        }
+        // make sure the schema mapping is right
+        tblDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tblName));
+        for (String cf : columnFamilies) {
+          if (!tblDesc.hasFamily(Bytes.toBytes(cf))) {
+            throw new MetaException("Column Family " + cf
+              + " is not defined in hbase table " + tblName);
+          }
+        }
+
+      }
+      // ensure the table is online
+      new HTable(hbaseConf, tblDesc.getName());
+    } catch (MasterNotRunningException mnre) {
+      throw new MetaException(StringUtils.stringifyException(mnre));
+    } catch (IOException ie) {
+      throw new MetaException(StringUtils.stringifyException(ie));
+    }
+  }
+
+  @Override
+  public void rollbackCreateTable(Table table) throws MetaException {
+    boolean isExternal = MetaStoreUtils.isExternalTable(table);
+    String tableName = getHBaseTableName(table);
+    try {
+      if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
+        // we have create an hbase table, so we delete it to roll back;
+        if (getHBaseAdmin().isTableEnabled(tableName)) {
+          getHBaseAdmin().disableTable(tableName);
+        }
+        getHBaseAdmin().deleteTable(tableName);
+      }
+    } catch (IOException ie) {
+      throw new MetaException(StringUtils.stringifyException(ie));
+    }
+  }
+
+  @Override
+  public void commitCreateTable(Table table) throws MetaException {
+    // nothing to do
+  }
+
+  @Override
+  public Configuration getConf() {
+    return hbaseConf;
+  }
+
+  @Override
+  public void setConf(Configuration conf) {
+    hbaseConf = new HBaseConfiguration(conf);
+  }
+
+  @Override
+  public Class<? extends InputFormat> getInputFormatClass() {
+    return HiveHBaseTableInputFormat.class;
+  }
+  
+  @Override
+  public Class<? extends OutputFormat> getOutputFormatClass() {
+    return HiveHBaseTableOutputFormat.class;
+  }
+
+  @Override
+  public Class<? extends SerDe> getSerDeClass() {
+    return HBaseSerDe.class;
+  }
+
+  @Override
+  public HiveMetaHook getMetaHook() {
+    return this;
+  }
+
+  @Override
+  public void configureTableJobProperties(
+    TableDesc tableDesc,
+    Map<String, String> jobProperties) {
+
+    Properties tableProperties = tableDesc.getProperties();
+    
+    jobProperties.put(
+      HBaseSerDe.HBASE_COL_MAPPING,
+      tableProperties.getProperty(HBaseSerDe.HBASE_COL_MAPPING));
+
+    String tableName =
+      tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME);
+    if (tableName == null) {
+      tableName =
+        tableProperties.getProperty(Constants.META_TABLE_NAME);
+    }
+    jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName);
+  }
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,179 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.mapred.TableSplit;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+
+/**
+ * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler
+ * tables, decorating an underlying HBase TableInputFormat with extra Hive logic
+ * such as column pruning.
+ */
+public class HiveHBaseTableInputFormat<K extends ImmutableBytesWritable, V extends RowResult>
+    implements InputFormat<K, V>, JobConfigurable {
+  
+  static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class);
+  
+  private HBaseExposedTableInputFormat hbaseInputFormat;
+
+  public HiveHBaseTableInputFormat() {
+    hbaseInputFormat = new HBaseExposedTableInputFormat();
+  }
+
+  @Override
+  public RecordReader<K, V> getRecordReader(
+    InputSplit split, JobConf job,
+    Reporter reporter) throws IOException {
+
+    HBaseSplit hbaseSplit = (HBaseSplit) split;
+
+    byte [] tableNameBytes;
+    String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
+    hbaseInputFormat.setHBaseTable(
+      new HTable(
+        new HBaseConfiguration(job),
+        Bytes.toBytes(hbaseTableName)));
+    
+    // because the hbase key is mapped to the first column in its hive table,
+    // we add the "_key" before the columnMapping that we can use the
+    // hive column id to find the exact hbase column one-for-one.
+    String columnMapping = "_key," + hbaseSplit.getColumnsMapping();
+    String[] columns = columnMapping.split(",");   
+    List<Integer> readColIDs =
+      ColumnProjectionUtils.getReadColumnIDs(job);
+ 
+    if (columns.length < readColIDs.size()) {
+      throw new IOException(
+        "Cannot read more columns than the given table contains.");
+    }
+    
+    byte [][] scanColumns;
+    if (readColIDs.size() == 0) {
+      scanColumns = new byte[columns.length - 1][];
+      for (int i=0; i < columns.length - 1; i++) {
+        scanColumns[i] = Bytes.toBytes(columns[i + 1]);
+      }
+    } else {
+      Collections.sort(readColIDs);
+      
+      if (readColIDs.get(0) == 0) {
+        // sql like "select key from hbasetable;"
+        // As HBase can not scan a hbase table while just getting its keys,
+        // so we will scan out the second column of the hive table
+        // but ignore it.
+        if (readColIDs.size() == 1) {
+          scanColumns = new byte[1][];
+          scanColumns[0] = Bytes.toBytes(columns[1]);
+        } else {
+          scanColumns = new byte[readColIDs.size() - 1][];
+          for (int i=0; i<scanColumns.length; i++) {
+            scanColumns[i] = Bytes.toBytes(columns[readColIDs.get(i + 1)]);
+          }
+        }
+      } else {
+        scanColumns = new byte[readColIDs.size()][];
+        for (int i=0; i<scanColumns.length; i++) {
+          scanColumns[i] = Bytes.toBytes(columns[readColIDs.get(i)]);
+        }
+      }
+    }
+    
+    hbaseInputFormat.setScanColumns(scanColumns);
+    
+    return (RecordReader<K, V>)
+      hbaseInputFormat.getRecordReader(hbaseSplit.getSplit(), job, reporter);
+  }
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
+    Path [] tableNames = FileInputFormat.getInputPaths(job);
+    String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME);
+    hbaseInputFormat.setHBaseTable(
+      new HTable(new HBaseConfiguration(job), hbaseTableName));
+    
+    String hbaseSchemaMapping = job.get(HBaseSerDe.HBASE_COL_MAPPING);
+    if (hbaseSchemaMapping == null) {
+      throw new IOException("hbase.columns.mapping required for HBase Table.");
+    }
+    
+    String [] columns = hbaseSchemaMapping.split(",");
+    byte [][] inputColumns = new byte[columns.length][];
+    for (int i=0; i < columns.length; i++) {
+      inputColumns[i] = Bytes.toBytes(columns[i]);
+    }
+    
+    hbaseInputFormat.setScanColumns(inputColumns);
+    
+    InputSplit[] splits = hbaseInputFormat.getSplits(
+      job, numSplits <= 0 ? 1 : numSplits);
+    InputSplit[] results = new InputSplit[splits.length];
+    for (int i = 0; i < splits.length; i++) {
+      results[i] = new HBaseSplit(
+        (TableSplit) splits[i], hbaseSchemaMapping, tableNames[0]);
+    }
+    return results;
+  }
+ 
+  @Override
+  public void configure(JobConf job) {
+    hbaseInputFormat.configure(job);
+  }
+
+  /**
+   * HBaseExposedTableInputFormat exposes some protected methods
+   * from the HBase TableInputFormatBase.
+   */
+  static class HBaseExposedTableInputFormat
+    extends org.apache.hadoop.hbase.mapred.TableInputFormatBase
+    implements JobConfigurable {
+
+    @Override
+    public void configure(JobConf job) {
+      // not needed for now
+    }
+    
+    public void setScanColumns(byte[][] scanColumns) {
+      setInputColumns(scanColumns);
+    }
+    
+    public void setHBaseTable(HTable table) {
+      setHTable(table);
+    }
+  }
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.IOException;
+import java.util.Properties;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter;
+import org.apache.hadoop.hive.ql.io.HiveOutputFormat;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.Progressable;
+
+/**
+ * HiveHBaseTableOutputFormat implements TableOutputFormat for HBase tables.
+ */
+public class HiveHBaseTableOutputFormat extends 
+    TableOutputFormat implements
+    HiveOutputFormat<ImmutableBytesWritable, BatchUpdate> {
+  
+  private final ImmutableBytesWritable key = new ImmutableBytesWritable();
+
+  /**
+   * Update to the final out table, and output an empty key as the key.
+   * 
+   * @param jc
+   *          the job configuration file
+   * @param finalOutPath
+   *          the final output table name
+   * @param valueClass
+   *          the value class used for create
+   * @param isCompressed
+   *          whether the content is compressed or not
+   * @param tableProperties
+   *          the tableInfo of this file's corresponding table
+   * @param progress
+   *          progress used for status report
+   * @return the RecordWriter for the output file
+   */
+  @Override
+  public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath,
+      Class<? extends Writable> valueClass, boolean isCompressed,
+      Properties tableProperties, Progressable progress) throws IOException {
+    String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME);
+    jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName);
+
+    final org.apache.hadoop.mapred.RecordWriter<
+      ImmutableBytesWritable, BatchUpdate> tblWriter =
+      this.getRecordWriter(null, jc, null, progress);
+
+    return new RecordWriter() {
+      
+      @Override
+      public void close(boolean abort) throws IOException {
+        tblWriter.close(null);
+      }
+
+      @Override
+      public void write(Writable w) throws IOException {
+        BatchUpdate bu = (BatchUpdate) w;
+        key.set(bu.getRow());
+        tblWriter.write(key, bu);
+      }
+    };
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyMap;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
+import org.apache.hadoop.hive.serde2.lazy.LazyUtils;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
+
+/**
+ * LazyHBaseCellMap refines LazyMap with HBase column mapping.
+ */
+public class LazyHBaseCellMap extends LazyMap {
+  
+  private RowResult rowResult;
+  private String hbaseColumnFamily;
+  
+  /**
+   * Construct a LazyCellMap object with the ObjectInspector.
+   * @param oi
+   */
+  public LazyHBaseCellMap(LazyMapObjectInspector oi) {
+    super(oi);
+  }
+
+  @Override
+  public void init(ByteArrayRef bytes, int start, int length) {
+    // do nothing
+  }
+  
+  public void init(RowResult rr, String columnFamily) {
+    rowResult = rr;
+    hbaseColumnFamily = columnFamily;
+    setParsed(false);
+  }
+  
+  private void parse() {
+    if (cachedMap == null) {
+      cachedMap = new LinkedHashMap<Object, Object>();
+    } else {
+      cachedMap.clear();
+    }
+    
+    Iterator<byte[]> iter = rowResult.keySet().iterator();
+    
+    byte[] columnFamily = hbaseColumnFamily.getBytes();
+    while (iter.hasNext()) {
+      byte [] columnKey = iter.next();
+      if (columnFamily.length > columnKey.length) {
+        continue;
+      }
+      
+      if (0 == LazyUtils.compare(
+          columnFamily, 0, columnFamily.length, 
+          columnKey, 0, columnFamily.length)) {
+
+        byte [] columnValue = rowResult.get(columnKey).getValue();
+        if (columnValue == null || columnValue.length == 0) {
+          // an empty object
+          continue;
+        }
+        
+        // Keys are always primitive
+        LazyPrimitive<?, ?> key = LazyFactory.createLazyPrimitiveClass(
+            (PrimitiveObjectInspector)
+            ((MapObjectInspector) getInspector()).getMapKeyObjectInspector());
+        ByteArrayRef keyRef = new ByteArrayRef();
+        keyRef.setData(columnKey);
+        key.init(
+          keyRef, columnFamily.length, columnKey.length - columnFamily.length);
+        
+        // Value
+        LazyObject value = LazyFactory.createLazyObject(
+          ((MapObjectInspector) getInspector()).getMapValueObjectInspector());
+        ByteArrayRef valueRef = new ByteArrayRef();
+        valueRef.setData(columnValue);
+        value.init(valueRef, 0, columnValue.length);
+        
+        // Put it into the map
+        cachedMap.put(key.getObject(), value.getObject());
+      }
+    }
+  }
+  
+  /**
+   * Get the value in the map for the given key.
+   * 
+   * @param key
+   * @return
+   */
+  public Object getMapValueElement(Object key) {
+    if (!getParsed()) {
+      parse();
+    }
+    
+    for (Map.Entry<Object, Object> entry : cachedMap.entrySet()) {
+      LazyPrimitive<?, ?> lazyKeyI = (LazyPrimitive<?, ?>) entry.getKey();
+      // getWritableObject() will convert LazyPrimitive to actual primitive
+      // writable objects.
+      Object keyI = lazyKeyI.getWritableObject();
+      if (keyI == null) {
+        continue;
+      }
+      if (keyI.equals(key)) {
+        // Got a match, return the value
+        LazyObject v = (LazyObject) entry.getValue();
+        return v == null ? v : v.getObject();
+      }
+    }
+    
+    return null;
+  }
+  
+  public Map<Object, Object> getMap() {
+    if (!getParsed()) {
+      parse();
+    }
+    return cachedMap;
+  }
+  
+  public int getMapSize() {
+    if (!getParsed()) {
+      parse();
+    }
+    return cachedMap.size();
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,175 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef;
+import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
+import org.apache.hadoop.hive.serde2.lazy.LazyObject;
+import org.apache.hadoop.hive.serde2.lazy.LazyStruct;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector;
+import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+
+/**
+ * LazyObject for storing an HBase row.  The field of an HBase row can be
+ * primitive or non-primitive.
+ */
+public class LazyHBaseRow extends LazyStruct {
+  
+  /**
+   * The HBase columns mapping of the row.
+   */
+  private List<String> hbaseColumns;
+  private RowResult rowResult;
+  private ArrayList<Object> cachedList;
+  
+  /**
+   * Construct a LazyHBaseRow object with the ObjectInspector.
+   */
+  public LazyHBaseRow(LazySimpleStructObjectInspector oi) {
+    super(oi);
+  }
+  
+  /**
+   * Set the hbase row data(a RowResult writable) for this LazyStruct.
+   * @see LazyHBaseRow#init(RowResult)
+   */
+  public void init(RowResult rr, List<String> hbaseColumns) {
+    this.rowResult = rr;
+    this.hbaseColumns = hbaseColumns;
+    setParsed(false);
+  }
+
+  /**
+   * Parse the RowResult and fill each field.
+   * @see LazyStruct#parse()
+   */
+  private void parse() {
+    if (getFields() == null) {
+      List<? extends StructField> fieldRefs =
+        ((StructObjectInspector)getInspector()).getAllStructFieldRefs();
+      setFields(new LazyObject[fieldRefs.size()]);
+      for (int i = 0; i < getFields().length; i++) {
+        if (i > 0) {
+          String hbaseColumn = hbaseColumns.get(i - 1);
+          if (hbaseColumn.endsWith(":")) {
+            // a column family
+            getFields()[i] = 
+              new LazyHBaseCellMap(
+                (LazyMapObjectInspector)
+                fieldRefs.get(i).getFieldObjectInspector());
+            continue;
+          }
+        }
+        
+        getFields()[i] = LazyFactory.createLazyObject(
+          fieldRefs.get(i).getFieldObjectInspector());
+      }
+      setFieldInited(new boolean[getFields().length]);
+    }
+    Arrays.fill(getFieldInited(), false);
+    setParsed(true);
+  }
+  
+  /**
+   * Get one field out of the hbase row.
+   * 
+   * If the field is a primitive field, return the actual object.
+   * Otherwise return the LazyObject.  This is because PrimitiveObjectInspector
+   * does not have control over the object used by the user - the user simply
+   * directly uses the Object instead of going through 
+   * Object PrimitiveObjectInspector.get(Object).  
+   * 
+   * @param fieldID  The field ID
+   * @return         The field as a LazyObject
+   */
+  public Object getField(int fieldID) {
+    if (!getParsed()) {
+      parse();
+    }
+    return uncheckedGetField(fieldID);
+  }
+  
+  /**
+   * Get the field out of the row without checking whether parsing is needed.
+   * This is called by both getField and getFieldsAsList.
+   * @param fieldID  The id of the field starting from 0.
+   * @param nullSequence  The sequence representing NULL value.
+   * @return  The value of the field
+   */
+  private Object uncheckedGetField(int fieldID) {
+    if (!getFieldInited()[fieldID]) {
+      getFieldInited()[fieldID] = true;
+      
+      ByteArrayRef ref = new ByteArrayRef();
+      
+      if (fieldID == 0) {
+        // the key
+        ref.setData(rowResult.getRow());
+        getFields()[fieldID].init(ref, 0, ref.getData().length);
+      } else {
+        String columnName = hbaseColumns.get(fieldID - 1);
+        if (columnName.endsWith(":")) {
+          // it is a column family
+          ((LazyHBaseCellMap) getFields()[fieldID]).init(
+            rowResult, columnName);
+        } else {
+          // it is a column
+          if (rowResult.containsKey(columnName)) {
+            ref.setData(rowResult.get(columnName).getValue());
+            getFields()[fieldID].init(ref, 0, ref.getData().length);
+          } else {
+            return null;
+          }
+        }
+      }
+    }
+    return getFields()[fieldID].getObject();
+  }
+
+  /**
+   * Get the values of the fields as an ArrayList.
+   * @return The values of the fields as an ArrayList.
+   */
+  public ArrayList<Object> getFieldsAsList() {
+    if (!getParsed()) {
+      parse();
+    }
+    if (cachedList == null) {
+      cachedList = new ArrayList<Object>();
+    } else {
+      cachedList.clear();
+    }
+    for (int i = 0; i < getFields().length; i++) {
+      cachedList.add(uncheckedGetField(i));
+    }
+    return cachedList;
+  }
+  
+  @Override
+  public Object getObject() {
+    return this;
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,21 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/** Implements an HBase storage handler for Hive. */
+package org.apache.hadoop.hive.hbase;
+

Added: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.hbase;
+
+import java.io.File;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster;
+import org.apache.hadoop.hbase.MiniZooKeeperCluster;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hive.ql.QTestUtil;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.zookeeper.Watcher;
+
+
+/**
+ * HBaseQTestUtil defines HBase-specific test fixtures.
+ */
+public class HBaseQTestUtil extends QTestUtil {
+
+  private String tmpdir;
+  
+  private MiniHBaseCluster hbase = null;
+  private MiniZooKeeperCluster zooKeeperCluster;
+  private static final int NUM_REGIONSERVERS = 1;
+  
+  public HBaseQTestUtil(
+    String outDir, String logDir, boolean miniMr) throws Exception {
+
+    super(outDir, logDir, miniMr);
+  }
+  
+  protected void preTestUtilInit() throws Exception {
+    // Setup the hbase Cluster
+    boolean success = false;
+    try {
+      conf.set("hbase.master", "local");
+      tmpdir =  System.getProperty("user.dir")+"/../build/ql/tmp";
+      conf.set("hbase.rootdir", "file://" + tmpdir + "/hbase");
+      zooKeeperCluster = new MiniZooKeeperCluster();
+      int clientPort = zooKeeperCluster.startup(
+        new File(tmpdir, "zookeeper"));
+      conf.set("hbase.zookeeper.property.clientPort",
+        Integer.toString(clientPort));
+      HBaseConfiguration hbaseConf = new HBaseConfiguration(conf);
+      hbase = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS);
+      conf.set("hbase.master", hbase.getHMasterAddress().toString());
+      // opening the META table ensures that cluster is running
+      new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME);
+      success = true;
+    } finally {
+      if (!success) {
+        if (hbase != null) {
+          hbase.shutdown();
+        }
+        if (zooKeeperCluster != null) {
+          zooKeeperCluster.shutdown();
+        }
+      }
+    }
+    
+    String auxJars = conf.getAuxJars();
+    auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://"
+      + new JobConf(conf, HBaseConfiguration.class).getJar();
+    auxJars += ",file://" + new JobConf(conf, HBaseSerDe.class).getJar();
+    auxJars += ",file://" + new JobConf(conf, Watcher.class).getJar();
+    conf.setAuxJars(auxJars);
+  }
+  
+  public void shutdown() throws Exception {
+    if (hbase != null) {
+      HConnectionManager.deleteConnectionInfo(
+        new HBaseConfiguration(conf), true);
+      hbase.shutdown();
+      hbase = null;
+    }
+    if (zooKeeperCluster != null) {
+      zooKeeperCluster.shutdown();
+      zooKeeperCluster = null;
+    }
+    
+    super.shutdown();
+  }
+
+}

Added: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=922404&view=auto
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (added)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Fri Mar 12 19:53:57 2010
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.hbase;
+
+
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.HbaseMapWritable;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.apache.hadoop.hive.serde2.io.ByteWritable;
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.hive.serde2.io.ShortWritable;
+import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
+import org.apache.hadoop.hive.serde2.objectinspector.StructField;
+import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests the HBaseSerDe class.
+ */
+public class TestHBaseSerDe extends TestCase {
+
+  /**
+   * Test the LazySimpleSerDe class.
+   */
+  public void testHBaseSerDe() throws SerDeException {
+    // Create the SerDe
+    HBaseSerDe serDe = new HBaseSerDe();
+    Configuration conf = new Configuration();
+    Properties tbl = createProperties();
+    serDe.initialize(conf, tbl);
+      
+    byte[] colabyte   = "cola:abyte".getBytes();
+    byte[] colbshort  = "colb:ashort".getBytes();
+    byte[] colcint    = "colc:aint".getBytes();
+    byte[] colalong   = "cola:along".getBytes();
+    byte[] colbdouble = "colb:adouble".getBytes();
+    byte[] colcstring = "colc:astring".getBytes();
+      
+    // Data
+    HbaseMapWritable<byte[], Cell> cells =
+      new HbaseMapWritable<byte[], Cell>();
+    cells.put(colabyte,    new Cell("123".getBytes(), 0));
+    cells.put(colbshort,   new Cell("456".getBytes(), 0));
+    cells.put(colcint,     new Cell("789".getBytes(), 0));
+    cells.put(colalong,    new Cell("1000".getBytes(), 0));
+    cells.put(colbdouble,  new Cell("5.3".getBytes(), 0));
+    cells.put(colcstring,  new Cell("hive and hadoop".getBytes(), 0));
+    RowResult rr = new RowResult("test-row1".getBytes(), cells);
+    BatchUpdate bu = new BatchUpdate("test-row1".getBytes());
+    bu.put(colabyte,    "123".getBytes());
+    bu.put(colbshort,   "456".getBytes());
+    bu.put(colcint,     "789".getBytes());
+    bu.put(colalong,    "1000".getBytes());
+    bu.put(colbdouble,  "5.3".getBytes());
+    bu.put(colcstring,  "hive and hadoop".getBytes());
+      
+    Object[] expectedFieldsData = {
+      new Text("test-row1"),
+      new ByteWritable((byte)123),
+      new ShortWritable((short)456),
+      new IntWritable(789),
+      new LongWritable(1000),
+      new DoubleWritable(5.3),
+      new Text("hive and hadoop")
+    };
+     
+    deserializeAndSerialize(serDe, rr, bu, expectedFieldsData);
+  }
+
+  private void deserializeAndSerialize(
+    HBaseSerDe serDe, RowResult rr, BatchUpdate bu,
+      Object[] expectedFieldsData) throws SerDeException {
+
+    // Get the row structure
+    StructObjectInspector oi = (StructObjectInspector)
+      serDe.getObjectInspector();
+    List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
+    assertEquals(7, fieldRefs.size());
+    
+    // Deserialize
+    Object row = serDe.deserialize(rr);
+    for (int i = 0; i < fieldRefs.size(); i++) {
+      Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i));
+      if (fieldData != null) {
+        fieldData = ((LazyPrimitive)fieldData).getWritableObject();
+      }
+      assertEquals("Field " + i, expectedFieldsData[i], fieldData);
+    }
+    // Serialize 
+    assertEquals(BatchUpdate.class, serDe.getSerializedClass());
+    BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi);
+    assertEquals("Serialized data", bu.toString(), serializedBU.toString());
+  }
+
+  private Properties createProperties() {
+    Properties tbl = new Properties();
+    
+    // Set the configuration parameters
+    tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
+    tbl.setProperty("columns",
+        "key,abyte,ashort,aint,along,adouble,astring");
+    tbl.setProperty("columns.types",
+        "string,tinyint:smallint:int:bigint:double:string");
+    tbl.setProperty(HBaseSerDe.HBASE_COL_MAPPING, 
+        "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring");
+    return tbl;
+  }
+  
+}



Mime
View raw message