crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: DoFn processing
Date Fri, 18 Oct 2013 20:27:57 GMT
Hi Hrishikesh,

I think the basic principle in Crunch that should answer a large
portion of your questions is this: things only really start happening
in Crunch if you either write data to a target (i.e.
pipeline.write(...)), or materialize a PCollection. Materializing a
PCollection is something you typically only want to do if it is
relatively small, or if you have a specific need to get the contents
of a PCollection into the client application.

If you specifically want to ensure that the TestFn is only used once
in your pipeline, it should be sufficient to add the following call
just after the "table" PCollection is created:

    table.materialize().iterator().next();

Using this instead of pipeline.run will probably ensure that TestFn is
only run once over the collection in this case.

However, it's actually a much better idea to not have operations that
affect the state of an external application/process within a DoFn. I
would recommend instead writing the data to be added to an external
database as an end-point of a pipeline (or even as something that can
be run on the output of a pipeline), instead of doing it within a
pipeline. In this way you can become immune to the number of times a
DoFn is used/re-used in a pipeline.

Hope this helps, but if this isn't answering your questions then just
let me know.

- Gabriel


On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P <hrishi.engineer@gmail.com> wrote:
> Basically, I have a need to emit a pair of a string and an int from the
> DoFn's process method; the integers will later be grouped by the string. In
> addition to this, I also need to emit another long from the process method
> that is independent of this pair. At the end, I want to -
> 1) group the integers by string value
> 2) print the max long value.
>
> So I came up with the following code:
>
> {pseudo-code}
> ...
>
> PTable<Double, Long> inputTable = read() // internal read method
>
> PTable<Pair<String, Integer>, Long> table = inputTable.parallelDo("testjob",
> new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(),
> Writables.ints()), Writables.longs()));
>
> pipeline.run();
>
> final Iterator<Long> it =
> table.top(1).values().asCollection().getValue().iterator();
>
> if (it.hasNext()) {
>
> System.err.println("MAX LONG = " + it.next());
>
> }
>
> PTable<String, Integer> newTable = table.keys().parallelDo(new
> PTableConverterFn(), Writables.tableOf(Writables.strings(),
> Writables.ints()));
>
> // PTableConverterFn just emits the input back, it is used to convert
>
> // PCollection to PTable so that the values can be easily grouped by.
>
> final Iterator<Pair<String, Integer>> mapIt =
> newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();
>
> int count = 0;
>
> while (mapIt.hasNext()) {
>
>                 Pair<String, Integer> aPair = mapIt.next();
>
>                 System.out.println(aPair.first() + "---" + aPair.second());
> // Need to print these in the logs.
>
>                 count += aPair.second();
>
> }
>
> pipeline.done();
>
> System.out.println("Count = " + count);
>
> ...
>
> private static class TestFn extends DoFn<Pair<Double, Long>,
> Pair<Pair<String, Integer>, Long>> {
>
> protected TestFn(){
>
> }
>
> @Override
>
>         public void process(Pair<Double, Long> input,
> Emitter<Pair<Pair<String, Integer>, Long>> emitter) {
>
> System.out.println("Entering process ...");
>
> ...
>
> Pair<String, Integer> aPair = processInput(input); // internal method, does
> some database insertion operations.
>
> long maxLong = 0;
>
> maxLong = findMaxLong(input.first()); // internal method
>
> ...
>
> Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String, Integer>,
> Long>(aPair, maxLong);
>
>                     emitter.emit(pair);
>
> System.out.println("Exiting process ...");
>
> }
>
> }
>
> ...
>
> {pseudo-code}
>
>
> This code gives me the results I expect, but the DoFn process method gets
> called twice, which I guess happens first during the
> "table.top(1).values().asCollection().getValue().iterator()" call and second
> during the
> "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
> call. Since I am doing database insertion operations inside the process
> method, this is undesirable.
>
> My questions are:
>
> 1) Is there a better way to do what I am trying to do here?
>
> 2) when/how many times the process method actually gets called and/or why
> does it get called more than once?
>
> 3) If I did not have the need to print the values after the processing,
> would the database insertions still have happened without materializing the
> results? Is materializing necessary for processing to happen?
>
> Thanks for the help/input.

Mime
View raw message