Return-Path: X-Original-To: apmail-hive-commits-archive@www.apache.org Delivered-To: apmail-hive-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CD8BB10567 for ; Wed, 24 Jul 2013 14:04:49 +0000 (UTC) Received: (qmail 95457 invoked by uid 500); 24 Jul 2013 14:04:49 -0000 Delivered-To: apmail-hive-commits-archive@hive.apache.org Received: (qmail 95300 invoked by uid 500); 24 Jul 2013 14:04:48 -0000 Mailing-List: contact commits-help@hive.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hive-dev@hive.apache.org Delivered-To: mailing list commits@hive.apache.org Received: (qmail 95292 invoked by uid 99); 24 Jul 2013 14:04:47 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jul 2013 14:04:47 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Jul 2013 14:04:45 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E041223888CD; Wed, 24 Jul 2013 14:04:24 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1506563 - in /hive/trunk/hbase-handler/src: java/org/apache/hadoop/hive/hbase/ test/org/apache/hadoop/hive/hbase/ Date: Wed, 24 Jul 2013 14:04:24 -0000 To: commits@hive.apache.org From: brock@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20130724140424.E041223888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: brock Date: Wed Jul 24 14:04:24 2013 New Revision: 1506563 URL: http://svn.apache.org/r1506563 Log: HIVE-3725: Add support for pulling HBase columns with prefixes (Swarnim Kulkarni via Brock Noland) Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Wed Jul 24 14:04:24 2013 @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.util.Byte import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.ByteStream; -import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeStats; import org.apache.hadoop.hive.serde2.SerDeUtils; @@ -69,11 +68,16 @@ public class HBaseSerDe extends Abstract public static final String HBASE_SCAN_CACHE = "hbase.scan.cache"; public static final String HBASE_SCAN_CACHEBLOCKS = "hbase.scan.cacheblock"; public static final String HBASE_SCAN_BATCH = "hbase.scan.batch"; + + /** Determines whether a regex matching should be done on the columns or not. Defaults to true. + * WARNING: Note that currently this only supports the suffix wildcard .* **/ + public static final String HBASE_COLUMNS_REGEX_MATCHING = "hbase.columns.mapping.regex.matching"; public static final Log LOG = LogFactory.getLog(HBaseSerDe.class); private ObjectInspector cachedObjectInspector; private String hbaseColumnsMapping; + private boolean doColumnRegexMatching; private List columnsMapping; private SerDeParameters serdeParams; private boolean useJSONSerialize; @@ -148,6 +152,21 @@ public class HBaseSerDe extends Abstract */ public static List parseColumnsMapping(String columnsMappingSpec) throws SerDeException { + return parseColumnsMapping(columnsMappingSpec, true); + } + + /** + * Parses the HBase columns mapping specifier to identify the column families, qualifiers + * and also caches the byte arrays corresponding to them. One of the Hive table + * columns maps to the HBase row key, by default the first column. + * + * @param columnsMappingSpec string hbase.columns.mapping specified when creating table + * @param doColumnRegexMatching whether to do a regex matching on the columns or not + * @return List which contains the column mapping information by position + * @throws SerDeException + */ + public static List parseColumnsMapping(String columnsMappingSpec, boolean doColumnRegexMatching) + throws SerDeException { if (columnsMappingSpec == null) { throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table."); @@ -193,8 +212,21 @@ public class HBaseSerDe extends Abstract columnMapping.hbaseRowKey = false; if (parts.length == 2) { - columnMapping.qualifierName = parts[1]; - columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + + if (doColumnRegexMatching && parts[1].endsWith(".*")) { + // we have a prefix with a wildcard + columnMapping.qualifierPrefix = parts[1].substring(0, parts[1].length() - 2); + columnMapping.qualifierPrefixBytes = Bytes.toBytes(columnMapping.qualifierPrefix); + // we weren't provided any actual qualifier name. Set these to + // null. + columnMapping.qualifierName = null; + columnMapping.qualifierNameBytes = null; + } else { + // set the regular provided qualifier names + columnMapping.qualifierName = parts[1]; + columnMapping.qualifierNameBytes = Bytes.toBytes(parts[1]); + ; + } } else { columnMapping.qualifierName = null; columnMapping.qualifierNameBytes = null; @@ -413,6 +445,8 @@ public class HBaseSerDe extends Abstract List binaryStorage; boolean hbaseRowKey; String mappingSpec; + String qualifierPrefix; + byte[] qualifierPrefixBytes; } private void initHBaseSerDeParameters( @@ -424,8 +458,10 @@ public class HBaseSerDe extends Abstract String columnTypeProperty = tbl.getProperty(serdeConstants.LIST_COLUMN_TYPES); putTimestamp = Long.valueOf(tbl.getProperty(HBaseSerDe.HBASE_PUT_TIMESTAMP,"-1")); + doColumnRegexMatching = Boolean.valueOf(tbl.getProperty(HBASE_COLUMNS_REGEX_MATCHING, "true")); + // Parse and initialize the HBase columns mapping - columnsMapping = parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); // Build the type property string if not supplied if (columnTypeProperty == null) { @@ -798,6 +834,7 @@ public class HBaseSerDe extends Abstract return columnsMapping.get(colPos).binaryStorage; } + @Override public SerDeStats getSerDeStats() { // no support for statistics return null; Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Wed Jul 24 14:04:24 2013 @@ -39,9 +39,9 @@ import org.apache.hadoop.hbase.util.Byte import org.apache.hadoop.hive.hbase.HBaseSerDe.ColumnMapping; import org.apache.hadoop.hive.metastore.HiveMetaHook; import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; import org.apache.hadoop.hive.ql.index.IndexPredicateAnalyzer; import org.apache.hadoop.hive.ql.index.IndexSearchCondition; import org.apache.hadoop.hive.ql.metadata.DefaultStorageHandler; @@ -279,6 +279,8 @@ public class HBaseStorageHandler extends jobProperties.put( HBaseSerDe.HBASE_COLUMNS_MAPPING, tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING)); + jobProperties.put(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, + tableProperties.getProperty(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, "true")); jobProperties.put(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE, tableProperties.getProperty(HBaseSerDe.HBASE_TABLE_DEFAULT_STORAGE_TYPE,"string")); String scanCache = tableProperties.getProperty(HBaseSerDe.HBASE_SCAN_CACHE); Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Wed Jul 24 14:04:24 2013 @@ -91,11 +91,12 @@ public class HiveHBaseTableInputFormat e String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); List readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf); List columnsMapping = null; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping, doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } @@ -434,6 +435,7 @@ public class HiveHBaseTableInputFormat e String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME); setHTable(new HTable(HBaseConfiguration.create(jobConf), Bytes.toBytes(hbaseTableName))); String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING); + boolean doColumnRegexMatching = jobConf.getBoolean(HBaseSerDe.HBASE_COLUMNS_REGEX_MATCHING, true); if (hbaseColumnsMapping == null) { throw new IOException("hbase.columns.mapping required for HBase Table."); @@ -441,7 +443,7 @@ public class HiveHBaseTableInputFormat e List columnsMapping = null; try { - columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping); + columnsMapping = HBaseSerDe.parseColumnsMapping(hbaseColumnsMapping,doColumnRegexMatching); } catch (SerDeException e) { throw new IOException(e); } Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseCellMap.java Wed Jul 24 14:04:24 2013 @@ -21,10 +21,11 @@ package org.apache.hadoop.hive.hbase; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; -import java.util.NavigableMap; import java.util.Map.Entry; +import java.util.NavigableMap; import org.apache.hadoop.hbase.client.Result; +import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyFactory; import org.apache.hadoop.hive.serde2.lazy.LazyMap; @@ -42,6 +43,7 @@ public class LazyHBaseCellMap extends La private Result result; private byte [] columnFamilyBytes; + private byte[] qualPrefix; private List binaryStorage; /** @@ -54,12 +56,21 @@ public class LazyHBaseCellMap extends La public void init( Result r, - byte [] columnFamilyBytes, + byte[] columnFamilyBytes, List binaryStorage) { + init(r, columnFamilyBytes, binaryStorage, null); + } + + public void init( + Result r, + byte [] columnFamilyBytes, + List binaryStorage, byte[] qualPrefix) { + result = r; this.columnFamilyBytes = columnFamilyBytes; this.binaryStorage = binaryStorage; + this.qualPrefix = qualPrefix; setParsed(false); } @@ -80,6 +91,12 @@ public class LazyHBaseCellMap extends La continue; } + if (qualPrefix != null && !Bytes.startsWith(e.getKey(), qualPrefix)) { + // since we were provided a qualifier prefix, only accept qualifiers that start with this + // prefix + continue; + } + LazyMapObjectInspector lazyMoi = getInspector(); // Keys are always primitive Modified: hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original) +++ hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Wed Jul 24 14:04:24 2013 @@ -142,9 +142,11 @@ public class LazyHBaseRow extends LazySt } else { if (colMap.qualifierName == null) { // it is a column family - // primitive type for Map can be stored in binary format + // primitive type for Map can be stored in binary format. Pass in the + // qualifier prefix to cherry pick the qualifiers that match the prefix instead of picking + // up everything ((LazyHBaseCellMap) fields[fieldID]).init( - result, colMap.familyNameBytes, colMap.binaryStorage); + result, colMap.familyNameBytes, colMap.binaryStorage, colMap.qualifierPrefixBytes); } else { // it is a column i.e. a column-family with column-qualifier byte [] res = result.getValue(colMap.familyNameBytes, colMap.qualifierNameBytes); Modified: hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java URL: http://svn.apache.org/viewvc/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=1506563&r1=1506562&r2=1506563&view=diff ============================================================================== --- hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original) +++ hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Wed Jul 24 14:04:24 2013 @@ -18,6 +18,7 @@ package org.apache.hadoop.hive.hbase; +import java.io.IOException; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -32,6 +33,7 @@ import org.apache.hadoop.hbase.client.Re import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.SerDeUtils; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.ShortWritable; @@ -688,4 +690,123 @@ public class TestHBaseSerDe extends Test Put serializedPut = (Put) hbaseSerDe.serialize(row, soi); assertEquals("Serialized data: ", p.toString(), serializedPut.toString()); } + + public void testHBaseSerDeWithColumnPrefixes() + throws Exception { + byte[] cfa = "cola".getBytes(); + + byte[] qualA = "prefixA_col1".getBytes(); + byte[] qualB = "prefixB_col2".getBytes(); + byte[] qualC = "prefixB_col3".getBytes(); + byte[] qualD = "unwanted_col".getBytes(); + + List qualifiers = new ArrayList(); + qualifiers.add(new Text("prefixA_col1")); + qualifiers.add(new Text("prefixB_col2")); + qualifiers.add(new Text("prefixB_col3")); + qualifiers.add(new Text("unwanted_col")); + + List expectedQualifiers = new ArrayList(); + expectedQualifiers.add(new Text("prefixA_col1")); + expectedQualifiers.add(new Text("prefixB_col2")); + expectedQualifiers.add(new Text("prefixB_col3")); + + byte[] rowKey = Bytes.toBytes("test-row1"); + + // Data + List kvs = new ArrayList(); + + byte[] dataA = "This is first test data".getBytes(); + byte[] dataB = "This is second test data".getBytes(); + byte[] dataC = "This is third test data".getBytes(); + byte[] dataD = "Unwanted data".getBytes(); + + kvs.add(new KeyValue(rowKey, cfa, qualA, dataA)); + kvs.add(new KeyValue(rowKey, cfa, qualB, dataB)); + kvs.add(new KeyValue(rowKey, cfa, qualC, dataC)); + kvs.add(new KeyValue(rowKey, cfa, qualD, dataD)); + + Result r = new Result(kvs); + + Put p = new Put(rowKey); + + p.add(new KeyValue(rowKey, cfa, qualA, dataA)); + p.add(new KeyValue(rowKey, cfa, qualB, dataB)); + p.add(new KeyValue(rowKey, cfa, qualC, dataC)); + + Object[] expectedFieldsData = { + new Text("test-row1"), + new String("This is first test data"), + new String("This is second test data"), + new String("This is third test data")}; + + int[] expectedMapSize = new int[] {1, 2}; + + // Create, initialize, and test the SerDe + HBaseSerDe serDe = new HBaseSerDe(); + Configuration conf = new Configuration(); + Properties tbl = createPropertiesForColumnPrefixes(); + serDe.initialize(conf, tbl); + + Object notPresentKey = new Text("unwanted_col"); + + deserializeAndSerializeHivePrefixColumnFamily(serDe, r, p, expectedFieldsData, expectedMapSize, + expectedQualifiers, + notPresentKey); + } + + private Properties createPropertiesForColumnPrefixes() { + Properties tbl = new Properties(); + tbl.setProperty(serdeConstants.LIST_COLUMNS, + "key,astring,along"); + tbl.setProperty(serdeConstants.LIST_COLUMN_TYPES, + "string:map:map"); + tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING, + ":key,cola:prefixA_.*,cola:prefixB_.*"); + + return tbl; + } + + private void deserializeAndSerializeHivePrefixColumnFamily(HBaseSerDe serDe, Result r, Put p, + Object[] expectedFieldsData, int[] expectedMapSize, List expectedQualifiers, + Object notPresentKey) + throws SerDeException, IOException { + StructObjectInspector soi = (StructObjectInspector) serDe.getObjectInspector(); + + List fieldRefs = soi.getAllStructFieldRefs(); + + Object row = serDe.deserialize(r); + + int j = 0; + + for (int i = 0; i < fieldRefs.size(); i++) { + Object fieldData = soi.getStructFieldData(row, fieldRefs.get(i)); + assertNotNull(fieldData); + + if (fieldData instanceof LazyPrimitive) { + assertEquals(expectedFieldsData[i], ((LazyPrimitive) fieldData).getWritableObject()); + } else if (fieldData instanceof LazyHBaseCellMap) { + assertEquals(expectedFieldsData[i], ((LazyHBaseCellMap) fieldData) + .getMapValueElement(expectedQualifiers.get(j)).toString().trim()); + + assertEquals(expectedMapSize[j], ((LazyHBaseCellMap) fieldData).getMapSize()); + // Make sure that the unwanted key is not present in the map + assertNull(((LazyHBaseCellMap) fieldData).getMapValueElement(notPresentKey)); + + j++; + + } else { + fail("Error: field data not an instance of LazyPrimitive or LazyHBaseCellMap"); + } + } + + SerDeUtils.getJSONString(row, soi); + + // Now serialize + Put put = (Put) serDe.serialize(row, soi); + + if (p != null) { + assertEquals("Serialized put:", p.toString(), put.toString()); + } + } }