crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Josh Wills (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.
Date Wed, 03 Jul 2013 14:17:20 GMT

    [ https://issues.apache.org/jira/browse/CRUNCH-232?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13699009#comment-13699009
] 

Josh Wills commented on CRUNCH-232:
-----------------------------------

For joins, the CrunchMapper has two different types of input splits that it needs to process,
and there are two separate sequences of DoFn calls (which in the runtime system are called
RTNodes), one for each of the splits. It turns out that during setup, we would always initialize
_all_ of the RTNodes (and thus their child DoFns), regardless of which input split we were
running on at the time, but we would only call cleanup for the RTNode (and DoFns) that corresponded
to the split that we were actually processing in the current task. The fix was primarily to
ensure that we only called initialize for one of the RTNode sequences during setup-- the one
that we were about to process-- not all of them.
                
> DoFn initialize method gets called twice where as cleanup gets called only once when
join is performed on two PTables.
> ----------------------------------------------------------------------------------------------------------------------
>
>                 Key: CRUNCH-232
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-232
>             Project: Crunch
>          Issue Type: Bug
>          Components: MapReduce Patterns
>    Affects Versions: 0.6.0
>            Reporter: Anuj Ojha
>            Priority: Critical
>         Attachments: CRUNCH-232.patch
>
>
> DoFn's initialize method gets called twice where as cleanup gets called only once, when
a Join is performed on two Ptables.
>  
> Sample Test:
> {code} 
>         final Configuration config = HBaseTest.getConf();
>         final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
>         final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile.txt").toString());
>  
>         final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
>                 "/HbaseTestFile2.txt").toString());
>  
>         final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating
table", new DoFnCheck(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating
table2", new DoFnCheck2(),
>                 Avros.tableOf(Avros.ints(), Avros.strings()));
>  
>         final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
>  
>         final PCollection<String> joinedStrings = joinedTable.parallelDo(
>                 new MapFn<Pair<Integer, Pair<String, String>>, String>()
{
>                     private static final long serialVersionUID = -8796426750247480646L;
>  
>                     @Override
>                     public String map(final Pair<Integer, Pair<String, String>>
input) {
>                         return input.second().first() + "/" + input.second().second();
>                     }
>                 }, Avros.strings());
>  
>         System.out.println(joinedStrings.materialize().iterator().hasNext());
>  {code}
>  
> The two DoFnCheck looks something like this:
>  
> {code} 
> public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
>     /**
>      * 
>      */
>     private static final long serialVersionUID = 6780749658216132026L;
>  
>     @Override
>     public void initialize() {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
>         System.out
>                 .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>     }
>  
>     @Override
>     public void process(final String input, final Emitter<Pair<Integer, String>>
emitter) {
>         // TODO Auto-generated method stub
>         System.out.println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
>  
>         final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
>  
>         emitter.emit(pair);
>     }
> }
> {code} 
>  
> The console looks like this:
>  
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!Process!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
> !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message