crunch-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Anuj Ojha (JIRA)" <>
Subject [jira] [Created] (CRUNCH-232) DoFn initialize method gets called twice where as cleanup gets called only once when join is performed on two PTables.
Date Tue, 02 Jul 2013 21:39:20 GMT
Anuj Ojha created CRUNCH-232:

             Summary: DoFn initialize method gets called twice where as cleanup gets called
only once when join is performed on two PTables.
                 Key: CRUNCH-232
             Project: Crunch
          Issue Type: Bug
          Components: MapReduce Patterns
    Affects Versions: 0.6.0
            Reporter: Anuj Ojha
            Priority: Critical

DoFn's initialize method gets called twice where as cleanup gets called only once, when a
Join is performed on two Ptables.
Sample Test:
        final Configuration config = HBaseTest.getConf();
        final Pipeline pipeline = new MRPipeline(MaraCheckTest.class, config);
        final PCollection<String> collectionHelper1 = pipeline.readTextFile(HBaseTest.class.getResource(
        final PCollection<String> collectionHelper2 = pipeline.readTextFile(HBaseTest.class.getResource(
        final PTable<Integer, String> ptable1 = collectionHelper2.parallelDo("Creating
table", new DoFnCheck(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
        final PTable<Integer, String> ptable2 = collectionHelper1.parallelDo("Creating
table2", new DoFnCheck2(),
                Avros.tableOf(Avros.ints(), Avros.strings()));
        final PTable<Integer, Pair<String, String>> joinedTable = ptable1.join(ptable2);
        final PCollection<String> joinedStrings = joinedTable.parallelDo(
                new MapFn<Pair<Integer, Pair<String, String>>, String>()
                    private static final long serialVersionUID = -8796426750247480646L;
                    public String map(final Pair<Integer, Pair<String, String>>
input) {
                        return input.second().first() + "/" + input.second().second();
                }, Avros.strings());
The two DoFnCheck looks something like this:
public class DoFnCheck extends DoFn<String, Pair<Integer, String>> {
    private static final long serialVersionUID = 6780749658216132026L;
    public void initialize() {
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    public void cleanup(final Emitter<Pair<Integer, String>> emitter) {
                .println("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
    public void process(final String input, final Emitter<Pair<Integer, String>>
emitter) {
        // TODO Auto-generated method stub
        final Pair<Integer, String> pair = new Pair<Integer, String>(1, input);
The console looks like this:
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm cleaned up!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!I'm initializing!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see:

View raw message