incubator-hcatalog-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From khorg...@apache.org
Subject svn commit: r1181302 - in /incubator/hcatalog/trunk: ./ storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/
Date Tue, 11 Oct 2011 00:05:45 GMT
Author: khorgath
Date: Tue Oct 11 00:05:44 2011
New Revision: 1181302

URL: http://svn.apache.org/viewvc?rev=1181302&view=rev
Log:
HCATALOG-119 Output Storage Driver for HBase (Bulk) and HBaseStorageDriver Composite class (toffer via khorgath)

Added:
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
Modified:
    incubator/hcatalog/trunk/CHANGES.txt
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
    incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java

Modified: incubator/hcatalog/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/CHANGES.txt?rev=1181302&r1=1181301&r2=1181302&view=diff
==============================================================================
--- incubator/hcatalog/trunk/CHANGES.txt (original)
+++ incubator/hcatalog/trunk/CHANGES.txt Tue Oct 11 00:05:44 2011
@@ -23,6 +23,8 @@ Trunk (unreleased changes)
   INCOMPATIBLE CHANGES
 
   NEW FEATURES
+  HCAT-119. Output Storage Driver for HBase (Bulk) and HBaseStorageDriver Composite class (toffer via khorgath)
+
   HCAT-75. Input storage driver for HBase (avandana via khorgath)
 
   HCAT-73. Output Storage Driver for HBase (Direct PUTs) (toffer via khorgath)

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBaseOutputStorageDriver.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+import org.apache.hcatalog.mapreduce.HCatTableInfo;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+/**
+ * Base class share by both {@link HBaseBulkOutputStorageDriver} and {@link HBaseDirectOutputStorageDriver}
+ */
+abstract  class HBaseBaseOutputStorageDriver extends HCatOutputStorageDriver {
+    protected HCatTableInfo tableInfo;
+    protected ResultConverter converter;
+    protected OutputJobInfo outputJobInfo;
+    protected HCatSchema schema;
+    protected HCatSchema outputSchema;
+
+    @Override
+    public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+        hcatProperties = (Properties)hcatProperties.clone();
+        super.initialize(context, hcatProperties);
+
+
+        String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
+        if( jobString == null ) {
+            throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?");
+        }
+
+        outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
+        //override table properties with user defined ones
+        //TODO in the future we should be more selective on what to override
+        hcatProperties.putAll(outputJobInfo.getProperties());
+        outputJobInfo.getProperties().putAll(hcatProperties);
+        hcatProperties = outputJobInfo.getProperties();
+
+
+        String revision = outputJobInfo.getProperties().getProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY);
+        if(revision == null) {
+            outputJobInfo.getProperties()
+                         .setProperty(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY,
+                                      new Path(outputJobInfo.getLocation()).getName());
+        }
+
+        tableInfo = outputJobInfo.getTableInfo();
+        schema = tableInfo.getDataColumns();
+
+        List<FieldSchema> fields = HCatUtil.getFieldSchemaList(outputSchema.getFields());
+        hcatProperties.setProperty(Constants.LIST_COLUMNS,
+                MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
+        hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
+                MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
+
+        //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called
+        converter = new HBaseSerDeResultConverter(schema,
+                                                  outputSchema,
+                                                  hcatProperties);
+        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
+    }
+
+    @Override
+    public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException {
+        this.outputSchema = schema;
+    }
+
+    @Override
+    public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
+        //HBase doesn't use KEY as part of output
+        return null;
+    }
+
+    @Override
+    public Writable convertValue(HCatRecord value) throws IOException {
+        return converter.convert(value);
+    }
+
+    @Override
+    public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException {
+        //no partitions for this driver
+    }
+
+    @Override
+    public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException {
+        return null;
+    }
+
+    @Override
+    public void setOutputPath(JobContext jobContext, String location) throws IOException {
+        //no output path
+    }
+
+    @Override
+    public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+        //TODO figure out a way to include user specified revision number as part of dir
+        return new Path(tableLocation, Long.toString(System.currentTimeMillis())).toString();
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputFormat.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,108 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+
+import java.io.IOException;
+
+/**
+ * Class which imports data into HBase via it's "bulk load" feature. Wherein regions
+ * are created by the MR job using HFileOutputFormat and then later "moved" into
+ * the appropriate region server.
+ */
+class HBaseBulkOutputFormat extends SequenceFileOutputFormat<ImmutableBytesWritable,Put> {
+
+    @Override
+    public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+        return new HBaseBulkOutputCommitter(FileOutputFormat.getOutputPath(context),context,(FileOutputCommitter)super.getOutputCommitter(context));
+    }
+
+    private static class HBaseBulkOutputCommitter extends FileOutputCommitter {
+        FileOutputCommitter baseOutputCommitter;
+
+        public HBaseBulkOutputCommitter(Path outputPath, TaskAttemptContext taskAttemptContext,
+                                                           FileOutputCommitter baseOutputCommitter) throws IOException {
+            super(outputPath, taskAttemptContext);
+            this.baseOutputCommitter = baseOutputCommitter;
+        }
+
+        @Override
+        public void abortTask(TaskAttemptContext context) {
+            baseOutputCommitter.abortTask(context);
+        }
+
+        @Override
+        public void commitTask(TaskAttemptContext context) throws IOException {
+            baseOutputCommitter.commitTask(context);
+        }
+
+        @Override
+        public Path getWorkPath() throws IOException {
+            return baseOutputCommitter.getWorkPath();
+        }
+
+        @Override
+        public boolean needsTaskCommit(TaskAttemptContext context) throws IOException {
+            return baseOutputCommitter.needsTaskCommit(context);
+        }
+
+        @Override
+        public void setupJob(JobContext context) throws IOException {
+            baseOutputCommitter.setupJob(context);
+        }
+
+        @Override
+        public void setupTask(TaskAttemptContext context) throws IOException {
+            baseOutputCommitter.setupTask(context);
+        }
+
+        @Override
+        public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+            try {
+                baseOutputCommitter.abortJob(jobContext,state);
+            } finally {
+                cleanIntermediate(jobContext);
+            }
+        }
+
+        @Override
+        public void cleanupJob(JobContext context) throws IOException {
+            try {
+                baseOutputCommitter.cleanupJob(context);
+            } finally {
+                cleanIntermediate(context);
+            }
+        }
+
+        @Override
+        public void commitJob(JobContext jobContext) throws IOException {
+            try {
+                baseOutputCommitter.commitJob(jobContext);
+                Configuration conf = jobContext.getConfiguration();
+                Path srcPath = FileOutputFormat.getOutputPath(jobContext);
+                Path destPath = new Path(srcPath.getParent(),srcPath.getName()+"_hfiles");
+                ImportSequenceFile.runJob(conf,
+                                                        conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY),
+                                                        srcPath,
+                                                        destPath);
+            } finally {
+                cleanIntermediate(jobContext);
+            }
+        }
+
+        public void cleanIntermediate(JobContext jobContext) throws IOException {
+            FileSystem fs = FileSystem.get(jobContext.getConfiguration());
+            fs.delete(FileOutputFormat.getOutputPath(jobContext),true);
+        }
+    }
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseBulkOutputStorageDriver.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hcatalog.data.HCatRecord;
+
+import java.io.IOException;
+import java.util.Properties;
+
+
+/**
+ * Storage driver which works with {@link HBaseBulkOutputFormat} and makes use
+ * of HBase's "bulk load" feature to get data into HBase. This should be
+ * efficient for large batch writes in comparison to HBaseDirectOutputStorageDriver.
+ */
+public class HBaseBulkOutputStorageDriver extends HBaseBaseOutputStorageDriver {
+    private OutputFormat outputFormat;
+    private final static ImmutableBytesWritable EMPTY_KEY = new ImmutableBytesWritable(new byte[0]);
+
+    @Override
+    public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+        super.initialize(context, hcatProperties);
+        Path outputDir = new Path(outputJobInfo.getLocation());
+        context.getConfiguration().set("mapred.output.dir", outputDir.toString());
+        outputFormat = new HBaseBulkOutputFormat();
+    }
+
+    @Override
+    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+        return outputFormat;
+    }
+
+    @Override
+    public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
+        return EMPTY_KEY;
+    }
+
+}

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java?rev=1181302&r1=1181301&r2=1181302&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseConstants.java Tue Oct 11 00:05:44 2011
@@ -22,7 +22,7 @@ import org.apache.hadoop.hive.hbase.HBas
 import org.apache.hcatalog.common.HCatConstants;
 
 /**
- * Constants class for constants used in the HBase Storage Drivers module
+ * Constants class for constants used in Ht
  */
 class HBaseConstants {
 
@@ -35,4 +35,7 @@ class HBaseConstants {
     /** key used to define the column mapping of hbase to hcatalog schema */
     public static final String PROPERTY_COLUMN_MAPPING_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+"."+ HBaseSerDe.HBASE_COLUMNS_MAPPING;
 
+    /** key used to define wether bulk storage driver will be used or not  */
+    public static final String PROPERTY_OSD_BULK_MODE_KEY = HCatConstants.HCAT_DEFAULT_TOPIC_PREFIX+".hbase.output.bulkMode";
+
 }

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java?rev=1181302&r1=1181301&r2=1181302&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseDirectOutputStorageDriver.java Tue Oct 11 00:05:44 2011
@@ -18,64 +18,24 @@
 
 package org.apache.hcatalog.hbase;
 
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hive.metastore.MetaStoreUtils;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.serde.Constants;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableComparable;
 import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.hadoop.mapreduce.OutputFormat;
-import org.apache.hadoop.mapreduce.TaskAttemptContext;
-import org.apache.hcatalog.common.HCatConstants;
-import org.apache.hcatalog.common.HCatUtil;
-import org.apache.hcatalog.data.HCatRecord;
-import org.apache.hcatalog.data.schema.HCatSchema;
-import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
-import org.apache.hcatalog.mapreduce.HCatTableInfo;
-import org.apache.hcatalog.mapreduce.OutputJobInfo;
 
 import java.io.IOException;
-import java.util.List;
-import java.util.Map;
 import java.util.Properties;
 
 /**
  * HBase Storage driver implementation which uses "direct" writes to hbase for writing out records.
  */
-public class HBaseDirectOutputStorageDriver extends HCatOutputStorageDriver {
-    private HCatTableInfo tableInfo;
+public class HBaseDirectOutputStorageDriver extends HBaseBaseOutputStorageDriver {
+
     private HBaseDirectOutputFormat outputFormat;
-    private ResultConverter converter;
-    private OutputJobInfo outputJobInfo;
-    private HCatSchema schema;
-    private HCatSchema outputSchema;
 
     @Override
     public void initialize(JobContext context, Properties hcatProperties) throws IOException {
         super.initialize(context, hcatProperties);
-        String jobString = context.getConfiguration().get(HCatConstants.HCAT_KEY_OUTPUT_INFO);
-        if( jobString == null ) {
-            throw new IOException("OutputJobInfo information not found in JobContext. HCatInputFormat.setOutput() not called?");
-        }
-        outputJobInfo = (OutputJobInfo) HCatUtil.deserialize(jobString);
-        tableInfo = outputJobInfo.getTableInfo();
-        schema = tableInfo.getDataColumns();
-
-        List<FieldSchema> fields = HCatUtil.getFieldSchemaList(outputSchema.getFields());
-        hcatProperties.setProperty(Constants.LIST_COLUMNS,
-                MetaStoreUtils.getColumnNamesFromFieldSchema(fields));
-        hcatProperties.setProperty(Constants.LIST_COLUMN_TYPES,
-                MetaStoreUtils.getColumnTypesFromFieldSchema(fields));
-
-        //override table properties with user defined ones
-        //in the future we should be more selective on what to override
-        hcatProperties.putAll(outputJobInfo.getProperties());
-        //outputSchema should be set by HCatOutputFormat calling setSchema, prior to initialize being called
-        converter = new HBaseSerDeResultConverter(schema,
-                outputSchema,
-                hcatProperties);
-        context.getConfiguration().set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY,tableInfo.getTableName());
         outputFormat = new HBaseDirectOutputFormat();
         outputFormat.setConf(context.getConfiguration());
     }
@@ -85,39 +45,4 @@ public class HBaseDirectOutputStorageDri
         return outputFormat;
     }
 
-    @Override
-    public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException {
-        this.outputSchema = schema;
-    }
-
-    @Override
-    public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
-        //HBase doesn't use KEY as part of output
-        return null;
-    }
-
-    @Override
-    public Writable convertValue(HCatRecord value) throws IOException {
-        return converter.convert(value);
-    }
-
-    @Override
-    public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException {
-        //no partitions for this driver
-    }
-
-    @Override
-    public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException {
-        return null;
-    }
-
-    @Override
-    public void setOutputPath(JobContext jobContext, String location) throws IOException {
-        //no output path
-    }
-
-    @Override
-    public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
-        return null;
-    }
 }

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseOutputStorageDriver.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,102 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.OutputFormat;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.mapreduce.HCatOutputStorageDriver;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Forwarding HBaseOutputStorageDriver, actual implementation is decided by a configuration
+ * {@link HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY} which defaults to HBaseBulkOutputStorageDriver
+ */
+public class HBaseOutputStorageDriver extends HCatOutputStorageDriver {
+
+    private HBaseBulkOutputStorageDriver bulkOSD = new HBaseBulkOutputStorageDriver();
+    private HBaseDirectOutputStorageDriver directOSD = new HBaseDirectOutputStorageDriver();
+    private HBaseBaseOutputStorageDriver activeOSD;
+
+    @Override
+    public void initialize(JobContext context, Properties hcatProperties) throws IOException {
+        super.initialize(context, hcatProperties);
+        determineOSD(context.getConfiguration(),hcatProperties);
+        activeOSD.initialize(context,hcatProperties);
+    }
+
+    @Override
+    public WritableComparable<?> generateKey(HCatRecord value) throws IOException {
+        return activeOSD.generateKey(value);
+    }
+
+    @Override
+    public Writable convertValue(HCatRecord value) throws IOException {
+        return activeOSD.convertValue(value);
+    }
+
+    @Override
+    public String getOutputLocation(JobContext jobContext, String tableLocation, List<String> partitionCols, Map<String, String> partitionValues, String dynHash) throws IOException {
+        //sanity check since we can't determine which will be used till initialize
+        //and this method gets called before that
+        String location = bulkOSD.getOutputLocation(jobContext, tableLocation, partitionCols, partitionValues, dynHash);
+        if(!location.equals(directOSD.getOutputLocation(jobContext, tableLocation, partitionCols, partitionValues, dynHash))) {
+            throw new IOException("bulkOSD and directOSD return inconsistent path for getOutputLocation()");
+        }
+        return location;
+    }
+
+    @Override
+    public Path getWorkFilePath(TaskAttemptContext context, String outputLoc) throws IOException {
+        return activeOSD.getWorkFilePath(context,outputLoc);
+    }
+
+    @Override
+    public OutputFormat<? super WritableComparable<?>, ? super Writable> getOutputFormat() throws IOException {
+        return activeOSD.getOutputFormat();
+    }
+
+    @Override
+    public void setOutputPath(JobContext jobContext, String location) throws IOException {
+        directOSD.setOutputPath(jobContext, location);
+        bulkOSD.setOutputPath(jobContext, location);
+    }
+
+    @Override
+    public void setSchema(JobContext jobContext, HCatSchema schema) throws IOException {
+        directOSD.setSchema(jobContext,schema);
+        bulkOSD.setSchema(jobContext,schema);
+    }
+
+    @Override
+    public void setPartitionValues(JobContext jobContext, Map<String, String> partitionValues) throws IOException {
+        directOSD.setPartitionValues(jobContext,partitionValues);
+        bulkOSD.setPartitionValues(jobContext,partitionValues);
+    }
+
+    private void determineOSD(Configuration conf, Properties prop) {
+        if(activeOSD != null)
+            return;
+
+        String bulkMode = conf.get(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY);
+        if(bulkMode == null && prop != null)
+            bulkMode = prop.getProperty(HBaseConstants.PROPERTY_OSD_BULK_MODE_KEY);
+
+        if(bulkMode != null && !Boolean.valueOf(bulkMode)) {
+            activeOSD = directOSD;
+            bulkOSD = null;
+        }
+        else {
+            activeOSD = bulkOSD;
+            directOSD = null;
+        }
+    }
+}

