hadoop-common-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Apache Wiki <wikidi...@apache.org>
Subject [Hadoop Wiki] Update of "Hbase/MapReduce" by Misty
Date Fri, 23 Oct 2015 00:07:16 GMT
Dear Wiki user,

You have subscribed to a wiki page or wiki category on "Hadoop Wiki" for change notification.

The "Hbase/MapReduce" page has been changed by Misty:
https://wiki.apache.org/hadoop/Hbase/MapReduce?action=diff&rev1=23&rev2=24

- = Hbase, MapReduce and the CLASSPATH =
- 
- 
- 
- 
  ----
  '''DEPRECATED!!!  Instead see the new [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapreduce/package-summary.html#package_description|mapreduce
package description]] which supersedes the old [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/package-summary.html#package_description|mapred
package description]]'''
  ----
  
- 
- 
- 
- 
- !MapReduce jobs deployed to a mapreduce cluster do not by default have access to the hbase
configuration under ''$HBASE_CONF_DIR'' nor to hbase classes.
- 
- You could add ''hbase-site.xml'' to $HADOOP_HOME/conf and add hbase.jar to the $HADOOP_HOME/lib
and copy these changes across your cluster but the cleanest means of adding hbase configuration
and classes to the cluster ''CLASSPATH'' is by uncommenting ''HADOOP_CLASSPATH'' in ''$HADOOP_HOME/conf/hadoop-env.sh''
and adding the path to the hbase jar and ''$HBASE_CONF_DIR'' directory.  Then copy the amended
configuration around the cluster.  You'll probably need to restart the mapreduce cluster if
you want it to notice the new configuration (You may not have to).
- 
- Below, find an example of how you would amend $HADOOP_HOME/conf/hadoop-env.sh adding the
hbase jar, conf.  This example assumes you are using the hbase-0.2.0 release, with additional
commented export commands for other releases/builds:
- 
- {{{
- # Extra Java CLASSPATH elements.  Optional.
- # export HADOOP_CLASSPATH=
- # for hbase-0.2.0 release
- export HADOOP_CLASSPATH=$HBASE_HOME/build/hbase-0.2.0.jar:$HBASE_HOME/conf
- # for 0.16.0 release
- #export HADOOP_CLASSPATH=$HBASE_HOME/hadoop-0.16.0-hbase.jar:$HBASE_HOME/conf
- # for 0.16.0 developer build
- #export HADOOP_CLASSPATH=$HBASE_HOME/build/test:$HBASE_HOME/build/hadoop-0.16.0-hbase.jar:$HBASE_HOME/conf
- }}}
- 
- Expand $HBASE_HOME appropriately in the in accordance with your local environment.  To use
the developer versions of the HADOOP_CLASSPATH, you first need to execute "ant" or "ant build"
in $HBASE_HOME.  This will create some .jar files in the $HBASE_HOME/build directory.  To
use the !PerformanceEvaluation class from hbase test classes, you must use a developer build.
 Then, this is how you would run the PerformanceEvaluation MR job to put up 4 clients:
- 
- {{{ > $HADOOP_HOME/bin/hadoop org.apache.hadoop.hbase.PerformanceEvaluation sequentialWrite
4}}}
- 
- The PerformanceEvaluation class wil be found on the CLASSPATH because you added $HBASE_HOME/build/test
to HADOOP_CLASSPATH
- 
- NOTE: While previous it used to be possible to bundle the hbase.jar up inside the job jar
you submit to hadoop, as of 0.2.0RC2, this is no longer so.  See [[https://issues.apache.org/jira/browse/HBASE-797|HBASE-797]].
- 
- = Hbase as MapReduce job data source and sink =
- 
- Hbase can be used as a data source, [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/TableInputFormat.html|TableInputFormat]],
and data sink, [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/TableOutputFormat.html|TableOutputFormat]],
for mapreduce jobs.  Writing mapreduce jobs that read or write hbase, you'll probably want
to subclass [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/TableMap.html|TableMap]]
and/or [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/TableReduce.html|TableReduce]].
 See the do-nothing passthrough classes [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/IdentityTableMap.html|IdentityTableMap]]
