incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1181260 - in /incubator/hcatalog/trunk: ./ src/java/org/apache/hcatalog/mapreduce/ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Mon, 10 Oct 2011 21:58:07 GMT
Author: khorgath
Date: Mon Oct 10 21:58:07 2011
New Revision: 1181260

URL: http://svn.apache.org/viewvc?rev=1181260&view=rev
Log:
HCATALOG-75 Input storage driver for HBase (avandana via khorgath)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1181260&r1=1181259&r2=1181260&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Mon Oct 10 21:58:07 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-75. Input storage driver for HBase (avandana via khorgath)
+
   HCAT-73. Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)
 
   HCAT-74. ResultConverter for HBase Storage Drivers (avandana via khorgath)

Modified: incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java?rev=1181260&r1=1181259&r2=1181260&view=diff
==============================================================================
--- incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java (original)
+++ incubator/hcatalog/trunk/src/java/org/apache/hcatalog/mapreduce/InitializeInput.java Mon
Oct 10 21:58:07 2011
@@ -26,6 +26,7 @@ import java.util.Properties;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.conf.HiveConf.ConfVars;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
@@ -48,22 +49,12 @@ public class InitializeInput {
 
   /** The prefix for keys used for storage driver arguments */
   static final String HCAT_KEY_PREFIX = "hcat.";
-  private static final HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
+  private static HiveConf hiveConf;
 
   private static HiveMetaStoreClient createHiveMetaClient(Configuration conf, InputJobInfo
inputJobInfo) throws Exception {
 
-	if (inputJobInfo.getServerUri() != null){
-      hiveConf.set("hive.metastore.local", "false");
-      hiveConf.set(HiveConf.ConfVars.METASTOREURIS.varname, inputJobInfo.getServerUri());
-    }
-    
-    String kerberosPrincipal = inputJobInfo.getServerKerberosPrincipal();
-    if(kerberosPrincipal != null){
-      hiveConf.setBoolean(HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
-      hiveConf.set(HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname, kerberosPrincipal);
-    }
-
-    return new HiveMetaStoreClient(hiveConf,null);
+      hiveConf = getHiveConf(inputJobInfo, conf);
+      return new HiveMetaStoreClient(hiveConf, null);
   }
 
   /**
@@ -207,4 +198,82 @@ public class InitializeInput {
     return new StorerInfo(inputSDClass, outputSDClass, hcatProperties);
   }
 
+    static HiveConf getHiveConf(InputJobInfo iInfo, Configuration conf)
+            throws IOException {
+
+        HiveConf hiveConf = new HiveConf(HCatInputFormat.class);
+
+        if (iInfo.getServerUri() != null) {
+            // User specified a thrift url
+
+            hiveConf.set("hive.metastore.local", "false");
+            hiveConf.set(ConfVars.METASTOREURIS.varname, iInfo.getServerUri());
+
+            String kerberosPrincipal = iInfo.getServerKerberosPrincipal();
+            if (kerberosPrincipal != null) {
+                hiveConf.setBoolean(
+                        HiveConf.ConfVars.METASTORE_USE_THRIFT_SASL.varname,
+                        true);
+                hiveConf.set(
+                        HiveConf.ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                        kerberosPrincipal);
+            } else {
+
+                kerberosPrincipal = conf
+                        .get(HCatConstants.HCAT_METASTORE_PRINCIPAL);
+
+                if (kerberosPrincipal == null) {
+                    kerberosPrincipal = conf
+                            .get(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname);
+                }
+                if (kerberosPrincipal != null) {
+                    hiveConf.setBoolean(
+                            ConfVars.METASTORE_USE_THRIFT_SASL.varname, true);
+                    hiveConf.set(ConfVars.METASTORE_KERBEROS_PRINCIPAL.varname,
+                            kerberosPrincipal);
+                }
+
+                if (conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE) != null) {
+                    hiveConf.set("hive.metastore.token.signature",
+                            conf.get(HCatConstants.HCAT_KEY_TOKEN_SIGNATURE));
+                }
+            }
+
+        } else {
+            // Thrift url is null, copy the hive conf into the job conf and
+            // restore it
+            // in the backend context
+
+            if (conf.get(HCatConstants.HCAT_KEY_HIVE_CONF) == null) {
+                conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                        HCatUtil.serialize(hiveConf.getAllProperties()));
+            } else {
+                // Copy configuration properties into the hive conf
+                Properties properties = (Properties) HCatUtil.deserialize(conf
+                        .get(HCatConstants.HCAT_KEY_HIVE_CONF));
+
+                for (Map.Entry<Object, Object> prop : properties.entrySet()) {
+                    if (prop.getValue() instanceof String) {
+                        hiveConf.set((String) prop.getKey(),
+                                (String) prop.getValue());
+                    } else if (prop.getValue() instanceof Integer) {
+                        hiveConf.setInt((String) prop.getKey(),
+                                (Integer) prop.getValue());
+                    } else if (prop.getValue() instanceof Boolean) {
+                        hiveConf.setBoolean((String) prop.getKey(),
+                                (Boolean) prop.getValue());
+                    } else if (prop.getValue() instanceof Long) {
+                        hiveConf.setLong((String) prop.getKey(),
+                                (Long) prop.getValue());
+                    } else if (prop.getValue() instanceof Float) {
+                        hiveConf.setFloat((String) prop.getKey(),
+                                (Float) prop.getValue());
+                    }
+                }
+            }
+
+        }
+        return hiveConf;
+    }
+
 }

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java?rev=1181260&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
(added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputFormat.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+/**
+ * This class HBaseInputFormat is a wrapper class of TableInputFormat in HBase.
+ */
+class HBaseInputFormat extends InputFormat<ImmutableBytesWritable, Result> {
+    
+    private final TableInputFormat inputFormat;
+    
+    public HBaseInputFormat() {
+        inputFormat = new TableInputFormat();
+    }
+    
+    /*
+     * @param instance of InputSplit
+     * 
+     * @param instance of TaskAttemptContext
+     * 
+     * @return RecordReader
+     * 
+     * @throws IOException
+     * 
+     * @throws InterruptedException
+     * 
+     * @see
+     * org.apache.hadoop.mapreduce.InputFormat#createRecordReader(org.apache
+     * .hadoop.mapreduce.InputSplit,
+     * org.apache.hadoop.mapreduce.TaskAttemptContext)
+     */
+    @Override
+    public RecordReader<ImmutableBytesWritable, Result> createRecordReader(
+            InputSplit split, TaskAttemptContext tac) throws IOException,
+            InterruptedException {
+        return inputFormat.createRecordReader(split, tac);
+    }
+    
+    /*
+     * @param jobContext
+     * 
+     * @return List of InputSplit
+     * 
+     * @throws IOException
+     * 
+     * @throws InterruptedException
+     * 
+     * @see
+     * org.apache.hadoop.mapreduce.InputFormat#getSplits(org.apache.hadoop.mapreduce
+     * .JobContext)
+     */
+    @Override
+    public List<InputSplit> getSplits(JobContext jobContext)
+            throws IOException, InterruptedException {
+        return inputFormat.getSplits(jobContext);
+    }
+    
+    public void setConf(Configuration conf) {
+        inputFormat.setConf(conf);
+    }
+    
+    public Scan getScan() {
+        return inputFormat.getScan();
+    }
+    
+    public void setScan(Scan scan) {
+        inputFormat.setScan(scan);
+    }
+    
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java?rev=1181260&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
(added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseInputStorageDriver.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+
+/**
+ * The Class HBaseInputStorageDriver enables reading of HBase tables through
+ * HCatalog.
+ */
+public class HBaseInputStorageDriver extends HCatInputStorageDriver {
+    private HCatTableInfo   tableInfo;
+    private ResultConverter converter;
+    private HCatSchema      outputColSchema;
+    private HCatSchema      dataSchema;
+    private Configuration   jobConf;
+    
+    /*
+     * @param JobContext
+     * 
+     * @param hcatProperties
+     * 
+     * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+     * #initialize(org.apache.hadoop.mapreduce.JobContext, java.util.Properties)
+     */
+    @Override
+    public void initialize(JobContext context, Properties hcatProperties) {
+        jobConf = context.getConfiguration();
+        try {
+            String jobString = context.getConfiguration().get(
+                    HCatConstants.HCAT_KEY_JOB_INFO);
+            if (jobString == null) {
+                throw new IOException(
+                        "InputJobInfo information not found in JobContext. "
+                                + "HCatInputFormat.setInput() not called?");
+            }
+            InputJobInfo jobInfo = (InputJobInfo) HCatUtil
+                    .deserialize(jobString);
+            tableInfo = jobInfo.getTableInfo();
+            dataSchema = tableInfo.getDataColumns();
+            List<FieldSchema> fields = HCatUtil
+                    .getFieldSchemaList(outputColSchema.getFields());
+            hcatProperties.setProperty(Constants.LIST_COLUMNS,
+                    MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+            hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+                    MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+            converter = new HBaseSerDeResultConverter(dataSchema,
+                    outputColSchema, hcatProperties);
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        
+    }
+    
+    /*
+     * @param hcatProperties
+     * 
+     * @return InputFormat
+     * 
+     * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+     * #getInputFormat(java.util.Properties)
+     */
+    @Override
+    public InputFormat<ImmutableBytesWritable, Result> getInputFormat(
+            Properties hcatProperties) {
+        HBaseInputFormat tableInputFormat = new HBaseInputFormat();
+        jobConf.set(TableInputFormat.INPUT_TABLE, tableInfo.getTableName());
+        tableInputFormat.setConf(jobConf);
+        // TODO: Make the caching configurable by the user
+        tableInputFormat.getScan().setCaching(200);
+        tableInputFormat.getScan().setCacheBlocks(false);
+        return tableInputFormat;
+    }
+    
+    /*
+     * @param baseKey
+     * 
+     * @param baseValue
+     * 
+     * @return HCatRecord
+     * 
+     * @throws IOException
+     * 
+     * @see
+     * org.apache.hcatalog.mapreduce.HCatInputStorageDriver#convertToHCatRecord
+     * (org.apache.hadoop.io.WritableComparable, org.apache.hadoop.io.Writable)
+     */
+    @Override
+    public HCatRecord convertToHCatRecord(WritableComparable baseKey,
+            Writable baseValue) throws IOException {
+        return this.converter.convert((Result) baseValue);
+    }
+    
+    /*
+     * @param jobContext
+     * 
+     * @param howlSchema
+     * 
+     * @throws IOException
+     * 
+     * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver#
+     * setOutputSchema(org.apache.hadoop.mapreduce.JobContext,
+     * org.apache.hcatalog.data.schema.HCatSchema)
+     */
+    @Override
+    public void setOutputSchema(JobContext jobContext, HCatSchema howlSchema)
+            throws IOException {
+        outputColSchema = howlSchema;
+    }
+    
+    /*
+     * @param jobContext
+     * 
+     * @param partitionValues
+     * 
+     * @throws IOException
+     * 
+     * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+     * #setPartitionValues(org.apache.hadoop.mapreduce.JobContext,
+     * java.util.Map)
+     */
+    @Override
+    public void setPartitionValues(JobContext jobContext,
+            Map<String, String> partitionValues) throws IOException {
+    }
+    
+    /*
+     * @param jobContext
+     * 
+     * @param hcatSchema
+     * 
+     * @throws IOException
+     * 
+     * @see org.apache.hcatalog.mapreduce.HCatInputStorageDriver
+     * #setOriginalSchema(org.apache.hadoop.mapreduce.JobContext,
+     * org.apache.hcatalog.data.schema.HCatSchema)
+     */
+    @Override
+    public void setOriginalSchema(JobContext jobContext, HCatSchema hcatSchema)
+            throws IOException {
+        this.dataSchema = hcatSchema;
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java?rev=1181260&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
(added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseInputStorageDriver.java
Mon Oct 10 21:58:07 2011
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.mapreduce.TableInputFormat;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hcatalog.mapreduce.InputJobInfo;
+import org.junit.Test;
+
+public class TestHBaseInputStorageDriver extends SkeletonHBaseTest {
+    
+    private final byte[] FAMILY     = Bytes.toBytes("testFamily");
+    private final byte[] QUALIFIER1 = Bytes.toBytes("testQualifier1");
+    private final byte[] QUALIFIER2 = Bytes.toBytes("testQualifier2");
+    private final String tableName  = "mytesttable";
+    
+    List<Put> generatePuts(int num) {
+        List<Put> myPuts = new ArrayList<Put>();
+        for (int i = 0; i < num; i++) {
+            Put put = new Put(Bytes.toBytes("testRow" + i));
+            put.add(FAMILY, QUALIFIER1, 0,
+                    Bytes.toBytes("testQualifier1-" + "textValue-" + i));
+            put.add(FAMILY, QUALIFIER2, 0,
+                    Bytes.toBytes("testQualifier2-" + "textValue-" + i));
+            myPuts.add(put);
+        }
+        return myPuts;
+    }
+    
+    private void registerHBaseTable(String tableName) throws Exception {
+        
+        String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME;
+        HiveMetaStoreClient client = getCluster().getHiveMetaStoreClient();
+        try {
+            client.dropTable(databaseName, tableName);
+        } catch (Exception e) {
+        } // can fail with NoSuchObjectException
+        
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+        tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tableParams.put(HCatConstants.HCAT_ISD_CLASS,
+                HBaseInputStorageDriver.class.getName());
+        tableParams.put(HCatConstants.HCAT_OSD_CLASS, "NotRequired");
+        tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,
+                ":key,testFamily:testQualifier1,testFamily:testQualifier2");
+        tableParams.put(Constants.SERIALIZATION_FORMAT, "9");
+        tableParams.put(Constants.SERIALIZATION_NULL_FORMAT, "NULL");
+        tbl.setParameters(tableParams);
+        
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setCols(HCatUtil.getFieldSchemaList(getSchema().getFields()));
+        sd.setBucketCols(new ArrayList<String>(3));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters()
+                .put(Constants.SERIALIZATION_FORMAT, "9");
+        sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
+        sd.setInputFormat(HBaseInputFormat.class.getName());
+        sd.setOutputFormat("NotRequired");
+        
+        tbl.setSd(sd);
+        client.createTable(tbl);
+        
+    }
+    
+    public void populateTable() throws IOException {
+        List<Put> myPuts = generatePuts(10);
+        HTable table = new HTable(getHbaseConf(), Bytes.toBytes(tableName));
+        table.put(myPuts);
+    }
+    
+    @Test
+    public void TestHBaseTableReadMR() throws Exception {
+        
+        Configuration conf = new Configuration();
+        // include hbase config in conf file
+        for (Map.Entry<String, String> el : getHbaseConf()) {
+            if (el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(), el.getValue());
+            }
+        }
+        
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF,
+                HCatUtil.serialize(getHiveConf().getAllProperties()));
+        
+        // create Hbase table using admin
+        createTable(tableName, new String[] { "testFamily" });
+        registerHBaseTable(tableName);
+        populateTable();
+        // output settings
+        Path outputDir = new Path(getTestDir(), "mapred/testHbaseTableMRRead");
+        FileSystem fs = getFileSystem();
+        if (fs.exists(outputDir)) {
+            fs.delete(outputDir, true);
+        }
+        // create job
+        Job job = new Job(conf, "hbase-mr-read-test");
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapReadHTable.class);
+        
+        job.getConfiguration().set(TableInputFormat.INPUT_TABLE, tableName);
+        
+        job.setInputFormatClass(HCatInputFormat.class);
+        InputJobInfo inputJobInfo = InputJobInfo.create(
+                MetaStoreUtils.DEFAULT_DATABASE_NAME, tableName, null, null,
+                null);
+        HCatInputFormat.setOutputSchema(job, getSchema());
+        HCatInputFormat.setInput(job, inputJobInfo);
+        job.setOutputFormatClass(TextOutputFormat.class);
+        TextOutputFormat.setOutputPath(job, outputDir);
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(Text.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Text.class);
+        job.setNumReduceTasks(0);
+        assertTrue(job.waitForCompletion(true));
+        assertTrue(MapReadHTable.error == false);
+    }
+    
+    public static class MapReadHTable
+            extends
+            Mapper<ImmutableBytesWritable, HCatRecord, WritableComparable, Text> {
+        
+        static boolean error = false;
+        
+        @Override
+        public void map(ImmutableBytesWritable key, HCatRecord value,
+                Context context) throws IOException, InterruptedException {
+            boolean correctValues = (value.size() == 3)
+                    && (value.get(0).toString()).startsWith("testRow")
+                    && (value.get(1).toString()).startsWith("testQualifier1")
+                    && (value.get(2).toString()).startsWith("testQualifier2");
+            
+            if (correctValues == false) {
+                error = true;
+            }
+        }
+    }
+    
+    private HCatSchema getSchema() throws HCatException {
+        
+        HCatSchema schema = new HCatSchema(new ArrayList<HCatFieldSchema>());
+        schema.append(new HCatFieldSchema("key", HCatFieldSchema.Type.STRING,
+                ""));
+        schema.append(new HCatFieldSchema("testqualifier1",
+                HCatFieldSchema.Type.STRING, ""));
+        schema.append(new HCatFieldSchema("testqualifier2",
+                HCatFieldSchema.Type.STRING, ""));
+        return schema;
+    }
+}



Mime
View raw message