hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "S. Zhou" <myx...@yahoo.com>
Subject Re: MapReduce job with mixed data sources: HBase table and HDFS files
Date Fri, 12 Jul 2013 04:49:10 GMT
Thanks Ted &Azurry. Your hint helped me solve that particular issue. 

But now I run into a new problem with multipleInputs. This time I add a HTable and a HDFS
file as inputs. (see the new code below). The problem is: whatever data source added later
overrides the data source added before. For example, if I add HBase table first to MultipleInputs
first then HDFS file, then the final result from reducer only contains the results of HDFS
file. On the other hand, if I add HDFS file first then HBase table, then the final result
from reducer only contains the results of HBase table.

public class MixMR {
       public static class Map extends Mapper<Object, Text, Text, Text> {

                public void map(Object key, Text value, Context context) throws
IOException,   InterruptedException {
                        String s = value.toString();
                        String[] sa = s.split(",");
                        if (sa.length == 2) {
                                context.write(new Text(sa[0]),
new Text(sa[1]));
                        }

                }

        }

    public static class TableMap extends TableMapper<Text, Text>  {
        public static final byte[] CF = "cf".getBytes();
        public static final byte[] ATTR1 = "c1".getBytes();

        public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
           
            String key = Bytes.toString(row.get());
            String val = new String(value.getValue(CF, ATTR1));
           
            context.write(new Text(key), new Text(val));
        }
    }


    public static class Reduce extends Reducer  <Object, Text, Object, Text> {
        public void reduce(Object key, Iterable<Text> values, Context context)
                throws IOException, InterruptedException {
            String ks = key.toString();
            for (Text val : values){
                context.write(new Text(ks), val);
            }

        }
    }

 public static void main(String[] args) throws Exception {
                  Path inputPath1 = new Path(args[0]);
                Path outputPath = new Path(args[1]);

                String tableName1 = "sezhou-test";

                Configuration config1 = HBaseConfiguration.create();
                Job job1 = new Job(config1, "ExampleRead");
                job1.setJarByClass(com.shopping.test.dealstore.MixMR.class);    
// class that contains mapper


                Scan scan1 = new Scan();
                scan1.setCaching(500);        // 1 is the default in
Scan, which will be bad for MapReduce jobs
                scan1.setCacheBlocks(false);  // don't set to true for MR
jobs
                scan1.addFamily(Bytes.toBytes("cf"));

                TableMapReduceUtil.initTableMapperJob(
                                tableName1,        //
input HBase table name
                                  scan1,            
// Scan instance to control CF and attribute selection
                                  TableMap.class,   //
mapper
                                  Text.class,            
// mapper output key
                                  Text.class,            
// mapper output value
                                  job1);


                job1.setReducerClass(Reduce.class);    // reducer class
                job1.setOutputFormatClass(TextOutputFormat.class);

                // the result from reducer only contains the HBase table
                MultipleInputs.addInputPath(job1, inputPath1, TextInputFormat.class,
Map.class);
                MultipleInputs.addInputPath(job1, inputPath1,  TableInputFormat.class,
TableMap.class); // inputPath1 here has no effect for HBase table

                FileOutputFormat.setOutputPath(job1, outputPath);

                job1.waitForCompletion(true);    
}
}






________________________________
 From: Ted Yu <yuzhihong@gmail.com>
To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com> 
Sent: Thursday, July 11, 2013 3:51 PM
Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files
 


