hadoop-hive-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j..@apache.org
Subject svn commit: r988211 - in /hadoop/hive/trunk: ./ hbase-handler/src/java/org/apache/hadoop/hive/hbase/ hbase-handler/src/test/org/apache/hadoop/hive/hbase/
Date Mon, 23 Aug 2010 17:09:42 GMT
Author: jvs
Date: Mon Aug 23 17:09:41 2010
New Revision: 988211

URL: http://svn.apache.org/viewvc?rev=988211&view=rev
Log:
HIVE-1512. Need to get hive_hbase-handler to work with hbase
versions 0.20.4 0.20.5 and cloudera CDH3 version
(Basab Maulik via jvs)


Modified:
    hadoop/hive/trunk/CHANGES.txt
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
    hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
    hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java

Modified: hadoop/hive/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/CHANGES.txt?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/CHANGES.txt (original)
+++ hadoop/hive/trunk/CHANGES.txt Mon Aug 23 17:09:41 2010
@@ -92,6 +92,10 @@ Trunk -  Unreleased
     index rebuild
     (He Yongqiang via jvs)
 
+    HIVE-1512. Need to get hive_hbase-handler to work with hbase
+    versions 0.20.4 0.20.5 and cloudera CDH3 version
+    (Basab Maulik via jvs)
+
   OPTIMIZATIONS
 
   BUG FIXES
@@ -178,7 +182,7 @@ Trunk -  Unreleased
     HIVE-1561. test bug: smb_mapjoin*.q should use bucketizedhiveinputformat
     (He Yongqiang via namit)
 
-    HIVE-1510. HiveCombineInputFormat should not use prefix matching to find 
+    HIVE-1510. HiveCombineInputFormat should not use prefix matching to find
     the partitionDesc for a given path
     (He Yongqiang via Ning Zhang)
 

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseSerDe.java Mon Aug 23 17:09:41 2010
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -66,8 +65,11 @@ public class HBaseSerDe implements SerDe
   public static final Log LOG = LogFactory.getLog(HBaseSerDe.class);
 
   private ObjectInspector cachedObjectInspector;
-  private List<String> hbaseColumnNames;
-  private List<byte []> hbaseColumnNamesBytes;
+  private String hbaseColumnsMapping;
+  private List<String> hbaseColumnFamilies;
+  private List<byte []> hbaseColumnFamiliesBytes;
+  private List<String> hbaseColumnQualifiers;
+  private List<byte []> hbaseColumnQualifiersBytes;
   private SerDeParameters serdeParams;
   private boolean useJSONSerialize;
   private LazyHBaseRow cachedHBaseRow;
@@ -85,7 +87,7 @@ public class HBaseSerDe implements SerDe
   public String toString() {
     return getClass().toString()
         + "["
-        + hbaseColumnNames
+        + hbaseColumnsMapping
         + ":"
         + ((StructTypeInfo) serdeParams.getRowTypeInfo())
             .getAllStructFieldNames()
@@ -126,36 +128,117 @@ public class HBaseSerDe implements SerDe
         + " columnTypes = "
         + serdeParams.getColumnTypes()
         + " hbaseColumnMapping = "
-        + hbaseColumnNames);
+        + hbaseColumnsMapping);
     }
   }
 
