From hcatalog-commits-return-558-apmail-incubator-hcatalog-commits-archive=incubator.apache.org@incubator.apache.org Wed Nov 23 18:24:15 2011 Return-Path: X-Original-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-hcatalog-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1EED47E45 for ; Wed, 23 Nov 2011 18:24:15 +0000 (UTC) Received: (qmail 93898 invoked by uid 500); 23 Nov 2011 18:24:15 -0000 Delivered-To: apmail-incubator-hcatalog-commits-archive@incubator.apache.org Received: (qmail 93856 invoked by uid 500); 23 Nov 2011 18:24:14 -0000 Mailing-List: contact hcatalog-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: hcatalog-dev@incubator.apache.org Delivered-To: mailing list hcatalog-commits@incubator.apache.org Received: (qmail 93849 invoked by uid 99); 23 Nov 2011 18:24:14 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2011 18:24:14 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 23 Nov 2011 18:24:12 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 88C7C238889B; Wed, 23 Nov 2011 18:23:52 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1205527 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/ Date: Wed, 23 Nov 2011 18:23:52 -0000 To: hcatalog-commits@incubator.apache.org From: hashutosh@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111123182352.88C7C238889B@eris.apache.org> Author: hashutosh Date: Wed Nov 23 18:23:51 2011 New Revision: 1205527 URL: http://svn.apache.org/viewvc?rev=1205527&view=rev Log: HCATALOG-160 : HBaseDirectOutputStorageDriver outputVersion isn't consitent within the same MR job (toffer via hashutosh) Modified: incubator/hcatalog/trunk/CHANGES.txt incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java Modified: incubator/hcatalog/trunk/CHANGES.txt URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205527&r1=1205526&r2=1205527&view=diff ============================================================================== --- incubator/hcatalog/trunk/CHANGES.txt (original) +++ incubator/hcatalog/trunk/CHANGES.txt Wed Nov 23 18:23:51 2011 @@ -79,6 +79,8 @@ Trunk (unreleased changes) OPTIMIZATIONS BUG FIXES + HCAT-160. HBaseDirectOutputStorageDriver outputVersion isn't consitent within the same MR job (toffer via hashutosh) + HCAT-155. HBase bulkOSD requires value to be Put rather than HCatRecord (toffer via hashutosh) HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name (toffer via hashutosh) Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java Wed Nov 23 18:23:51 2011 @@ -50,6 +50,14 @@ abstract class HBaseBaseOutputStorageDr protected HCatSchema schema; protected HCatSchema outputSchema; + /** + * Subclasses are required to serialize OutputJobInfo back into jobContext. + * Since initialize() sets some properties in OutputJobInfo, requiring + * an update of the instance stored in jobContext. + * @param context the job context object + * @param hcatProperties the properties for the storage driver + * @throws IOException + */ @Override public void initialize(JobContext context, Properties hcatProperties) throws IOException { hcatProperties = (Properties)hcatProperties.clone(); Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Wed Nov 23 18:23:51 2011 @@ -22,6 +22,8 @@ import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hcatalog.common.HCatConstants; +import org.apache.hcatalog.common.HCatUtil; import java.io.IOException; import java.util.Properties; @@ -36,6 +38,7 @@ public class HBaseDirectOutputStorageDri @Override public void initialize(JobContext context, Properties hcatProperties) throws IOException { super.initialize(context, hcatProperties); + context.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo)); outputFormat = new HBaseDirectOutputFormat(); outputFormat.setConf(context.getConfiguration()); } Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff ============================================================================== --- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java (original) +++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java Wed Nov 23 18:23:51 2011 @@ -23,37 +23,27 @@ import org.apache.hadoop.fs.FSDataOutput import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.client.*; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hive.hbase.HBaseSerDe; -import org.apache.hadoop.hive.metastore.HiveMetaStoreClient; -import org.apache.hadoop.hive.metastore.MetaStoreUtils; -import org.apache.hadoop.hive.metastore.TableType; -import org.apache.hadoop.hive.metastore.api.FieldSchema; -import org.apache.hadoop.hive.metastore.api.SerDeInfo; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; -import org.apache.hadoop.hive.metastore.api.Table; -import org.apache.hadoop.hive.serde.Constants; +import org.apache.hadoop.hive.cli.CliSessionState; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.session.SessionState; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hcatalog.cli.HCatDriver; +import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer; import org.apache.hcatalog.common.HCatConstants; -import org.apache.hcatalog.common.HCatException; import org.apache.hcatalog.common.HCatUtil; import org.apache.hcatalog.data.DefaultHCatRecord; import org.apache.hcatalog.data.HCatRecord; -import org.apache.hcatalog.data.schema.HCatFieldSchema; import org.apache.hcatalog.data.schema.HCatSchema; -import org.apache.hcatalog.data.schema.HCatSchemaUtils; import org.apache.hcatalog.mapreduce.HCatOutputFormat; import org.apache.hcatalog.mapreduce.OutputJobInfo; import org.junit.Test; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; import java.util.Map; import static org.junit.Assert.assertEquals; @@ -64,95 +54,39 @@ import static org.junit.Assert.assertTru */ public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest { - private void registerHBaseTable(String tableName) throws Exception { + private final HiveConf allConf; + private final HCatDriver hcatDriver; - String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ; - HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf()); + public TestHBaseDirectOutputStorageDriver() { + allConf = getHiveConf(); + allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname, + HCatSemanticAnalyzer.class.getName()); + allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString()); + allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString()); + + //Add hbase properties + for (Map.Entry el : getHbaseConf()) + allConf.set(el.getKey(), el.getValue()); + for (Map.Entry el : getJobConf()) + allConf.set(el.getKey(), el.getValue()); - try { - client.dropTable(databaseName, tableName); - } catch(Exception e) { - } //can fail with NoSuchObjectException - - - Table tbl = new Table(); - tbl.setDbName(databaseName); - tbl.setTableName(tableName); - tbl.setTableType(TableType.EXTERNAL_TABLE.toString()); - StorageDescriptor sd = new StorageDescriptor(); - - sd.setCols(getTableColumns()); - tbl.setPartitionKeys(new ArrayList()); - - tbl.setSd(sd); - - sd.setBucketCols(new ArrayList(2)); - sd.setSerdeInfo(new SerDeInfo()); - sd.getSerdeInfo().setName(tbl.getTableName()); - sd.getSerdeInfo().setParameters(new HashMap()); - sd.getSerdeInfo().getParameters().put( - Constants.SERIALIZATION_FORMAT, "1"); - sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName()); - sd.setInputFormat("fillme"); - sd.setOutputFormat(HBaseDirectOutputFormat.class.getName()); - - Map tableParams = new HashMap(); - tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme"); - tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseDirectOutputStorageDriver.class.getName()); - tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish"); - tbl.setParameters(tableParams); - - client.createTable(tbl); - } - - protected List getTableColumns() { - List fields = new ArrayList(); - fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, "")); - fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")); - fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")); - return fields; - } - - private static List generateDataColumns() throws HCatException { - List dataColumns = new ArrayList(); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, ""))); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""))); - dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""))); - return dataColumns; - } - - public void test() throws IOException { - Configuration conf = getHbaseConf(); - String tableName = "my_table"; - byte[] tableNameBytes = Bytes.toBytes(tableName); - String familyName = "my_family"; - byte[] familyNameBytes = Bytes.toBytes(familyName); - createTable(tableName,new String[]{familyName}); - HTable table = new HTable(getHbaseConf(),tableNameBytes); - byte[] key = Bytes.toBytes("foo"); - byte[] qualifier = Bytes.toBytes("qualifier"); - byte[] val = Bytes.toBytes("bar"); - Put put = new Put(key); - put.add(familyNameBytes, qualifier, val); - table.put(put); - Result result = table.get(new Get(key)); - assertTrue(Bytes.equals(val, result.getValue(familyNameBytes, qualifier))); + SessionState.start(new CliSessionState(allConf)); + hcatDriver = new HCatDriver(); } @Test public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException { - String tableName = newTableName("mrTest"); + String testName = "directOutputFormatTest"; + Path methodTestDir = new Path(getTestDir(),testName); + + String tableName = newTableName(testName).toLowerCase(); byte[] tableNameBytes = Bytes.toBytes(tableName); String familyName = "my_family"; byte[] familyNameBytes = Bytes.toBytes(familyName); //include hbase config in conf file - Configuration conf = new Configuration(getJobConf()); - for(Map.Entry el: getHbaseConf()) { - if(el.getKey().startsWith("hbase.")) { - conf.set(el.getKey(),el.getValue()); - } - } + Configuration conf = new Configuration(allConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties())); //create table createTable(tableName,new String[]{familyName}); @@ -164,7 +98,7 @@ public class TestHBaseDirectOutputStorag // input/output settings - Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + Path inputPath = new Path(methodTestDir,"mr_input"); getFileSystem().mkdirs(inputPath); FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); for(String line: data) @@ -172,7 +106,8 @@ public class TestHBaseDirectOutputStorag os.close(); //create job - Job job = new Job(conf, "hcat mapreduce write test"); + Job job = new Job(conf, testName); + job.setWorkingDirectory(new Path(methodTestDir,"mr_work")); job.setJarByClass(this.getClass()); job.setMapperClass(MapWrite.class); @@ -194,7 +129,7 @@ public class TestHBaseDirectOutputStorag //verify HTable table = new HTable(conf, tableName); Scan scan = new Scan(); - scan.addFamily(Bytes.toBytes("my_family")); + scan.addFamily(familyNameBytes); ResultScanner scanner = table.getScanner(scan); int index=0; for(Result result: scanner) { @@ -209,61 +144,151 @@ public class TestHBaseDirectOutputStorag assertEquals(data.length,index); } - public static class MapWrite extends Mapper { + @Test + public void directOutputStorageDriverTest() throws Exception { + String testName = "directOutputStorageDriverTest"; + Path methodTestDir = new Path(getTestDir(),testName); - @Override - public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { - String vals[] = value.toString().split(","); - Put put = new Put(Bytes.toBytes(vals[0])); + String databaseName = "default"; + String dbDir = new Path(methodTestDir,"DB_"+testName).toString(); + String tableName = newTableName(testName).toLowerCase(); + byte[] tableNameBytes = Bytes.toBytes(tableName); + String familyName = "my_family"; + byte[] familyNameBytes = Bytes.toBytes(familyName); + + + //include hbase config in conf file + Configuration conf = new Configuration(allConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties())); + + + String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'"; + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key int, english string, spanish string) STORED BY " + + "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" + + "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " + + "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," + + "'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+ + "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ; + + assertEquals(0, hcatDriver.run(dbquery).getResponseCode()); + assertEquals(0, hcatDriver.run(tableQuery).getResponseCode()); + + String data[] = {"1,english:ONE,spanish:UNO", + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; + + // input/output settings + Path inputPath = new Path(methodTestDir,"mr_input"); + getFileSystem().mkdirs(inputPath); + //create multiple files so we can test with multiple mappers + for(int i=0;i el: getHbaseConf()) { - if(el.getKey().startsWith("hbase.")) { - conf.set(el.getKey(),el.getValue()); - } - } + Configuration conf = new Configuration(allConf); + conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties())); - conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties())); - //create table - createTable(tableName,new String[]{familyName}); - registerHBaseTable(tableName); + String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '" + dbDir + "'"; + String tableQuery = "CREATE TABLE " + databaseName + "." + tableName + + "(key int, english string, spanish string) STORED BY " + + "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" + + "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver', " + + "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver'," + + "'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+ + "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')" ; + assertEquals(0, hcatDriver.run(dbquery).getResponseCode()); + assertEquals(0, hcatDriver.run(tableQuery).getResponseCode()); String data[] = {"1,english:ONE,spanish:UNO", - "2,english:ONE,spanish:DOS", - "3,english:ONE,spanish:TRES"}; - - + "2,english:ONE,spanish:DOS", + "3,english:ONE,spanish:TRES"}; // input/output settings - Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/"); + Path inputPath = new Path(methodTestDir,"mr_input"); getFileSystem().mkdirs(inputPath); - FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt")); - for(String line: data) - os.write(Bytes.toBytes(line + "\n")); - os.close(); + //create multiple files so we can test with multiple mappers + for(int i=0;i { + + @Override + public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { + String vals[] = value.toString().split(","); + Put put = new Put(Bytes.toBytes(vals[0])); + for(int i=1;i