TextInputFormat wouldn't work:
public class TextInputFormat extends FileInputFormat<LongWritable, Text> {
Take a look at TableInputFormatBase or the class(es) which extend it:
public abstract class TableInputFormatBase
implements InputFormat<ImmutableBytesWritable, Result> {

Cheers

On Thu, Jul 11, 2013 at 3:44 PM, S. Zhou <myxjtu@yahoo.com> wrote:

Thanks very much for the help, Ted & Azurry. I wrote a very simple MR program which takes
HBase table as input and outputs to a HDFS file. Unfortunately, I run into the following error:
>
>java.lang.ClassCastException: org.apache.hadoop.io.LongWritable cannot be cast to org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
>I run on pseudo-distributed hadoop (1.2.0) and Pseudo-distributed HBase (0.95.1-hadoop1).
>
>Here is the complete source code: an interesting thing is: if I comment out the multipleinputs
line "MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class);",
the MR job runs fine.
>
>public class MixMR {
>
>    public static class TableMap extends TableMapper<Text, Text>  {
>        public static final byte[] CF = "cf".getBytes();
>        public static final byte[] ATTR1 = "c1".getBytes();
>
>        public void map(ImmutableBytesWritable row, Result value, Context context)
throws IOException, InterruptedException {
>           
>            String key = Bytes.toString(row.get());
>            String val = new String(value.getValue(CF, ATTR1));
>           
>            context.write(new Text(key), new Text(val));
>        }
>    }
>
>
>    public static class Reduce extends Reducer  <Object, Text, Object, Text>
{
>        public void reduce(Object key, Iterable<Text> values, Context context)
>                throws IOException, InterruptedException {
>            String ks = key.toString();
>            for (Text val : values){
>                context.write(new Text(ks), val);
>            }
>
>        }
>    }
>
> public static void main(String[] args) throws Exception {
>        Path inputPath1 = new Path(args[0]);
>        Path outputPath = new Path(args[1]);
>       
>        String tableName1 = "test";
>       
>        Configuration config = HBaseConfiguration.create();
>        Job job = new Job(config, "ExampleRead");
>        job.setJarByClass(MixMR.class);     // class that contains mapper
> 
>       
>        Scan scan = new Scan();
>        scan.setCaching(500);        // 1 is the default in Scan, which
will be bad for MapReduce jobs
>        scan.setCacheBlocks(false);  // don't set to true for MR jobs
>        scan.addFamily(Bytes.toBytes("cf"));
>       
>        TableMapReduceUtil.initTableMapperJob(
>                tableName1,        // input HBase table name
>                  scan,             // Scan instance to control
CF and attribute selection
>                  TableMap.class,   // mapper
>                  Text.class,             // mapper output
key
>                  Text.class,             // mapper output
value
>                  job);
>        job.setReducerClass(Reduce.class);    // reducer class
>        job.setOutputFormatClass(TextOutputFormat.class);  
>
>        // inputPath1 here has no effect for HBase table
>        MultipleInputs.addInputPath(job, inputPath1, TextInputFormat.class, TableMap.class);
>
>        FileOutputFormat.setOutputPath(job, outputPath);
>       
>        job.waitForCompletion(true);
>
>    }
>}
>
>
>
>
>
>________________________________
> From: Ted Yu <yuzhihong@gmail.com>
>To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>Sent: Wednesday, July 10, 2013 11:21 AM
>
>Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS files
>
>
>    conf.set(TableInputFormat.SCAN, convertScanToString(scan));
>
>is called by initTableMapperJob().
>
>Looking at the source would make it clear for you.
>
>Cheers
>
>On Wed, Jul 10, 2013 at 10:55 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>
>> Thanks Ted. I will try that. But at this time I am not sure how to call "
>> conf.set()" after call "initTableMapperJob()"?
>> The approach suggested by Azuryy is " conf.set(TableInputFormat.SCAN,
>> TableMapReduceUtil.convertScanToString(new Scan()));"
>>
>>
>>
>> ________________________________
>>  From: Ted Yu <yuzhihong@gmail.com>
>> To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>> Sent: Wednesday, July 10, 2013 10:21 AM
>> Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
>> files
>>
>>
>> Can you utilize initTableMapperJob() (which
>> calls TableMapReduceUtil.convertScanToString() underneath) ?
>>
>> On Wed, Jul 10, 2013 at 10:15 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>>
>> > Hi Azuryy, I am testing the way you suggested. Now I am facing a
>> > compilation error for the following statement:
>> > conf.set(TableInputFormat.SCAN,
>> TableMapReduceUtil.convertScanToString(new
>> > Scan()));
>> >
>> >
>> > The error is: "method convertScanToString is not visible in
>> > TableMapReduceUtil". Could u help? It blocks me.
>> >
>> >
>> > BTW, I am using the HBase-server jar file version 0.95.1-hadoop1 . I
>> tried
>> > other versions as well like 0.94.9 and got the same error.
>> >
>> > Thanks!
>> >
>> >
>> > ________________________________
>> >  From: Azuryy Yu <azuryyyu@gmail.com>
>> > To: user@hbase.apache.org
>> > Sent: Wednesday, July 3, 2013 6:02 PM
>> > Subject: Re: MapReduce job with mixed data sources: HBase table and HDFS
>> > files
>> >
>> >
>> > Hi,
>> > 1) It cannot input two different cluster's data to a MR job.
>> > 2) If your data locates in the same cluster, then:
>> >
>> >     conf.set(TableInputFormat.SCAN,
>> > TableMapReduceUtil.convertScanToString(new Scan()));
>> >     conf.set(TableInputFormat.INPUT_TABLE, tableName);
>> >
>> >     MultipleInputs.addInputPath(conf, new Path(input_on_hdfs),
>> > TextInputFormat.class, MapperForHdfs.class);
>> >     MultipleInputs.addInputPath(conf, new Path(input_on_hbase),
>> > TableInputFormat.class, MapperForHBase.class);*
>> >
>> > *
>> > but,
>> > new Path(input_on_hbase) can be any path, it make no sense.*
>> >
>> > *
>> > Please refer to
>> > org.apache.hadoop.hbase.mapreduce.IndexBuilder for how to read table in
>> the
>> > MR job under $HBASE_HOME/src/example*
>> >
>> >
>> >
>> > *
>> >
>> >
>> > On Thu, Jul 4, 2013 at 5:19 AM, Michael Segel <michael_segel@hotmail.com
>> > >wrote:
>> >
>> > > You may want to pull your data from your HBase first in a separate map
>> > > only job and then use its output along with other HDFS input.
>> > > There is a significant disparity between the reads from HDFS and from
>> > > HBase.
>> > >
>> > >
>> > > On Jul 3, 2013, at 10:34 AM, S. Zhou <myxjtu@yahoo.com> wrote:
>> > >
>> > > > Azuryy, I am looking at the MultipleInputs doc. But I could not
>> figure
>> > > out how to add HBase table as a Path to the input? Do you have some
>> > sample
>> > > code? Thanks!
>> > > >
>> > > >
>> > > >
>> > > >
>> > > > ________________________________
>> > > > From: Azuryy Yu <azuryyyu@gmail.com>
>> > > > To: user@hbase.apache.org; S. Zhou <myxjtu@yahoo.com>
>> > > > Sent: Tuesday, July 2, 2013 10:06 PM
>> > > > Subject: Re: MapReduce job with mixed data sources: HBase table and
>> > HDFS
>> > > files
>> > > >
>> > > >
>> > > > Hi ,
>> > > >
>> > > > Use MultipleInputs, which can solve your problem.
>> > > >
>> > > >
>> > > > On Wed, Jul 3, 2013 at 12:34 PM, S. Zhou <myxjtu@yahoo.com>
wrote:
>> > > >
>> > > >> Hi there,
>> > > >>
>> > > >> I know how to create MapReduce job with HBase data source only
or
>> HDFS
>> > > >> file as data source. Now I need to create a MapReduce job with
mixed
>> > > data
>> > > >> sources, that is, this MR job need to read data from both HBase
and
>> > HDFS
>> > > >> files. Is it possible? If yes, could u share some sample code?
>> > > >>
>> > > >> Thanks!
>> > > >> Senqiang
>> > >
>> > >
>> >
>>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message