Return-Path: X-Original-To: apmail-crunch-dev-archive@www.apache.org Delivered-To: apmail-crunch-dev-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 447A610C11 for ; Tue, 2 Jul 2013 21:43:21 +0000 (UTC) Received: (qmail 2413 invoked by uid 500); 2 Jul 2013 21:43:20 -0000 Delivered-To: apmail-crunch-dev-archive@crunch.apache.org Received: (qmail 2333 invoked by uid 500); 2 Jul 2013 21:43:20 -0000 Mailing-List: contact dev-help@crunch.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@crunch.apache.org Delivered-To: mailing list dev@crunch.apache.org Received: (qmail 2269 invoked by uid 500); 2 Jul 2013 21:43:20 -0000 Delivered-To: apmail-incubator-crunch-dev@incubator.apache.org Received: (qmail 2266 invoked by uid 99); 2 Jul 2013 21:43:20 -0000 Received: from arcas.apache.org (HELO arcas.apache.org) (140.211.11.28) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 02 Jul 2013 21:43:20 +0000 Date: Tue, 2 Jul 2013 21:43:20 +0000 (UTC) From: "Anuj Ojha (JIRA)" To: crunch-dev@incubator.apache.org Message-ID: In-Reply-To: References: Subject: [jira] [Updated] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 [ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Anuj Ojha updated CRUNCH-232: ----------------------------- Description: DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables. Sample Test: {code borderStyle=solid} final Configuration config = HBaseTest.getConf(); final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); final PCollection collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile.txt").toString()); final PCollection collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile2.txt").toString()); final PTable ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable> joinedTable = ptable1.join(ptable2); final PCollection joinedStrings = joinedTable.parallelDo( new MapFn>, String>() { private static final long serialVersionUID = -8796426750247480646L; @Override public String map(final Pair> input) { return input.second().first() + "/" + input.second().second(); } }, Avros.strings()); System.out.println(joinedStrings.materialize().iterator().hasNext()); {code} The two DoFnCheck looks something like this: {code} public class DoFnCheck extends DoFn> { /** * */ private static final long serialVersionUID = 6780749658216132026L; @Override public void initialize() { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void cleanup(final Emitter> emitter) { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void process(final String input, final Emitter> emitter) { // TODO Auto-generated method stub System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); final Pair pair = new Pair(1, input); emitter.emit(pair); } } {code} The console looks like this: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! was: DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables. Sample Test: {code} final Configuration config = HBaseTest.getConf(); final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); final PCollection collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile.txt").toString()); final PCollection collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource( "/HbaseTestFile2.txt").toString()); final PTable ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), Avros.tableOf(Avros.ints(), Avros.strings())); final PTable> joinedTable = ptable1.join(ptable2); final PCollection joinedStrings = joinedTable.parallelDo( new MapFn>, String>() { private static final long serialVersionUID = -8796426750247480646L; @Override public String map(final Pair> input) { return input.second().first() + "/" + input.second().second(); } }, Avros.strings()); System.out.println(joinedStrings.materialize().iterator().hasNext()); {code} The two DoFnCheck looks something like this: {code} public class DoFnCheck extends DoFn> { /** * */ private static final long serialVersionUID = 6780749658216132026L; @Override public void initialize() { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void cleanup(final Emitter> emitter) { System.out .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } @Override public void process(final String input, final Emitter> emitter) { // TODO Auto-generated method stub System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); final Pair pair = new Pair(1, input); emitter.emit(pair); } } {code} The console looks like this: !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables. > ---------------------------------------------------------------------------------------------------------------------- > > Key: CRUNCH-232 > URL: https://issues.apache.org/jira/browse/CRUNCH-232 > Project: Crunch > Issue Type: Bug > Components: MapReduce Patterns > Affects Versions: 0.6.0 > Reporter: Anuj Ojha > Priority: Critical > > DoFn's initialize method gets called twice where as cleanup gets called only once, when a Join is performed on two Ptables. > > Sample Test: > {code borderStyle=solid} > final Configuration config = HBaseTest.getConf(); > final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config); > final PCollection collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile.txt").toString()); > > final PCollection collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource( > "/HbaseTestFile2.txt").toString()); > > final PTable ptable1 = collectionHelper2.parallelDo("Creating table", new DoFnCheck(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable ptable2 = collectionHelper1.parallelDo("Creating table2", new DoFnCheck2(), > Avros.tableOf(Avros.ints(), Avros.strings())); > > final PTable> joinedTable = ptable1.join(ptable2); > > final PCollection joinedStrings = joinedTable.parallelDo( > new MapFn>, String>() { > private static final long serialVersionUID = -8796426750247480646L; > > @Override > public String map(final Pair> input) { > return input.second().first() + "/" + input.second().second(); > } > }, Avros.strings()); > > System.out.println(joinedStrings.materialize().iterator().hasNext()); > {code} > > The two DoFnCheck looks something like this: > > {code} > public class DoFnCheck extends DoFn> { > /** > * > */ > private static final long serialVersionUID = 6780749658216132026L; > > @Override > public void initialize() { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void cleanup(final Emitter> emitter) { > System.out > .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > } > > @Override > public void process(final String input, final Emitter> emitter) { > // TODO Auto-generated method stub > System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); > > final Pair pair = new Pair(1, input); > > emitter.emit(pair); > } > } > {code} > > The console looks like this: > > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! > !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira