crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Wills (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.
Date Tue, 02 Jul 2013 23:41:20 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13698402#comment-13698402
] 

Josh Wills commented on CRUNCH-232:
-----------------------------------

Thanks for reporting this. It looks to me that we don't call cleanup in the case that there's
no processing to do for an input, which probably only happens when we are doing a join against
an empty input. Can you confirm that one of the inputs here is empty?

I think that the right thing to do is to ensure that cleanup is always called, regardless
of whether or not the input actually has any records in it.
                
> DoFn initialize method gets called twice where as cleanup gets called only once when
join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-232
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-232
>             Project: Crunch
>          Issue Type: Bug
>          Components: MapReduce Patterns
>    Affects Versions: 0.6.0
>            Reporter: Anuj Ojha
>            Priority: Critical
>
> DoFn's initialize method gets called twice where as cleanup gets called only once, when
a Join is performed on two Ptables.
>  
> Sample Test:
> {code} 
>         final Configuration config = HBaseTest.getConf();
>         final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
>         final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile.txt").toString());
>  
>         final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile2.txt").toString());
>  
>         final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating
table", new DoFnCheck(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating
table2", new DoFnCheck2(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
>  
>         final PCollection<String> joinedStrings = joinedTable.parallelDo(
>                 new MapFn<Pair<Integer, Pair<String, String>>, String>()
{
>                     private static final long serialVersionUID = -8796426750247480646L;
>  
>                     @Override
>                     public String map(final Pair<Integer, Pair<String, String>>
input) {
>                         return input.second().first() + "/" + input.second().second();
>                     }
>                 }, Avros.strings());
>  
>         System.out.println(joinedStrings.materialize().iterator().hasNext());
>  {code}
>  
> The two DoFnCheck looks something like this:
>  
> {code} 
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 6780749658216132026L;
>  
>     @Override
>     public void initialize() {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void process(final String input, final Emitter<Pair<Integer, String>>
emitter) {
>         // TODO Auto-generated method stub
>         System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>  
>         final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
>  
>         emitter.emit(pair);
>     }
> }
> {code} 
>  
> The console looks like this:
>  
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message