hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: MapReduce job with mixed data sources: HBase table and HDFS files
Date Fri, 12 Jul 2013 04:54:23 GMT
Did you use org.apache.hadoop.mapreduce.lib.input.MultipleInputs or the one
from org.apache.hadoop.mapred.lib ?

Which hadoop version do you use ?

Cheers

On Thu, Jul 11, 2013 at 9:49 PM, S. Zhou <myxjtu@yahoo.com> wrote:

> Thanks Ted &Azurry. Your hint helped me solve that particular issue.
>
> But now I run into a new problem with multipleInputs. This time I add a
> HTable and a HDFS file as inputs. (see the new code below). The problem is:
> whatever data source added later overrides the data source added before.
> For example, if I add HBase table first to MultipleInputs first then HDFS
> file, then the final result from reducer only contains the results of HDFS
> file. On the other hand, if I add HDFS file first then HBase table, then
> the final result from reducer only contains the results of HBase table.
>
> public class MixMR {
>        public static class Map extends Mapper<Object, Text, Text, Text> {
>
>                 public void map(Object key, Text value, Context context)
> throws IOException,   InterruptedException {
>                         String s = value.toString();
>                         String[] sa = s.split(",");
>                         if (sa.length == 2) {
>                                 context.write(new Text(sa[0]), new
> Text(sa[1]));
>
>                         }
>
>                 }
>
>         }
>
>     public static class TableMap extends TableMapper<Text, Text>  {
>         public static final byte[] CF = "cf".getBytes();
>         public static final byte[] ATTR1 = "c1".getBytes();
>
>         public void map(ImmutableBytesWritable row, Result value, Context
> context) throws IOException, InterruptedException {
>
>             String key = Bytes.toString(row.get());
>             String val = new String(value.getValue(CF, ATTR1));
>
>             context.write(new Text(key), new Text(val));
>         }
>     }
>
>
>     public static class Reduce extends Reducer  <Object, Text, Object,
> Text> {
>         public void reduce(Object key, Iterable<Text> values, Context
> context)
>                 throws IOException, InterruptedException {
>             String ks = key.toString();
>             for (Text val : values){
>                 context.write(new Text(ks), val);
>             }
>
>         }
>     }
>
>  public static void main(String[] args) throws Exception {
>                   Path inputPath1 = new Path(args[0]);
>                 Path outputPath = new Path(args[1]);
>
>                 String tableName1 = "sezhou-test";
>
>                 Configuration config1 = HBaseConfiguration.create();
>                 Job job1 = new Job(config1, "ExampleRead");
>
> job1.setJarByClass(com.shopping.test.dealstore.MixMR.class);     // class
> that contains mapper
>
>
>                 Scan scan1 = new Scan();
>                 scan1.setCaching(500);        // 1 is the default in Scan,
> which will be bad for MapReduce jobs
>                 scan1.setCacheBlocks(false);  // don't set to true for MR
> jobs
>                 scan1.addFamily(Bytes.toBytes("cf"));
>
>
>                 TableMapReduceUtil.initTableMapperJob(
>                                 tableName1,        // input HBase table
> name
>                                   scan1,             // Scan instance to
> control CF and attribute selection
>
>                                   TableMap.class,   // mapper
>                                   Text.class,             // mapper output
> key
>                                   Text.class,             // mapper output
> value
>                                   job1);
>
>
>                 job1.setReducerClass(Reduce.class);    // reducer class
>                 job1.setOutputFormatClass(TextOutputFormat.class);
>
>                 // the result from reducer only contains the HBase table
>                 MultipleInputs.addInputPath(job1, inputPath1,
> TextInputFormat.class, Map.class);
>                 MultipleInputs.addInputPath(job1, inputPath1,
> TableInputFormat.class, TableMap.class); // inputPath1 here has no effect
> for HBase table
>
>                 FileOutputFormat.setOutputPath(job1, outputPath);
>
>                 job1.waitForCompletion(true);
> }
> }
>
>
>
>
>   ------------------------------
>  *From:* Ted Yu <yuzhihong@gmail.com>
> *To:* user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
> *Sent:* Thursday, July 11, 2013 3:51 PM
>
> *Subject:* Re: MapReduce job with mixed data sources: HBase table and
> HDFS files
>
> TextInputFormat wouldn't work:
> public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
> Take a look at TableInputFormatBase or the class(es) which extend it:
> public abstract class TableInputFormatBase
> implements InputFormat<ImmutableBytesWritable, Result> {
>
> Cheers
>
> On Thu, Jul 11, 2013 at 3:44 PM, S. Zhou <myxjtu@yahoo.com> wrote:
>
> Thanks very much for the help, Ted & Azurry. I wrote a very simple MR
> program which takes HBase table as input and outputs to a HDFS file.
> Unfortunately, I run into the following error:
>
> java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be
> cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> I run on pseudo-distributed hadoop (1.2.0) and Pseudo-distributed HBase
> (0.95.1-hadoop1).
>
> Here is the complete source code: an interesting thing is: if I comment
> out the multipleinputs line "MultipleInputs.addInputPath(job, inputPath1,
> TextInputFormat.class, TableMap.class);", the MR job runs fine.
>
> public class MixMR {
>
>     public static class TableMap extends TableMapper<Text, Text>  {
>         public static final byte[] CF = "cf".getBytes();
>         public static final byte[] ATTR1 = "c1".getBytes();
>
>         public void map(ImmutableBytesWritable row, Result value, Context
> context) throws IOException, InterruptedException {
>
>             String key = Bytes.toString(row.get());
>             String val = new String(value.getValue(CF, ATTR1));
>
>             context.write(new Text(key), new Text(val));
>         }
>     }
>
>
>     public static class Reduce extends Reducer  <Object, Text, Object,
> Text> {
>         public void reduce(Object key, Iterable<Text> values, Context
> context)
>                 throws IOException, InterruptedException {
>             String ks = key.toString();
>             for (Text val : values){
>                 context.write(new Text(ks), val);
>             }
>
>         }
>     }
>
>  public static void main(String[] args) throws Exception {
>         Path inputPath1 = new Path(args[0]);
>         Path outputPath = new Path(args[1]);
>
>         String tableName1 = "test";
>
>         Configuration config = HBaseConfiguration.create();
>         Job job = new Job(config, "ExampleRead");
>         job.setJarByClass(MixMR.class);     // class that contains mapper
>
>
>         Scan scan = new Scan();
>         scan.setCaching(500);        // 1 is the default in Scan, which
> will be bad for MapReduce jobs
>         scan.setCacheBlocks(false);  // don't set to true for MR jobs
>         scan.addFamily(Bytes.toBytes("cf"));
>
>         TableMapReduceUtil.initTableMapperJob(
>                 tableName1,        // input HBase table name
>                   scan,             // Scan instance to control CF and
> attribute selection
>                   TableMap.class,   // mapper
>                   Text.class,             // mapper output key
>                   Text.class,             // mapper output value
>                   job);
>         job.setReducerClass(Reduce.class);    // reducer class
>         job.setOutputFormatClass(TextOutputFormat.class);
>
>         // inputPath1 here has no effect for HBase table
>         MultipleInputs.addInputPath(job, inputPath1,
> TextInputFormat.class, TableMap.class);
>
>         FileOutputFormat.setOutputPath(job, outputPath);
>
>         job.waitForCompletion(true);
>     }
> }
>
>
>
>
>
> ________________________________
>  From: Ted Yu <yuzhihong@gmail.com>
> To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
> Sent: Wednesday, July 10, 2013 11:21 AM
> Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
> files
>
>
>     conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>
> is called by initTableMapperJob().
>
> Looking at the source would make it clear for you.
>
> Cheers
>
> On Wed, Jul 10, 2013 at 10:55 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>
> > Thanks Ted. I will try that. But at this time I am not sure how to call "
> > conf.set()" after call "initTableMapperJob()"?
> > The approach suggested by Azuryy is " conf.set(TableInputFormat.SCAN,
> > TableMapReduceUtil.convertScanToString(new Scan()));"
> >
> >
> >
> > ________________________________
> >  From: Ted Yu <yuzhihong@gmail.com>
> > To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
> > Sent: Wednesday, July 10, 2013 10:21 AM
> > Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
> > files
> >
> >
> > Can you utilize initTableMapperJob() (which
> > calls TableMapReduceUtil.convertScanToString() underneath) ?
> >
> > On Wed, Jul 10, 2013 at 10:15 AM, S. Zhou <myxjtu@yahoo.com> wrote:
> >
> > > Hi Azuryy, I am testing the way you suggested. Now I am facing a
> > > compilation error for the following statement:
> > > conf.set(TableInputFormat.SCAN,
> > TableMapReduceUtil.convertScanToString(new
> > > Scan()));
> > >
> > >
> > > The error is: "method convertScanToString is not visible in
> > > TableMapReduceUtil". Could u help? It blocks me.
> > >
> > >
> > > BTW, I am using the HBase-server jar file version 0.95.1-hadoop1 . I
> > tried
> > > other versions as well like 0.94.9 and got the same error.
> > >
> > > Thanks!
> > >
> > >
> > > ________________________________
> > >  From: Azuryy Yu <azuryyyu@gmail.com>
> > > To: user@hbase.apache.org
> > > Sent: Wednesday, July 3, 2013 6:02 PM
> > > Subject: Re: MapReduce job with mixed data sources: HBase table and
> HDFS
> > > files
> > >
> > >
> > > Hi,
> > > 1) It cannot input two different cluster's data to a MR job.
> > > 2) If your data locates in the same cluster, then:
> > >
> > >     conf.set(TableInputFormat.SCAN,
> > > TableMapReduceUtil.convertScanToString(new Scan()));
> > >     conf.set(TableInputFormat.INPUT_TABLE, tableName);
> > >
> > >     MultipleInputs.addInputPath(conf, new Path(input_on_hdfs),
> > > TextInputFormat.class, MapperForHdfs.class);
> > >     MultipleInputs.addInputPath(conf, new Path(input_on_hbase),
> > > TableInputFormat.class, MapperForHBase.class);*
> > >
> > > *
> > > but,
> > > new Path(input_on_hbase) can be any path, it make no sense.*
> > >
> > > *
> > > Please refer to
> > > org.apache.hadoop.hbase.mapreduce.IndexBuilder for how to read table in
> > the
> > > MR job under $HBASE_HOME/src/example*
> > >
> > >
> > >
> > > *
> > >
> > >
> > > On Thu, Jul 4, 2013 at 5:19 AM, Michael Segel <
> michael_segel@hotmail.com
> > > >wrote:
> > >
> > > > You may want to pull your data from your HBase first in a separate
> map
> > > > only job and then use its output along with other HDFS input.
> > > > There is a significant disparity between the reads from HDFS and from
> > > > HBase.
> > > >
> > > >
> > > > On Jul 3, 2013, at 10:34 AM, S. Zhou <myxjtu@yahoo.com> wrote:
> > > >
> > > > > Azuryy, I am looking at the MultipleInputs doc. But I could not
> > figure
> > > > out how to add HBase table as a Path to the input? Do you have some
> > > sample
> > > > code? Thanks!
> > > > >
> > > > >
> > > > >
> > > > >
> > > > > ________________________________
> > > > > From: Azuryy Yu <azuryyyu@gmail.com>
> > > > > To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
> > > > > Sent: Tuesday, July 2, 2013 10:06 PM
> > > > > Subject: Re: MapReduce job with mixed data sources: HBase table and
> > > HDFS
> > > > files
> > > > >
> > > > >
> > > > > Hi ,
> > > > >
> > > > > Use MultipleInputs, which can solve your problem.
> > > > >
> > > > >
> > > > > On Wed, Jul 3, 2013 at 12:34 PM, S. Zhou <myxjtu@yahoo.com>
wrote:
> > > > >
> > > > >> Hi there,
> > > > >>
> > > > >> I know how to create MapReduce job with HBase data source only
or
> > HDFS
> > > > >> file as data source. Now I need to create a MapReduce job with
> mixed
> > > > data
> > > > >> sources, that is, this MR job need to read data from both HBase
> and
> > > HDFS
> > > > >> files. Is it possible? If yes, could u share some sample code?
> > > > >>
> > > > >> Thanks!
> > > > >> Senqiang
> > > >
> > > >
> > >
> >
>
>
>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message