-  public static List<String> parseColumnMapping(String columnMapping) {
-    String [] columnArray = columnMapping.split(",");
-    List<String> columnList = Arrays.asList(columnArray);
-    int iKey = columnList.indexOf(HBASE_KEY_COL);
-    if (iKey == -1) {
-      columnList = new ArrayList<String>(columnList);
-      columnList.add(0, HBASE_KEY_COL);
+  /**
+   * Parses the HBase columns mapping to identify the column families, qualifiers
+   * and also caches the byte arrays corresponding to them. One of the Hive table
+   * columns maps to the HBase row key, by default the first column.
+   *
+   * @param columnMapping - the column mapping specification to be parsed
+   * @param colFamilies - the list of HBase column family names
+   * @param colFamiliesBytes - the corresponding byte array
+   * @param colQualifiers - the list of HBase column qualifier names
+   * @param colQualifiersBytes - the corresponding byte array
+   * @return the row key index in the column names list
+   * @throws SerDeException
+   */
+  public static int parseColumnMapping(
+      String columnMapping,
+      List<String> colFamilies,
+      List<byte []> colFamiliesBytes,
+      List<String> colQualifiers,
+      List<byte []> colQualifiersBytes) throws SerDeException {
+
+    int rowKeyIndex = -1;
+
+    if (colFamilies == null || colQualifiers == null) {
+      throw new SerDeException("Error: caller must pass in lists for the column families " +
+          "and qualifiers.");
     }
-    return columnList;
-  }
 
-  public static List<byte []> initColumnNamesBytes(List<String> columnNames) {
-    List<byte []> columnBytes = new ArrayList<byte []>();
-    String column = null;
+    colFamilies.clear();
+    colQualifiers.clear();
+
+    if (columnMapping == null) {
+      throw new SerDeException("Error: hbase.columns.mapping missing for this HBase table.");
+    }
 
-    for (int i = 0; i < columnNames.size(); i++) {
-      column = columnNames.get(i);
+    if (columnMapping.equals("") || columnMapping.equals(HBASE_KEY_COL)) {
+      throw new SerDeException("Error: hbase.columns.mapping specifies only the HBase table"
+          + " row key. A valid Hive-HBase table must specify at least one additional column.");
+    }
 
-      if (column.endsWith(":")) {
-        columnBytes.add(Bytes.toBytes(column.split(":")[0]));
+    String [] mapping = columnMapping.split(",");
+
+    for (int i = 0; i < mapping.length; i++) {
+      String elem = mapping[i];
+      int idxFirst = elem.indexOf(":");
+      int idxLast = elem.lastIndexOf(":");
+
+      if (idxFirst < 0 || !(idxFirst == idxLast)) {
+        throw new SerDeException("Error: the HBase columns mapping contains a badly formed " +
+            "column family, column qualifier specification.");
+      }
+
+      if (elem.equals(HBASE_KEY_COL)) {
+        rowKeyIndex = i;
+        colFamilies.add(elem);
+        colQualifiers.add(null);
       } else {
-        columnBytes.add(Bytes.toBytes(column));
+        String [] parts = elem.split(":");
+        assert(parts.length > 0 && parts.length <= 2);
+        colFamilies.add(parts[0]);
+
+        if (parts.length == 2) {
+          colQualifiers.add(parts[1]);
+        } else {
+          colQualifiers.add(null);
+        }
+      }
+    }
+
+    if (rowKeyIndex == -1) {
+      colFamilies.add(0, HBASE_KEY_COL);
+      colQualifiers.add(0, null);
+      rowKeyIndex = 0;
+    }
+
+    if (colFamilies.size() != colQualifiers.size()) {
+      throw new SerDeException("Error in parsing the hbase columns mapping.");
+    }
+
+    // populate the corresponding byte [] if the client has passed in a non-null list
+    if (colFamiliesBytes != null) {
+      colFamiliesBytes.clear();
+
+      for (String fam : colFamilies) {
+        colFamiliesBytes.add(Bytes.toBytes(fam));
       }
     }
 
-    return columnBytes;
+    if (colQualifiersBytes != null) {
+      colQualifiersBytes.clear();
+
+      for (String qual : colQualifiers) {
+        if (qual == null) {
+          colQualifiersBytes.add(null);
+        } else {
+          colQualifiersBytes.add(Bytes.toBytes(qual));
+        }
+      }
+    }
+
+    if (colFamiliesBytes != null && colQualifiersBytes != null) {
+      if (colFamiliesBytes.size() != colQualifiersBytes.size()) {
+        throw new SerDeException("Error in caching the bytes for the hbase column families " +
+            "and qualifiers.");
+      }
+    }
+
+    return rowKeyIndex;
   }
 
   public static boolean isSpecialColumn(String hbaseColumnName) {
@@ -167,31 +250,31 @@ public class HBaseSerDe implements SerDe
     throws SerDeException {
 
     // Read configuration parameters
-    String hbaseColumnNameProperty =
-      tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-    String columnTypeProperty =
-      tbl.getProperty(Constants.LIST_COLUMN_TYPES);
-
-    // Initialize the HBase column list
-    hbaseColumnNames = parseColumnMapping(hbaseColumnNameProperty);
-    iKey = hbaseColumnNames.indexOf(HBASE_KEY_COL);
+    hbaseColumnsMapping = tbl.getProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+    String columnTypeProperty = tbl.getProperty(Constants.LIST_COLUMN_TYPES);
 
-    // initialize the byte [] corresponding to each column name
-    hbaseColumnNamesBytes = initColumnNamesBytes(hbaseColumnNames);
+    // Parse the HBase columns mapping and initialize the col family & qualifiers
+    hbaseColumnFamilies = new ArrayList<String>();
+    hbaseColumnFamiliesBytes = new ArrayList<byte []>();
+    hbaseColumnQualifiers = new ArrayList<String>();
+    hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+    iKey = parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
+        hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
 
     // Build the type property string if not supplied
     if (columnTypeProperty == null) {
       StringBuilder sb = new StringBuilder();
 
-      for (int i = 0; i < hbaseColumnNames.size(); i++) {
+      for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
         if (sb.length() > 0) {
           sb.append(":");
         }
-        String colName = hbaseColumnNames.get(i);
-        if (isSpecialColumn(colName)) {
-            // a special column becomes a STRING
+        String colFamily = hbaseColumnFamilies.get(i);
+        String colQualifier = hbaseColumnQualifiers.get(i);
+        if (isSpecialColumn(colFamily)) {
+            // the row key column becomes a STRING
             sb.append(Constants.STRING_TYPE_NAME);
-        } else if (colName.endsWith(":"))  {
+        } else if (colQualifier == null)  {
           // a column family become a MAP
           sb.append(
             Constants.MAP_TYPE_NAME + "<"
@@ -207,11 +290,11 @@ public class HBaseSerDe implements SerDe
 
     serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, serdeName);
 
-    if (hbaseColumnNames.size() != serdeParams.getColumnNames().size()) {
+    if (hbaseColumnFamilies.size() != serdeParams.getColumnNames().size()) {
       throw new SerDeException(serdeName + ": columns has " +
         serdeParams.getColumnNames().size() +
         " elements while hbase.columns.mapping has " +
-        hbaseColumnNames.size() + " elements" +
+        hbaseColumnFamilies.size() + " elements" +
         " (counting the key if implicit)");
     }
 
@@ -221,10 +304,11 @@ public class HBaseSerDe implements SerDe
     needsEscape = serdeParams.getNeedsEscape();
 
     // check that the mapping schema is right;
-    // we just can make sure that "column-family:" is mapped to MAP<String,?>
-    for (int i = 0; i < hbaseColumnNames.size(); i++) {
-      String hbaseColName = hbaseColumnNames.get(i);
-      if (hbaseColName.endsWith(":")) {
+    // check that the "column-family:" is mapped to MAP<String,?>
+    for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+      String colFamily = hbaseColumnFamilies.get(i);
+      String colQualifier = hbaseColumnQualifiers.get(i);
+      if (colQualifier == null && !isSpecialColumn(colFamily)) {
         TypeInfo typeInfo = serdeParams.getColumnTypes().get(i);
         if ((typeInfo.getCategory() != Category.MAP) ||
           (((MapTypeInfo) typeInfo).getMapKeyTypeInfo().getTypeName()
@@ -232,8 +316,8 @@ public class HBaseSerDe implements SerDe
 
           throw new SerDeException(
             serdeName + ": hbase column family '"
-            + hbaseColName
-            + "' should be mapped to map<string,?> but is mapped to "
+            + colFamily
+            + "' should be mapped to Map<String,?> but is mapped to "
             + typeInfo.getTypeName());
         }
       }
@@ -253,8 +337,9 @@ public class HBaseSerDe implements SerDe
       throw new SerDeException(getClass().getName() + ": expects Result!");
     }
 
-    Result r = (Result)result;
-    cachedHBaseRow.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
+    cachedHBaseRow.init((Result) result, hbaseColumnFamilies, hbaseColumnFamiliesBytes,
+        hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+
     return cachedHBaseRow;
   }
 
@@ -322,7 +407,8 @@ public class HBaseSerDe implements SerDe
     List<? extends StructField> declaredFields) throws IOException {
 
     // column name
-    String hbaseColumn = hbaseColumnNames.get(i);
+    String hbaseColumnFamily = hbaseColumnFamilies.get(i);
+    String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
 
     // Get the field objectInspector and the field object.
     ObjectInspector foi = fields.get(i).getFieldObjectInspector();
@@ -333,8 +419,8 @@ public class HBaseSerDe implements SerDe
       return null;
     }
 
-    // If the field corresponds to a column family in hbase
-    if (hbaseColumn.endsWith(":")) {
+    // If the field corresponds to a column family in HBase
+    if (hbaseColumnQualifier == null && !isSpecialColumn(hbaseColumnFamily)) {
       MapObjectInspector moi = (MapObjectInspector)foi;
       ObjectInspector koi = moi.getMapKeyObjectInspector();
       ObjectInspector voi = moi.getMapValueObjectInspector();
@@ -349,8 +435,9 @@ public class HBaseSerDe implements SerDe
           serialize(entry.getKey(), koi, 3);
 
           // Get the column-qualifier
-          byte [] columnQualifier = new byte[serializeStream.getCount()];
-          System.arraycopy(serializeStream.getData(), 0, columnQualifier, 0, serializeStream.getCount());
+          byte [] columnQualifierBytes = new byte[serializeStream.getCount()];
+          System.arraycopy(
+              serializeStream.getData(), 0, columnQualifierBytes, 0, serializeStream.getCount());
 
           // Get the Value
           serializeStream.reset();
@@ -360,7 +447,7 @@ public class HBaseSerDe implements SerDe
           }
           byte [] value = new byte[serializeStream.getCount()];
           System.arraycopy(serializeStream.getData(), 0, value, 0, serializeStream.getCount());
-          put.add(hbaseColumnNamesBytes.get(i), columnQualifier, value);
+          put.add(hbaseColumnFamiliesBytes.get(i), columnQualifierBytes, value);
         }
       }
     } else {
@@ -391,7 +478,7 @@ public class HBaseSerDe implements SerDe
       if (i == iKey) {
         return key;
       }
-      put.add(hbaseColumnNamesBytes.get(i), 0, key);
+      put.add(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i), key);
     }
 
     return null;

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HBaseStorageHandler.java Mon Aug 23 17:09:41 2010
@@ -19,6 +19,7 @@
 package org.apache.hadoop.hive.hbase;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -41,6 +42,7 @@ import org.apache.hadoop.hive.metastore.
 import org.apache.hadoop.hive.ql.metadata.HiveStorageHandler;
 import org.apache.hadoop.hive.ql.plan.TableDesc;
 import org.apache.hadoop.hive.serde2.SerDe;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.OutputFormat;
 import org.apache.hadoop.util.StringUtils;
@@ -49,8 +51,7 @@ import org.apache.hadoop.util.StringUtil
  * HBaseStorageHandler provides a HiveStorageHandler implementation for
  * HBase.
  */
-public class HBaseStorageHandler
-  implements HiveStorageHandler, HiveMetaHook {
+public class HBaseStorageHandler implements HiveStorageHandler, HiveMetaHook {
 
   private HBaseConfiguration hbaseConf;
   private HBaseAdmin admin;
@@ -120,77 +121,71 @@ public class HBaseStorageHandler
     }
 
     try {
-      String tblName = getHBaseTableName(tbl);
-
-      // Build the mapping schema
-      Set<String> columnFamilies = new HashSet<String>();
-      // Check the hbase columns and get all the families
-      Map<String, String> serdeParam =
-        tbl.getSd().getSerdeInfo().getParameters();
+      String tableName = getHBaseTableName(tbl);
+      Map<String, String> serdeParam = tbl.getSd().getSerdeInfo().getParameters();
       String hbaseColumnsMapping = serdeParam.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
       if (hbaseColumnsMapping == null) {
         throw new MetaException("No hbase.columns.mapping defined in Serde.");
       }
-      List<String> hbaseColumns =
-        HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
-      int iKeyFirst = hbaseColumns.indexOf(HBaseSerDe.HBASE_KEY_COL);
-      int iKeyLast = hbaseColumns.lastIndexOf(HBaseSerDe.HBASE_KEY_COL);
-      if (iKeyFirst != iKeyLast) {
-        throw new MetaException("Multiple key columns defined in hbase.columns.mapping.");
-      }
-      for (String hbaseColumn : hbaseColumns) {
-        if (HBaseSerDe.isSpecialColumn(hbaseColumn)) {
-          continue;
-        }
-        int idx = hbaseColumn.indexOf(":");
-        if (idx < 0) {
-          throw new MetaException(
-            hbaseColumn + " is not a qualified hbase column.");
-        }
-        columnFamilies.add(hbaseColumn.substring(0, idx));
-      }
 
-      // Check if the given hbase table exists
-      HTableDescriptor tblDesc;
+      List<String> hbaseColumnFamilies = new ArrayList<String>();
+      List<String> hbaseColumnQualifiers = new ArrayList<String>();
+      List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
+      List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+      int iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
+          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
 
-      if (!getHBaseAdmin().tableExists(tblName)) {
+      HTableDescriptor tableDesc;
+
+      if (!getHBaseAdmin().tableExists(tableName)) {
         // if it is not an external table then create one
         if (!isExternal) {
-          // Create the all column descriptors
-          tblDesc = new HTableDescriptor(tblName);
-          for (String cf : columnFamilies) {
-            tblDesc.addFamily(new HColumnDescriptor(cf + ":"));
+          // Create the column descriptors
+          tableDesc = new HTableDescriptor(tableName);
+          Set<String> uniqueColumnFamilies = new HashSet<String>(hbaseColumnFamilies);
+          uniqueColumnFamilies.remove(hbaseColumnFamilies.get(iKey));
+
+          for (String columnFamily : uniqueColumnFamilies) {
+            tableDesc.addFamily(new HColumnDescriptor(Bytes.toBytes(columnFamily)));
           }
 
-          getHBaseAdmin().createTable(tblDesc);
+          getHBaseAdmin().createTable(tableDesc);
         } else {
           // an external table
-          throw new MetaException("HBase table " + tblName +
+          throw new MetaException("HBase table " + tableName +
               " doesn't exist while the table is declared as an external table.");
         }
 
       } else {
         if (!isExternal) {
-          throw new MetaException("Table " + tblName + " already exists"
+          throw new MetaException("Table " + tableName + " already exists"
             + " within HBase; use CREATE EXTERNAL TABLE instead to"
             + " register it in Hive.");
         }
         // make sure the schema mapping is right
-        tblDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tblName));
-        for (String cf : columnFamilies) {
-          if (!tblDesc.hasFamily(Bytes.toBytes(cf))) {
-            throw new MetaException("Column Family " + cf
-              + " is not defined in hbase table " + tblName);
+        tableDesc = getHBaseAdmin().getTableDescriptor(Bytes.toBytes(tableName));
+
+        for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+          if (i == iKey) {
+            continue;
           }
-        }
 
+          if (!tableDesc.hasFamily(hbaseColumnFamiliesBytes.get(i))) {
+            throw new MetaException("Column Family " + hbaseColumnFamilies.get(i)
+                + " is not defined in hbase table " + tableName);
+          }
+        }
       }
+
       // ensure the table is online
-      new HTable(hbaseConf, tblDesc.getName());
+      new HTable(hbaseConf, tableDesc.getName());
     } catch (MasterNotRunningException mnre) {
       throw new MetaException(StringUtils.stringifyException(mnre));
     } catch (IOException ie) {
       throw new MetaException(StringUtils.stringifyException(ie));
+    } catch (SerDeException se) {
+      throw new MetaException(StringUtils.stringifyException(se));
     }
   }
 
@@ -200,7 +195,7 @@ public class HBaseStorageHandler
     String tableName = getHBaseTableName(table);
     try {
       if (!isExternal && getHBaseAdmin().tableExists(tableName)) {
-        // we have create an hbase table, so we delete it to roll back;
+        // we have created an HBase table, so we delete it to roll back;
         if (getHBaseAdmin().isTableEnabled(tableName)) {
           getHBaseAdmin().disableTable(tableName);
         }

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java Mon Aug 23 17:09:41 2010
@@ -35,6 +35,7 @@ import org.apache.hadoop.hbase.mapreduce
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Writables;
 import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.mapred.InputFormat;
 import org.apache.hadoop.mapred.InputSplit;
 import org.apache.hadoop.mapred.JobConf;
@@ -65,39 +66,69 @@ public class HiveHBaseTableInputFormat e
     HBaseSplit hbaseSplit = (HBaseSplit) split;
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
     setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
-
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
-    List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
+    List<String> hbaseColumnFamilies = new ArrayList<String>();
+    List<String> hbaseColumnQualifiers = new ArrayList<String>();
+    List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
+    List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+
+    int iKey;
+    try {
+      iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
+          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+    } catch (SerDeException se) {
+      throw new IOException(se);
+    }
     List<Integer> readColIDs = ColumnProjectionUtils.getReadColumnIDs(jobConf);
 
-    if (columns.size() < readColIDs.size()) {
+    if (hbaseColumnFamilies.size() < readColIDs.size()) {
       throw new IOException("Cannot read more columns than the given table contains.");
     }
 
-    List<byte []> scanColumns = new ArrayList<byte []>();
     boolean addAll = (readColIDs.size() == 0);
+    Scan scan = new Scan();
+    boolean empty = true;
+
     if (!addAll) {
-      for (int iColumn : readColIDs) {
-        String column = columns.get(iColumn);
-        if (HBaseSerDe.isSpecialColumn(column)) {
+      for (int i : readColIDs) {
+        if (i == iKey) {
           continue;
         }
-        scanColumns.add(Bytes.toBytes(column));
+
+        if (hbaseColumnQualifiers.get(i) == null) {
+          scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+        } else {
+          scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+        }
+
+        empty = false;
       }
     }
-    if (scanColumns.isEmpty()) {
-      for (String column : columns) {
-        if (HBaseSerDe.isSpecialColumn(column)) {
+
+    // The HBase table's row key maps to an Hive table column. In the corner case when only the
+    // row key column is selected in Hive, the HBase Scan will be empty i.e. no column family/
+    // column qualifier will have been added to the scan. We arbitrarily add at least one column
+    // to the HBase scan so that we can retrieve all of the row keys and return them as the Hive
+    // tables column projection.
+    if (empty) {
+      for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+        if (i == iKey) {
           continue;
         }
-        scanColumns.add(Bytes.toBytes(column));
+
+        if (hbaseColumnQualifiers.get(i) == null) {
+          scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+        } else {
+          scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+        }
+
         if (!addAll) {
           break;
         }
       }
     }
 
-    setScan(new Scan().addColumns(scanColumns.toArray(new byte[0][])));
+    setScan(scan);
     org.apache.hadoop.hbase.mapreduce.TableSplit tableSplit = hbaseSplit.getSplit();
 
     Job job = new Job(jobConf);
@@ -175,22 +206,41 @@ public class HiveHBaseTableInputFormat e
     String hbaseTableName = jobConf.get(HBaseSerDe.HBASE_TABLE_NAME);
     setHTable(new HTable(new HBaseConfiguration(jobConf), Bytes.toBytes(hbaseTableName)));
     String hbaseColumnsMapping = jobConf.get(HBaseSerDe.HBASE_COLUMNS_MAPPING);
+
     if (hbaseColumnsMapping == null) {
       throw new IOException("hbase.columns.mapping required for HBase Table.");
     }
 
+    List<String> hbaseColumnFamilies = new ArrayList<String>();
+    List<String> hbaseColumnQualifiers = new ArrayList<String>();
+    List<byte []> hbaseColumnFamiliesBytes = new ArrayList<byte []>();
+    List<byte []> hbaseColumnQualifiersBytes = new ArrayList<byte []>();
+
+    int iKey;
+    try {
+      iKey = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping, hbaseColumnFamilies,
+          hbaseColumnFamiliesBytes, hbaseColumnQualifiers, hbaseColumnQualifiersBytes);
+    } catch (SerDeException se) {
+      throw new IOException(se);
+    }
+
+    Scan scan = new Scan();
+
     // REVIEW:  are we supposed to be applying the getReadColumnIDs
     // same as in getRecordReader?
-    List<String> columns = HBaseSerDe.parseColumnMapping(hbaseColumnsMapping);
-    List<byte []> inputColumns = new ArrayList<byte []>();
-    for (String column : columns) {
-      if (HBaseSerDe.isSpecialColumn(column)) {
+    for (int i = 0; i < hbaseColumnFamilies.size(); i++) {
+      if (i == iKey) {
         continue;
       }
-      inputColumns.add(Bytes.toBytes(column));
+
+      if (hbaseColumnQualifiers.get(i) == null) {
+        scan.addFamily(hbaseColumnFamiliesBytes.get(i));
+      } else {
+        scan.addColumn(hbaseColumnFamiliesBytes.get(i), hbaseColumnQualifiersBytes.get(i));
+      }
     }
 
-    setScan(new Scan().addColumns(inputColumns.toArray(new byte[0][])));
+    setScan(scan);
     Job job = new Job(jobConf);
     JobContext jobContext = new JobContext(job.getConfiguration(), job.getJobID());
     Path [] tablePaths = FileInputFormat.getInputPaths(jobContext);

Modified: hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/java/org/apache/hadoop/hive/hbase/LazyHBaseRow.java Mon Aug 23 17:09:41 2010
@@ -42,8 +42,10 @@ public class LazyHBaseRow extends LazySt
    * The HBase columns mapping of the row.
    */
   private Result result;
-  private List<String> hbaseColumns;
-  private List<byte []> hbaseColumnsBytes;
+  private List<String> hbaseColumnFamilies;
+  private List<byte []> hbaseColumnFamiliesBytes;
+  private List<String> hbaseColumnQualifiers;
+  private List<byte []> hbaseColumnQualifiersBytes;
   private ArrayList<Object> cachedList;
 
   /**
@@ -57,10 +59,18 @@ public class LazyHBaseRow extends LazySt
    * Set the HBase row data(a Result writable) for this LazyStruct.
    * @see LazyHBaseRow#init(Result)
    */
-  public void init(Result r, List<String> hbaseColumns, List<byte []> hbaseColumnsBytes) {
+  public void init(
+      Result r,
+      List<String> hbaseColumnFamilies,
+      List<byte []> hbaseColumnFamiliesBytes,
+      List<String> hbaseColumnQualifiers,
+      List<byte []> hbaseColumnQualifiersBytes) {
+
     result = r;
-    this.hbaseColumns = hbaseColumns;
-    this.hbaseColumnsBytes = hbaseColumnsBytes;
+    this.hbaseColumnFamilies = hbaseColumnFamilies;
+    this.hbaseColumnFamiliesBytes = hbaseColumnFamiliesBytes;
+    this.hbaseColumnQualifiers = hbaseColumnQualifiers;
+    this.hbaseColumnQualifiersBytes = hbaseColumnQualifiersBytes;
     setParsed(false);
   }
 
@@ -74,18 +84,17 @@ public class LazyHBaseRow extends LazySt
         ((StructObjectInspector)getInspector()).getAllStructFieldRefs();
       setFields(new LazyObject[fieldRefs.size()]);
       for (int i = 0; i < getFields().length; i++) {
-        String hbaseColumn = hbaseColumns.get(i);
-        if (hbaseColumn.endsWith(":")) {
+        String hbaseColumnFamily = hbaseColumnFamilies.get(i);
+        String hbaseColumnQualifier = hbaseColumnQualifiers.get(i);
+
+        if (hbaseColumnQualifier == null && !HBaseSerDe.isSpecialColumn(hbaseColumnFamily)) {
           // a column family
-          getFields()[i] =
-            new LazyHBaseCellMap(
-              (LazyMapObjectInspector)
-              fieldRefs.get(i).getFieldObjectInspector());
+          getFields()[i] = new LazyHBaseCellMap(
+              (LazyMapObjectInspector) fieldRefs.get(i).getFieldObjectInspector());
           continue;
         }
 
-        getFields()[i] = LazyFactory.createLazyObject(
-          fieldRefs.get(i).getFieldObjectInspector());
+        getFields()[i] = LazyFactory.createLazyObject(fieldRefs.get(i).getFieldObjectInspector());
       }
       setFieldInited(new boolean[getFields().length]);
     }
@@ -124,19 +133,23 @@ public class LazyHBaseRow extends LazySt
     if (!getFieldInited()[fieldID]) {
       getFieldInited()[fieldID] = true;
       ByteArrayRef ref = null;
-      String columnName = hbaseColumns.get(fieldID);
-      byte [] columnNameBytes = hbaseColumnsBytes.get(fieldID);
+      String columnFamily = hbaseColumnFamilies.get(fieldID);
+      String columnQualifier = hbaseColumnQualifiers.get(fieldID);
+      byte [] columnFamilyBytes = hbaseColumnFamiliesBytes.get(fieldID);
+      byte [] columnQualifierBytes = hbaseColumnQualifiersBytes.get(fieldID);
 
-      if (columnName.equals(HBaseSerDe.HBASE_KEY_COL)) {
+      if (HBaseSerDe.isSpecialColumn(columnFamily)) {
+        assert(columnQualifier == null);
         ref = new ByteArrayRef();
         ref.setData(result.getRow());
       } else {
-        if (columnName.endsWith(":")) {
+        if (columnQualifier == null) {
           // it is a column family
-          ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnNameBytes);
+          ((LazyHBaseCellMap) getFields()[fieldID]).init(result, columnFamilyBytes);
         } else {
-          // it is a column
-          byte [] res = result.getValue(columnNameBytes);
+          // it is a column i.e. a column-family with column-qualifier
+          byte [] res = result.getValue(columnFamilyBytes, columnQualifierBytes);
+
           if (res == null) {
             return null;
           } else {

Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestHBaseSerDe.java Mon Aug 23 17:09:41 2010
@@ -37,6 +37,8 @@ import org.apache.hadoop.hive.serde2.io.
 import org.apache.hadoop.hive.serde2.lazy.LazyPrimitive;
 import org.apache.hadoop.hive.serde2.objectinspector.StructField;
 import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
 import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -56,37 +58,45 @@ public class TestHBaseSerDe extends Test
     Properties tbl = createProperties();
     serDe.initialize(conf, tbl);
 
-    byte[] colabyte   = "cola:abyte".getBytes();
-    byte[] colbshort  = "colb:ashort".getBytes();
-    byte[] colcint    = "colc:aint".getBytes();
-    byte[] colalong   = "cola:along".getBytes();
-    byte[] colbdouble = "colb:adouble".getBytes();
-    byte[] colcstring = "colc:astring".getBytes();
+    byte [] cfa = "cola".getBytes();
+    byte [] cfb = "colb".getBytes();
+    byte [] cfc = "colc".getBytes();
+
+    byte [] qualByte = "byte".getBytes();
+    byte [] qualShort = "short".getBytes();
+    byte [] qualInt = "int".getBytes();
+    byte [] qualLong = "long".getBytes();
+    byte [] qualFloat = "float".getBytes();
+    byte [] qualDouble = "double".getBytes();
+    byte [] qualString = "string".getBytes();
+    byte [] qualBool = "boolean".getBytes();
+
+    byte [] rowKey = Bytes.toBytes("test-row1");
 
     // Data
     List<KeyValue> kvs = new ArrayList<KeyValue>();
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colabyte, 0, Bytes.toBytes("123")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colbshort, 0, Bytes.toBytes("456")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colcint, 0, Bytes.toBytes("789")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colalong, 0, Bytes.toBytes("1000")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colbdouble, 0, Bytes.toBytes("5.3")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row1"),
-        colcstring, 0, Bytes.toBytes("hive and hadoop")));
+
+    kvs.add(new KeyValue(rowKey, cfa, qualByte, Bytes.toBytes("123")));
+    kvs.add(new KeyValue(rowKey, cfb, qualShort, Bytes.toBytes("456")));
+    kvs.add(new KeyValue(rowKey, cfc, qualInt, Bytes.toBytes("789")));
+    kvs.add(new KeyValue(rowKey, cfa, qualLong, Bytes.toBytes("1000")));
+    kvs.add(new KeyValue(rowKey, cfb, qualFloat, Bytes.toBytes("-0.01")));
+    kvs.add(new KeyValue(rowKey, cfc, qualDouble, Bytes.toBytes("5.3")));
+    kvs.add(new KeyValue(rowKey, cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive")));
+    kvs.add(new KeyValue(rowKey, cfb, qualBool, Bytes.toBytes("true")));
+
     Result r = new Result(kvs);
 
-    Put p = new Put(Bytes.toBytes("test-row1"));
+    Put p = new Put(rowKey);
 
-    p.add(colabyte, 0, Bytes.toBytes("123"));
-    p.add(colbshort, 0, Bytes.toBytes("456"));
-    p.add(colcint, 0, Bytes.toBytes("789"));
-    p.add(colalong, 0, Bytes.toBytes("1000"));
-    p.add(colbdouble, 0, Bytes.toBytes("5.3"));
-    p.add(colcstring, 0, Bytes.toBytes("hive and hadoop"));
+    p.add(cfa, qualByte, Bytes.toBytes("123"));
+    p.add(cfb, qualShort, Bytes.toBytes("456"));
+    p.add(cfc, qualInt, Bytes.toBytes("789"));
+    p.add(cfa, qualLong, Bytes.toBytes("1000"));
+    p.add(cfb, qualFloat, Bytes.toBytes("-0.01"));
+    p.add(cfc, qualDouble, Bytes.toBytes("5.3"));
+    p.add(cfa, qualString, Bytes.toBytes("Hadoop, HBase, and Hive"));
+    p.add(cfb, qualBool, Bytes.toBytes("true"));
 
     Object[] expectedFieldsData = {
       new Text("test-row1"),
@@ -94,8 +104,10 @@ public class TestHBaseSerDe extends Test
       new ShortWritable((short)456),
       new IntWritable(789),
       new LongWritable(1000),
+      new FloatWritable(-0.01F),
       new DoubleWritable(5.3),
-      new Text("hive and hadoop")
+      new Text("Hadoop, HBase, and Hive"),
+      new BooleanWritable(true)
     };
 
     deserializeAndSerialize(serDe, r, p, expectedFieldsData);
@@ -109,7 +121,7 @@ public class TestHBaseSerDe extends Test
     StructObjectInspector oi = (StructObjectInspector)
       serDe.getObjectInspector();
     List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs();
-    assertEquals(7, fieldRefs.size());
+    assertEquals(9, fieldRefs.size());
 
     // Deserialize
     Object row = serDe.deserialize(r);
@@ -131,12 +143,11 @@ public class TestHBaseSerDe extends Test
 
     // Set the configuration parameters
     tbl.setProperty(Constants.SERIALIZATION_FORMAT, "9");
-    tbl.setProperty("columns",
-        "key,abyte,ashort,aint,along,adouble,astring");
+    tbl.setProperty("columns", "key,abyte,ashort,aint,along,afloat,adouble,astring,abool");
     tbl.setProperty("columns.types",
-        "string,tinyint:smallint:int:bigint:double:string");
+        "string,tinyint:smallint:int:bigint:float:double:string:boolean");
     tbl.setProperty(HBaseSerDe.HBASE_COLUMNS_MAPPING,
-        "cola:abyte,colb:ashort,colc:aint,cola:along,colb:adouble,colc:astring");
+        "cola:byte,colb:short,colc:int,cola:long,colb:float,colc:double,cola:string,colb:boolean");
     return tbl;
   }
 }

Modified: hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java
URL: http://svn.apache.org/viewvc/hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java?rev=988211&r1=988210&r2=988211&view=diff
==============================================================================
--- hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java (original)
+++ hadoop/hive/trunk/hbase-handler/src/test/org/apache/hadoop/hive/hbase/TestLazyHBaseObject.java Mon Aug 23 17:09:41 2010
@@ -27,6 +27,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.Result;
 import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.serde2.SerDeException;
 import org.apache.hadoop.hive.serde2.SerDeUtils;
 import org.apache.hadoop.hive.serde2.lazy.LazyFactory;
 import org.apache.hadoop.hive.serde2.lazy.LazyString;
@@ -57,20 +58,20 @@ public class TestLazyHBaseObject extends
     // Initialize a result
     List<KeyValue> kvs = new ArrayList<KeyValue>();
 
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa:col1"), 0, Bytes.toBytes("cfacol1")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa:col2"), 0, Bytes.toBytes("cfacol2")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb:2"), 0, Bytes.toBytes("def")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb:-1"), 0, Bytes.toBytes("")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb:0"), 0, Bytes.toBytes("0")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb:8"), 0, Bytes.toBytes("abc")));
-    kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc:col3"), 0, Bytes.toBytes("cfccol3")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"),
+        Bytes.toBytes("col1"), Bytes.toBytes("cfacol1")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfa"),
+        Bytes.toBytes("col2"), Bytes.toBytes("cfacol2")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"),
+        Bytes.toBytes("2"), Bytes.toBytes("def")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"),
+        Bytes.toBytes("-1"), Bytes.toBytes("")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"),
+        Bytes.toBytes("0"), Bytes.toBytes("0")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfb"),
+        Bytes.toBytes("8"), Bytes.toBytes("abc")));
+    kvs.add(new KeyValue(Bytes.toBytes("test-row"), Bytes.toBytes("cfc"),
+        Bytes.toBytes("col3"), Bytes.toBytes("cfccol3")));
 
     Result r = new Result(kvs);
 
@@ -115,19 +116,19 @@ public class TestLazyHBaseObject extends
     List<KeyValue> kvs = new ArrayList<KeyValue>();
 
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("col1"), 0, Bytes.toBytes("cfacol1")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("col1"), Bytes.toBytes("cfacol1")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("col2"), 0, Bytes.toBytes("cfacol2")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("col2"), Bytes.toBytes("cfacol2")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("2"), 0, Bytes.toBytes("d\tf")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("2"), Bytes.toBytes("d\tf")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("-1"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("-1"), Bytes.toBytes("")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("0"), 0, Bytes.toBytes("0")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("0"), Bytes.toBytes("0")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("8"), 0, Bytes.toBytes("abc")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("8"), Bytes.toBytes("abc")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc"), Bytes.toBytes("col3"), 0, Bytes.toBytes("cfccol3")));
+        Bytes.toBytes("cfc"), Bytes.toBytes("col3"), Bytes.toBytes("cfccol3")));
 
     Result r = new Result(kvs);
     b.init(r, "cfb".getBytes());
@@ -168,9 +169,22 @@ public class TestLazyHBaseObject extends
       new String[]{"key", "a", "b", "c", "d"});
     Text nullSequence = new Text("\\N");
 
-    List<String> hbaseColumnNames =
-      Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:c", "cfb:d"});
-    List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+    String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:c,cfb:d";
+    List<String> colFamily = new ArrayList<String>();
+    List<String> colQual = new ArrayList<String>();
+    List<byte []> colFamilyBytes = new ArrayList<byte []>();
+    List<byte []> colQualBytes = new ArrayList<byte []>();
+
+    int iKey = -1;
+
+    try {
+      iKey = HBaseSerDe.parseColumnMapping(
+          hbaseColsMapping, colFamily, colFamilyBytes, colQual, colQualBytes);
+    } catch (SerDeException e) {
+      fail(e.toString());
+    }
+
+    assertEquals(0, iKey);
 
     ObjectInspector oi = LazyFactory.createLazyStructInspector(fieldNames,
       fieldTypeInfos, new byte[] {' ', ':', '='},
@@ -180,17 +194,17 @@ public class TestLazyHBaseObject extends
     List<KeyValue> kvs = new ArrayList<KeyValue>();
 
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("a:b:c")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
 
     Result r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':123,'b':['a','b','c'],"
         + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
@@ -198,12 +212,13 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=e:f=g")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=e:f=g")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
         ("{'key':'test-row','a':123,'b':null,"
           + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
@@ -211,14 +226,15 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("a")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("d=\\N:f=g:h")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("d=\\N:f=g:h")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
         ("{'key':'test-row','a':null,'b':['a'],"
           + "'c':{'d':null,'f':'g','h':null},'d':'no'}").replace("'", "\""),
@@ -226,12 +242,13 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes(":a::")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("no")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['','a','',''],"
         + "'c':null,'d':'no'}").replace("'", "\""),
@@ -239,16 +256,17 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("c"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("c"), Bytes.toBytes("")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));
@@ -267,9 +285,19 @@ public class TestLazyHBaseObject extends
       new String[]{"key", "a", "b", "c", "d"});
     Text nullSequence = new Text("\\N");
 
-    List<String> hbaseColumnNames =
-      Arrays.asList(new String[]{":key", "cfa:a", "cfa:b", "cfb:", "cfc:d"});
-    List<byte []> hbaseColumnNamesBytes = HBaseSerDe.initColumnNamesBytes(hbaseColumnNames);
+    String hbaseColsMapping = ":key,cfa:a,cfa:b,cfb:,cfc:d";
+    List<String> colFamily = new ArrayList<String>();
+    List<String> colQual = new ArrayList<String>();
+    List<byte []> colFamilyBytes = new ArrayList<byte []>();
+    List<byte []> colQualBytes = new ArrayList<byte []>();
+    int iKey = -1;
+    try {
+      iKey = HBaseSerDe.parseColumnMapping(
+          hbaseColsMapping, colFamily, colFamilyBytes, colQual, colQualBytes);
+    } catch (SerDeException e) {
+      fail(e.toString());
+    }
+    assertEquals(0, iKey);
 
     ObjectInspector oi = LazyFactory.createLazyStructInspector(
       fieldNames,
@@ -280,19 +308,19 @@ public class TestLazyHBaseObject extends
 
     List<KeyValue> kvs = new ArrayList<KeyValue>();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a:b:c")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("a:b:c")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("e")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("hi")));
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("hi")));
 
     Result r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':123,'b':['a','b','c'],"
         + "'c':{'d':'e','f':'g'},'d':'hi'}").replace("'", "\""),
@@ -300,14 +328,15 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("d"), 0, Bytes.toBytes("e")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("d"), Bytes.toBytes("e")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':123,'b':null,"
         + "'c':{'d':'e','f':'g'},'d':null}").replace("'", "\""),
@@ -315,14 +344,15 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("a")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("a")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfb"), Bytes.toBytes("f"), 0, Bytes.toBytes("g")));
+        Bytes.toBytes("cfb"), Bytes.toBytes("f"), Bytes.toBytes("g")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['a'],"
         + "'c':{'f':'g'},'d':'no'}").replace("'", "\""),
@@ -330,12 +360,13 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes(":a::")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes(":a::")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("no")));
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("no")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       ("{'key':'test-row','a':null,'b':['','a','',''],"
         + "'c':{},'d':'no'}").replace("'", "\""),
@@ -343,14 +374,15 @@ public class TestLazyHBaseObject extends
 
     kvs.clear();
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("a"), 0, Bytes.toBytes("123")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("a"), Bytes.toBytes("123")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfa"), Bytes.toBytes("b"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfa"), Bytes.toBytes("b"), Bytes.toBytes("")));
     kvs.add(new KeyValue(Bytes.toBytes("test-row"),
-        Bytes.toBytes("cfc"), Bytes.toBytes("d"), 0, Bytes.toBytes("")));
+        Bytes.toBytes("cfc"), Bytes.toBytes("d"), Bytes.toBytes("")));
+
     r = new Result(kvs);
+    o.init(r, colFamily, colFamilyBytes, colQual, colQualBytes);
 
-    o.init(r, hbaseColumnNames, hbaseColumnNamesBytes);
     assertEquals(
       "{'key':'test-row','a':123,'b':[],'c':{},'d':''}".replace("'", "\""),
       SerDeUtils.getJSONString(o, oi));



Mime
View raw message