Return-Path: X-Original-To: apmail-crunch-dev-archive@www.apache.org Delivered-To: apmail-crunch-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id B721D10B4B for ; Wed, 3 Jul 2013 14:17:27 +0000 (UTC) Received: (qmail 1390 invoked by uid 500); 3 Jul 2013 14:17:25 -0000 Delivered-To: apmail-crunch-dev-archive@crunch.apache.org Received: (qmail 512 invoked by uid 500); 3 Jul 2013 14:17:24 -0000 Mailing-List: contact dev-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list dev@crunch.apache.org Received: (qmail 380 invoked by uid 500); 3 Jul 2013 14:17:22 -0000 Delivered-To: apmail-incubator-crunch-dev@incubator.apache.org Received: (qmail 288 invoked by uid 99); 3 Jul 2013 14:17:20 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 03 Jul 2013 14:17:20 +0000 Date: Wed, 3 Jul 2013 14:17:20 +0000 (UTC) From: "Josh Wills (JIRA)" To: crunch-dev@incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Commented] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699009#comment-13699009 ] Josh Wills commented on CRUNCH-232: ----------------------------------- For joins, the CrunchMapper has two different types of input splits that it needs to process, and there are two separate sequences of DoFn calls (which in the runtime system are called RTNodes), one for each of the splits. It turns out that during setup, we would always initialize _all_ of the RTNodes (and thus their child DoFns), regardless of which input split we were running on at the time, but we would only call cleanup for the RTNode (and DoFns) that corresponded to the split that we were actually processing in the current task. The fix was primarily to ensure that we only called initialize for one of the RTNode sequences during setup-- the one that we were about to process-- not all of them. > DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables. > ---------------------------------------------------------------------------------------------------------------------- > > Key: CRUNCH-232 > URL: https://issues.apache.org/jira/browse/CRUNCH-232 > Project: Crunch > Issue Type: Bug > Components: MapReduce Patterns > Affects Versions: 0.6.0 > Reporter: Anuj Ojha > Priority: Critical > Attachments: CRUNCH-232.patch > > > DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables. > > Sample Test: > {code} > final Configuration config = HBaseTest.getConf(); > final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); > final PCollection collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile.txt").toString()); > > final PCollection collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile2.txt").toString()); > > final PTable ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable> joinedTable = ptable1.join(ptable2); > > final PCollection joinedStrings = joinedTable.parallelDo( > new MapFn>, String>() { > private static final long serialVersionUID = -8796426750247480646L; > > @Override > public String map(final Pair> input) { > return input.second().first() + "/" + input.second().second(); > } > }, Avros.strings()); > > System.out.println(joinedStrings.materialize().iterator().hasNext()); > {code} > > The two DoFnCheck looks something like this: > > {code} > public class DoFnCheck extends DoFn> { > /** > * > */ > private static final long serialVersionUID = 6780749658216132026L; > > @Override > public void initialize() { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void cleanup(final Emitter> emitter) { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void process(final String input, final Emitter> emitter) { > // TODO Auto-generated method stub > System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > > final Pair pair = new Pair(1, input); > > emitter.emit(pair); > } > } > {code} > > The console looks like this: > > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira