hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ferdy Galema <ferdy.gal...@kalooga.com>
Subject Re: Pipelining Mappers and Reducers
Date Fri, 13 Aug 2010 10:34:23 GMT

Thank you for your efforts. We've also been trying some things and 
currently we have implemented a solution that we are satisfied with. It 
allows us to instantly start merging any indexing job.

It's a single class that wraps an indexing Job and submits it to the 
JobTracker, asynchronously. It periodically retrieves 
TaskCompletionEvent events and it merges task outputs that are complete. 
This effectively means that the job submitter client also is the index 
merger. It has support for both jobs with and without reduces. It simply 
copies job output to the local disk in order to merge it. When the job 
has finished, it optimizes the index and completes the output to the 
configured filesystem. For more implementation details, see class code 
listing below. (Compiles to CDH2 hadoop-0.20.1+169.89.)


package mypackage;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskCompletionEvent;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriter.MaxFieldLength;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FSDirectory;

public class AsyncLuceneMergerJobWrapper {
  private static final Log LOG = LogFactory.getLog(new Object() {

  private final Job job;
  private final FileSystem fs;
  private final LocalFileSystem localFs;
  private Path localTemp;
  private Path localParts;
  private Path localMerged;
  private final IndexWriter writer;

  public AsyncLuceneMergerJobWrapper(Job job, Analyzer analyzer) throws 
IOException {
    this.job = job;

    Configuration conf = job.getConfiguration();

    fs = FileSystem.get(conf);
    localFs = FileSystem.getLocal(conf);

    localTemp = new Path(conf.get("hadoop.tmp.dir"), 
    if (localFs.exists(localTemp)) throw new 
IllegalStateException("Temporary directory already exists " + localTemp);
    localParts = new Path(localTemp, "parts");
    localMerged = new Path(localTemp, "merged");

    this.writer = new IndexWriter(FSDirectory.open(new 
        analyzer, true, MaxFieldLength.LIMITED);
   * @return The index writer that is used for merging. Use this method 
to set
   * merging options.
  public IndexWriter getWriter() {
    return writer;

   * Start the job and merge job outputs (Lucene indices) into a single 
optimized segment.
   * It will begin merging from the moment the first task outputs are 
   * It will not delete the job output. The merged output is put into 
   * @return
   * @throws ClassNotFoundException
   * @throws InterruptedException
   * @throws IOException
  public boolean waitForCompletion() throws IOException, 
InterruptedException, ClassNotFoundException {
    Path jobOutput = FileOutputFormat.getOutputPath(job);

    try {

      int receivedEvents = 0;
      boolean allPartsMerged = false;
      while (!allPartsMerged) {
        //merge completed job parts

        List<FSDirectory> dirs = new ArrayList<FSDirectory>();
        TaskCompletionEvent[] taskCompletionEvents = 
        receivedEvents += taskCompletionEvents.length;
        for (TaskCompletionEvent event : taskCompletionEvents) {
          LOG.info("New event " + event + "; isMapTask=" + 
event.isMapTask() + " ;idWithinJob=" + event.idWithinJob());
(TaskCompletionEvent.Status.SUCCEEDED.equals(event.getTaskStatus())) {
            //new part!
            //we only want outputting tasks; that is mappers when no 
reducers or just the reducers.
            if (job.getNumReduceTasks() == 0 || !event.isMapTask()) {
              String partName = getPartname(event);
              Path partOutput = new Path(jobOutput, partName);       
              if (!fs.exists(partOutput)) {
                LOG.info("Task had no output, continuing. Event="+event);
              Path localCopy = new Path(localParts, partName);
              LOG.info("Copying new part " + partOutput);
              fs.copyToLocalFile(partOutput, localCopy);
              LOG.info("Copy done.");
              FSDirectory dir = FSDirectory.open(new 

        if (dirs.size() > 0) {
          writer.addIndexesNoOptimize(dirs.toArray(new FSDirectory[0]));
          LOG.info("AddIndexesNoOptimize(dirs) done.");
          for (Directory dir : dirs) {

        //we are done when there are no more events and the job is complete
        allPartsMerged = taskCompletionEvents.length == 0 && 
      LOG.info("Completed. Calling optimize()");
      LOG.info("Closed. Completing local output.");
      fs.completeLocalOutput(new Path(jobOutput, "merged"), localMerged);
    } finally {
      if (localFs.exists(localTemp)) {
        localFs.delete(localTemp, true);

    return job.isSuccessful();

  private String getPartname(TaskCompletionEvent event) {
    //getTaskID() is deprecated because TaskCompletionEvent returns the 
old TaskAttemptId
    return event.getTaskAttemptId().getTaskID().toString();

Shai Erera wrote:
> Hi
> I've done some work and thought I'd report back the results (that are
> not too encouraging).
> Approach 1:
> * Mappers output a side effect Lucene Directory (written on-disk) and
> a <Long, Text> pair where the value is the location of the index on
> disk and the key is unimportant for now.
> * Reducer merges the on-disk indexes into a larger on-disk index and
> optimize() it. Its output is the same as the Mapper.
> Pros:
> 1) Low memory footprint on both Mapper and Reducer side.
> 2) Hadoop related output is very small - just the location of the
> index that needs to be copied around and sort/merged.
> 3) Utilizes Lucene powerful indexing capabilities by assigning each
> Mapper a large portion of text to index.
> Cons:
> 1) no pipelining can be done, because the output of all Mappers is
> very small (Hadoop-wise) and thus fits in memory and never flushed.
> I've even tried reducing the buffer on the reducer side (played a/ all
> sorts of parameters), but this never triggered the Combiner I've set.
> 2) if the cluster has complex structure, where not all nodes see the
> same HD (not in my case), then this approach may fail, because Hadoop
> is not aware of the side-effect files and thus cannot pass them to the
> Reducer(s).
> Approach 2:
> * Mappers output a DirectoryWritable, which is a Writable that
> (de)serializes a Lucene Directory.
> * Reducer merges the incoming DWs into a larger index on-disk (output
> is same as above)
> * Combiner merges the incoming DWs into a larger DW (not on-disk,
> though I've tried both)
> Pros:
> 1) Working w/ a DW is better Hadoop-wise -- the Directories can be
> passed around between machines and overcome the 2nd "Con" in Approach
> #1.
> 2) Since the output of the Mappers is large (40 MB in my case), the
> Reducer's buffer gets flushed more often, and the Combiner actually
> gets to do some work.
> Cons:
> Because all is done in-memory, I've constantly hit OOM errors on the
> Combiner side. I've tried limiting the size of its created directory
> (by letting it merge as much N MB before it outputs anything) down to
> 128MB, but still hit those exceptions. I've even set the child's JVM
> options to -Xmx2048m and 4096m, to no avail. The machine has 64GB RAM
> (16 cores), so a 4GB RAM should be ok, but still hit the OOM in the
> reduce/shuffle/sort phases. Only when I set it to 64MB it worked, but
> that's useless because it means each DW is written as is (as they
> already come in 40MB size from the Mapper), and not only that I don't
> gain anything, I actually lose some (because of all the
> (de)serialization done by the Writable).
> It would really be great if I can have the Combiner invoked in
> Approach #1, but I wasn't able to set its buffer low enough. I don't
> mind doing all the merges on-disk (that's how it's done in the final
> phase anyway), but I cannot seem to get the Combiner working.
> So unless there's an option to tell the Combiner "kick in after N
> Mappers finished" or "kick in after VERY SMALL #bytes arrived from
> Mappers (in the order of 10/100s bytes)", I'm not sure indexing w/
> Lucene and using a Combiner is doable.
> Hope you'll find that interesting and that someone knows if/how I can
> achieve what I want :-).
> Thanks for your help and comments thus far,
> Shai
> On Thursday, July 29, 2010, Ferdy Galema <ferdy.galema@kalooga.com> wrote:
>> Very well. Could you keep us informed on how your instant merging plans
>> work out? We're actually running a similar indexing process.
>> It's very interesting to be able to start merging Lucene indexes once
>> the first mappers have finished, instead of waiting until ALL mappers
>> have finished.
>> Shai Erera wrote:
>>   Well, the current scale does not warrant sharding up
>> the index. 13GB of data, ~8-10GB index is something a single machine
>> (even not a strong one) can handle pretty well. And as the data will
>> grow, so will the number of Mappers, and the number of sub-indexes. So
>> at some point I will need to merge indexes. Sharding is one of my
>> strategies, but for the TB scale, not what I'm training on now :).
>> Thanks,
>> Shai
>>   On Thu, Jul 29, 2010 at 1:23 PM, Ferdy
>> Galema <ferdy.galema@kalooga.com>
>> wrote:
>>     You right about the fact that
>> merging cannot be done by simply
>> appending. Have you thought about the possibility to actually take
>> advantage of fact that your final index will be split into several
>> segments? Especially if you plan to increase the scale of the input
>> data, you may eventually want some sort of scaling on the search side
>> as well. Sharding your index on several machines (there are frameworks
>> that will help you doing this; e.g. Katta - distributed lucene). You
>> can even do this on a single machine with a ParallelMultiSearcher,
>> preferrably if the machine has several disks to spread I/O.
>> Ofcourse in this case you still have to make sure that the number of
>> segments won't be too high.
>> Shai Erera wrote:
>>       Specifically at the moment, a Mapper output is an
>> index (Lucene), and the Reducer's job is to take all indexes and merge
>> them down to 1. Searching on an index w/ hundreds of segments is
>> inefficient. Basically, if you think of the Reducer as holding the
>> 'final' index (to be passed on in the larger pipeline I have), then as
>> soon as a Mapper finishes, I'd like to merge its index w/ the
>> Reducer's. Not all merges will actually be expensive - only every
>> several such indexes will they be truly merged together.
>> I cannot use simple merge techniques - must use Lucene's API to merge
>> the indexes. And the merge is not a simple 'append to current index'
>> either. So if I could get a hold of some object (code) which gets
>> executed when Mappers output is ready for the Reducer, then I can have
>> my merge code executed there. From what I understand so far, it can be
>> the Combiner, which is what I'm investigating now.
>> On the multi-level reduce -- if your cluster is big, and Mappers work
>> on their own hardware (even if some share the same HW), then
>> multi-level reduce can help. True - you might eventually process more
>> bytes than you would if you had one Reducer, but (1) that's not always
>> true, and (2) wall-clock time may be shortened in such a case.
>> Shai
>>       On Thu, Jul 29, 2010 at 12:31 PM, Ferdy
>> Galema <ferdy.galema@kalooga.com>
>> wrote:
>>         Could you elaborate on
>> how
>> you merge your data? If you have independent
>> map tasks with single key-value outputs that can be written as soon as
>> possible, I'm still curious why need to reduce at all. Surely there
>> must be a way to merge your data 'on read'.
>> About multi-level reducing; if you merge your data in multiple steps,
>> there would only be a gain if the reducers somehow reduce (no pun
>> intended) the amount of data in the pipeline. For example running 400
>> maps and 10 reduces followed by another job with a single reducer will
>> not benefit if the single reducer has to process the same amount of
>> data that the previous reducers have been outputting. Therefore it
>> completely depends on what your reducer actually does.
>> Ferdy.
>> Shai Erera wrote:
>>           Yes, I'm aware of the fact that I can run w/ 0
>> reducers. However, I do need my output to be merged, and unfortunately
>> the merge is not really that simple that I can use 'getmerge'. I think
>> I'll give the Combiner a chance now - hopefully its reduce() method
>> will get called as Mappers finish their work.
>> BTW, if I'd like to have multi-level Reducers, such that each level
>> works on less data, do I need to do that using multiple Jobs? I haven't
>> seen any API to do so, unless I chain mappers/reducers and includes
>> #levels in the chain. To clarify, if I need to take 400 outputs (or
>> much more) and merge them down to 1, currently I need to do that
>> seque

View raw message