incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1205202 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Tue, 22 Nov 2011 22:35:37 GMT
Author: hashutosh
Date: Tue Nov 22 22:35:37 2011
New Revision: 1205202

URL: http://svn.apache.org/viewvc?rev=1205202&view=rev
Log:
HCATALOG-154 && HCATALOG-157. (toffer via hashutosh)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov 22 22:35:37 2011
@@ -79,6 +79,10 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name (toffer via hashutosh)

+ 
+  HCAT-154. HBase bulkOSD and directOSD return inconsistent path for getOutputLocation()
(toffer via hashutosh)
+
   HCAT-124. null pointer execption on 'use no_such_db' (hashutosh)
 
   HCAT-125. HCat doesn't support hive's describe database DDL (hashutosh)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -73,7 +73,7 @@ abstract  class HBaseBaseOutputStorageDr
         if(revision == null) {
             outputJobInfo.getProperties()
                          .setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
-                                      new Path(outputJobInfo.getLocation()).getName());
+                                            Long.toString(System.currentTimeMillis()));
         }
 
         tableInfo = outputJobInfo.getTableInfo();
@@ -89,7 +89,7 @@ abstract  class HBaseBaseOutputStorageDr
         converter = new HBaseSerDeResultConverter(schema,
                                                   outputSchema,
                                                   hcatProperties);
-        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
+        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, HBaseHCatStorageHandler.getFullyQualifiedName(tableInfo));
     }
 
     @Override
@@ -125,7 +125,6 @@ abstract  class HBaseBaseOutputStorageDr
 
     @Override
     public String getOutputLocation(JobContext jobContext, String tableLocation, List<String>
partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException
{
-        //TODO figure out a way to include user specified revision number as part of dir
-        return new Path(tableLocation, Long.toString(System.currentTimeMillis())).toString();
+        return null;
     }
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -24,9 +24,13 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.HCatRecord;
 
 import java.io.IOException;
+import java.util.List;
+import java.util.Map;
 import java.util.Properties;
 
 
@@ -36,15 +40,29 @@ import java.util.Properties;
  * efficient for large batch writes in comparison to HBaseDirectOutputStorageDriver.
  */
 public class HBaseBulkOutputStorageDriver extends HBaseBaseOutputStorageDriver {
+    private String PROPERTY_TABLE_LOCATION = "hcat.hbase.mapreduce.table.location";
+    private String PROPERTY_INT_OUTPUT_LOCATION = "hcat.hbase.mapreduce.intermediateOutputLocation";
     private OutputFormat outputFormat;
     private final static ImmutableBytesWritable EMPTY_KEY = new ImmutableBytesWritable(new
byte[0]);
 
     @Override
     public void initialize(JobContext context, Properties hcatProperties) throws IOException
{
         super.initialize(context, hcatProperties);
-        Path outputDir = new Path(outputJobInfo.getLocation());
-        context.getConfiguration().set("mapred.output.dir", outputDir.toString());
+
+        //initialize() gets called multiple time in the lifecycle of an MR job, client, mapper,
reducer, etc
+        //depending on the case we have to make sure for some context variables we set here
that they don't get set again
+        if(!outputJobInfo.getProperties().containsKey(PROPERTY_INT_OUTPUT_LOCATION)) {
+            String location = new  Path(context.getConfiguration().get(PROPERTY_TABLE_LOCATION),
+                                                    "REVISION_"+outputJobInfo.getProperties()
+                                                                                        
      .getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY)).toString();
+            outputJobInfo.getProperties().setProperty(PROPERTY_INT_OUTPUT_LOCATION, location);
+            //We are writing out an intermediate sequenceFile hence location is not passed
in OutputJobInfo.getLocation()
+            //TODO replace this with a mapreduce constant when available
+            context.getConfiguration().set("mapred.output.dir", location);
+        }
+
         outputFormat = new HBaseBulkOutputFormat();
+        context.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
     }
 
     @Override
@@ -57,4 +75,11 @@ public class HBaseBulkOutputStorageDrive
         return EMPTY_KEY;
     }
 