and [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/IdentityTableReduce.html|IdentityTableReduce]]
for basic usage.  For a more involved example, see [[http://hadoop.apache.org/hbase/docs/current/api/org/apache/hadoop/hbase/mapred/BuildTableIndex.html|BuildTableIndex]]
from the same package or review the org.apache.hadoop.hbase.mapred.!TestTableMapReduce unit
test.
- 
- Running mapreduce jobs that have hbase as source or sink, you'll need to specify source/sink
table and column names in your configuration.
- 
- Reading from hbase, the !TableInputFormat asks hbase for the list of regions and makes a
map-per-region.  Writing, it may make sense to avoid the reduce step and write back into hbase
from inside your map.  You'd do this when your job does not need the sort and collation that
MR does inside in its reduce; on insert, hbase sorts so no point double-sorting (and shuffling
data around your MR cluster) unless you need to.  If you do not need the reduce, you might
just have your map emit counts of records processed just so the framework can emit that nice
report of records processed when the job is done.  See example code below.  If running the
reduce step makes sense in  your case, its better to have lots of reducers so load is spread
across the hbase cluster.
- 
- == Example to bulk import/load a text file into an HTable ==
- 
- Here's a sample program from [[http://spicylogic.com/allenday/blog|Allen Day]] that takes
an HDFS text file path and an HBase table name as inputs, and loads the contents of the text
file to the table.
- 
- {{{
- package com.spicylogic.hbase;
- import java.io.IOException;
- import java.util.Iterator;
- import java.util.Map;
- 
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.HBaseConfiguration;
- import org.apache.hadoop.hbase.client.HTable;
- import org.apache.hadoop.hbase.io.BatchUpdate;
- import org.apache.hadoop.hbase.mapred.TableOutputFormat;
- import org.apache.hadoop.hbase.mapred.TableReduce;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.MapWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reducer;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.mapred.lib.NullOutputFormat;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- 
- public class BulkImport implements Tool {
-   private static final String NAME = "BulkImport";
-   private Configuration conf;
- 
-   public static class InnerMap extends MapReduceBase implements Mapper<LongWritable,
Text, Text, Text> {
-     private HTable table;
-     private HBaseConfiguration HBconf;
- 
-     public void map(LongWritable key, Text value, OutputCollector<Text, Text> output,
Reporter reporter) throws IOException {
-       if ( table == null )
-         throw new IOException("table is null");
-       
-       String [] splits = value.toString().split("\t");
-       if ( splits.length != 4 )
-         return;
- 
-       String rowID     = splits[0];
-       int timestamp    = Integer.parseInt( splits[1] );
-       String colID     = splits[2];
-       String cellValue = splits[3];
- 
-       reporter.setStatus("Map emitting cell for row='" + rowID + "', column='" + colID +
"', time='" + timestamp + "'");
- 
-       BatchUpdate bu = new BatchUpdate( rowID );
-       if ( timestamp > 0 )
-         bu.setTimestamp( timestamp );
- 
-       bu.put(colID, cellValue.getBytes());      
-       table.commit( bu );      
-     }
-     public void configure(JobConf job) {
-       HBconf = new HBaseConfiguration();
-       try {
-         table = new HTable( HBconf, job.get("input.table") );
-       } catch (IOException e) {
-         // TODO Auto-generated catch block
-         e.printStackTrace();
-       }
-     }
-   }
-   
-   
-   public JobConf createSubmittableJob(String[] args) {
-     JobConf c = new JobConf(getConf(), BulkImport.class);
-     c.setJobName(NAME);
-     c.setInputPath(new Path(args[0]));
- 
-     c.set("input.table", args[1]);
-     c.setMapperClass(InnerMap.class);
-     c.setNumReduceTasks(0);
-     c.setOutputFormat(NullOutputFormat.class);
-     return c;
-   }
-   
-   static int printUsage() {
-     System.err.println("Usage: " + NAME + " <input> <table_name>");
-     System.err.println("\twhere <input> is a tab-delimited text file with 4 columns.");
-     System.err.println("\t\tcolumn 1 = row ID");
-     System.err.println("\t\tcolumn 2 = timestamp (use a negative value for current time)");
-     System.err.println("\t\tcolumn 3 = column ID");
-     System.err.println("\t\tcolumn 4 = cell value");
-     return -1;
-   } 
- 
-   public int run(@SuppressWarnings("unused") String[] args) throws Exception {
-     // Make sure there are exactly 3 parameters left.
-     if (args.length != 2) {
-       return printUsage();
-     }
-     JobClient.runJob(createSubmittableJob(args));
-     return 0;
-   }
- 
-   public Configuration getConf() {
-     return this.conf;
-   } 
- 
-   public void setConf(final Configuration c) {
-     this.conf = c;
-   }
- 
-   public static void main(String[] args) throws Exception {
-     int errCode = ToolRunner.run(new Configuration(), new BulkImport(), args);
-     System.exit(errCode);
-   }
- }
- }}}
- 
- == Example to map rows/column families between two HTables ==
- 
- Here another sample program from [[http://spicylogic.com/allenday/blog|Allen Day]] that
will iterate over all rows in one table for specified column families and insert those rows/columns
to a second table.
- 
- {{{
- package com.spicylogic.hbase;
- import java.io.IOException;
- 
- public class BulkCopy extends TableMap<Text, Text> implements Tool {
-   static final String NAME = "bulkcopy";  
-   private Configuration conf;
-   
-   public void map(ImmutableBytesWritable row, RowResult value, OutputCollector<Text,
Text> output, Reporter reporter) throws IOException {
-     HTable table = new HTable(new HBaseConfiguration(), conf.get("output.table"));
-     if ( table == null ) {
-       throw new IOException("output table is null");
-     }
- 
-     BatchUpdate bu = new BatchUpdate( row.get() );
- 
-     boolean content = false;
-     for (Map.Entry<byte [], Cell> e: value.entrySet()) {
-       Cell cell = e.getValue();
-       if (cell != null && cell.getValue().length > 0) {
-         bu.put(e.getKey(), cell.getValue());
-       }
-     }
-     table.commit( bu );
-   }
- 
-   public JobConf createSubmittableJob(String[] args) throws IOException {
-     JobConf c = new JobConf(getConf(), BulkExport.class);
-     //table = new HTable(new HBaseConfiguration(), args[2]);
-     c.set("output.table", args[2]);
-     c.setJobName(NAME);
-     // Columns are space delimited
-     StringBuilder sb = new StringBuilder();
-     final int columnoffset = 3;
-     for (int i = columnoffset; i < args.length; i++) {
-       if (i > columnoffset) {
-         sb.append(" ");
-       }
-       sb.append(args[i]);
-     }
-     // Second argument is the table name.
-     TableMap.initJob(args[1], sb.toString(), this.getClass(),
-     Text.class, Text.class, c);
-     c.setReducerClass(IdentityReducer.class);
-     // First arg is the output directory.
-     c.setOutputPath(new Path(args[0]));
-     return c;
-   }
-   
-   static int printUsage() {
-     System.out.println(NAME +" <outputdir> <input tablename> <output tablename>
<column1> [<column2>...]");
-     return -1;
-   }
-   
-   public int run(final String[] args) throws Exception {
-     // Make sure there are at least 3 parameters
-     if (args.length < 3) {
-       System.err.println("ERROR: Wrong number of parameters: " + args.length);
-       return printUsage();
-     }
-     JobClient.runJob(createSubmittableJob(args));
-     return 0;
-   }
- 
-   public Configuration getConf() {
-     return this.conf;
-   }
- 
-   public void setConf(final Configuration c) {
-     this.conf = c;
-   }
- 
-   public static void main(String[] args) throws Exception {
-     //String[] aa = {"/tmp/foobar", "M2", "M3", "R:"};
-     int errCode = ToolRunner.run(new HBaseConfiguration(), new BulkCopy(), args);
-     System.exit(errCode);
-   }
- }
- }}}
- 
- 
- == Sample running HBase inserts out of Map Task ==
- Here's sample code from Andrew Purtell that does HBase insert inside in the mapper rather
than via TableReduce.
- {{{
- public class MyMap 
-   extends TableMap<ImmutableBytesWritable,MapWritable> // or whatever
- {
-   private HTable table;
- 
-   public void configure(JobConf job) {
-     super.configure(job);
-     try {
-       HBaseConfiguration conf = new HBaseConfiguration(job);
-       table = new HTable(conf, "mytable");
-     } catch (Exception) {
-       // can't do anything about this now
-     }
-   }
- 
-   public void map(ImmutableBytesWritable key, RowResult value,
-     OutputCollector<ImmutableBytesWritable,MapWritable> output,
-     Reporter reporter) throws IOException
-   {
-     // now we can report an exception opening the table
-     if (table == null)
-       throw new IOException("could not open mytable");
- 
-     // ...
- 
-     // commit the result
-     BatchUpdate update = new BatchUpdate();
-     // ...
-     table.commit(update);
-   }
- }
- }}}
- This assumes that you do this when setting up your job: {{{JobConf conf = new JobConf(new
HBaseConfiguration());}}}
- 
- Or maybe something like this:
- 
-    {{{JobConf conf = new JobConf(new Configuration());
- conf.set("hbase.master", myMaster);}}}
- 
- 
- = Sample MR+HBase Jobs =
- A [[http://www.nabble.com/Re%3A-Map-Reduce-over-HBase---sample-code-p18253120.html|students/classes
example]] by Naama Kraus.
- 
- == Sample MR Bulk Uploader ==
- Read the class comment below for specification of inputs, prerequisites, etc.  In particular,
note that the class comment says that this code is for hbase 0.1.x.
- {{{
- package org.apache.hadoop.hbase.mapred;
- 
- import java.io.IOException;
- import java.util.Iterator;
- 
- import org.apache.hadoop.conf.Configuration;
- import org.apache.hadoop.fs.Path;
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
- import org.apache.hadoop.io.LongWritable;
- import org.apache.hadoop.io.MapWritable;
- import org.apache.hadoop.io.Text;
- import org.apache.hadoop.mapred.JobClient;
- import org.apache.hadoop.mapred.JobConf;
- import org.apache.hadoop.mapred.MapReduceBase;
- import org.apache.hadoop.mapred.Mapper;
- import org.apache.hadoop.mapred.OutputCollector;
- import org.apache.hadoop.mapred.Reporter;
- import org.apache.hadoop.util.Tool;
- import org.apache.hadoop.util.ToolRunner;
- 
- /**
-  * Sample uploader.
-  * 
-  * This is EXAMPLE code.  You will need to change it to work for your context.
-  * 
-  * Uses TableReduce to put the data into hbase. Change the InputFormat to suit
-  * your data. Use the map to massage the input so it fits hbase.  Currently its
-  * just a pass-through map.  In the reduce, you need to output a row and a
-  * map of columns to cells.  Change map and reduce to suit your input.
-  * 
-  * <p>The below is wired up to handle an input whose format is a text file
-  * which has a line format as follow:
-  * <pre>
-  * row columnname columndata
-  * </pre>
-  * 
-  * <p>The table and columnfamily we're to insert into must preexist.
-  * 
-  * <p> To run, edit your hadoop-env.sh and add hbase classes and conf to your
-  * HADOOP_CLASSPATH.  For example:
-  * <pre>
-  * export HADOOP_CLASSPATH=/Users/stack/Documents/checkouts/hbase/branches/0.1/build/classes:/Users/stack/Documents/checkouts/hbase/branches/0.1/conf
-  * </pre>
-  * <p>Restart your MR cluster after making the following change (You need to 
-  * be running in pseudo-distributed mode at a minimum for the hadoop to see
-  * the above additions to your CLASSPATH).
-  * 
-  * <p>Start up your hbase cluster.
-  * 
-  * <p>Next do the following to start the MR job:
-  * <pre>
-  * ./bin/hadoop org.apache.hadoop.hbase.mapred.SampleUploader /tmp/input.txt TABLE_NAME
-  * </pre>
-  * 
-  * <p>This code was written against hbase 0.1 branch.
-  */
- public class SampleUploader extends MapReduceBase
- implements Mapper<LongWritable, Text, Text, MapWritable>, Tool {
-   private static final String NAME = "SampleUploader";
-   private Configuration conf;
- 
-   public JobConf createSubmittableJob(String[] args) {
-     JobConf c = new JobConf(getConf(), SampleUploader.class);
-     c.setJobName(NAME);
-     c.setInputPath(new Path(args[0]));
-     c.setMapperClass(this.getClass());
-     c.setMapOutputKeyClass(Text.class);
-     c.setMapOutputValueClass(MapWritable.class);
-     c.setReducerClass(TableUploader.class);
-     TableReduce.initJob(args[1], TableUploader.class, c);
-     return c;
-   } 
- 
-   public void map(LongWritable k, Text v,
-     OutputCollector<Text, MapWritable> output, Reporter r)
-   throws IOException {
-     // Lines are space-delimited; first item is row, next the columnname and
-     // then the third the cell value.
-     String tmp = v.toString();
-     if (tmp.length() == 0) {
-       return;
-     }
-     String [] splits = v.toString().split(" ");
-     MapWritable mw = new MapWritable();
-     mw.put(new Text(splits[1]),
-       new ImmutableBytesWritable(splits[2].getBytes()));
-     String row = splits[0];
-     r.setStatus("Map emitting " + row + " for record " + k.toString());
-     output.collect(new Text(row), mw);
-   }
-   
-   public static class TableUploader
-   extends TableReduce<Text, MapWritable> {
-     @Override
-     public void reduce(Text k, Iterator<MapWritable> v,
-       OutputCollector<Text, MapWritable> output, Reporter r)
-     throws IOException {
-       while (v.hasNext()) {
-         r.setStatus("Reducer committing " + k);
-         output.collect(k, v.next());
-       }
-     }
-   }
-   
-   static int printUsage() {
-     System.out.println(NAME + " <input> <table_name>");
-     return -1;
-   } 
-     
-   public int run(@SuppressWarnings("unused") String[] args) throws Exception {
-     // Make sure there are exactly 2 parameters left.
-     if (args.length != 2) {
-       System.out.println("ERROR: Wrong number of parameters: " +
-         args.length + " instead of 2.");
-       return printUsage();
-     }
-     JobClient.runJob(createSubmittableJob(args));
-     return 0;
-   }
-     
-   public Configuration getConf() {
-     return this.conf;
-   } 
-   
-   public void setConf(final Configuration c) {
-     this.conf = c;
-   }
- 
-   public static void main(String[] args) throws Exception {
-     int errCode = ToolRunner.run(new Configuration(), new SampleUploader(),
-       args);
-     System.exit(errCode);
-   }
- }
- }}}
- 

Mime
View raw message