incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1205527 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Wed, 23 Nov 2011 18:23:52 GMT
Author: hashutosh
Date: Wed Nov 23 18:23:51 2011
New Revision: 1205527

URL: http://svn.apache.org/viewvc?rev=1205527&view=rev
Log:
HCATALOG-160 : HBaseDirectOutputStorageDriver outputVersion isn't consitent within the same
MR job (toffer via hashutosh)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Wed Nov 23 18:23:51 2011
@@ -79,6 +79,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-160. HBaseDirectOutputStorageDriver outputVersion isn't consitent within the same
MR job (toffer via hashutosh)
+
   HCAT-155. HBase bulkOSD requires value to be Put rather than HCatRecord (toffer via hashutosh)

 
   HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name (toffer via hashutosh)


Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -50,6 +50,14 @@ abstract  class HBaseBaseOutputStorageDr
     protected HCatSchema schema;
     protected HCatSchema outputSchema;
 
+    /**
+     *  Subclasses are required to serialize OutputJobInfo back into jobContext.
+     *  Since initialize() sets some properties in OutputJobInfo, requiring
+     *  an update of the instance stored in jobContext.
+     * @param context the job context object
+     * @param hcatProperties the properties for the storage driver
+     * @throws IOException
+     */
     @Override
     public void initialize(JobContext context, Properties hcatProperties) throws IOException
{
         hcatProperties = (Properties)hcatProperties.clone();

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -22,6 +22,8 @@ import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
 
 import java.io.IOException;
 import java.util.Properties;
@@ -36,6 +38,7 @@ public class HBaseDirectOutputStorageDri
     @Override
     public void initialize(JobContext context, Properties hcatProperties) throws IOException
{
         super.initialize(context, hcatProperties);
+        context.getConfiguration().set(HCatConstants.HCAT_KEY_OUTPUT_INFO, HCatUtil.serialize(outputJobInfo));
         outputFormat = new HBaseDirectOutputFormat();
         outputFormat.setConf(context.getConfiguration());
     }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1205527&r1=1205526&r2=1205527&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Wed Nov 23 18:23:51 2011
@@ -23,37 +23,27 @@ import org.apache.hadoop.fs.FSDataOutput
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.*;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.hadoop.hive.hbase.HBaseSerDe;
-import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.TableType;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.hive.cli.CliSessionState;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.session.SessionState;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hcatalog.cli.HCatDriver;
+import org.apache.hcatalog.cli.SemanticAnalysis.HCatSemanticAnalyzer;
 import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatException;
 import org.apache.hcatalog.common.HCatUtil;
 import org.apache.hcatalog.data.DefaultHCatRecord;
 import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatFieldSchema;
 import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.data.schema.HCatSchemaUtils;
 import org.apache.hcatalog.mapreduce.HCatOutputFormat;
 import org.apache.hcatalog.mapreduce.OutputJobInfo;
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -64,95 +54,39 @@ import static org.junit.Assert.assertTru
  */
 public class TestHBaseDirectOutputStorageDriver extends SkeletonHBaseTest {
 
-    private void registerHBaseTable(String tableName) throws Exception {
+    private final HiveConf allConf;
+    private final HCatDriver hcatDriver;
 
-        String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
-        HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
+    public TestHBaseDirectOutputStorageDriver() {
+        allConf = getHiveConf();
+        allConf.set(HiveConf.ConfVars.SEMANTIC_ANALYZER_HOOK.varname,
+                HCatSemanticAnalyzer.class.getName());
+        allConf.set(HiveConf.ConfVars.HADOOPFS.varname, getFileSystem().getUri().toString());
+        allConf.set(HiveConf.ConfVars.METASTOREWAREHOUSE.varname, new Path(getTestDir(),"warehouse").toString());
+
+        //Add hbase properties
+        for (Map.Entry<String, String> el : getHbaseConf())
+            allConf.set(el.getKey(), el.getValue());
+        for (Map.Entry<String, String> el : getJobConf())
+            allConf.set(el.getKey(), el.getValue());
 
-        try {
-            client.dropTable(databaseName, tableName);
-        } catch(Exception e) {
-        } //can fail with NoSuchObjectException
-
-
-        Table tbl = new Table();
-        tbl.setDbName(databaseName);
-        tbl.setTableName(tableName);
-        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
-        StorageDescriptor sd = new StorageDescriptor();
-
-        sd.setCols(getTableColumns());
-        tbl.setPartitionKeys(new ArrayList<FieldSchema>());
-
-        tbl.setSd(sd);
-
-        sd.setBucketCols(new ArrayList<String>(2));
-        sd.setSerdeInfo(new SerDeInfo());
-        sd.getSerdeInfo().setName(tbl.getTableName());
-        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
-        sd.getSerdeInfo().getParameters().put(
-                Constants.SERIALIZATION_FORMAT, "1");
-        sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
-        sd.setInputFormat("fillme");
-        sd.setOutputFormat(HBaseDirectOutputFormat.class.getName());
-
-        Map<String, String> tableParams = new HashMap<String, String>();
-        tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
-        tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseDirectOutputStorageDriver.class.getName());
-        tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
-        tbl.setParameters(tableParams);
-
-        client.createTable(tbl);
-    }
-
-    protected List<FieldSchema> getTableColumns() {
-        List<FieldSchema> fields = new ArrayList<FieldSchema>();
-        fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
-        fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
-        fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
-        return fields;
-    }
-
-    private static  List<HCatFieldSchema> generateDataColumns() throws HCatException
{
-        List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME,
"")));
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME,
"")));
-        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME,
"")));
-        return dataColumns;
-    }
-
-    public void test() throws IOException {
-        Configuration conf = getHbaseConf();
-        String tableName = "my_table";
-        byte[] tableNameBytes = Bytes.toBytes(tableName);
-        String familyName = "my_family";
-        byte[] familyNameBytes = Bytes.toBytes(familyName);
-        createTable(tableName,new String[]{familyName});
-        HTable table = new HTable(getHbaseConf(),tableNameBytes);
-        byte[] key = Bytes.toBytes("foo");
-        byte[] qualifier = Bytes.toBytes("qualifier");
-        byte[] val = Bytes.toBytes("bar");
-        Put put = new Put(key);
-        put.add(familyNameBytes, qualifier, val);
-        table.put(put);
-        Result result = table.get(new Get(key));
-        assertTrue(Bytes.equals(val, result.getValue(familyNameBytes, qualifier)));
+        SessionState.start(new CliSessionState(allConf));
+        hcatDriver = new HCatDriver();
     }
 
     @Test
     public void directOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException
{
-        String tableName = newTableName("mrTest");
+        String testName = "directOutputFormatTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
-            }
-        }
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
 
         //create table
         createTable(tableName,new String[]{familyName});