+    @Override
+    public String getOutputLocation(JobContext jobContext, String tableLocation, List<String>
partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException
{
+        //TODO have HCatalog common objects expose more information
+        //this is the only way to pickup table location for storageDrivers
+        jobContext.getConfiguration().set(PROPERTY_TABLE_LOCATION, tableLocation);
+        return null;
+    }
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -47,11 +47,12 @@ public class HBaseOutputStorageDriver ex
     public String getOutputLocation(JobContext jobContext, String tableLocation, List<String>
partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException
{
         //sanity check since we can't determine which will be used till initialize
         //and this method gets called before that
-        String location = bulkOSD.getOutputLocation(jobContext, tableLocation, partitionCols,
partitionValues, dynHash);
-        if(!location.equals(directOSD.getOutputLocation(jobContext, tableLocation, partitionCols,
partitionValues, dynHash))) {
-            throw new IOException("bulkOSD and directOSD return inconsistent path for getOutputLocation()");
+        String l1 = bulkOSD.getOutputLocation(jobContext, tableLocation, partitionCols, partitionValues,
dynHash);
+        String l2 = directOSD.getOutputLocation(jobContext, tableLocation, partitionCols,
partitionValues, dynHash);
+        if(l1 != null || l2 != null) {
+            throw new IOException("bulkOSD or directOSD returns a non-null path for getOutputLocation()");
         }
-        return location;
+        return null;
     }
 
     @Override

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1205202&r1=1205201&r2=1205202&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Tue Nov 22 22:35:37 2011
@@ -11,32 +11,24 @@ import org.apache.hadoop.hbase.client.Re
 import org.apache.hadoop.hbase.client.Scan;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.Constants;
-import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Mapper;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.TaskAttemptID;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
 import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.junit.Test;
@@ -44,7 +36,6 @@ import org.junit.Test;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -56,67 +47,27 @@ import static org.junit.Assert.assertTru
  * Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat
  */
 public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
-    private final String suiteName = "TestHBaseBulkOutputStorageDriver";
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
 
-    private void registerHBaseTable(String tableName) throws Exception {
+    public TestHBaseBulkOutputStorageDriver() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
 
-        String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
-        HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
-
-        try {
-            client.dropTable(databaseName, tableName);
-        } catch(Exception e) {
-        } //can fail with NoSuchObjectException
-
-
-        Table tbl = new Table();
-        tbl.setDbName(databaseName);
-        tbl.setTableName(tableName);
-        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
-        StorageDescriptor sd = new StorageDescriptor();
-        sd.setLocation(getTestDir()+"/"+suiteName+"/"+tableName);
-        sd.setCols(getTableColumns());
-        tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-
-        tbl.setSd(sd);
-
-        sd.setBucketCols(new ArrayList<String>(2));
-        sd.setSerdeInfo(new SerDeInfo());
-        sd.getSerdeInfo().setName(tbl.getTableName());
-        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-        sd.getSerdeInfo().getParameters().put(
-                Constants.SERIALIZATION_FORMAT, "1");
-        sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
-        sd.setInputFormat("fillme");
-        sd.setOutputFormat(HBaseBulkOutputFormat.class.getName());
-
-        Map<String, String> tableParams = new HashMap<String, String>();
-        tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
-        tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseOutputStorageDriver.class.getName());
-        tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
-        tbl.setParameters(tableParams);
-
-        client.createTable(tbl);
-    }
-
-    protected List<FieldSchema> getTableColumns() {
-        List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
-        fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
-        fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
-        return fields;
-    }
-
-    private static  List<HCatFieldSchema> generateDataColumns() throws HCatException
{
-        List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME,
"")));
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME,
"")));
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME,
"")));
-        return dataColumns;
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
     }
 
     public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable,
