incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hashut...@apache.org
Subject svn commit: r1205212 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Tue, 22 Nov 2011 22:48:39 GMT
Author: hashutosh
Date: Tue Nov 22 22:48:38 2011
New Revision: 1205212

URL: http://svn.apache.org/viewvc?rev=1205212&view=rev
Log:
HCATALOG-155. HBase bulkOSD requires value to be Put rather than HCatRecord (toffer via hashutosh)

Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Nov 22 22:48:38 2011
@@ -79,6 +79,8 @@ Trunk (unreleased changes)
   OPTIMIZATIONS
 
   BUG FIXES
+  HCAT-155. HBase bulkOSD requires value to be Put rather than HCatRecord (toffer via hashutosh)

+
   HCAT-157. HBaseOutputFormat assumes hbase table name is hcat table name (toffer via hashutosh)

  
   HCAT-154. HBase bulkOSD and directOSD return inconsistent path for getOutputLocation()
(toffer via hashutosh)

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
Tue Nov 22 22:48:38 2011
@@ -5,11 +5,13 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.client.Put;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.JobStatus;
 import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.RecordWriter;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 
@@ -20,24 +22,60 @@ import java.io.IOException;
  * are created by the MR job using HFileOutputFormat and then later "moved" into
  * the appropriate region server.
  */