Modified: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java?rev=1181302&r1=1181301&r2=1181302&view=diff
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java (original)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/HBaseSerDeResultConverter.java Tue Oct 11 00:05:44 2011
@@ -71,7 +71,7 @@ class HBaseSerDeResultConverter implemen
      * @param hcatProperties table properties
      * @throws IOException thrown if hive's HBaseSerDe couldn't be initialized
      */
-    public HBaseSerDeResultConverter(HCatSchema schema,
+    HBaseSerDeResultConverter(HCatSchema schema,
                                      HCatSchema outputSchema,
                                      Properties hcatProperties) throws IOException {
 

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/java/org/apache/hcatalog/hbase/ImportSequenceFile.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,232 @@
+/**
+ * Copyright 2010 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hcatalog.hbase;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.filecache.DistributedCache;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.hbase.mapreduce.HFileOutputFormat;
+import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
+import org.apache.hadoop.hbase.mapreduce.PutSortReducer;
+import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil;
+import org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.JobStatus;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.OutputCommitter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+
+import static org.apache.hadoop.hbase.mapreduce.hadoopbackport.TotalOrderPartitioner.*;
+
+
+/**
+ * MapReduce job which reads a series of Puts stored in a sequence file
+ * and imports the data into HBase. It needs to create the necessary HBase
+ * regions using HFileOutputFormat and then notify the correct region servers
+ * to doBulkLoad(). This will be used After an MR job has written the SequenceFile
+ * and data needs to be bulk loaded onto HBase.
+ */
+class ImportSequenceFile {
+    private final static Log LOG = LogFactory.getLog(ImportSequenceFile.class);
+    private final static String NAME = "HCatImportSequenceFile";
+    private final static String IMPORTER_WORK_DIR = "_IMPORTER_MR_WORK_DIR";
+
+
+    private static class SequenceFileImporter  extends Mapper<ImmutableBytesWritable, Put, ImmutableBytesWritable, Put> {
+
+        @Override
+        public void map(ImmutableBytesWritable rowKey, Put value,
+                        Context context)
+                throws IOException {
+            try {
+                context.write(new ImmutableBytesWritable(value.getRow()), value);
+            } catch (InterruptedException e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class ImporterOutputFormat extends HFileOutputFormat {
+        @Override
+        public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException {
+            final OutputCommitter baseOutputCommitter = super.getOutputCommitter(context);
+
+            return new OutputCommitter() {
+                @Override
+                public void setupJob(JobContext jobContext) throws IOException {
+                    baseOutputCommitter.setupJob(jobContext);
+                }
+
+                @Override
+                public void setupTask(TaskAttemptContext taskContext) throws IOException {
+                    baseOutputCommitter.setupTask(taskContext);
+                }
+
+                @Override
+                public boolean needsTaskCommit(TaskAttemptContext taskContext) throws IOException {
+                    return baseOutputCommitter.needsTaskCommit(taskContext);
+                }
+
+                @Override
+                public void commitTask(TaskAttemptContext taskContext) throws IOException {
+                    baseOutputCommitter.commitTask(taskContext);
+                }
+
+                @Override
+                public void abortTask(TaskAttemptContext taskContext) throws IOException {
+                    baseOutputCommitter.abortTask(taskContext);
+                }
+
+                @Override
+                public void abortJob(JobContext jobContext, JobStatus.State state) throws IOException {
+                    try {
+                        baseOutputCommitter.abortJob(jobContext,state);
+                    } finally {
+                        cleanupScratch(jobContext);
+                    }
+                }
+
+                @Override
+                public void commitJob(JobContext jobContext) throws IOException {
+                    try {
+                        baseOutputCommitter.commitJob(jobContext);
+                        Configuration conf = jobContext.getConfiguration();
+                        //import hfiles
+                        new LoadIncrementalHFiles(conf)
+                                .doBulkLoad(HFileOutputFormat.getOutputPath(jobContext),
+                                                   new HTable(conf,
+                                                                      conf.get(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY)));
+                    } finally {
+                        cleanupScratch(jobContext);
+                    }
+                }
+
+                @Override
+                public void cleanupJob(JobContext context) throws IOException {
+                    try {
+                        baseOutputCommitter.cleanupJob(context);
+                    } finally {
+                        cleanupScratch(context);
+                    }
+                }
+
+                private void cleanupScratch(JobContext context) throws IOException{
+                    FileSystem fs = FileSystem.get(context.getConfiguration());
+                    fs.delete(HFileOutputFormat.getOutputPath(context),true);
+                }
+            };
+        }
+    }
+
+    private static Job createSubmittableJob(Configuration conf, String tableName, Path inputDir, Path scratchDir, boolean localMode)
+            throws IOException {
+        Job job = new Job(conf, NAME + "_" + tableName);
+        job.setJarByClass(SequenceFileImporter.class);
+        FileInputFormat.setInputPaths(job, inputDir);
+        job.setInputFormatClass(SequenceFileInputFormat.class);
+        job.setMapperClass(SequenceFileImporter.class);
+
+        HTable table = new HTable(conf, tableName);
+        job.setReducerClass(PutSortReducer.class);
+        FileOutputFormat.setOutputPath(job, scratchDir);
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+        HFileOutputFormat.configureIncrementalLoad(job, table);
+        //override OutputFormatClass with our own so we can include cleanup in the committer
+        job.setOutputFormatClass(ImporterOutputFormat.class);
+
+        //local mode doesn't support symbolic links so we have to manually set the actual path
+        if(localMode) {
+            String partitionFile = null;
+            for(URI uri: DistributedCache.getCacheFiles(job.getConfiguration())) {
+                if(DEFAULT_PATH.equals(uri.getFragment())) {
+                    partitionFile = uri.toString();
+                    break;
+                }
+            }
+            partitionFile = partitionFile.substring(0,partitionFile.lastIndexOf("#"));
+            job.getConfiguration().set(TotalOrderPartitioner.PARTITIONER_PATH,partitionFile.toString());
+        }
+
+        //add hbase dependency jars
+        TableMapReduceUtil.addDependencyJars(job);
+        TableMapReduceUtil.addDependencyJars(job.getConfiguration());
+        return job;
+    }
+
+    /**
+     * Method to run the Importer MapReduce Job. Normally will be called by another MR job
+     * during OutputCommitter.commitJob().
+      * @param otherConf configuration of the parent job
+     * @param tableName name of table to bulk load data into
+     * @param InputDir path of SequenceFile formatted data to read
+     * @param scratchDir temporary path for the Importer MR job to build the HFiles which will be imported
+     * @return
+     */
+    static boolean runJob(Configuration otherConf, String tableName, Path InputDir, Path scratchDir) {
+        Configuration conf = HBaseConfiguration.create();
+        for(Map.Entry<String,String> el: otherConf) {
+            if(el.getKey().startsWith("hbase."))
+                conf.set(el.getKey(),el.getValue());
+        }
+        conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+
+        boolean localMode = "local".equals(conf.get("mapred.job.tracker"));
+        boolean success = false;
+        try {
+            FileSystem fs = FileSystem.get(conf);
+            Path workDir = new Path(new Job(otherConf).getWorkingDirectory(),IMPORTER_WORK_DIR);
+            if(!fs.mkdirs(workDir))
+                throw new IOException("Importer work directory already exists: "+workDir);
+            Job job = createSubmittableJob(conf, tableName, InputDir, scratchDir, localMode);
+            job.setWorkingDirectory(workDir);
+            success = job.waitForCompletion(true);
+            fs.delete(workDir, true);
+            //We only cleanup on success because failure might've been caused by existence of target directory
+            if(localMode && success)
+                new ImporterOutputFormat().getOutputCommitter(new TaskAttemptContext(conf,new TaskAttemptID())).commitJob(job);
+        } catch (InterruptedException e) {
+            LOG.error("ImportSequenceFile Failed", e);
+        } catch (ClassNotFoundException e) {
+            LOG.error("ImportSequenceFile Failed",e);
+        } catch (IOException e) {
+            LOG.error("ImportSequenceFile Failed",e);
+        }
+        return success;
+    }
+
+}

Added: incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java
URL: http://svn.apache.org/viewvc/incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java?rev=1181302&view=auto
==============================================================================
--- incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java (added)
+++ incubator/hcatalog/trunk/storage-drivers/hbase/src/test/org/apache/hcatalog/hbase/TestHBaseBulkOutputStorageDriver.java Tue Oct 11 00:05:44 2011
@@ -0,0 +1,483 @@
+package org.apache.hcatalog.hbase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hive.hbase.HBaseSerDe;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.MetaStoreUtils;
+import org.apache.hadoop.hive.metastore.TableType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.serde.Constants;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.Mapper;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.TaskAttemptID;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hcatalog.common.HCatConstants;
+import org.apache.hcatalog.common.HCatException;
+import org.apache.hcatalog.common.HCatUtil;
+import org.apache.hcatalog.data.DefaultHCatRecord;
+import org.apache.hcatalog.data.HCatRecord;
+import org.apache.hcatalog.data.schema.HCatFieldSchema;
+import org.apache.hcatalog.data.schema.HCatSchema;
+import org.apache.hcatalog.data.schema.HCatSchemaUtils;
+import org.apache.hcatalog.mapreduce.HCatOutputFormat;
+import org.apache.hcatalog.mapreduce.OutputJobInfo;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests components of HBaseBulkOutputStorageDriver using ManyMiniCluster.
+ * Including ImprtSequenceFile, HBaseOutputStorageDrivers and HBaseBulkOutputFormat
+ */
+public class TestHBaseBulkOutputStorageDriver extends SkeletonHBaseTest {
+    private final String suiteName = "TestHBaseBulkOutputStorageDriver";
+
+    private void registerHBaseTable(String tableName) throws Exception {
+
+        String databaseName = MetaStoreUtils.DEFAULT_DATABASE_NAME ;
+        HiveMetaStoreClient client = new HiveMetaStoreClient(getHiveConf());
+
+        try {
+            client.dropTable(databaseName, tableName);
+        } catch(Exception e) {
+        } //can fail with NoSuchObjectException
+
+
+        Table tbl = new Table();
+        tbl.setDbName(databaseName);
+        tbl.setTableName(tableName);
+        tbl.setTableType(TableType.EXTERNAL_TABLE.toString());
+        StorageDescriptor sd = new StorageDescriptor();
+        sd.setLocation(getTestDir()+"/"+suiteName+"/"+tableName);
+        sd.setCols(getTableColumns());
+        tbl.setPartitionKeys(new ArrayList<FieldSchema>());
+
+        tbl.setSd(sd);
+
+        sd.setBucketCols(new ArrayList<String>(2));
+        sd.setSerdeInfo(new SerDeInfo());
+        sd.getSerdeInfo().setName(tbl.getTableName());
+        sd.getSerdeInfo().setParameters(new HashMap<String, String>());
+        sd.getSerdeInfo().getParameters().put(
+                Constants.SERIALIZATION_FORMAT, "1");
+        sd.getSerdeInfo().setSerializationLib(HBaseSerDe.class.getName());
+        sd.setInputFormat("fillme");
+        sd.setOutputFormat(HBaseBulkOutputFormat.class.getName());
+
+        Map<String, String> tableParams = new HashMap<String, String>();
+        tableParams.put(HCatConstants.HCAT_ISD_CLASS, "fillme");
+        tableParams.put(HCatConstants.HCAT_OSD_CLASS, HBaseOutputStorageDriver.class.getName());
+        tableParams.put(HBaseConstants.PROPERTY_COLUMN_MAPPING_KEY,":key,my_family:english,my_family:spanish");
+        tbl.setParameters(tableParams);
+
+        client.createTable(tbl);
+    }
+
+    protected List<FieldSchema> getTableColumns() {
+        List<FieldSchema> fields = new ArrayList<FieldSchema>();
+        fields.add(new FieldSchema("key", Constants.INT_TYPE_NAME, ""));
+        fields.add(new FieldSchema("english", Constants.STRING_TYPE_NAME, ""));
+        fields.add(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, ""));
+        return fields;
+    }
+
+    private static  List<HCatFieldSchema> generateDataColumns() throws HCatException {
+        List<HCatFieldSchema> dataColumns = new ArrayList<HCatFieldSchema>();
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("key", Constants.INT_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("english", Constants.STRING_TYPE_NAME, "")));
+        dataColumns.add(HCatSchemaUtils.getHCatFieldSchema(new FieldSchema("spanish", Constants.STRING_TYPE_NAME, "")));
+        return dataColumns;
+    }
+
+    public static class MapWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, Put> {
+
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            String vals[] = value.toString().split(",");
+            Put put = new Put(Bytes.toBytes(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                put.add(Bytes.toBytes("my_family"),
+                        Bytes.toBytes(pair[0]),
+                        Bytes.toBytes(pair[1]));
+            }
+            context.write(new ImmutableBytesWritable(Bytes.toBytes(vals[0])),put);
+        }
+    }
+
+    public static class MapHCatWrite extends Mapper<LongWritable, Text, ImmutableBytesWritable, HCatRecord> {
+        @Override
+        public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
+            HCatRecord record = new DefaultHCatRecord(3);
+            HCatSchema schema = new HCatSchema(generateDataColumns());
+            String vals[] = value.toString().split(",");
+            record.setInteger("key",schema,Integer.parseInt(vals[0]));
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                record.set(pair[0],schema,pair[1]);
+            }
+            context.write(null,record);
+        }
+    }
+
+    @Test
+    public void hbaseBulkOutputFormatTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String tableName = newTableName("hbaseBulkOutputFormatTest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        //create table
+        conf.set(HBaseConstants.PROPERTY_OUTPUT_TABLE_NAME_KEY, tableName);
+        createTable(tableName, new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(getTestDir()+"/hbaseBulkOutputFormatTest/inter");
+
+        //create job
+        Job job = new Job(conf, "bulk write");
+        job.setWorkingDirectory(new Path(getTestDir(),"hbaseBulkOutputFormatTest_MR"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(HBaseBulkOutputFormat.class);
+        SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job.getConfiguration()).exists(interPath));
+    }
+
+    @Test
+    public void importSequenceFileTest() throws IOException, ClassNotFoundException, InterruptedException {
+        String tableName = newTableName("importSequenceFileTest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+
+        String data[] = {"1,english:one,spanish:uno",
+                               "2,english:two,spanish:dos",
+                               "3,english:three,spanish:tres"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+        Path interPath = new Path(getTestDir()+"/ImportSequenceFileTest/inter");
+        Path scratchPath = new Path(getTestDir()+"/ImportSequenceFileTest/scratch");
+
+
+        //create job
+        Job job = new Job(conf, "sequence file write");
+        job.setWorkingDirectory(new Path(getTestDir(),"importSequenceFileTest_MR"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        SequenceFileOutputFormat.setOutputPath(job,interPath);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(Put.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        assertTrue(ImportSequenceFile.runJob(job.getConfiguration(),tableName,interPath,scratchPath));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+        //test if scratch directory was erased
+        assertFalse(FileSystem.get(job.getConfiguration()).exists(scratchPath));
+    }
+
+    @Test
+    public void hbaseOutputStorageDriverTestWithRevision() throws Exception {
+        String tableName = newTableName("mrtest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+        registerHBaseTable(tableName);
+
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        //create job
+        Job job = new Job(conf, "hcat mapreduce write test");
+        job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        outputJobInfo.getProperties().put(HBaseConstants.PROPERTY_OUTPUT_VERSION_KEY, "1");
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                assertEquals(1l,result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp());
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+    @Test
+    public void hbaseOutputStorageDriverTest() throws Exception {
+        String tableName = newTableName("mrtest");
+        byte[] tableNameBytes = Bytes.toBytes(tableName);
+        String familyName = "my_family";
+        byte[] familyNameBytes = Bytes.toBytes(familyName);
+
+
+        //include hbase config in conf file
+        Configuration conf = new Configuration(getJobConf());
+        for(Map.Entry<String,String> el: getHbaseConf()) {
+            if(el.getKey().startsWith("hbase.")) {
+                conf.set(el.getKey(),el.getValue());
+            }
+        }
+
+        conf.set(HCatConstants.HCAT_KEY_HIVE_CONF, HCatUtil.serialize(getHiveConf().getAllProperties()));
+
+        //create table
+        createTable(tableName,new String[]{familyName});
+        registerHBaseTable(tableName);
+
+
+        String data[] = {"1,english:ONE,spanish:UNO",
+                "2,english:ONE,spanish:DOS",
+                "3,english:ONE,spanish:TRES"};
+
+
+
+        // input/output settings
+        Path inputPath = new Path(getTestDir(), "mapred/testHCatMapReduceInput/");
+        getFileSystem().mkdirs(inputPath);
+        FSDataOutputStream os = getFileSystem().create(new Path(inputPath,"inputFile.txt"));
+        for(String line: data)
+            os.write(Bytes.toBytes(line + "\n"));
+        os.close();
+
+        long ubTimestamp = System.currentTimeMillis();
+        long lbTimestamp = ubTimestamp;
+        //create job
+        Job job = new Job(conf, "hcat mapreduce write test");
+        job.setWorkingDirectory(new Path(getTestDir(),"hbaseOutputStorageDriverTest_MR"));
+        job.setJarByClass(this.getClass());
+        job.setMapperClass(MapHCatWrite.class);
+
+        job.setInputFormatClass(TextInputFormat.class);
+        TextInputFormat.setInputPaths(job, inputPath);
+
+
+        job.setOutputFormatClass(HCatOutputFormat.class);
+        OutputJobInfo outputJobInfo = OutputJobInfo.create(null,tableName,null,null,null);
+        HCatOutputFormat.setOutput(job,outputJobInfo);
+        ubTimestamp = System.currentTimeMillis();
+        System.out.println("ub: "+ubTimestamp);
+
+
+        job.setMapOutputKeyClass(ImmutableBytesWritable.class);
+        job.setMapOutputValueClass(HCatRecord.class);
+
+        job.setOutputKeyClass(ImmutableBytesWritable.class);
+        job.setOutputValueClass(Put.class);
+
+        job.setNumReduceTasks(0);
+
+        assertTrue(job.waitForCompletion(true));
+
+        //verify
+        HTable table = new HTable(conf, tableName);
+        Scan scan = new Scan();
+        scan.addFamily(Bytes.toBytes("my_family"));
+        ResultScanner scanner = table.getScanner(scan);
+        int index=0;
+        Long prevTimestamp = null;
+        for(Result result: scanner) {
+            String vals[] = data[index].toString().split(",");
+            for(int i=1;i<vals.length;i++) {
+                String pair[] = vals[i].split(":");
+                assertTrue(result.containsColumn(familyNameBytes,Bytes.toBytes(pair[0])));
+                assertEquals(pair[1],Bytes.toString(result.getValue(familyNameBytes,Bytes.toBytes(pair[0]))));
+                Long timestamp = result.getColumn(familyNameBytes,Bytes.toBytes(pair[0])).get(0).getTimestamp();
+                if(prevTimestamp == null)
+                    prevTimestamp = timestamp;
+                else
+                    assertEquals(prevTimestamp+"="+timestamp,
+                                 prevTimestamp,
+                                 timestamp);
+                assertTrue(lbTimestamp+"<="+timestamp+"<="+ubTimestamp,
+                           timestamp >= lbTimestamp && timestamp <= ubTimestamp);
+            }
+            index++;
+        }
+        //test if load count is the same
+        assertEquals(data.length,index);
+    }
+
+}



Mime
View raw message