Thanks to everyone who gave me useful suggestions on this problem. I
wanted to send out a message summarizing the solution I ended up with.
I too used the general three step approach advocated by most people
who responded, but there were a couple of tricky points that I'll
highlight here. The first problem was that in all the documentation
and writing about this, it would have been useful to have an example
of chaining two mapreduce operations.
The first two map/reduce pairs are similar to the "word count"
example, so I won't elaborate on them except to describe what they did
in the context of my problem:
1 Compute counts:
map: raw data => <<A,B>,1>
red: <<A,B>,1> => <<A,B>,Count(A,B)>
2 Compute marginals
map: <<A,B>,C(A,B)> => <A,C(A,B)>
red: <A,C(A,B)> => <A,C(A)>
The third and final step was also really fairly simple. It had only
two things that were tricky. The first was figuring out that I could
use
conf.setInputPath(countsPath);
conf.addInputPath(marginalsPath);
to read the output from the first two steps.
The map thus had to deal with two different input formats:
a) <<A,B>,Count(A,B)> => <<A,B>,<B,Count(A,B)>>
b) <A,Count(A)> => <<A,*>, <*, Count(A)>>
The second tricky thing was knowing to use
setOutputValueGroupingComparator(...) to sort on just the first half
of the pair and used setOutputKeyComparatorClass(...) to sort on the
entire key, putting <x,*> before any y <x,y>.
With these comparators, the reducer always received <<A,*>,C(A)> as
the first of the <A,x> pairs, which I iterated over. Note, however,
that I had to duplicate the secondary key into the value in the
mapper the reducer only receives a single key, and it always has *
as its secondary key, thus since I wanted to emit <<A,B>,C(A,B)/C(A)>
I needed to be able to get B from the value since it wasn't available
from the key. The reducer did not have a problem having
output.collect called many times from one key.
Everything works fairly well for the most part. And it's really
pretty neatcomputations that usually take a day or so now take me a
few minutes (I'm lucky to have access to a fairly large cluster). One
fairly serious remaining problem is that I get out of memory errors if
my settings for the number of map and reduce jobs are too large/small.
I'm not doing anything memory intensive in my map/reduce jobs, so
this seems like a framework problem. (Does anyone have experience
with this?)
Anyway, thanks for your help
Chris
On 9/27/07, Ted Dunning <tdunning@veoh.com> wrote:
>
>
> I work on very similar problems (except for a pronounced dislike of MLE's).
>
> The easiest way to approach your problem is as a sequence of three
> mapreduce steps:
>
> Step 1, count pairs
> map: <A, B> => <<A,B>, 1>
> combine and reduce: <key, counts> => <key, sum(counts)>
> output name: pairs
>
> Step 2, summarize by row, input from step 1
> map: <A, B, k> => <A, k>
> combine and reduce: <key, counts> => <key, sum(counts)>
> output name: rows
>
> This is all just the way that you described. You have counts of <A,B> pairs
> and counts of <A,*> rows. You want to combine these into MLE's. They way
> that I go from here to ratios is to read the table of <A,*> counts into
> memory in the configure method for the mapper.
>
> Step 3, compute conditional probs
> configure: rowCounts = new Map("rows")
> map: <A, B, k1> => A, B, k1 / rowCounts[A]
> Note: no reduce ... set number of reducers to 0
>
> For many problems of this sort, the cardinality of A is sufficiently small
> so that this inmemory table is no problem (< a few million). For some
> problems like trigram language models this won't work as well. In that
> case, you can arrange the third step so that it gets the "rows" output
> followed by the "pairs" output and that this ordering carries through to the
> reduce step. For example, take a look at these alternative forms for steps
> 1 and 2:
>
> Step 1, count pairs
> map: <A, B> => <<A,B>, 1>
> combine and reduce: <key, counts> => <key, sum(counts)>
> output name: pairs
>
> Step 2, summarize by row, input from step 1
> map: <A, B, k> => <A, k>
> combine and reduce: <key, counts> => <key, "*", sum(counts)>
> output name: rows
>
> Now step 3 can be rewritten so that it puts the row sum at the beginning of
> the values for the reduce:
>
> Step 3, compute conditional probabilities, inputs are "rows" and "pairs" in
> that order
> map: <A, B, k> => <A, <B k>>
> combine: none
> reduce: function (A, values) {
> assert(first(first(values)) == "*")
> rowTotal = second(first(values))
> for (<B, k> in tail(values)) {
> output(A, B, k / rowTotal)
> }
> }
>
> This approach is obviously better in terms of memory use, but it depends on
> confidence that the reduced values won't get reordered. You will probably
> have to do something to enforce that constraint like tagging the value
> output of map and asking Hadoop to sort the reduce values accordingly. I
> didn't want to figure that out so I took the easy way out for my stuff.
>
> Let me know how you proceed.
>
> On 9/27/07 12:22 PM, "Chris Dyer" <redpony@umd.edu> wrote:
>
> > Hi all
> > I'm new to using Hadoop so I'm hoping to get a little guidance on what
> > the best way to solve a particular class of problems would be.
> >
> > The general use case is this: from a very small set of data, I will
> > generate a massive set of pairs of values, ie, <A,B>. I would like to
> > compute the maximum likelihood estimate (MLE) of the conditional
> > probability P(AB). However, it is very obvious to me how to compute
> > the counts of C(<A,B>) and equally obvious how to compute the counts
> > C(<A,*>) or C(<*,B>), but what I need is: C(<A,B>)/C(<*,B>).
> >
> > My approach:
> >
> > My initial approach to the decomposition of this problem is to use a
> > mapper to go from my input data to <A,B> pairs, and then a reducer to
> > go for <A,B> pairs to counts C(A,B). However, at that point, I'd like
> > a second reducer like thing (call it Normalize) to run which
> > aggregates all the C(*,B) pairs and returns a value P(AB) for each A
> > that occurs with B. This is where things get fuzzy for me. How do I
> > do this? A reducer can only return a single value (for example, if I
> > make B the key for Normalize it could return C(B) very easily). What
> > I need is a value type that reduce can return that is essential a list
> > of (key,value) pairs. Does such a thing exist? Am I approaching this
> > the wrong way?
> >
> > Thanks for any assistance!
> > Chris
> >
> > 
> > Chris Dyer
> > Dept. of Linguistics
> > University of Maryland
> > 1401 Marie Mount Hall
> > College Park MD 20742
>
>
