Return-Path: X-Original-To: apmail-hadoop-common-user-archive@www.apache.org Delivered-To: apmail-hadoop-common-user-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 16BD3D62B for ; Mon, 8 Oct 2012 23:03:47 +0000 (UTC) Received: (qmail 29362 invoked by uid 500); 8 Oct 2012 23:03:42 -0000 Delivered-To: apmail-hadoop-common-user-archive@hadoop.apache.org Received: (qmail 29225 invoked by uid 500); 8 Oct 2012 23:03:42 -0000 Mailing-List: contact user-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: user@hadoop.apache.org Delivered-To: mailing list user@hadoop.apache.org Received: (qmail 29217 invoked by uid 99); 8 Oct 2012 23:03:42 -0000 Received: from minotaur.apache.org (HELO minotaur.apache.org) (140.211.11.9) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2012 23:03:42 +0000 Received: from localhost (HELO mail-wg0-f48.google.com) (127.0.0.1) (smtp-auth username edwardyoon, mechanism plain) by minotaur.apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 Oct 2012 23:03:42 +0000 Received: by mail-wg0-f48.google.com with SMTP id ds1so3460921wgb.29 for ; Mon, 08 Oct 2012 16:03:40 -0700 (PDT) X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=google.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding:x-gm-message-state; bh=yCsDWjSVVQUw8utD25lUb1mTaW+h/eqTFwz1DGUsiPk=; b=BCBw/WyZwb1jx8sl8rDuVr8ucZqzWFnon9xaEN1tNiOYX3ectoW9DxVERic1DcJJqP XuZwrqJtnD4nlCWR43F7qflc8pI1AKgzJmnr+isCrCFvbbg8GgFdZWOUCl5CNWONQYW5 CSsAy7/hVyKdpx0Egpf8vn1SmYvTS8kmxciVDbFCyKOLi6sw0sGUCCgWk8oW1zFzuUpi 7P+FGD7bpI82Zly7LUY+K5vSnjihg4yB4TapmlSIaZ0gPXY6h2UA3Fd75OLVc4TJJFoh /XHzcziBqGMy1iEcape7PY59J6b+SevQ7l4ki8TrUJj5ccHcnznFUCONyW6rXrIu8NF6 e3EQ== MIME-Version: 1.0 Received: by 10.180.94.226 with SMTP id df2mr24580694wib.11.1349737420351; Mon, 08 Oct 2012 16:03:40 -0700 (PDT) Received: by 10.180.3.166 with HTTP; Mon, 8 Oct 2012 16:03:40 -0700 (PDT) In-Reply-To: References: Date: Tue, 9 Oct 2012 08:03:40 +0900 Message-ID: Subject: Re: Chaning Multiple Reducers: Reduce -> Reduce -> Reduce From: "Edward J. Yoon" To: user@hadoop.apache.org Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: quoted-printable X-Gm-Message-State: ALoCoQkPb9DyJDALwXbc5g1pF9edXiDSkHFfZDjidHOFFICV4MXDUm4yLjq5HuA+qR/Yhky4CLsi Mike, just FYI, it's my 08's approach[1]. 1. https://blogs.apache.org/hama/entry/how_will_hama_bsp_different On Tue, Oct 9, 2012 at 7:50 AM, Michael Segel w= rote: > Jim, > > You can use the combiner as a reducer albeit you won't get down to a sing= le reduce file output. But you don't need that. > As long as the output from the combiner matches the input to the next red= ucer you should be ok. > > Without knowing the specifics, all I can say is TANSTAAFL that is to say = that in a map/reduce paradigm you need to have some sort of mapper. > > I would also point you to look at using HBase and temp tables. While the = writes have more overhead than writing directly to HDFS, it may make things= a bit more interesting. > > Again, the usual caveats about YMMV and things. > > -Mike > > On Oct 8, 2012, at 3:53 PM, Jim Twensky wrote: > >> Hi Mike, >> >> I'm already doing that but the output of the reduce goes straight back >> to HDFS to be consumed by the next Identity Mapper. Combiners just >> reduce the amount of data between map and reduce whereas I'm looking >> for an optimization between reduce and map. >> >> Jim >> >> On Mon, Oct 8, 2012 at 2:19 PM, Michael Segel wrote: >>> Well I was thinking ... >>> >>> Map -> Combiner -> Reducer -> Identity Mapper -> combiner -> reducer ->= Identity Mapper -> combiner -> reducer... >>> >>> May make things easier. >>> >>> HTH >>> >>> 0Mike >>> >>> On Oct 8, 2012, at 2:09 PM, Jim Twensky wrote: >>> >>>> Thank you for the comments. Some similar frameworks I looked at >>>> include Haloop, Twister, Hama, Giraph and Cascading. I am also doing >>>> large scale graph processing so I assumed one of them could serve the >>>> purpose. Here is a summary of what I found out about them that is >>>> relevant: >>>> >>>> 1) Haloop and Twister: They cache static data among a chain of >>>> MapReduce jobs. The main contribution is to reduce the intermediate >>>> data shipped from mappers to reducers. Still, the output of each >>>> reduce goes to the file system. >>>> >>>> 2) Cascading: A higher level API to create MapReduce workflows. >>>> Anything you can do with Cascading can be done practically by more >>>> programing effort and using Hadoop only. Bypassing map and running a >>>> chain of sort->reduce->sort->reduce jobs is not possible. Please >>>> correct me if I'm wrong. >>>> >>>> 3) Giraph: Built on the BSP model and is very similar to Pregel. I >>>> couldn't find a detailed overview of their architecture but my >>>> understanding is that your data needs to fit in distributed memory, >>>> which is also true for Pregel. >>>> >>>> 4) Hama: Also follows the BSP model. I don't know how the intermediate >>>> data is serialized and passed to the next set of nodes and whether it >>>> is possible to do a performance optimization similar to what I am >>>> asking for. If anyone who used Hama can point a few articles about how >>>> the framework actually works and handles the messages passed between >>>> vertices, I'd really appreciate that. >>>> >>>> Conclusion: None of the above tools can bypass the map step or do a >>>> similar performance optimization. Of course Giraph and Hama are built >>>> on a different model - not really MapReduce - so it is not very >>>> accurate to say that they don't have the required functionality. >>>> >>>> If I'm missing anything and.or if there are folks who used Giraph or >>>> Hama and think that they might serve the purpose, I'd be glad to hear >>>> more. >>>> >>>> Jim >>>> >>>> On Mon, Oct 8, 2012 at 6:52 AM, Michael Segel wrote: >>>>> I don't believe that Hama would suffice. >>>>> >>>>> In terms of M/R where you want to chain reducers... >>>>> Can you chain combiners? (I don't think so, but you never know) >>>>> >>>>> If not, you end up with a series of M/R jobs and the Mappers are just= identity mappers. >>>>> >>>>> Or you could use HBase, with a small caveat... you have to be careful= not to use speculative execution and that if a task fails, that the result= s of the task won't be affected if they are run a second time. Meaning that= they will just overwrite the data in a column with a second cell and that = you don't care about the number of versions. >>>>> >>>>> Note: HBase doesn't have transactions, so you would have to think abo= ut how to tag cells so that if a task dies, upon restart, you can remove th= e affected cells. Along with some post job synchronization... >>>>> >>>>> Again HBase may work, but there may also be additional problems that = could impact your results. It will have to be evaluated on a case by case b= asis. >>>>> >>>>> >>>>> JMHO >>>>> >>>>> -Mike >>>>> >>>>> On Oct 8, 2012, at 6:35 AM, Edward J. Yoon wr= ote: >>>>> >>>>>>> call context.write() in my mapper class)? If not, are there any oth= er >>>>>>> MR platforms that can do this? I've been searching around and could= n't >>>>>> >>>>>> You can use Hama BSP[1] instead of Map/Reduce. >>>>>> >>>>>> No stable release yet but I confirmed that large graph with billions >>>>>> of nodes and edges can be crunched in few minutes[2]. >>>>>> >>>>>> 1. http://hama.apache.org >>>>>> 2. http://wiki.apache.org/hama/Benchmarks >>>>>> >>>>>> On Sat, Oct 6, 2012 at 1:31 AM, Jim Twensky = wrote: >>>>>>> Hi, >>>>>>> >>>>>>> I have a complex Hadoop job that iterates over large graph data >>>>>>> multiple times until some convergence condition is met. I know that >>>>>>> the map output goes to the local disk of each particular mapper fir= st, >>>>>>> and then fetched by the reducers before the reduce tasks start. I c= an >>>>>>> see that this is an overhead, and it theory we can ship the data >>>>>>> directly from mappers to reducers, without serializing on the local >>>>>>> disk first. I understand that this step is necessary for fault >>>>>>> tolerance and it is an essential building block of MapReduce. >>>>>>> >>>>>>> In my application, the map process consists of identity mappers whi= ch >>>>>>> read the input from HDFS and ship it to reducers. Essentially, what= I >>>>>>> am doing is applying chains of reduce jobs until the algorithm >>>>>>> converges. My question is, can I bypass the serialization of the lo= cal >>>>>>> data and ship it from mappers to reducers immediately (as soon as I >>>>>>> call context.write() in my mapper class)? If not, are there any oth= er >>>>>>> MR platforms that can do this? I've been searching around and could= n't >>>>>>> see anything similar to what I need. Hadoop On Line is a prototype = and >>>>>>> has some similar functionality but it hasn't been updated for a whi= le. >>>>>>> >>>>>>> Note: I know about ChainMapper and ChainReducer classes but I don't >>>>>>> want to chain multiple mappers in the same local node. I want to ch= ain >>>>>>> multiple reduce functions globally so the data flow looks like: Map= -> >>>>>>> Reduce -> Reduce -> Reduce, which means each reduce operation is >>>>>>> followed by a shuffle and sort essentially bypassing the map >>>>>>> operation. >>>>>> >>>>>> >>>>>> >>>>>> -- >>>>>> Best Regards, Edward J. Yoon >>>>>> @eddieyoon >>>>>> >>>>> >>>> >>> >> > --=20 Best Regards, Edward J. Yoon @eddieyoon