crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hrishikesh P <hrishi.engin...@gmail.com>
Subject Re: DoFn processing
Date Fri, 18 Oct 2013 22:14:09 GMT
Hi Gabriel,

Thanks for the reply, that definitely answered some of my questions.

My PCollection is relatively small (30K pairs should be a small value). My
main concern was that the process method was getting called twice. But
after adding "table.materialize().iterator().next();" its now getting
called only once, which will solve my problem for now.

But I see your point. If I make the database insertion operation an end
point of the pipeline, would I still be able to do it in parallel, because
that is one of the reasons I put the logic in the process method. If not, I
may have to continue with my approach.

Thanks.


On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <gabriel.reid@gmail.com>wrote:

> Hi Hrishikesh,
>
> I think the basic principle in Crunch that should answer a large
> portion of your questions is this: things only really start happening
> in Crunch if you either write data to a target (i.e.
> pipeline.write(...)), or materialize a PCollection. Materializing a
> PCollection is something you typically only want to do if it is
> relatively small, or if you have a specific need to get the contents
> of a PCollection into the client application.
>
> If you specifically want to ensure that the TestFn is only used once
> in your pipeline, it should be sufficient to add the following call
> just after the "table" PCollection is created:
>
>     table.materialize().iterator().next();
>
> Using this instead of pipeline.run will probably ensure that TestFn is
> only run once over the collection in this case.
>
> However, it's actually a much better idea to not have operations that
> affect the state of an external application/process within a DoFn. I
> would recommend instead writing the data to be added to an external
> database as an end-point of a pipeline (or even as something that can
> be run on the output of a pipeline), instead of doing it within a
> pipeline. In this way you can become immune to the number of times a
> DoFn is used/re-used in a pipeline.
>
> Hope this helps, but if this isn't answering your questions then just
> let me know.
>
> - Gabriel
>
>
> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P <hrishi.engineer@gmail.com>
> wrote:
> > Basically, I have a need to emit a pair of a string and an int from the
> > DoFn's process method; the integers will later be grouped by the string.
> In
> > addition to this, I also need to emit another long from the process
> method
> > that is independent of this pair. At the end, I want to -
> > 1) group the integers by string value
> > 2) print the max long value.
> >
> > So I came up with the following code:
> >
> > {pseudo-code}
> > ...
> >
> > PTable<Double, Long> inputTable = read() // internal read method
> >
> > PTable<Pair<String, Integer>, Long> table =
> inputTable.parallelDo("testjob",
> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(),
> > Writables.ints()), Writables.longs()));
> >
> > pipeline.run();
> >
> > final Iterator<Long> it =
> > table.top(1).values().asCollection().getValue().iterator();
> >
> > if (it.hasNext()) {
> >
> > System.err.println("MAX LONG = " + it.next());
> >
> > }
> >
> > PTable<String, Integer> newTable = table.keys().parallelDo(new
> > PTableConverterFn(), Writables.tableOf(Writables.strings(),
> > Writables.ints()));
> >
> > // PTableConverterFn just emits the input back, it is used to convert
> >
> > // PCollection to PTable so that the values can be easily grouped by.
> >
> > final Iterator<Pair<String, Integer>> mapIt =
> >
> newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();
> >
> > int count = 0;
> >
> > while (mapIt.hasNext()) {
> >
> >                 Pair<String, Integer> aPair = mapIt.next();
> >
> >                 System.out.println(aPair.first() + "---" +
> aPair.second());
> > // Need to print these in the logs.
> >
> >                 count += aPair.second();
> >
> > }
> >
> > pipeline.done();
> >
> > System.out.println("Count = " + count);
> >
> > ...
> >
> > private static class TestFn extends DoFn<Pair<Double, Long>,
> > Pair<Pair<String, Integer>, Long>> {
> >
> > protected TestFn(){
> >
> > }
> >
> > @Override
> >
> >         public void process(Pair<Double, Long> input,
> > Emitter<Pair<Pair<String, Integer>, Long>> emitter) {
> >
> > System.out.println("Entering process ...");
> >
> > ...
> >
> > Pair<String, Integer> aPair = processInput(input); // internal method,
> does
> > some database insertion operations.
> >
> > long maxLong = 0;
> >
> > maxLong = findMaxLong(input.first()); // internal method
> >
> > ...
> >
> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String,
Integer>,
> > Long>(aPair, maxLong);
> >
> >                     emitter.emit(pair);
> >
> > System.out.println("Exiting process ...");
> >
> > }
> >
> > }
> >
> > ...
> >
> > {pseudo-code}
> >
> >
> > This code gives me the results I expect, but the DoFn process method gets
> > called twice, which I guess happens first during the
> > "table.top(1).values().asCollection().getValue().iterator()" call and
> second
> > during the
> >
> "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
> > call. Since I am doing database insertion operations inside the process
> > method, this is undesirable.
> >
> > My questions are:
> >
> > 1) Is there a better way to do what I am trying to do here?
> >
> > 2) when/how many times the process method actually gets called and/or why
> > does it get called more than once?
> >
> > 3) If I did not have the need to print the values after the processing,
> > would the database insertions still have happened without materializing
> the
> > results? Is materializing necessary for processing to happen?
> >
> > Thanks for the help/input.
>

Mime
View raw message