hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Trivial Update of "Hbase/MapReduce" by stack
Date Wed, 26 Mar 2008 18:35:55 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The following page has been changed by stack:
http://wiki.apache.org/hadoop/Hbase/MapReduce

The comment on the change is:
Added sample MR Bulk Uploader

------------------------------------------------------------------------------
  
  Reading from hbase, the !TableInputFormat asks hbase for the list of regions and makes a
map-per-region.  Writing, its better to have lots of reducers so load is spread across the
hbase cluster.
  
+ = Sample MR Bulk Uploader =
+ Read the class comment below for specification of inputs, prerequisites, etc.
+ {{{
+ package org.apache.hadoop.hbase.mapred;
+ 
+ import java.io.IOException;
+ import java.util.Iterator;
+ 
+ import org.apache.hadoop.conf.Configuration;
+ import org.apache.hadoop.fs.Path;
+ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
+ import org.apache.hadoop.io.LongWritable;
+ import org.apache.hadoop.io.MapWritable;
+ import org.apache.hadoop.io.Text;
+ import org.apache.hadoop.mapred.JobClient;
+ import org.apache.hadoop.mapred.JobConf;
+ import org.apache.hadoop.mapred.MapReduceBase;
+ import org.apache.hadoop.mapred.Mapper;
+ import org.apache.hadoop.mapred.OutputCollector;
+ import org.apache.hadoop.mapred.Reporter;
+ import org.apache.hadoop.util.Tool;
+ import org.apache.hadoop.util.ToolRunner;
+ 
+ /**
+  * Sample uploader.
+  * 
+  * This is EXAMPLE code.  You will need to change it to work for your context.
+  * 
+  * Uses TableReduce to put the data into hbase. Change the InputFormat to suit
+  * your data. Use the map to massage the input so it fits hbase.  Currently its
+  * just a pass-through map.  In the reduce, you need to output a row and a
+  * map of columns to cells.  Change map and reduce to suit your input.
+  * 
+  * <p>The below is wired up to handle an input whose format is a text file
+  * which has a line format as follow:
+  * <pre>
+  * row columnname columndata
+  * </pre>
+  * 
+  * <p>The table and columnfamily we're to insert into must preexist.
+  * 
+  * <p> To run, edit your hadoop-env.sh and add hbase classes and conf to your
+  * HADOOP_CLASSPATH.  For example:
+  * <pre>
+  * export HADOOP_CLASSPATH=/Users/stack/Documents/checkouts/hbase/branches/0.1/build/classes:/Users/stack/Documents/checkouts/hbase/branches/0.1/conf
+  * </pre>
+  * <p>Restart your MR cluster after making the following change (You need to 
+  * be running in pseudo-distributed mode at a minimum for the hadoop to see
+  * the above additions to your CLASSPATH).
+  * 
+  * <p>Start up your hbase cluster.
+  * 
+  * <p>Next do the following to start the MR job:
+  * <pre>
+  * ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
+  * </pre>
+  * 
+  * <p>This code was written against hbase 0.1 branch.
+  */
+ public class SampleUploader extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
+   private static final String NAME = "SampleUploader";
+   private Configuration conf;
+ 
+   public JobConf createSubmittableJob(String[] args) {
+     JobConf c = new JobConf(getConf(), SampleUploader.class);
+     c.setJobName(NAME);
+     c.setInputPath(new Path(args[0]));
+     c.setMapperClass(this.getClass());
+     c.setMapOutputKeyClass(Text.class);
+     c.setMapOutputValueClass(MapWritable.class);
+     c.setReducerClass(TableUploader.class);
+     TableReduce.initJob(args[1], TableUploader.class, c);
+     return c;
+   } 
+ 
+   public void map(LongWritable k, Text v,
+     OutputCollector<Text, MapWritable> output, Reporter r)
+   throws IOException {
+     // Lines are space-delimited; first item is row, next the columnname and
+     // then the third the cell value.
+     String tmp = v.toString();
+     if (tmp.length() == 0) {
+       return;
+     }
+     String [] splits = v.toString().split(" ");
+     MapWritable mw = new MapWritable();
+     mw.put(new Text(splits[1]),
+       new ImmutableBytesWritable(splits[2].getBytes()));
+     String row = splits[0];
+     r.setStatus("Map emitting " + row + " for record " + k.toString());
+     output.collect(new Text(row), mw);
+   }
+   
+   public static class TableUploader
+   extends TableReduce<Text, MapWritable> {
+     @Override
+     public void reduce(Text k, Iterator<MapWritable> v,
+       OutputCollector<Text, MapWritable> output, Reporter r)
+     throws IOException {
+       while (v.hasNext()) {
+         r.setStatus("Reducer committing " + k);
+         output.collect(k, v.next());
+       }
+     }
+   }
+   
+   static int printUsage() {
+     System.out.println(NAME + " <input> <table_name>");
+     return -1;
+   } 
+     
+   public int run(@SuppressWarnings("unused") String[] args) throws Exception {
+     // Make sure there are exactly 2 parameters left.
+     if (args.length != 2) {
+       System.out.println("ERROR: Wrong number of parameters: " +
+         args.length + " instead of 2.");
+       return printUsage();
+     }
+     JobClient.runJob(createSubmittableJob(args));
+     return 0;
+   }
+     
+   public Configuration getConf() {
+     return this.conf;
+   } 
+   
+   public void setConf(final Configuration c) {
+     this.conf = c;
+   }
+ 
+   public static void main(String[] args) throws Exception {
+     int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
+       args);
+     System.exit(errCode);
+   }
+ }
+ }}}
+ 

Mime
View raw message