hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "Hbase/MapReduce" by allenday
Date Wed, 03 Sep 2008 00:33:06 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The following page has been changed by allenday:
http://wiki.apache.org/hadoop/Hbase/MapReduce

The comment on the change is:
adding the BulkImport example

------------------------------------------------------------------------------
  
  Reading from hbase, the !TableInputFormat asks hbase for the list of regions and makes a
map-per-region.  Writing, it may make sense to avoid the reduce step and write back into hbase
from inside your map.  You'd do this when your job does not need the sort and collation that
MR does inside in its reduce; on insert, hbase sorts so no point double-sorting (and shuffling
data around your MR cluster) unless you need to.  If you do not need the reduce, you might
just have your map emit counts of records processed just so the framework can emit that nice
report of records processed when the job is done.  See example code below.  If running the
reduce step makes sense in  your case, its better to have lots of reducers so load is spread
across the hbase cluster.
  
+ == Example to bulk import/load a text file into an HTable ==
+ 
+ Here's a sample program from [http://spicylogic.com/allenday/blog Allen Day] that takes
an HDFS text file path and an HBase table name as inputs, and loads the contents of the text
file to the table.
+ 
+ {{{
+ package com.spicylogic.hbase;
+ import java.io.IOException;
+ import java.util.Iterator;
+ import java.util.Map;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.HBaseConfiguration;
+ import org.apache.hadoop.hbase.client.HTable;
+ import org.apache.hadoop.hbase.io.BatchUpdate;
+ import org.apache.hadoop.hbase.mapred.TableOutputFormat;
+ import org.apache.hadoop.hbase.mapred.TableReduce;
+ import org.apache.hadoop.io.LongWritable;
+ import org.apache.hadoop.io.MapWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapred.JobClient;
+ import org.apache.hadoop.mapred.JobConf;
+ import org.apache.hadoop.mapred.MapReduceBase;
+ import org.apache.hadoop.mapred.Mapper;
+ import org.apache.hadoop.mapred.OutputCollector;
+ import org.apache.hadoop.mapred.Reducer;
+ import org.apache.hadoop.mapred.Reporter;
+ import org.apache.hadoop.mapred.lib.NullOutputFormat;
+ import org.apache.hadoop.util.Tool;
+ import org.apache.hadoop.util.ToolRunner;
+ 
+ public class BulkImport implements Tool {
+   private static final String NAME = "BulkImport";
+   private Configuration conf;
+ 
+   public static class InnerMap extends MapReduceBase implements Mapper<LongWritable,
Text, Text, Text> {
+     private HTable table;
+     private HBaseConfiguration HBconf;
+ 
+     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
+       if ( table == null )
+         throw new IOException("table is null");
+       
+       String [] splits = value.toString().split("\t");
+       if ( splits.length != 4 )
+         return;
+ 
+       String rowID     = splits[0];
+       int timestamp    = Integer.parseInt( splits[1] );
+       String colID     = splits[2];
+       String cellValue = splits[3];
+ 
+       reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID +
"', time='" + timestamp + "'");
+ 
+       BatchUpdate bu = new BatchUpdate( rowID );
+       if ( timestamp > 0 )
+         bu.setTimestamp( timestamp );
+ 
+       bu.put(colID, cellValue.getBytes());      
+       table.commit( bu );      
+     }
+     public void configure(JobConf job) {
+       HBconf = new HBaseConfiguration();
+       try {
+         table = new HTable( HBconf, job.get("input.table") );
+       } catch (IOException e) {
+         // TODO Auto-generated catch block
+         e.printStackTrace();
+       }
+     }
+   }
+   
+   
+   public JobConf createSubmittableJob(String[] args) {
+     JobConf c = new JobConf(getConf(), BulkImport.class);
+     c.setJobName(NAME);
+     c.setInputPath(new Path(args[0]));
+ 
+     c.set("input.table", args[1]);
+     c.setMapperClass(InnerMap.class);
+     c.setNumReduceTasks(0);
+     c.setOutputFormat(NullOutputFormat.class);
+     return c;
+   }
+   
+   static int printUsage() {
+     System.err.println("Usage: " + NAME + " <input> <table_name>");
+     System.err.println("\twhere <input> is a tab-delimited text file with 4 columns.");
+     System.err.println("\t\tcolumn 1 = row ID");
+     System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
+     System.err.println("\t\tcolumn 3 = column ID");
+     System.err.println("\t\tcolumn 4 = cell value");
+     return -1;
+   } 
+ 
+   public int run(@SuppressWarnings("unused") String[] args) throws Exception {
+     // Make sure there are exactly 3 parameters left.
+     if (args.length != 2) {
+       return printUsage();
+     }
+     JobClient.runJob(createSubmittableJob(args));
+     return 0;
+   }
+ 
+   public Configuration getConf() {
+     return this.conf;
+   } 
+ 
+   public void setConf(final Configuration c) {
+     this.conf = c;
+   }
+ 
+   public static void main(String[] args) throws Exception {
+     int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
+     System.exit(errCode);
+   }
+ }
+ }}}
+ 
  == Example to map rows/column families between two HTables ==
  
- Here's some sample code from [http://spicylogic.com/allenday/blog Allen Day] that will iterate
over all rows in one table for specified column families and insert those rows/columns to
a second table.
+ Here another sample program from [http://spicylogic.com/allenday/blog Allen Day] that will
iterate over all rows in one table for specified column families and insert those rows/columns
to a second table.
  
  {{{
+ package com.spicylogic.hbase;
  import java.io.IOException;
  
  public class BulkCopy extends TableMap<Text, Text> implements Tool {

Mime
View raw message