hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "S. Zhou" <myx...@yahoo.com>
Subject Re: MapReduce job with mixed data sources: HBase table and HDFS files
Date Fri, 12 Jul 2013 15:49:33 GMT
Ted & Azurry, after some investigation on log files, I figured out why it happens. In the
code, I use the same path "inputPath1" for two inputs (see below) since I thought the input
path is not effective for HBase table. But it turns out that the input path of HBase table
could affect the input path of HDFS file. I changed the input path for HBase table to a different
value, then it works!


Ted & Azurry, I really appreciate your help!


MultipleInputs.addInputPath(job1, inputPath1, TextInputFormat.class, Map.class);
MultipleInputs.addInputPath(job1, inputPath1,  TableInputFormat.class, TableMap.class);


________________________________
 From: S. Zhou <myxjtu@yahoo.com>
To: Ted Yu <yuzhihong@gmail.com> 
Cc: "user@hbase.apache.org" <user@hbase.apache.org> 
Sent: Thursday, July 11, 2013 10:19 PM
Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files
 

i use org.apache.hadoop.mapreduce.lib.input.MultipleInputs



I run on pseudo-distributed hadoop (1.2.0) and Pseudo-distributed HBase (0.95.1-hadoop1).


________________________________
From: Ted Yu <yuzhihong@gmail.com>
To: S. Zhou <myxjtu@yahoo.com> 
Cc: "user@hbase.apache.org" <user@hbase.apache.org> 
Sent: Thursday, July 11, 2013 9:54 PM
Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files



Did you use org.apache.hadoop.mapreduce.lib.input.MultipleInputs or the one from org.apache.hadoop.mapred.lib
?

Which hadoop version do you use ?

Cheers


On Thu, Jul 11, 2013 at 9:49 PM, S. Zhou <myxjtu@yahoo.com> wrote:

