hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ivan Cores gonzalez <ivan.co...@inria.fr>
Subject Re: Processing rows in parallel with MapReduce jobs.
Date Mon, 18 Apr 2016 08:54:17 GMT


Hi Ted,
So, If I understand the behaviour of getSplits(), I can create "virtual" splits overriding
the getSplits function.
I was performing some tests, but my code crash in runtime and I cannot found the problem.

Any help? I didn't find examples.


public class SimpleRowCounter extends Configured implements Tool {

  static class RowCounterMapper extends TableMapper<ImmutableBytesWritable, Result>
{
    public static enum Counters { ROWS }
    @Override
    public void map(ImmutableBytesWritable row, Result value, Context context) {
      context.getCounter(Counters.ROWS).increment(1);
		try {
			Thread.sleep(3000); //Simulates work
		} catch (InterruptedException name) { }
    }
  }

  public class MyTableInputFormat extends TableInputFormat {
    @Override
    public List<InputSplit> getSplits(JobContext context) throws IOException {
	//Just to detect if this method is being called ...
	List<InputSplit> splits = super.getSplits(context);
	System.out.printf("    Message to log? \n" );
	return splits;
    }
  }

  @Override
  public int run(String[] args) throws Exception {
    if (args.length != 1) {
      System.err.println("Usage: SimpleRowCounter <tablename>");
      return -1;
    }
    String tableName = args[0];

    Scan scan = new Scan();
    scan.setFilter(new FirstKeyOnlyFilter());
    scan.setCaching(500);
    scan.setCacheBlocks(false);

    Job job = new Job(getConf(), getClass().getSimpleName());
    job.setJarByClass(getClass());

    TableMapReduceUtil.initTableMapperJob(tableName, scan, RowCounterMapper.class, 
		ImmutableBytesWritable.class, Result.class, job, true, MyTableInputFormat.class);

    job.setNumReduceTasks(0);
    job.setOutputFormatClass(NullOutputFormat.class);
    return job.waitForCompletion(true) ? 0 : 1;
  }

  public static void main(String[] args) throws Exception {
    int exitCode = ToolRunner.run(HBaseConfiguration.create(),
        new SimpleRowCounter(), args);
    System.exit(exitCode);
  }
}

Thanks so much,
Iván.




----- Mensaje original -----
> De: "Ted Yu" <yuzhihong@gmail.com>
> Para: user@hbase.apache.org
> Enviados: Martes, 12 de Abril 2016 17:29:52
> Asunto: Re: Processing rows in parallel with MapReduce jobs.
> 
> Please take a look at TableInputFormatBase#getSplits() :
> 
>    * Calculates the splits that will serve as input for the map tasks. The
> 
>    * number of splits matches the number of regions in a table.
> 
> Each mapper would be reading one of the regions.
> 
> On Tue, Apr 12, 2016 at 8:18 AM, Ivan Cores gonzalez <ivan.cores@inria.fr>
> wrote:
> 
> > Hi Ted,
> > Yes, I mean same region.
> >
> > I wasn't using the getSplits() function. I'm trying to add it to my code
> > but I'm not sure how I have to do it. Is there any example in the website?
> > I can not find anything. (By the way, I'm using TableInputFormat, not
> > InputFormat)
> >
> > But just to confirm, with the getSplits() function, Are mappers processing
> > rows in the same region executed in parallel? (assuming that there are
> > empty
> > processors/cores)
> >
> > Thanks,
> > Ivan.
> >
> >
> > ----- Mensaje original -----
> > > De: "Ted Yu" <yuzhihong@gmail.com>
> > > Para: user@hbase.apache.org
> > > Enviados: Lunes, 11 de Abril 2016 15:10:29
> > > Asunto: Re: Processing rows in parallel with MapReduce jobs.
> > >
> > > bq. if they are located in the same split?
> > >
> > > Probably you meant same region.
> > >
> > > Can you show the getSplits() for the InputFormat of your MapReduce job ?
> > >
> > > Thanks
> > >
> > > On Mon, Apr 11, 2016 at 5:48 AM, Ivan Cores gonzalez <
> > ivan.cores@inria.fr>
> > > wrote:
> > >
> > > > Hi all,
> > > >
> > > > I have a small question regarding the MapReduce jobs behaviour with
> > HBase.
> > > >
> > > > I have a HBase test table with only 8 rows. I splitted the table with
> > the
> > > > hbase shell
> > > > split command into 2 splits. So now there are 4 rows in every split.
> > > >
> > > > I create a MapReduce job that only prints the row key in the log files.
> > > > When I run the MapReduce job, every row is processed by 1 mapper. But
> > the
> > > > mappers
> > > > in the same split are executed sequentially (inside the same
> > container).
> > > > That means,
> > > > the first four rows are processed sequentially by 4 mappers. The system
> > > > has cores
> > > > that are free, so is it possible to process rows in parallel if they
> > are
> > > > located
> > > > in the same split?
> > > >
> > > > The only way I found to have 8 mappers executed in parallel is split
> > the
> > > > table
> > > > in 8 splits (1 split per row). But obviously this is not the best
> > solution
> > > > for
> > > > big tables ...
> > > >
> > > > Thanks,
> > > > Ivan.
> > > >
> > >
> >
> 

Mime
View raw message