Put> {
-
         @Override
         public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
             String vals[] = value.toString().split(",");
@@ -134,8 +85,9 @@ public class TestHBaseBulkOutputStorageD
     public static class MapHCatWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable,
HCatRecord> {
         @Override
         public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
             HCatRecord record = new DefaultHCatRecord(3);
-            HCatSchema schema = new HCatSchema(generateDataColumns());
+            HCatSchema schema = jobInfo.getOutputSchema();
             String vals[] = value.toString().split(",");
             record.setInteger("key",schema,Integer.parseInt(vals[0]));
             for(int i=1;i<vals.length;i++) {
@@ -148,18 +100,16 @@ public class TestHBaseBulkOutputStorageD
 
     @Test
     public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException
{
-        String tableName = newTableName("hbaseBulkOutputFormatTest");
+        String testName = "hbaseBulkOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
-            }
-        }
+        Configuration conf = new Configuration(allConf);
 
         //create table
         conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
@@ -172,17 +122,17 @@ public class TestHBaseBulkOutputStorageD
 
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
             os.write(Bytes.toBytes(line + "\n"));
         os.close();
-        Path interPath = new Path(getTestDir()+"/hbaseBulkOutputFormatTest/inter");
+        Path interPath = new Path(methodTestDir,"inter");
 
         //create job
-        Job job = new Job(conf, "bulk write");
-        job.setWorkingDirectory(new Path(getTestDir(),"hbaseBulkOutputFormatTest_MR"));
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapWrite.class);
 
@@ -205,7 +155,7 @@ public class TestHBaseBulkOutputStorageD
         //verify
         HTable table = new HTable(conf, tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
         for(Result result: scanner) {
@@ -225,18 +175,16 @@ public class TestHBaseBulkOutputStorageD
 
     @Test
     public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException
{
-        String tableName = newTableName("importSequenceFileTest");
+        String testName = "importSequenceFileTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
-            }
-        }
+        Configuration conf = new Configuration(allConf);
 
         //create table
         createTable(tableName,new String[]{familyName});
@@ -248,19 +196,19 @@ public class TestHBaseBulkOutputStorageD
 
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
             os.write(Bytes.toBytes(line + "\n"));
         os.close();
-        Path interPath = new Path(getTestDir()+"/ImportSequenceFileTest/inter");
-        Path scratchPath = new Path(getTestDir()+"/ImportSequenceFileTest/scratch");
+        Path interPath = new Path(methodTestDir,"inter");
+        Path scratchPath = new Path(methodTestDir,"scratch");
 
 
         //create job
-        Job job = new Job(conf, "sequence file write");
-        job.setWorkingDirectory(new Path(getTestDir(),"importSequenceFileTest_MR"));
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapWrite.class);
 
@@ -285,7 +233,7 @@ public class TestHBaseBulkOutputStorageD
         //verify
         HTable table = new HTable(conf, tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
         for(Result result: scanner) {
@@ -304,36 +252,143 @@ public class TestHBaseBulkOutputStorageD
     }
 
     @Test
-    public void hbaseOutputStorageDriverTestWithRevision() throws Exception {
-        String tableName = newTableName("mrtest");
+    public void hbaseBulkOutputStorageDriverTest() throws Exception {
+        String testName = "hbaseBulkOutputStorageDriverTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver',
" +
+                              "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver',"
+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        long ubTimestamp = System.currentTimeMillis();
+        long lbTimestamp = ubTimestamp;
+
+        assertTrue(job.waitForCompletion(true));
+
+        ubTimestamp = System.currentTimeMillis();
+
+        //verify
+        HTable table = new HTable(conf, databaseName+"."+tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        long prevTimestamp = -1;
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+
+                //verify revision
+                long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
+                if(prevTimestamp == -1) {
+                    prevTimestamp = timestamp;
+                }
+                else {
+                    assertEquals(prevTimestamp+"="+timestamp,
+                                       prevTimestamp,
+                                       timestamp);
+                }
+                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
+                                 timestamp >= lbTimestamp && timestamp <= ubTimestamp);
             }
+            index++;
         }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
 
-        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
+    @Test
+    public void hbaseBulkOutputStorageDriverTestWithRevision() throws Exception {
+        String testName = "hbaseBulkOutputStorageDriverTestWithRevision";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = testName.toLowerCase();
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
 
-        //create table
-        createTable(tableName,new String[]{familyName});
-        registerHBaseTable(tableName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
 
 
-        String data[] = {"1,english:ONE,spanish:UNO",
-                "2,english:ONE,spanish:DOS",
-                "3,english:ONE,spanish:TRES"};
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver',
" +
+                              "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver',"
+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
 
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
 
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
@@ -341,8 +396,8 @@ public class TestHBaseBulkOutputStorageD
         os.close();
 
         //create job
-        Job job = new Job(conf, "hcat mapreduce write test");
-        job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapHCatWrite.class);
 
@@ -351,7 +406,7 @@ public class TestHBaseBulkOutputStorageD
 
 
         job.setOutputFormatClass(HCatOutputFormat.class);
-        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
         outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
@@ -366,9 +421,9 @@ public class TestHBaseBulkOutputStorageD
         assertTrue(job.waitForCompletion(true));
 
         //verify
-        HTable table = new HTable(conf, tableName);
+        HTable table = new HTable(conf, databaseName+"."+tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
         for(Result result: scanner) {
@@ -386,47 +441,49 @@ public class TestHBaseBulkOutputStorageD
     }
 
     @Test
-    public void hbaseOutputStorageDriverTest() throws Exception {
-        String tableName = newTableName("mrtest");
+    public void hbaseBulkOutputStorageDriverTestWithDefaultDB() throws Exception {
+        String testName = "hbaseBulkOutputStorageDriverTestWithDefaultDB";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = "default";
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
-            }
-        }
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
 
-        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
 
-        //create table
-        createTable(tableName,new String[]{familyName});
-        registerHBaseTable(tableName);
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver',
" +
+                              "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver',"
+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
 
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
 
         String data[] = {"1,english:ONE,spanish:UNO",
-                "2,english:ONE,spanish:DOS",
-                "3,english:ONE,spanish:TRES"};
-
-
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
             os.write(Bytes.toBytes(line + "\n"));
         os.close();
 
-        long ubTimestamp = System.currentTimeMillis();
-        long lbTimestamp = ubTimestamp;
         //create job
-        Job job = new Job(conf, "hcat mapreduce write test");
-        job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+        Job job = new Job(conf,testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapHCatWrite.class);
 
@@ -435,11 +492,8 @@ public class TestHBaseBulkOutputStorageD
 
 
         job.setOutputFormatClass(HCatOutputFormat.class);
-        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
         HCatOutputFormat.setOutput(job,outputJobInfo);
-        ubTimestamp = System.currentTimeMillis();
-        System.out.println("ub: "+ubTimestamp);
-
 
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
@@ -454,25 +508,15 @@ public class TestHBaseBulkOutputStorageD
         //verify
         HTable table = new HTable(conf, tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
-        Long prevTimestamp = null;
         for(Result result: scanner) {
             String vals[] = data[index].toString().split(",");
             for(int i=1;i<vals.length;i++) {
                 String pair[] = vals[i].split(":");
                 assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
                 assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
-                Long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
-                if(prevTimestamp == null)
-                    prevTimestamp = timestamp;
-                else
-                    assertEquals(prevTimestamp+"="+timestamp,
-                                 prevTimestamp,
-                                 timestamp);
-                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
-                           timestamp >= lbTimestamp && timestamp <= ubTimestamp);
             }
             index++;
         }



Mime
View raw message