@@ -164,7 +98,7 @@ public class TestHBaseDirectOutputStorag
 
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
         FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
         for(String line: data)
@@ -172,7 +106,8 @@ public class TestHBaseDirectOutputStorag
         os.close();
 
         //create job
-        Job job = new Job(conf, "hcat mapreduce write test");
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapWrite.class);
 
@@ -194,7 +129,7 @@ public class TestHBaseDirectOutputStorag
         //verify
         HTable table = new HTable(conf, tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
         for(Result result: scanner) {
@@ -209,61 +144,151 @@ public class TestHBaseDirectOutputStorag
         assertEquals(data.length,index);
     }
 
-    public static class MapWrite extends Mapper<LongWritable, Text, BytesWritable, Put>
{
+    @Test
+    public void directOutputStorageDriverTest() throws Exception {
+        String testName = "directOutputStorageDriverTest";
+        Path methodTestDir = new Path(getTestDir(),testName);
 
-        @Override
-        public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
-            String vals[] = value.toString().split(",");
-            Put put = new Put(Bytes.toBytes(vals[0]));
+        String databaseName = "default";
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
+
+
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver',
" +
+                              "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver',"
+
+                              "'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
+
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
+
+        // input/output settings
+        Path inputPath = new Path(methodTestDir,"mr_input");
+        getFileSystem().mkdirs(inputPath);
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
+
+        //create job
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
+        long lbTimestamp = System.currentTimeMillis();
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(BytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+        long ubTimestamp = System.currentTimeMillis();
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(familyNameBytes);
+        ResultScanner scanner = table.getScanner(scan);
+        int index = 0;
+        long prevTimestamp = -1;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
             for(int i=1;i<vals.length;i++) {
                 String pair[] = vals[i].split(":");
-                put.add(Bytes.toBytes("my_family"),
-                        Bytes.toBytes(pair[0]),
-                        Bytes.toBytes(pair[1]));
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
+                if(prevTimestamp < 1)
+                    prevTimestamp = timestamp;
+                else
+                    assertEquals(prevTimestamp+"="+timestamp,
+                                       prevTimestamp,
+                                       timestamp);
+                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
+                           timestamp >= lbTimestamp && timestamp <= ubTimestamp);
             }
-            context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+            index++;
         }
+        assertEquals(data.length,index);
     }
 
     @Test
-    public void directOutputStorageDriverTest() throws Exception {
-        String tableName = newTableName("mrtest");
+    public void directOutputStorageDriverTestWithRevision() throws Exception {
+        String testName = "directOutputStorageDriverTestWithRevision";
+        Path methodTestDir = new Path(getTestDir(),testName);
+
+        String databaseName = "default";
+        String dbDir = new Path(methodTestDir,"DB_"+testName).toString();
+        String tableName = newTableName(testName).toLowerCase();
         byte[] tableNameBytes = Bytes.toBytes(tableName);
         String familyName = "my_family";
         byte[] familyNameBytes = Bytes.toBytes(familyName);
 
 
         //include hbase config in conf file
-        Configuration conf = new Configuration(getJobConf());
-        for(Map.Entry<String,String> el: getHbaseConf()) {
-            if(el.getKey().startsWith("hbase.")) {
-                conf.set(el.getKey(),el.getValue());
-            }
-        }
+        Configuration conf = new Configuration(allConf);
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(allConf.getAllProperties()));
 
-        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
 
-        //create table
-        createTable(tableName,new String[]{familyName});
-        registerHBaseTable(tableName);
+        String dbquery = "CREATE DATABASE IF NOT EXISTS " + databaseName + " LOCATION '"
+ dbDir + "'";
+        String tableQuery = "CREATE TABLE " + databaseName + "." + tableName +
+                              "(key int, english string, spanish string) STORED BY " +
+                              "'org.apache.hcatalog.hbase.HBaseHCatStorageHandler'" +
+                              "TBLPROPERTIES ('hcat.isd'='org.apache.hcatalog.hbase.HBaseInputStorageDriver',
" +
+                              "'hcat.osd'='org.apache.hcatalog.hbase.HBaseOutputStorageDriver',"
+
+                              "'"+HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY+"'='false',"+
+                              "'hbase.columns.mapping'=':key,"+familyName+":english,"+familyName+":spanish')"
;
 
+        assertEquals(0, hcatDriver.run(dbquery).getResponseCode());
+        assertEquals(0, hcatDriver.run(tableQuery).getResponseCode());
 
         String data[] = {"1,english:ONE,spanish:UNO",
-                "2,english:ONE,spanish:DOS",
-                "3,english:ONE,spanish:TRES"};
-
-
+                               "2,english:ONE,spanish:DOS",
+                               "3,english:ONE,spanish:TRES"};
 
         // input/output settings
-        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
-        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
-        for(String line: data)
-            os.write(Bytes.toBytes(line + "\n"));
-        os.close();
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
 
         //create job
-        Job job = new Job(conf, "hcat mapreduce write test");
+        Job job = new Job(conf, testName);
+        job.setWorkingDirectory(new Path(methodTestDir,"mr_work"));
         job.setJarByClass(this.getClass());
         job.setMapperClass(MapHCatWrite.class);
 
@@ -272,7 +297,7 @@ public class TestHBaseDirectOutputStorag
 
 
         job.setOutputFormatClass(HCatOutputFormat.class);
-        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
         outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
@@ -288,7 +313,7 @@ public class TestHBaseDirectOutputStorag
         //verify
         HTable table = new HTable(conf, tableName);
         Scan scan = new Scan();
-        scan.addFamily(Bytes.toBytes("my_family"));
+        scan.addFamily(familyNameBytes);
         ResultScanner scanner = table.getScanner(scan);
         int index=0;
         for(Result result: scanner) {
@@ -308,8 +333,9 @@ public class TestHBaseDirectOutputStorag
 
         @Override
         public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
+            OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
             HCatRecord record = new DefaultHCatRecord(3);
-            HCatSchema schema = new HCatSchema(generateDataColumns());
+            HCatSchema schema = jobInfo.getOutputSchema();
             String vals[] = value.toString().split(",");
             record.setInteger("key",schema,Integer.parseInt(vals[0]));
             for(int i=1;i<vals.length;i++) {
@@ -319,4 +345,20 @@ public class TestHBaseDirectOutputStorag
             context.write(null,record);
         }
     }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, BytesWritable, Put>
{
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new BytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
 }



Mime
View raw message