hadoop-mapreduce-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Evans <ev...@yahoo-inc.com>
Subject Re: Number of Reducers Set to One
Date Fri, 13 May 2011 14:33:51 GMT
You could merge the side effect files before running the second map job if you want to, or
you could just leave them as separate files and then read each one in the mapper.  If there
are lots of files then the namenode may get hit too much, and slow down the entire cluster.
 This is mitigated by using the distributed cache to transfer the files, but on a large cluster
could still cause some issues, so if there are over 20 files you probably want to concatenate
them together before launching the second mapper job.

So for example if you wanted to know how many people are in Las Vegas at any point in time
you could do the following. NOTE: this is pseudo code and is not tested in any way.

Mapper1 {
Map: Key<Person person>, Value<Time arrived, Time departed> {
   output.collect(Key<arrived>,Value<person, 1>);
  output.collect(Key<departed>,Value<person, -1>);
}
}

Partitioner1 {
 Partition: Key<Time time>, Value<Person person, Int count>, Int numPartitions
{
   Time BeginningTime = readFromConf();
   Time EndingTime = readFromConf();
   long TimePerBucket = (EndingTime - BeginningTime)/numPartitions;
   return time/TimePerBucket; //should probably handle corner cases too
}
}

Reducer1 {
Long seeSoFar = 0;
Time minTime = VERY_BIG_TIME;
Reduce: Key<Time time>, Value<Person person, int count> {
    seenSoFar += count;
    if(minTime > time) {
        minTime = time;
    }
    output.collect(Key: time, Value<person, count>);
}

Cleanup() {
  //a MultipleOutputFormat may be better to use though
  OutputStream file = fs.open("side effect file"+reducer_number);
  file.write(minTime+"\t"+seenSoFar+"\n");
  file.close();
}
}

//Not Shown configuration, or InputFormat to ensure that we do not split the input files.
Mapper2 {
seenSoFar = null;
Map:Key<Time time>, Value<Person person, int count> {
  if(seenSoFar == null) {
    List data<minTime, bigcount> = read in the side effect files.
    seenSoFar = 0;
    foreach<minTime, bigcount> {
        if(minTime < time) {
            seenSoFar += bigcount;
        }
    }
  }
  seenSoFar += count
  output.collect(Key<time>, Value<person, seeSoFar>);
}
}

--Bobby Evans

On 5/12/11 3:11 PM, "Geoffry Roberts" <geoffry.roberts@gmail.com> wrote:

Bobby,

Thanks for such a thoughtful response.

I have a data set that represents all the people that pass through Las Vegas over a course
of time, say five years, which comes to about 175 - 200 million people.   Each record is a
person, and it contains fields for where they came from, left to; times of arrival and departure,
and infectious state for a given disease, let's say its influenza.

Playing a what if game, I need to compute the probable disease state of each person upon departure.
 To do this, one thing -- one on a number of things -- I must do keep a running total of how
many people are present in Las Vegas at any moment in time.

I was thinking that running everything through a single reducer would be the best way of accomplishing
this.  Your side effect file: Each reducer write out its accumulation to its own file then
you merge these together into one big accumulation. Right?

On 12 May 2011 11:51, Robert Evans <evans@yahoo-inc.com> wrote:
Geoffry,

That really depends on how much data you are processing, and the algorithm you need to use
to process the data.  I did something similar a while ago with a medium amount of data and
we saw significant speed up by first assigning each record a new key based off of the expected
range of keys and just dividing them up equally into N buckets (our data was very evenly distributed
and we know the range of the data, but terrasort does something very similar, and has tools
to help if your data is not distributed evenly).  Each bucket was then sent to a different
reducer, and as we output the now sorted data, we also accumulated some metrics about the
data as we saw it go by.  We output these metrics to a side effect file, one per bucket. 
Because the number and size of these side effect files was rather small we were then able
to pull all of them into all map processes as a cache archive to do a second pass over the
data and then calculate the accumulated values for this particular record based off of all
buckets before the current one, and all records in this bucket before the current record (had
to make sure the map did not split the sorted file that is the output of the previous reduce).
 It does not follow a pure map/reduce paradigm, and it took a little bit of math to figure
out exactly what values we had to accumulate and convince ourselves that it would produce
the same results, but it worked for us.  I am not familiar with what you are trying to compute
but it could work for you too.

--Bobby Evans



On 5/12/11 12:44 PM, "Geoffry Roberts" <geoffry.roberts@gmail.com <http://geoffry.roberts@gmail.com>
> wrote:

All,

I am mostly seeking confirmation as to my thinking on this matter.

I have an MR job that I believe will force me into using a single reducer.  The nature of
the process is one where calculations performed on a given record rely on certain accumulated
values whose calculation depends on rolling values from all prior records.  An ultra simple
example of this would be a balance forward situation.  (I'm not doing accounting I'm doing
epidemiology, but the concept is the same.)

Is a single reducer the best way to go in this?

Thanks



Mime
View raw message