crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh P <hrishi.engin...@gmail.com>
Subject DoFn processing
Date Fri, 18 Oct 2013 19:43:28 GMT
Basically, I have a need to emit a pair of a string and an int from the
DoFn's process method; the integers will later be grouped by the string. In
addition to this, I also need to emit another long from the process method
that is independent of this pair. At the end, I want to -
1) group the integers by string value
2) print the max long value.

So I came up with the following code:

{pseudo-code}
...

PTable<Double, Long> inputTable = read() // internal read method

PTable<Pair<String, Integer>, Long> table =
inputTable.parallelDo("testjob", new TestFn(),
Writables.tableOf(Writables.pairs(Writables.strings(), Writables.ints()),
Writables.longs()));

pipeline.run();

final Iterator<Long> it =
table.top(1).values().asCollection().getValue().iterator();

if (it.hasNext()) {

System.err.println("MAX LONG = " + it.next());

}

PTable<String, Integer> newTable =
table.keys().parallelDo(newPTableConverterFn(),
Writables.tableOf(Writables.strings(),
Writables.ints()));

// PTableConverterFn just emits the input back, it is used to convert

// PCollection to PTable so that the values can be easily grouped by.

final Iterator<Pair<String, Integer>> mapIt =
newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();

int count = 0;

while (mapIt.hasNext()) {

                Pair<String, Integer> aPair = mapIt.next();

                System.out.println(aPair.first() + "---" + aPair.second());
 // Need to print these in the logs.

                count += aPair.second();

}

pipeline.done();

System.out.println("Count = " + count);

...

private static class TestFn extends DoFn<Pair<Double, Long>,
Pair<Pair<String, Integer>, Long>> {

protected TestFn(){

}

@Override

        public void process(Pair<Double, Long> input,
Emitter<Pair<Pair<String, Integer>, Long>> emitter) {

System.out.println("Entering process ...");

...

Pair<String, Integer> aPair = processInput(input); // internal method, does
some database insertion operations.

long maxLong = 0;

maxLong = findMaxLong(input.first()); // internal method

...

Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String, Integer>,
Long>(aPair, maxLong);

                    emitter.emit(pair);

System.out.println("Exiting process ...");

}

}

...

{pseudo-code}


This code gives me the results I expect, but the DoFn process method gets
called twice, which I guess happens first during the
"table.top(1).values().asCollection().getValue().iterator()" call and
second during the
"newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
call. Since I am doing database insertion operations inside the process
method, this is undesirable.

My questions are:

1) Is there a better way to do what I am trying to do here?

2) when/how many times the process method actually gets called and/or why
does it get called more than once?

3) If I did not have the need to print the values after the processing,
would the database insertions still have happened without materializing the
results? Is materializing necessary for processing to happen?

Thanks for the help/input.

Mime
View raw message