-class HBaseBulkOutputFormat extends SequenceFileOutputFormat<ImmutableBytesWritable,Put>
{
+class HBaseBulkOutputFormat extends OutputFormat<WritableComparable<?>,Put> {
+    private final static ImmutableBytesWritable EMPTY_LIST = new ImmutableBytesWritable(new
byte[0]);
+    private SequenceFileOutputFormat<WritableComparable<?>,Put> baseOutputFormat;
+
+    public HBaseBulkOutputFormat() {
+        baseOutputFormat = new SequenceFileOutputFormat<WritableComparable<?>,Put>();
+    }
+
+    @Override
+    public void checkOutputSpecs(JobContext context) throws IOException, InterruptedException
{
+        baseOutputFormat.checkOutputSpecs(context);
+    }
+
+    @Override
+    public RecordWriter<WritableComparable<?>, Put> getRecordWriter(TaskAttemptContext
context) throws IOException, InterruptedException {
+        //TODO use a constant/static setter when available
+        context.getConfiguration().setClass("mapred.output.key.class",ImmutableBytesWritable.class,Object.class);
+        context.getConfiguration().setClass("mapred.output.value.class",Put.class,Object.class);
+        return new HBaseBulkRecordWriter(baseOutputFormat.getRecordWriter(context));
+    }
 
     @Override
     public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException
{
-        return new HBaseBulkOutputCommitter(FileOutputFormat.getOutputPath(context),context,(FileOutputCommitter)super.getOutputCommitter(context));
+        return new HBaseBulkOutputCommitter(baseOutputFormat.getOutputCommitter(context));
     }
 
-    private static class HBaseBulkOutputCommitter extends FileOutputCommitter {
-        FileOutputCommitter baseOutputCommitter;
+    private static class HBaseBulkRecordWriter extends  RecordWriter<WritableComparable<?>,Put>
{
+        private RecordWriter<WritableComparable<?>,Put> baseWriter;
 
-        public HBaseBulkOutputCommitter(Path outputPath, TaskAttemptContext taskAttemptContext,
-                                                           FileOutputCommitter baseOutputCommitter)
throws IOException {
-            super(outputPath, taskAttemptContext);
+        public HBaseBulkRecordWriter(RecordWriter<WritableComparable<?>,Put>
baseWriter)  {
+            this.baseWriter = baseWriter;
+        }
+
+        @Override
+        public void write(WritableComparable<?> key, Put value) throws IOException,
InterruptedException {
+            //we ignore the key
+            baseWriter.write(EMPTY_LIST, value);
+        }
+
+        @Override
+        public void close(TaskAttemptContext context) throws IOException, InterruptedException
{
+            baseWriter.close(context);
+        }
+    }
+
+    private static class HBaseBulkOutputCommitter extends OutputCommitter {
+        private OutputCommitter baseOutputCommitter;
+
+        public HBaseBulkOutputCommitter(OutputCommitter baseOutputCommitter) throws IOException
{
             this.baseOutputCommitter = baseOutputCommitter;
         }
 
         @Override
-        public void abortTask(TaskAttemptContext context) {
+        public void abortTask(TaskAttemptContext context) throws IOException {
             baseOutputCommitter.abortTask(context);
         }
 
@@ -47,11 +85,6 @@ class HBaseBulkOutputFormat extends Sequ
         }
 
         @Override
-        public Path getWorkPath() throws IOException {
-            return baseOutputCommitter.getWorkPath();
-        }
-
-        @Override
         public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
             return baseOutputCommitter.needsTaskCommit(context);
         }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Tue Nov 22 22:48:38 2011
@@ -14,11 +14,10 @@ import org.apache.hadoop.hbase.util.Byte
 import org.apache.hadoop.hive.cli.CliSessionState;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.ql.session.SessionState;
+import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.JobContext;
-import org.apache.hadoop.mapreduce.JobID;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -34,8 +33,6 @@ import org.apache.hcatalog.mapreduce.Out
 import org.junit.Test;
 
 import java.io.IOException;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
@@ -82,7 +79,7 @@ public class TestHBaseBulkOutputStorageD
         }
     }
 
-    public static class MapHCatWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable,
HCatRecord> {
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, BytesWritable,
HCatRecord> {
         @Override
         public void map(LongWritable key, Text value, Context context) throws IOException,
InterruptedException {
             OutputJobInfo jobInfo = (OutputJobInfo)HCatUtil.deserialize(context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO));
@@ -143,10 +140,10 @@ public class TestHBaseBulkOutputStorageD
         SequenceFileOutputFormat.setOutputPath(job,interPath);
 
         job.setMapOutputKeyClass(ImmutableBytesWritable.class);
-        job.setMapOutputValueClass(Put.class);
+        job.setMapOutputValueClass(HCatRecord.class);
 
         job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
 
@@ -287,10 +284,12 @@ public class TestHBaseBulkOutputStorageD
         // input/output settings
         Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
-        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
-        for(String line: data)
-            os.write(Bytes.toBytes(line + "\n"));
-        os.close();
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
 
         //create job
         Job job = new Job(conf,testName);
@@ -306,11 +305,11 @@ public class TestHBaseBulkOutputStorageD
         OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
 
@@ -390,10 +389,12 @@ public class TestHBaseBulkOutputStorageD
         // input/output settings
         Path inputPath = new Path(methodTestDir,"mr_input");
         getFileSystem().mkdirs(inputPath);
-        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
-        for(String line: data)
-            os.write(Bytes.toBytes(line + "\n"));
-        os.close();
+        //create multiple files so we can test with multiple mappers
+        for(int i=0;i<data.length;i++) {
+            FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile"+i+".txt"));
+            os.write(Bytes.toBytes(data[i] + "\n"));
+            os.close();
+        }
 
         //create job
         Job job = new Job(conf,testName);
@@ -410,11 +411,11 @@ public class TestHBaseBulkOutputStorageD
         outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
 
@@ -495,11 +496,11 @@ public class TestHBaseBulkOutputStorageD
         OutputJobInfo outputJobInfo = OutputJobInfo.create(databaseName,tableName,null,null,null);
         HCatOutputFormat.setOutput(job,outputJobInfo);
 
-        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputKeyClass(BytesWritable.class);
         job.setMapOutputValueClass(HCatRecord.class);
 
-        job.setOutputKeyClass(ImmutableBytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputKeyClass(BytesWritable.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
 

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java?rev=1205212&r1=1205211&r2=1205212&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
(original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseDirectOutputStorageDriver.java
Tue Nov 22 22:48:38 2011
@@ -183,10 +183,10 @@ public class TestHBaseDirectOutputStorag
         job.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
 
         job.setMapOutputKeyClass(BytesWritable.class);
-        job.setMapOutputValueClass(Put.class);
+        job.setMapOutputValueClass(HCatRecord.class);
 
         job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
         assertTrue(job.waitForCompletion(true));
@@ -280,7 +280,7 @@ public class TestHBaseDirectOutputStorag
         job.setMapOutputValueClass(HCatRecord.class);
 
         job.setOutputKeyClass(BytesWritable.class);
-        job.setOutputValueClass(Put.class);
+        job.setOutputValueClass(HCatRecord.class);
 
         job.setNumReduceTasks(0);
         assertTrue(job.waitForCompletion(true));



Mime
View raw message