Return-Path: Delivered-To: apmail-hadoop-hive-commits-archive@minotaur.apache.org Received: (qmail 98975 invoked from network); 12 Mar 2010 19:55:07 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 12 Mar 2010 19:55:07 -0000 Received: (qmail 83450 invoked by uid 500); 12 Mar 2010 19:54:29 -0000 Delivered-To: apmail-hadoop-hive-commits-archive@hadoop.apache.org Received: (qmail 83426 invoked by uid 500); 12 Mar 2010 19:54:29 -0000 Mailing-List: contact hive-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hadoop.apache.org Delivered-To: mailing list hive-commits@hadoop.apache.org Received: (qmail 83418 invoked by uid 99); 12 Mar 2010 19:54:29 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Mar 2010 19:54:29 +0000 X-ASF-Spam-Status: No, hits=-1998.9 required=10.0 tests=ALL_TRUSTED,FRT_ADOBE2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 12 Mar 2010 19:54:20 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8E8F923888FE; Fri, 12 Mar 2010 19:54:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r922404 [1/3] - in /hadoop/hive/trunk: ./ hbase-handler/ hbase-handler/lib/ hbase-handler/src/ hbase-handler/src/java/ hbase-handler/src/java/org/ hbase-handler/src/java/org/apache/ hbase-handler/src/java/org/apache/hadoop/ hbase-handler/sr... Date: Fri, 12 Mar 2010 19:53:59 -0000 To: hive-commits@hadoop.apache.org From: namit@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20100312195400.8E8F923888FE@eris.apache.org> Author: namit Date: Fri Mar 12 19:53:57 2010 New Revision: 922404 URL: http://svn.apache.org/viewvc?rev=922404&view=rev Log: HIVE-705. Read HBase tables via Hive (John Sichi via namit) Added: hadoop/hive/trunk/hbase-handler/ hadoop/hive/trunk/hbase-handler/README.txt hadoop/hive/trunk/hbase-handler/build.xml hadoop/hive/trunk/hbase-handler/ivy.xml hadoop/hive/trunk/hbase-handler/lib/ hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar (with props) hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar (with props) hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar (with props) hadoop/hive/trunk/hbase-handler/src/ hadoop/hive/trunk/hbase-handler/src/java/ hadoop/hive/trunk/hbase-handler/src/java/org/ hadoop/hive/trunk/hbase-handler/src/java/org/apache/ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java hadoop/hive/trunk/hbase-handler/src/test/ hadoop/hive/trunk/hbase-handler/src/test/org/ hadoop/hive/trunk/hbase-handler/src/test/org/apache/ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java hadoop/hive/trunk/hbase-handler/src/test/queries/ hadoop/hive/trunk/hbase-handler/src/test/queries/hbase_queries.q hadoop/hive/trunk/hbase-handler/src/test/results/ hadoop/hive/trunk/hbase-handler/src/test/results/hbase_queries.q.out hadoop/hive/trunk/hbase-handler/src/test/templates/ hadoop/hive/trunk/hbase-handler/src/test/templates/TestHBaseCliDriver.vm hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHook.java hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaHookLoader.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/DefaultStorageHandler.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveStorageHandler.java hadoop/hive/trunk/ql/src/test/queries/clientnegative/alter_non_native.q hadoop/hive/trunk/ql/src/test/queries/clientnegative/load_non_native.q hadoop/hive/trunk/ql/src/test/results/clientnegative/alter_non_native.q.out hadoop/hive/trunk/ql/src/test/results/clientnegative/load_non_native.q.out Modified: hadoop/hive/trunk/CHANGES.txt hadoop/hive/trunk/build-common.xml hadoop/hive/trunk/build.xml hadoop/hive/trunk/metastore/if/hive_metastore.thrift hadoop/hive/trunk/metastore/src/gen-cpp/hive_metastore_constants.cpp hadoop/hive/trunk/metastore/src/gen-cpp/hive_metastore_constants.h hadoop/hive/trunk/metastore/src/gen-javabean/org/apache/hadoop/hive/metastore/api/Constants.java hadoop/hive/trunk/metastore/src/gen-php/hive_metastore_constants.php hadoop/hive/trunk/metastore/src/gen-py/hive_metastore/constants.py hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStore.java hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/HiveMetaStoreClient.java hadoop/hive/trunk/metastore/src/java/org/apache/hadoop/hive/metastore/MetaStoreUtils.java hadoop/hive/trunk/metastore/src/test/org/apache/hadoop/hive/metastore/TestHiveMetaStore.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/ExecDriver.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FetchOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Operator.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/exec/Utilities.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/io/HiveInputFormat.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Hive.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/HiveUtils.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/metadata/Table.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/BaseSemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/ErrorMsg.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/Hive.g hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/LoadSemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/AlterTableDesc.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/MapredLocalWork.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PartitionDesc.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/PlanUtils.java hadoop/hive/trunk/ql/src/java/org/apache/hadoop/hive/ql/plan/TableDesc.java hadoop/hive/trunk/ql/src/test/org/apache/hadoop/hive/ql/QTestUtil.java hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyMap.java hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyObject.java hadoop/hive/trunk/serde/src/java/org/apache/hadoop/hive/serde2/lazy/LazyStruct.java Modified: hadoop/hive/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=922404&r1=922403&r2=922404&view=diff ============================================================================== --- hadoop/hive/trunk/CHANGES.txt (original) +++ hadoop/hive/trunk/CHANGES.txt Fri Mar 12 19:53:57 2010 @@ -48,8 +48,8 @@ Trunk - Unreleased HIVE-1194. Add sort merge join (He Yongqiang via namit) - HIVE-1241. Drop the table at the beginning of tests - (He Yongqiang via namit) + HIVE-705. Read HBase tables via Hive + (John Sichi via namit) IMPROVEMENTS HIVE-983. Function from_unixtime takes long. @@ -258,6 +258,9 @@ Trunk - Unreleased HIVE-1022. desc Table should work. (namit via He Yongqiang) + HIVE-1241. Drop the table at the beginning of tests + (He Yongqiang via namit) + Release 0.5.0 - Unreleased INCOMPATIBLE CHANGES Modified: hadoop/hive/trunk/build-common.xml URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build-common.xml?rev=922404&r1=922403&r2=922404&view=diff ============================================================================== --- hadoop/hive/trunk/build-common.xml (original) +++ hadoop/hive/trunk/build-common.xml Fri Mar 12 19:53:57 2010 @@ -429,6 +429,7 @@ + Modified: hadoop/hive/trunk/build.xml URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/build.xml?rev=922404&r1=922403&r2=922404&view=diff ============================================================================== --- hadoop/hive/trunk/build.xml (original) +++ hadoop/hive/trunk/build.xml Fri Mar 12 19:53:57 2010 @@ -46,6 +46,16 @@ + + + + + + + + + + @@ -82,7 +92,7 @@ - + @@ -93,7 +103,7 @@ - + @@ -286,6 +296,7 @@ + Added: hadoop/hive/trunk/hbase-handler/README.txt URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/README.txt?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/README.txt (added) +++ hadoop/hive/trunk/hbase-handler/README.txt Fri Mar 12 19:53:57 2010 @@ -0,0 +1,2 @@ +See http://wiki.apache.org/hadoop/Hive/HBaseIntegration for +information about the HBase storage handler. Added: hadoop/hive/trunk/hbase-handler/build.xml URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/build.xml?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/build.xml (added) +++ hadoop/hive/trunk/hbase-handler/build.xml Fri Mar 12 19:53:57 2010 @@ -0,0 +1,97 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + Added: hadoop/hive/trunk/hbase-handler/ivy.xml URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/ivy.xml?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/ivy.xml (added) +++ hadoop/hive/trunk/hbase-handler/ivy.xml Fri Mar 12 19:53:57 2010 @@ -0,0 +1,8 @@ + + + + + + + + Added: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar?rev=922404&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3-test.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar?rev=922404&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/hive/trunk/hbase-handler/lib/hbase-0.20.3.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar?rev=922404&view=auto ============================================================================== Binary file - no diff available. Propchange: hadoop/hive/trunk/hbase-handler/lib/zookeeper-3.2.2.jar ------------------------------------------------------------------------------ svn:mime-type = application/octet-stream Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,495 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.ByteStream; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; + +/** + * HBaseSerDe can be used to serialize object into an HBase table and + * deserialize objects from an HBase table. + */ +public class HBaseSerDe implements SerDe { + + public static final String HBASE_COL_MAPPING = "hbase.columns.mapping"; + + public static final String HBASE_TABLE_NAME = "hbase.table.name"; + + public static final Log LOG = LogFactory.getLog( + HBaseSerDe.class.getName()); + + private ObjectInspector cachedObjectInspector; + private HBaseSerDeParameters hbSerDeParams; + private boolean useJSONSerialize; + private LazyHBaseRow cachedHBaseRow; + private BatchUpdate serializeCache; + private ByteStream.Output serializeStream = new ByteStream.Output(); + + /** + * HBaseSerDeParameters defines the parameters used to + * instantiate HBaseSerDe. + */ + public static class HBaseSerDeParameters { + private List hbaseColumnNames; + private SerDeParameters serdeParams; + + public List getHBaseColumnNames() { + return hbaseColumnNames; + } + + public SerDeParameters getSerDeParameters() { + return serdeParams; + } + } + + public String toString() { + return getClass().toString() + + "[" + + hbSerDeParams.hbaseColumnNames + + ":" + + ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo()) + .getAllStructFieldNames() + + ":" + + ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo()) + .getAllStructFieldTypeInfos() + "]"; + } + + public HBaseSerDe() throws SerDeException { + } + + /** + * Initialize the SerDe given parameters. + * @see SerDe#initialize(Configuration, Properties) + */ + public void initialize(Configuration conf, Properties tbl) + throws SerDeException { + hbSerDeParams = HBaseSerDe.initHBaseSerDeParameters(conf, tbl, + getClass().getName()); + + // We just used columnNames & columnTypes these two parameters + cachedObjectInspector = LazyFactory.createLazyStructInspector( + hbSerDeParams.serdeParams.getColumnNames(), + hbSerDeParams.serdeParams.getColumnTypes(), + hbSerDeParams.serdeParams.getSeparators(), + hbSerDeParams.serdeParams.getNullSequence(), + hbSerDeParams.serdeParams.isLastColumnTakesRest(), + hbSerDeParams.serdeParams.isEscaped(), + hbSerDeParams.serdeParams.getEscapeChar()); + + cachedHBaseRow = new LazyHBaseRow( + (LazySimpleStructObjectInspector) cachedObjectInspector); + + if (LOG.isDebugEnabled()) { + LOG.debug("HBaseSerDe initialized with : columnNames = " + + hbSerDeParams.serdeParams.getColumnNames() + + " columnTypes = " + + hbSerDeParams.serdeParams.getColumnTypes() + + " hbaseColumnMapping = " + + hbSerDeParams.hbaseColumnNames); + } + } + + public static HBaseSerDeParameters initHBaseSerDeParameters( + Configuration job, Properties tbl, String serdeName) + throws SerDeException { + + HBaseSerDeParameters serdeParams = new HBaseSerDeParameters(); + + // Read Configuration Parameter + String hbaseColumnNameProperty = + tbl.getProperty(HBaseSerDe.HBASE_COL_MAPPING); + String columnTypeProperty = + tbl.getProperty(Constants.LIST_COLUMN_TYPES); + + // Initial the hbase column list + if (hbaseColumnNameProperty != null + && hbaseColumnNameProperty.length() > 0) { + + serdeParams.hbaseColumnNames = + Arrays.asList(hbaseColumnNameProperty.split(",")); + } else { + serdeParams.hbaseColumnNames = new ArrayList(); + } + + // Add the hbase key to the columnNameList and columnTypeList + + // Build the type property string + if (columnTypeProperty == null) { + StringBuilder sb = new StringBuilder(); + sb.append(Constants.STRING_TYPE_NAME); + + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String colName = serdeParams.hbaseColumnNames.get(i); + if (colName.endsWith(":")) { + sb.append(":").append( + Constants.MAP_TYPE_NAME + "<" + + Constants.STRING_TYPE_NAME + + "," + Constants.STRING_TYPE_NAME + ">"); + } else { + sb.append(":").append(Constants.STRING_TYPE_NAME); + } + } + tbl.setProperty(Constants.LIST_COLUMN_TYPES, sb.toString()); + } + + serdeParams.serdeParams = LazySimpleSerDe.initSerdeParams( + job, tbl, serdeName); + + if (serdeParams.hbaseColumnNames.size() + 1 + != serdeParams.serdeParams.getColumnNames().size()) { + + throw new SerDeException(serdeName + ": columns has " + + serdeParams.serdeParams.getColumnNames().size() + + " elements while hbase.columns.mapping has " + + serdeParams.hbaseColumnNames.size() + " elements!"); + } + + // check that the mapping schema is right; + // we just can make sure that "columnfamily:" is mapped to MAP + for (int i = 0; i < serdeParams.hbaseColumnNames.size(); i++) { + String hbaseColName = serdeParams.hbaseColumnNames.get(i); + if (hbaseColName.endsWith(":")) { + TypeInfo typeInfo = serdeParams.serdeParams.getColumnTypes().get(i + 1); + if ((typeInfo.getCategory() != Category.MAP) || + (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName() + != Constants.STRING_TYPE_NAME)) { + + throw new SerDeException( + serdeName + ": hbase column family '" + + hbaseColName + + "' should be mapped to map but is mapped to " + + typeInfo.getTypeName()); + } + } + } + + return serdeParams; + } + + /** + * Deserialize a row from the HBase RowResult writable to a LazyObject + * @param rowResult the HBase RowResult Writable contain a row + * @return the deserialized object + * @see SerDe#deserialize(Writable) + */ + public Object deserialize(Writable rowResult) throws SerDeException { + + if (!(rowResult instanceof RowResult)) { + throw new SerDeException(getClass().getName() + ": expects RowResult!"); + } + + RowResult rr = (RowResult)rowResult; + cachedHBaseRow.init(rr, hbSerDeParams.hbaseColumnNames); + return cachedHBaseRow; + } + + @Override + public ObjectInspector getObjectInspector() throws SerDeException { + return cachedObjectInspector; + } + + @Override + public Class getSerializedClass() { + return BatchUpdate.class; + } + + @Override + public Writable serialize(Object obj, ObjectInspector objInspector) + throws SerDeException { + if (objInspector.getCategory() != Category.STRUCT) { + throw new SerDeException(getClass().toString() + + " can only serialize struct types, but we got: " + + objInspector.getTypeName()); + } + + // Prepare the field ObjectInspectors + StructObjectInspector soi = (StructObjectInspector) objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + List declaredFields = + (hbSerDeParams.serdeParams.getRowTypeInfo() != null && + ((StructTypeInfo) hbSerDeParams.serdeParams.getRowTypeInfo()) + .getAllStructFieldNames().size() > 0) ? + ((StructObjectInspector)getObjectInspector()).getAllStructFieldRefs() + : null; + + boolean isNotNull = false; + String hbaseColumn = ""; + + try { + // Serialize each field + for (int i = 0; i < fields.size(); i++) { + serializeStream.reset(); + // Get the field objectInspector and the field object. + ObjectInspector foi = fields.get(i).getFieldObjectInspector(); + Object f = (list == null ? null : list.get(i)); + + if (declaredFields != null && i >= declaredFields.size()) { + throw new SerDeException( + "Error: expecting " + declaredFields.size() + + " but asking for field " + i + "\n" + "data=" + obj + "\n" + + "tableType=" + + hbSerDeParams.serdeParams.getRowTypeInfo().toString() + + "\n" + + "dataType=" + + TypeInfoUtils.getTypeInfoFromObjectInspector(objInspector)); + } + + if (f == null) { + // a null object, we do not serialize it + continue; + } + + if (i > 0) { + hbaseColumn = hbSerDeParams.hbaseColumnNames.get(i-1); + } + + // If the field that is column family in hbase + if (i > 0 && hbaseColumn.endsWith(":")) { + MapObjectInspector moi = (MapObjectInspector)foi; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(f); + if (map == null) { + continue; + } else { + for (Map.Entry entry: map.entrySet()) { + // Get the Key + serialize(serializeStream, entry.getKey(), koi, + hbSerDeParams.serdeParams.getSeparators(), 3, + hbSerDeParams.serdeParams.getNullSequence(), + hbSerDeParams.serdeParams.isEscaped(), + hbSerDeParams.serdeParams.getEscapeChar(), + hbSerDeParams.serdeParams.getNeedsEscape()); + + // generate a column name (column_family:column_name) + hbaseColumn += Bytes.toString( + serializeStream.getData(), 0, serializeStream.getCount()); + + // Get the Value + serializeStream.reset(); + + isNotNull = serialize(serializeStream, entry.getValue(), voi, + hbSerDeParams.serdeParams.getSeparators(), 3, + hbSerDeParams.serdeParams.getNullSequence(), + hbSerDeParams.serdeParams.isEscaped(), + hbSerDeParams.serdeParams.getEscapeChar(), + hbSerDeParams.serdeParams.getNeedsEscape()); + } + } + } else { + // If the field that is passed in is NOT a primitive, and either the + // field is not declared (no schema was given at initialization), or + // the field is declared as a primitive in initialization, serialize + // the data to JSON string. Otherwise serialize the data in the + // delimited way. + if (!foi.getCategory().equals(Category.PRIMITIVE) + && (declaredFields == null || + declaredFields.get(i).getFieldObjectInspector().getCategory() + .equals(Category.PRIMITIVE) || useJSONSerialize)) { + isNotNull = serialize( + serializeStream, SerDeUtils.getJSONString(f, foi), + PrimitiveObjectInspectorFactory.javaStringObjectInspector, + hbSerDeParams.serdeParams.getSeparators(), 1, + hbSerDeParams.serdeParams.getNullSequence(), + hbSerDeParams.serdeParams.isEscaped(), + hbSerDeParams.serdeParams.getEscapeChar(), + hbSerDeParams.serdeParams.getNeedsEscape()); + } else { + isNotNull = serialize( + serializeStream, f, foi, + hbSerDeParams.serdeParams.getSeparators(), 1, + hbSerDeParams.serdeParams.getNullSequence(), + hbSerDeParams.serdeParams.isEscaped(), + hbSerDeParams.serdeParams.getEscapeChar(), + hbSerDeParams.serdeParams.getNeedsEscape()); + } + } + + byte [] key = new byte[serializeStream.getCount()]; + System.arraycopy( + serializeStream.getData(), 0, key, 0, serializeStream.getCount()); + if (i == 0) { + // the first column is the hbase key + serializeCache = new BatchUpdate(key); + } else { + if (isNotNull) { + serializeCache.put(hbaseColumn, key); + } + } + } + } catch (IOException e) { + throw new SerDeException(e); + } + + return serializeCache; + } + + /** + * Serialize the row into the StringBuilder. + * @param out The StringBuilder to store the serialized data. + * @param obj The object for the current field. + * @param objInspector The ObjectInspector for the current Object. + * @param separators The separators array. + * @param level The current level of separator. + * @param nullSequence The byte sequence representing the NULL value. + * @param escaped Whether we need to escape the data when writing out + * @param escapeChar Which char to use as the escape char, e.g. '\\' + * @param needsEscape Which chars needs to be escaped. + * This array should have size of 128. + * Negative byte values (or byte values >= 128) + * are never escaped. + * @throws IOException + * @return true, if serialize a not-null object; otherwise false. + */ + public static boolean serialize(ByteStream.Output out, Object obj, + ObjectInspector objInspector, byte[] separators, int level, + Text nullSequence, boolean escaped, byte escapeChar, + boolean[] needsEscape) throws IOException { + + switch (objInspector.getCategory()) { + case PRIMITIVE: { + LazyUtils.writePrimitiveUTF8( + out, obj, + (PrimitiveObjectInspector) objInspector, + escaped, escapeChar, needsEscape); + return true; + } + case LIST: { + char separator = (char) separators[level]; + ListObjectInspector loi = (ListObjectInspector)objInspector; + List list = loi.getList(obj); + ObjectInspector eoi = loi.getListElementObjectInspector(); + if (list == null) { + return false; + } else { + for (int i = 0; i < list.size(); i++) { + if (i > 0) { + out.write(separator); + } + serialize(out, list.get(i), eoi, separators, level + 1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case MAP: { + char separator = (char) separators[level]; + char keyValueSeparator = (char) separators[level+1]; + MapObjectInspector moi = (MapObjectInspector) objInspector; + ObjectInspector koi = moi.getMapKeyObjectInspector(); + ObjectInspector voi = moi.getMapValueObjectInspector(); + + Map map = moi.getMap(obj); + if (map == null) { + return false; + } else { + boolean first = true; + for (Map.Entry entry: map.entrySet()) { + if (first) { + first = false; + } else { + out.write(separator); + } + serialize(out, entry.getKey(), koi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + out.write(keyValueSeparator); + serialize(out, entry.getValue(), voi, separators, level+2, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + case STRUCT: { + char separator = (char)separators[level]; + StructObjectInspector soi = (StructObjectInspector)objInspector; + List fields = soi.getAllStructFieldRefs(); + List list = soi.getStructFieldsDataAsList(obj); + if (list == null) { + return false; + } else { + for (int i = 0; i 0) { + out.write(separator); + } + serialize(out, list.get(i), + fields.get(i).getFieldObjectInspector(), separators, level + 1, + nullSequence, escaped, escapeChar, needsEscape); + } + } + return true; + } + } + + throw new RuntimeException("Unknown category type: " + + objInspector.getCategory()); + } + + + /** + * @return the useJSONSerialize + */ + public boolean isUseJSONSerialize() { + return useJSONSerialize; + } + + /** + * @param useJSONSerialize the useJSONSerialize to set + */ + public void setUseJSONSerialize(boolean useJSONSerialize) { + this.useJSONSerialize = useJSONSerialize; + } + +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSplit.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileSplit; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.hbase.mapred.TableSplit; + +/** + * HBaseSplit augments FileSplit with HBase column mapping. + */ +public class HBaseSplit extends FileSplit implements InputSplit { + private String hbaseColumnMapping; + private TableSplit split; + + public HBaseSplit() { + super((Path) null, 0, 0, (String[]) null); + hbaseColumnMapping = ""; + split = new TableSplit(); + } + + public HBaseSplit(TableSplit split, String columnsMapping, Path dummyPath) { + super(dummyPath, 0, 0, (String[]) null); + this.split = split; + hbaseColumnMapping = columnsMapping; + } + + public TableSplit getSplit() { + return this.split; + } + + public String getColumnsMapping() { + return this.hbaseColumnMapping; + } + + @Override + public void readFields(DataInput in) throws IOException { + super.readFields(in); + hbaseColumnMapping = in.readUTF(); + split.readFields(in); + } + + @Override + public String toString() { + return "TableSplit " + split + " : " + hbaseColumnMapping; + } + + @Override + public void write(DataOutput out) throws IOException { + super.write(out); + out.writeUTF(hbaseColumnMapping); + split.write(out); + } + + @Override + public long getLength() { + return split.getLength(); + } + + @Override + public String[] getLocations() throws IOException { + return split.getLocations(); + } +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Map; +import java.util.Properties; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.HiveMetaHook; +import org.apache.hadoop.hive.metastore.MetaStoreUtils; +import org.apache.hadoop.hive.metastore.api.Constants; +import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.api.Partition; +import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler; +import org.apache.hadoop.hive.ql.plan.TableDesc; +import org.apache.hadoop.hive.serde2.SerDe; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.OutputFormat; +import org.apache.hadoop.util.StringUtils; + +/** + * HBaseStorageHandler provides a HiveStorageHandler implementation for + * HBase. + */ +public class HBaseStorageHandler + implements HiveStorageHandler, HiveMetaHook { + + private HBaseConfiguration hbaseConf; + private HBaseAdmin admin; + + private HBaseAdmin getHBaseAdmin() throws MetaException { + try { + if (admin == null) { + admin = new HBaseAdmin(hbaseConf); + } + return admin; + } catch (MasterNotRunningException mnre) { + throw new MetaException(StringUtils.stringifyException(mnre)); + } + } + + private String getHBaseTableName(Table tbl) { + String tableName = tbl.getSd().getSerdeInfo().getParameters().get( + HBaseSerDe.HBASE_TABLE_NAME); + if (tableName == null) { + tableName = tbl.getTableName(); + } + return tableName; + } + + @Override + public void preDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void rollbackDropTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public void commitDropTable( + Table tbl, boolean deleteData) throws MetaException { + + try { + String tableName = getHBaseTableName(tbl); + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + if (deleteData && !isExternal) { + if (getHBaseAdmin().isTableEnabled(tableName)) { + getHBaseAdmin().disableTable(tableName); + } + getHBaseAdmin().deleteTable(tableName); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void preCreateTable(Table tbl) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(tbl); + + // We'd like to move this to HiveMetaStore for any non-native table, but + // first we need to support storing NULL for location on a table + if (tbl.getSd().getLocation() != null) { + throw new MetaException("LOCATION may not be specified for HBase."); + } + + try { + String tblName = getHBaseTableName(tbl); + + // Build the mapping schema + Set columnFamilies = new HashSet(); + // Check the hbase columns and get all the families + Map serdeParam = + tbl.getSd().getSerdeInfo().getParameters(); + String hbaseColumnStr = serdeParam.get(HBaseSerDe.HBASE_COL_MAPPING); + if (hbaseColumnStr == null) { + throw new MetaException("No hbase.columns.mapping defined in Serde."); + } + String [] hbaseColumns = hbaseColumnStr.split(","); + for (String hbaseColumn : hbaseColumns) { + int idx = hbaseColumn.indexOf(":"); + if (idx < 0) { + throw new MetaException( + hbaseColumn + " is not a qualified hbase column."); + } + columnFamilies.add(hbaseColumn.substring(0, idx)); + } + + // Check if the given hbase table exists + HTableDescriptor tblDesc; + + if (!getHBaseAdmin().tableExists(tblName)) { + // if it is not an external table then create one + if (!isExternal) { + // Create the all column descriptors + tblDesc = new HTableDescriptor(tblName); + for (String cf : columnFamilies) { + tblDesc.addFamily(new HColumnDescriptor(cf + ":")); + } + + getHBaseAdmin().createTable(tblDesc); + } else { + // an external table + throw new MetaException("HBase table " + tblName + + " doesn't exist while the table is declared as an external table."); + } + + } else { + if (!isExternal) { + throw new MetaException("Table " + tblName + " already exists" + + " within HBase; use CREATE EXTERNAL TABLE instead to" + + " register it in Hive."); + } + // make sure the schema mapping is right + tblDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tblName)); + for (String cf : columnFamilies) { + if (!tblDesc.hasFamily(Bytes.toBytes(cf))) { + throw new MetaException("Column Family " + cf + + " is not defined in hbase table " + tblName); + } + } + + } + // ensure the table is online + new HTable(hbaseConf, tblDesc.getName()); + } catch (MasterNotRunningException mnre) { + throw new MetaException(StringUtils.stringifyException(mnre)); + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void rollbackCreateTable(Table table) throws MetaException { + boolean isExternal = MetaStoreUtils.isExternalTable(table); + String tableName = getHBaseTableName(table); + try { + if (!isExternal && getHBaseAdmin().tableExists(tableName)) { + // we have create an hbase table, so we delete it to roll back; + if (getHBaseAdmin().isTableEnabled(tableName)) { + getHBaseAdmin().disableTable(tableName); + } + getHBaseAdmin().deleteTable(tableName); + } + } catch (IOException ie) { + throw new MetaException(StringUtils.stringifyException(ie)); + } + } + + @Override + public void commitCreateTable(Table table) throws MetaException { + // nothing to do + } + + @Override + public Configuration getConf() { + return hbaseConf; + } + + @Override + public void setConf(Configuration conf) { + hbaseConf = new HBaseConfiguration(conf); + } + + @Override + public Class getInputFormatClass() { + return HiveHBaseTableInputFormat.class; + } + + @Override + public Class getOutputFormatClass() { + return HiveHBaseTableOutputFormat.class; + } + + @Override + public Class getSerDeClass() { + return HBaseSerDe.class; + } + + @Override + public HiveMetaHook getMetaHook() { + return this; + } + + @Override + public void configureTableJobProperties( + TableDesc tableDesc, + Map jobProperties) { + + Properties tableProperties = tableDesc.getProperties(); + + jobProperties.put( + HBaseSerDe.HBASE_COL_MAPPING, + tableProperties.getProperty(HBaseSerDe.HBASE_COL_MAPPING)); + + String tableName = + tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_NAME); + if (tableName == null) { + tableName = + tableProperties.getProperty(Constants.META_TABLE_NAME); + } + jobProperties.put(HBaseSerDe.HBASE_TABLE_NAME, tableName); + } +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,179 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.mapred.TableSplit; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobConfigurable; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.mapred.Reporter; + +/** + * HiveHBaseTableInputFormat implements InputFormat for HBase storage handler + * tables, decorating an underlying HBase TableInputFormat with extra Hive logic + * such as column pruning. + */ +public class HiveHBaseTableInputFormat + implements InputFormat, JobConfigurable { + + static final Log LOG = LogFactory.getLog(HiveHBaseTableInputFormat.class); + + private HBaseExposedTableInputFormat hbaseInputFormat; + + public HiveHBaseTableInputFormat() { + hbaseInputFormat = new HBaseExposedTableInputFormat(); + } + + @Override + public RecordReader getRecordReader( + InputSplit split, JobConf job, + Reporter reporter) throws IOException { + + HBaseSplit hbaseSplit = (HBaseSplit) split; + + byte [] tableNameBytes; + String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME); + hbaseInputFormat.setHBaseTable( + new HTable( + new HBaseConfiguration(job), + Bytes.toBytes(hbaseTableName))); + + // because the hbase key is mapped to the first column in its hive table, + // we add the "_key" before the columnMapping that we can use the + // hive column id to find the exact hbase column one-for-one. + String columnMapping = "_key," + hbaseSplit.getColumnsMapping(); + String[] columns = columnMapping.split(","); + List readColIDs = + ColumnProjectionUtils.getReadColumnIDs(job); + + if (columns.length < readColIDs.size()) { + throw new IOException( + "Cannot read more columns than the given table contains."); + } + + byte [][] scanColumns; + if (readColIDs.size() == 0) { + scanColumns = new byte[columns.length - 1][]; + for (int i=0; i < columns.length - 1; i++) { + scanColumns[i] = Bytes.toBytes(columns[i + 1]); + } + } else { + Collections.sort(readColIDs); + + if (readColIDs.get(0) == 0) { + // sql like "select key from hbasetable;" + // As HBase can not scan a hbase table while just getting its keys, + // so we will scan out the second column of the hive table + // but ignore it. + if (readColIDs.size() == 1) { + scanColumns = new byte[1][]; + scanColumns[0] = Bytes.toBytes(columns[1]); + } else { + scanColumns = new byte[readColIDs.size() - 1][]; + for (int i=0; i) + hbaseInputFormat.getRecordReader(hbaseSplit.getSplit(), job, reporter); + } + + @Override + public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { + Path [] tableNames = FileInputFormat.getInputPaths(job); + String hbaseTableName = job.get(HBaseSerDe.HBASE_TABLE_NAME); + hbaseInputFormat.setHBaseTable( + new HTable(new HBaseConfiguration(job), hbaseTableName)); + + String hbaseSchemaMapping = job.get(HBaseSerDe.HBASE_COL_MAPPING); + if (hbaseSchemaMapping == null) { + throw new IOException("hbase.columns.mapping required for HBase Table."); + } + + String [] columns = hbaseSchemaMapping.split(","); + byte [][] inputColumns = new byte[columns.length][]; + for (int i=0; i < columns.length; i++) { + inputColumns[i] = Bytes.toBytes(columns[i]); + } + + hbaseInputFormat.setScanColumns(inputColumns); + + InputSplit[] splits = hbaseInputFormat.getSplits( + job, numSplits <= 0 ? 1 : numSplits); + InputSplit[] results = new InputSplit[splits.length]; + for (int i = 0; i < splits.length; i++) { + results[i] = new HBaseSplit( + (TableSplit) splits[i], hbaseSchemaMapping, tableNames[0]); + } + return results; + } + + @Override + public void configure(JobConf job) { + hbaseInputFormat.configure(job); + } + + /** + * HBaseExposedTableInputFormat exposes some protected methods + * from the HBase TableInputFormatBase. + */ + static class HBaseExposedTableInputFormat + extends org.apache.hadoop.hbase.mapred.TableInputFormatBase + implements JobConfigurable { + + @Override + public void configure(JobConf job) { + // not needed for now + } + + public void setScanColumns(byte[][] scanColumns) { + setInputColumns(scanColumns); + } + + public void setHBaseTable(HTable table) { + setHTable(table); + } + } +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableOutputFormat.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,87 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.IOException; +import java.util.Properties; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.hadoop.hbase.mapred.TableOutputFormat; +import org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter; +import org.apache.hadoop.hive.ql.io.HiveOutputFormat; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.util.Progressable; + +/** + * HiveHBaseTableOutputFormat implements TableOutputFormat for HBase tables. + */ +public class HiveHBaseTableOutputFormat extends + TableOutputFormat implements + HiveOutputFormat { + + private final ImmutableBytesWritable key = new ImmutableBytesWritable(); + + /** + * Update to the final out table, and output an empty key as the key. + * + * @param jc + * the job configuration file + * @param finalOutPath + * the final output table name + * @param valueClass + * the value class used for create + * @param isCompressed + * whether the content is compressed or not + * @param tableProperties + * the tableInfo of this file's corresponding table + * @param progress + * progress used for status report + * @return the RecordWriter for the output file + */ + @Override + public RecordWriter getHiveRecordWriter(JobConf jc, Path finalOutPath, + Class valueClass, boolean isCompressed, + Properties tableProperties, Progressable progress) throws IOException { + String hbaseTableName = jc.get(HBaseSerDe.HBASE_TABLE_NAME); + jc.set(TableOutputFormat.OUTPUT_TABLE, hbaseTableName); + + final org.apache.hadoop.mapred.RecordWriter< + ImmutableBytesWritable, BatchUpdate> tblWriter = + this.getRecordWriter(null, jc, null, progress); + + return new RecordWriter() { + + @Override + public void close(boolean abort) throws IOException { + tblWriter.close(null); + } + + @Override + public void write(Writable w) throws IOException { + BatchUpdate bu = (BatchUpdate) w; + key.set(bu.getRow()); + tblWriter.write(key, bu); + } + }; + } + +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyMap; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; + +/** + * LazyHBaseCellMap refines LazyMap with HBase column mapping. + */ +public class LazyHBaseCellMap extends LazyMap { + + private RowResult rowResult; + private String hbaseColumnFamily; + + /** + * Construct a LazyCellMap object with the ObjectInspector. + * @param oi + */ + public LazyHBaseCellMap(LazyMapObjectInspector oi) { + super(oi); + } + + @Override + public void init(ByteArrayRef bytes, int start, int length) { + // do nothing + } + + public void init(RowResult rr, String columnFamily) { + rowResult = rr; + hbaseColumnFamily = columnFamily; + setParsed(false); + } + + private void parse() { + if (cachedMap == null) { + cachedMap = new LinkedHashMap(); + } else { + cachedMap.clear(); + } + + Iterator iter = rowResult.keySet().iterator(); + + byte[] columnFamily = hbaseColumnFamily.getBytes(); + while (iter.hasNext()) { + byte [] columnKey = iter.next(); + if (columnFamily.length > columnKey.length) { + continue; + } + + if (0 == LazyUtils.compare( + columnFamily, 0, columnFamily.length, + columnKey, 0, columnFamily.length)) { + + byte [] columnValue = rowResult.get(columnKey).getValue(); + if (columnValue == null || columnValue.length == 0) { + // an empty object + continue; + } + + // Keys are always primitive + LazyPrimitive key = LazyFactory.createLazyPrimitiveClass( + (PrimitiveObjectInspector) + ((MapObjectInspector) getInspector()).getMapKeyObjectInspector()); + ByteArrayRef keyRef = new ByteArrayRef(); + keyRef.setData(columnKey); + key.init( + keyRef, columnFamily.length, columnKey.length - columnFamily.length); + + // Value + LazyObject value = LazyFactory.createLazyObject( + ((MapObjectInspector) getInspector()).getMapValueObjectInspector()); + ByteArrayRef valueRef = new ByteArrayRef(); + valueRef.setData(columnValue); + value.init(valueRef, 0, columnValue.length); + + // Put it into the map + cachedMap.put(key.getObject(), value.getObject()); + } + } + } + + /** + * Get the value in the map for the given key. + * + * @param key + * @return + */ + public Object getMapValueElement(Object key) { + if (!getParsed()) { + parse(); + } + + for (Map.Entry entry : cachedMap.entrySet()) { + LazyPrimitive lazyKeyI = (LazyPrimitive) entry.getKey(); + // getWritableObject() will convert LazyPrimitive to actual primitive + // writable objects. + Object keyI = lazyKeyI.getWritableObject(); + if (keyI == null) { + continue; + } + if (keyI.equals(key)) { + // Got a match, return the value + LazyObject v = (LazyObject) entry.getValue(); + return v == null ? v : v.getObject(); + } + } + + return null; + } + + public Map getMap() { + if (!getParsed()) { + parse(); + } + return cachedMap; + } + + public int getMapSize() { + if (!getParsed()) { + parse(); + } + return cachedMap.size(); + } + +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,175 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; +import org.apache.hadoop.hive.serde2.lazy.LazyFactory; +import org.apache.hadoop.hive.serde2.lazy.LazyObject; +import org.apache.hadoop.hive.serde2.lazy.LazyStruct; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazyMapObjectInspector; +import org.apache.hadoop.hive.serde2.lazy.objectinspector.LazySimpleStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; + +/** + * LazyObject for storing an HBase row. The field of an HBase row can be + * primitive or non-primitive. + */ +public class LazyHBaseRow extends LazyStruct { + + /** + * The HBase columns mapping of the row. + */ + private List hbaseColumns; + private RowResult rowResult; + private ArrayList cachedList; + + /** + * Construct a LazyHBaseRow object with the ObjectInspector. + */ + public LazyHBaseRow(LazySimpleStructObjectInspector oi) { + super(oi); + } + + /** + * Set the hbase row data(a RowResult writable) for this LazyStruct. + * @see LazyHBaseRow#init(RowResult) + */ + public void init(RowResult rr, List hbaseColumns) { + this.rowResult = rr; + this.hbaseColumns = hbaseColumns; + setParsed(false); + } + + /** + * Parse the RowResult and fill each field. + * @see LazyStruct#parse() + */ + private void parse() { + if (getFields() == null) { + List fieldRefs = + ((StructObjectInspector)getInspector()).getAllStructFieldRefs(); + setFields(new LazyObject[fieldRefs.size()]); + for (int i = 0; i < getFields().length; i++) { + if (i > 0) { + String hbaseColumn = hbaseColumns.get(i - 1); + if (hbaseColumn.endsWith(":")) { + // a column family + getFields()[i] = + new LazyHBaseCellMap( + (LazyMapObjectInspector) + fieldRefs.get(i).getFieldObjectInspector()); + continue; + } + } + + getFields()[i] = LazyFactory.createLazyObject( + fieldRefs.get(i).getFieldObjectInspector()); + } + setFieldInited(new boolean[getFields().length]); + } + Arrays.fill(getFieldInited(), false); + setParsed(true); + } + + /** + * Get one field out of the hbase row. + * + * If the field is a primitive field, return the actual object. + * Otherwise return the LazyObject. This is because PrimitiveObjectInspector + * does not have control over the object used by the user - the user simply + * directly uses the Object instead of going through + * Object PrimitiveObjectInspector.get(Object). + * + * @param fieldID The field ID + * @return The field as a LazyObject + */ + public Object getField(int fieldID) { + if (!getParsed()) { + parse(); + } + return uncheckedGetField(fieldID); + } + + /** + * Get the field out of the row without checking whether parsing is needed. + * This is called by both getField and getFieldsAsList. + * @param fieldID The id of the field starting from 0. + * @param nullSequence The sequence representing NULL value. + * @return The value of the field + */ + private Object uncheckedGetField(int fieldID) { + if (!getFieldInited()[fieldID]) { + getFieldInited()[fieldID] = true; + + ByteArrayRef ref = new ByteArrayRef(); + + if (fieldID == 0) { + // the key + ref.setData(rowResult.getRow()); + getFields()[fieldID].init(ref, 0, ref.getData().length); + } else { + String columnName = hbaseColumns.get(fieldID - 1); + if (columnName.endsWith(":")) { + // it is a column family + ((LazyHBaseCellMap) getFields()[fieldID]).init( + rowResult, columnName); + } else { + // it is a column + if (rowResult.containsKey(columnName)) { + ref.setData(rowResult.get(columnName).getValue()); + getFields()[fieldID].init(ref, 0, ref.getData().length); + } else { + return null; + } + } + } + } + return getFields()[fieldID].getObject(); + } + + /** + * Get the values of the fields as an ArrayList. + * @return The values of the fields as an ArrayList. + */ + public ArrayList getFieldsAsList() { + if (!getParsed()) { + parse(); + } + if (cachedList == null) { + cachedList = new ArrayList(); + } else { + cachedList.clear(); + } + for (int i = 0; i < getFields().length; i++) { + cachedList.add(uncheckedGetField(i)); + } + return cachedList; + } + + @Override + public Object getObject() { + return this; + } + +} Added: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java (added) +++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/package-info.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,21 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** Implements an HBase storage handler for Hive. */ +package org.apache.hadoop.hive.hbase; + Added: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java (added) +++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/HBaseQTestUtil.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.hbase; + +import java.io.File; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.MiniHBaseCluster; +import org.apache.hadoop.hbase.MiniZooKeeperCluster; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hive.ql.QTestUtil; +import org.apache.hadoop.mapred.JobConf; +import org.apache.zookeeper.Watcher; + + +/** + * HBaseQTestUtil defines HBase-specific test fixtures. + */ +public class HBaseQTestUtil extends QTestUtil { + + private String tmpdir; + + private MiniHBaseCluster hbase = null; + private MiniZooKeeperCluster zooKeeperCluster; + private static final int NUM_REGIONSERVERS = 1; + + public HBaseQTestUtil( + String outDir, String logDir, boolean miniMr) throws Exception { + + super(outDir, logDir, miniMr); + } + + protected void preTestUtilInit() throws Exception { + // Setup the hbase Cluster + boolean success = false; + try { + conf.set("hbase.master", "local"); + tmpdir = System.getProperty("user.dir")+"/../build/ql/tmp"; + conf.set("hbase.rootdir", "file://" + tmpdir + "/hbase"); + zooKeeperCluster = new MiniZooKeeperCluster(); + int clientPort = zooKeeperCluster.startup( + new File(tmpdir, "zookeeper")); + conf.set("hbase.zookeeper.property.clientPort", + Integer.toString(clientPort)); + HBaseConfiguration hbaseConf = new HBaseConfiguration(conf); + hbase = new MiniHBaseCluster(hbaseConf, NUM_REGIONSERVERS); + conf.set("hbase.master", hbase.getHMasterAddress().toString()); + // opening the META table ensures that cluster is running + new HTable(new HBaseConfiguration(conf), HConstants.META_TABLE_NAME); + success = true; + } finally { + if (!success) { + if (hbase != null) { + hbase.shutdown(); + } + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + } + } + } + + String auxJars = conf.getAuxJars(); + auxJars = ((auxJars == null) ? "" : (auxJars + ",")) + "file://" + + new JobConf(conf, HBaseConfiguration.class).getJar(); + auxJars += ",file://" + new JobConf(conf, HBaseSerDe.class).getJar(); + auxJars += ",file://" + new JobConf(conf, Watcher.class).getJar(); + conf.setAuxJars(auxJars); + } + + public void shutdown() throws Exception { + if (hbase != null) { + HConnectionManager.deleteConnectionInfo( + new HBaseConfiguration(conf), true); + hbase.shutdown(); + hbase = null; + } + if (zooKeeperCluster != null) { + zooKeeperCluster.shutdown(); + zooKeeperCluster = null; + } + + super.shutdown(); + } + +} Added: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=922404&view=auto ============================================================================== --- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (added) +++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Fri Mar 12 19:53:57 2010 @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.hbase; + + +import java.util.List; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.ByteWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; +import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; + +import junit.framework.TestCase; + +/** + * Tests the HBaseSerDe class. + */ +public class TestHBaseSerDe extends TestCase { + + /** + * Test the LazySimpleSerDe class. + */ + public void testHBaseSerDe() throws SerDeException { + // Create the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createProperties(); + serDe.initialize(conf, tbl); + + byte[] colabyte = "cola:abyte".getBytes(); + byte[] colbshort = "colb:ashort".getBytes(); + byte[] colcint = "colc:aint".getBytes(); + byte[] colalong = "cola:along".getBytes(); + byte[] colbdouble = "colb:adouble".getBytes(); + byte[] colcstring = "colc:astring".getBytes(); + + // Data + HbaseMapWritable cells = + new HbaseMapWritable(); + cells.put(colabyte, new Cell("123".getBytes(), 0)); + cells.put(colbshort, new Cell("456".getBytes(), 0)); + cells.put(colcint, new Cell("789".getBytes(), 0)); + cells.put(colalong, new Cell("1000".getBytes(), 0)); + cells.put(colbdouble, new Cell("5.3".getBytes(), 0)); + cells.put(colcstring, new Cell("hive and hadoop".getBytes(), 0)); + RowResult rr = new RowResult("test-row1".getBytes(), cells); + BatchUpdate bu = new BatchUpdate("test-row1".getBytes()); + bu.put(colabyte, "123".getBytes()); + bu.put(colbshort, "456".getBytes()); + bu.put(colcint, "789".getBytes()); + bu.put(colalong, "1000".getBytes()); + bu.put(colbdouble, "5.3".getBytes()); + bu.put(colcstring, "hive and hadoop".getBytes()); + + Object[] expectedFieldsData = { + new Text("test-row1"), + new ByteWritable((byte)123), + new ShortWritable((short)456), + new IntWritable(789), + new LongWritable(1000), + new DoubleWritable(5.3), + new Text("hive and hadoop") + }; + + deserializeAndSerialize(serDe, rr, bu, expectedFieldsData); + } + + private void deserializeAndSerialize( + HBaseSerDe serDe, RowResult rr, BatchUpdate bu, + Object[] expectedFieldsData) throws SerDeException { + + // Get the row structure + StructObjectInspector oi = (StructObjectInspector) + serDe.getObjectInspector(); + List fieldRefs = oi.getAllStructFieldRefs(); + assertEquals(7, fieldRefs.size()); + + // Deserialize + Object row = serDe.deserialize(rr); + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = oi.getStructFieldData(row, fieldRefs.get(i)); + if (fieldData != null) { + fieldData = ((LazyPrimitive)fieldData).getWritableObject(); + } + assertEquals("Field " + i, expectedFieldsData[i], fieldData); + } + // Serialize + assertEquals(BatchUpdate.class, serDe.getSerializedClass()); + BatchUpdate serializedBU = (BatchUpdate)serDe.serialize(row, oi); + assertEquals("Serialized data", bu.toString(), serializedBU.toString()); + } + + private Properties createProperties() { + Properties tbl = new Properties(); + + // Set the configuration parameters + tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9"); + tbl.setProperty("columns", + "key,abyte,ashort,aint,along,adouble,astring"); + tbl.setProperty("columns.types", + "string,tinyint:smallint:int:bigint:double:string"); + tbl.setProperty(HBaseSerDe.HBASE_COL_MAPPING, + "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring"); + return tbl; + } + +}