Thanks Ted &Azurry. Your hint helped me solve that particular issue. 
>
>But now I run into a new problem with multipleInputs. This time I add a HTable and a HDFS
file as inputs. (see the new code below). The problem is: whatever data source added later
overrides the data source added before. For example, if I add HBase table first to MultipleInputs
first then HDFS file, then the final result from reducer only contains the results of HDFS
file. On the other hand, if I add HDFS file first then HBase table, then the final result
from reducer only contains the results of HBase table.
>
>public class MixMR {
>       public static class Map extends Mapper<Object, Text, Text, Text> {
>
>                public
void map(Object key, Text value, Context context) throws IOException,   InterruptedException
{
>                        String s = value.toString();
>                        String[] sa = s.split(",");
>                        if (sa.length == 2) {
>                                context.write(new Text(sa[0]),
new
Text(sa[1]));
>
>                        }
>
>                }
>
>        }
>
>    public static class TableMap extends TableMapper<Text, Text>  {
>        public static final byte[] CF = "cf".getBytes();
>        public static final byte[] ATTR1 = "c1".getBytes();
>
>        public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
>           
>            String key =
Bytes.toString(row.get());
>            String val = new String(value.getValue(CF, ATTR1));
>           
>            context.write(new Text(key), new Text(val));
>        }
>    }
>
>
>    public static class Reduce extends Reducer  <Object, Text, Object, Text>
{
>        public void reduce(Object key, Iterable<Text> values, Context context)
>                throws IOException, InterruptedException {
>            String ks = key.toString();
>            for (Text val :
values){
>                context.write(new Text(ks), val);
>            }
>
>        }
>    }
>
> public static void main(String[] args) throws Exception {
>                  Path inputPath1 = new Path(args[0]);
>                Path outputPath = new Path(args[1]);
>
>                String tableName1 = "sezhou-test";
>
>                Configuration config1 =
HBaseConfiguration.create();
>                Job job1 = new Job(config1, "ExampleRead");
>                job1.setJarByClass(com.shopping.test.dealstore.MixMR.class);    
// class that contains mapper
>
>
>                Scan scan1 = new Scan();
>                scan1.setCaching(500);        // 1 is the default
in Scan, which will be bad for MapReduce jobs
>                scan1.setCacheBlocks(false);  // don't set to true for
MR
jobs
>                scan1.addFamily(Bytes.toBytes("cf"));
>
>
>                TableMapReduceUtil.initTableMapperJob(
>                                tableName1,       
// input HBase table name
>                                  scan1,            
// Scan instance to control CF and attribute selection
>
>                                  TableMap.class,  
// mapper
>                                  Text.class,            
// mapper output key
>                                  Text.class,            
// mapper output
value
>                                  job1);
>
>
>                job1.setReducerClass(Reduce.class);    // reducer class
>                job1.setOutputFormatClass(TextOutputFormat.class);
>
>                // the result from reducer only contains the HBase table
>                MultipleInputs.addInputPath(job1, inputPath1, TextInputFormat.class,
Map.class);
>               
MultipleInputs.addInputPath(job1, inputPath1,  TableInputFormat.class, TableMap.class); //
inputPath1 here has no effect for HBase table
>
>                FileOutputFormat.setOutputPath(job1, outputPath);
>
>                job1.waitForCompletion(true);    
>}
>}
>
>
>
>
>
>
>
>
>________________________________
> From: Ted Yu <yuzhihong@gmail.com>
>To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com> 
>Sent: Thursday, July 11, 2013 3:51 PM
>
>Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files
> 
>
>
>TextInputFormat wouldn't work:
>public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
>Take a look at TableInputFormatBase or the class(es) which extend it:
>public abstract class TableInputFormatBase
>implements InputFormat<ImmutableBytesWritable, Result> {
>
>
>Cheers
>
>On Thu, Jul 11, 2013 at 3:44 PM, S. Zhou <myxjtu@yahoo.com> wrote:
>
>Thanks very much for the help, Ted & Azurry. I wrote a very simple MR program which
takes HBase table as input and outputs to a HDFS file. Unfortunately, I run into the following
error:
>>
>>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to
org.apache.hadoop.hbase.io.ImmutableBytesWritable
>>
>>I run on pseudo-distributed hadoop (1.2.0) and Pseudo-distributed HBase (0.95.1-hadoop1).
>>
>>Here is the complete source code: an interesting thing is: if I comment out the multipleinputs
line "MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class);",
the MR job runs fine.
>>
>>public class MixMR {
>>
>>    public static class TableMap extends TableMapper<Text, Text>  {
>>        public static final byte[] CF = "cf".getBytes();
>>        public static final byte[] ATTR1 = "c1".getBytes();
>>
>>        public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
>>           
>>            String key = Bytes.toString(row.get());
>>            String val = new String(value.getValue(CF, ATTR1));
>>           
>>            context.write(new Text(key), new Text(val));
>>        }
>>    }
>>
>>
>>    public static class Reduce extends Reducer  <Object, Text, Object, Text>
{
>>        public void reduce(Object key, Iterable<Text> values, Context
context)
>>                throws IOException, InterruptedException {
>>            String ks = key.toString();
>>            for (Text val : values){
>>                context.write(new Text(ks), val);
>>            }
>>
>>        }
>>    }
>>
>> public static void main(String[] args) throws Exception {
>>        Path inputPath1 = new Path(args[0]);
>>        Path outputPath = new Path(args[1]);
>>       
>>        String tableName1 = "test";
>>       
>>        Configuration config = HBaseConfiguration.create();
>>        Job job = new Job(config, "ExampleRead");
>>        job.setJarByClass(MixMR.class);     // class that contains mapper
>> 
>>       
>>        Scan scan = new Scan();
>>        scan.setCaching(500);        // 1 is the default in Scan, which
will be bad for MapReduce jobs
>>        scan.setCacheBlocks(false);  // don't set to true for MR jobs
>>        scan.addFamily(Bytes.toBytes("cf"));
>>       
>>        TableMapReduceUtil.initTableMapperJob(
>>                tableName1,        // input HBase table name
>>                  scan,             // Scan instance
to control CF and attribute selection
>>                  TableMap.class,   // mapper
>>                  Text.class,             // mapper output
key
>>                  Text.class,             // mapper output
value
>>                  job);
>>        job.setReducerClass(Reduce.class);    // reducer class
>>        job.setOutputFormatClass(TextOutputFormat.class);  
>>
>>        // inputPath1 here has no effect for HBase table
>>        MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class,
TableMap.class);
>>
>>        FileOutputFormat.setOutputPath(job, outputPath);
>>       
>>        job.waitForCompletion(true);
>>
>>    }
>>}
>>
>>
>>
>>
>>
>>________________________________
>> From: Ted Yu <yuzhihong@gmail.com>
>>To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>>Sent: Wednesday, July 10, 2013 11:21 AM
>>
>>Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files
>>
>>
>>    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>>
>>is called by initTableMapperJob().
>>
>>Looking at the source would make it clear for you.
>>
>>Cheers
>>
>>On Wed, Jul 10, 2013 at 10:55 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>>
>>> Thanks Ted. I will try that. But at this time I am not sure how to call "
>>> conf.set()" after call "initTableMapperJob()"?
>>> The approach suggested by Azuryy is " conf.set(TableInputFormat.SCAN,
>>> TableMapReduceUtil.convertScanToString(new Scan()));"
>>>
>>>
>>>
>>> ________________________________
>>>  From: Ted Yu <yuzhihong@gmail.com>
>>> To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>>> Sent: Wednesday, July 10, 2013 10:21 AM
>>> Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
>>> files
>>>
>>>
>>> Can you utilize initTableMapperJob() (which
>>> calls TableMapReduceUtil.convertScanToString() underneath) ?
>>>
>>> On Wed, Jul 10, 2013 at 10:15 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>>>
>>> > Hi Azuryy, I am testing the way you suggested. Now I am facing a
>>> > compilation error for the following statement:
>>> > conf.set(TableInputFormat.SCAN,
>>> TableMapReduceUtil.convertScanToString(new
>>> > Scan()));
>>> >
>>> >
>>> > The error is: "method convertScanToString is not visible in
>>> > TableMapReduceUtil". Could u help? It blocks me.
>>> >
>>> >
>>> > BTW, I am using the HBase-server jar file version 0.95.1-hadoop1 . I
>>> tried
>>> > other versions as well like 0.94.9 and got the same error.
>>> >
>>> > Thanks!
>>> >
>>> >
>>> > ________________________________
>>> >  From: Azuryy Yu <azuryyyu@gmail.com>
>>> > To: user@hbase.apache.org
>>> > Sent: Wednesday, July 3, 2013 6:02 PM
>>> > Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
>>> > files
>>> >
>>> >
>>> > Hi,
>>> > 1) It cannot input two different cluster's data to a MR job.
>>> > 2) If your data locates in the same cluster, then:
>>> >
>>> >     conf.set(TableInputFormat.SCAN,
>>> > TableMapReduceUtil.convertScanToString(new Scan()));
>>> >     conf.set(TableInputFormat.INPUT_TABLE, tableName);
>>> >
>>> >     MultipleInputs.addInputPath(conf, new Path(input_on_hdfs),
>>> > TextInputFormat.class, MapperForHdfs.class);
>>> >     MultipleInputs.addInputPath(conf, new Path(input_on_hbase),
>>> > TableInputFormat.class, MapperForHBase.class);*
>>> >
>>> > *
>>> > but,
>>> > new Path(input_on_hbase) can be any path, it make no sense.*
>>> >
>>> > *
>>> > Please refer to
>>> > org.apache.hadoop.hbase.mapreduce.IndexBuilder for how to read table in
>>> the
>>> > MR job under $HBASE_HOME/src/example*
>>> >
>>> >
>>> >
>>> > *
>>> >
>>> >
>>> > On Thu, Jul 4, 2013 at 5:19 AM, Michael Segel <michael_segel@hotmail.com
>>> > >wrote:
>>> >
>>> > > You may want to pull your data from your HBase first in a separate
map
>>> > > only job and then use its output along with other HDFS input.
>>> > > There is a significant disparity between the reads from HDFS and from
>>> > > HBase.
>>> > >
>>> > >
>>> > > On Jul 3, 2013, at 10:34 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>>> > >
>>> > > > Azuryy, I am looking at the MultipleInputs doc. But I could not
>>> figure
>>> > > out how to add HBase table as a Path to the input? Do you have some
>>> > sample
>>> > > code? Thanks!
>>> > > >
>>> > > >
>>> > > >
>>> > > >
>>> > > > ________________________________
>>> > > > From: Azuryy Yu <azuryyyu@gmail.com>
>>> > > > To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>>> > > > Sent: Tuesday, July 2, 2013 10:06 PM
>>> > > > Subject: Re: MapReduce job with mixed data sources: HBase table
and
>>> > HDFS
>>> > > files
>>> > > >
>>> > > >
>>> > > > Hi ,
>>> > > >
>>> > > > Use MultipleInputs, which can solve your problem.
>>> > > >
>>> > > >
>>> > > > On Wed, Jul 3, 2013 at 12:34 PM, S. Zhou <myxjtu@yahoo.com>
wrote:
>>> > > >
>>> > > >> Hi there,
>>> > > >>
>>> > > >> I know how to create MapReduce job with HBase data source
only or
>>> HDFS
>>> > > >> file as data source. Now I need to create a MapReduce job
with mixed
>>> > > data
>>> > > >> sources, that is, this MR job need to read data from both
HBase and
>>> > HDFS
>>> > > >> files. Is it possible? If yes, could u share some sample code?
>>> > > >>
>>> > > >> Thanks!
>>> > > >> Senqiang
>>> > >
>>> > >
>>> >
>>>
>
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message