crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: DoFn processing
Date Sat, 19 Oct 2013 03:28:40 GMT
Hi Hrishikesh,

About the database insertion -- if you want to have parallel writes to
a database, probably the best way to do this within Crunch would be to
implement a Target[1] for writing to your RDBMS. This would involve
using a Hadoop OutputFormat, and would do the writes in parallel.

There isn't currently a DatabaseTarget available in Crunch. However,
the Sqoop project[2] deals specifically with moving data between HDFS
and databases, and also helps to take care of details like limiting an
import job to the amount of parallelism that an RDBMS can handle.
Probably the best option (although slightly more involved) would be to
use Crunch to do your data processing and output data to HDFS, and
then use Sqoop to import the data into your RDBMS.

- Gabriel

[1] http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/Target.html
[2] http://sqoop.apache.org

On Sat, Oct 19, 2013 at 12:14 AM, Hrishikesh P
<hrishi.engineer@gmail.com> wrote:
> Hi Gabriel,
>
> Thanks for the reply, that definitely answered some of my questions.
>
> My PCollection is relatively small (30K pairs should be a small value). My
> main concern was that the process method was getting called twice. But after
> adding "table.materialize().iterator().next();" its now getting called only
> once, which will solve my problem for now.
>
> But I see your point. If I make the database insertion operation an end
> point of the pipeline, would I still be able to do it in parallel, because
> that is one of the reasons I put the logic in the process method. If not, I
> may have to continue with my approach.
>
> Thanks.
>
>
> On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <gabriel.reid@gmail.com>
> wrote:
>>
>> Hi Hrishikesh,
>>
>> I think the basic principle in Crunch that should answer a large
>> portion of your questions is this: things only really start happening
>> in Crunch if you either write data to a target (i.e.
>> pipeline.write(...)), or materialize a PCollection. Materializing a
>> PCollection is something you typically only want to do if it is
>> relatively small, or if you have a specific need to get the contents
>> of a PCollection into the client application.
>>
>> If you specifically want to ensure that the TestFn is only used once
>> in your pipeline, it should be sufficient to add the following call
>> just after the "table" PCollection is created:
>>
>>     table.materialize().iterator().next();
>>
>> Using this instead of pipeline.run will probably ensure that TestFn is
>> only run once over the collection in this case.
>>
>> However, it's actually a much better idea to not have operations that
>> affect the state of an external application/process within a DoFn. I
>> would recommend instead writing the data to be added to an external
>> database as an end-point of a pipeline (or even as something that can
>> be run on the output of a pipeline), instead of doing it within a
>> pipeline. In this way you can become immune to the number of times a
>> DoFn is used/re-used in a pipeline.
>>
>> Hope this helps, but if this isn't answering your questions then just
>> let me know.
>>
>> - Gabriel
>>
>>
>> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P <hrishi.engineer@gmail.com>
>> wrote:
>> > Basically, I have a need to emit a pair of a string and an int from the
>> > DoFn's process method; the integers will later be grouped by the string.
>> > In
>> > addition to this, I also need to emit another long from the process
>> > method
>> > that is independent of this pair. At the end, I want to -
>> > 1) group the integers by string value
>> > 2) print the max long value.
>> >
>> > So I came up with the following code:
>> >
>> > {pseudo-code}
>> > ...
>> >
>> > PTable<Double, Long> inputTable = read() // internal read method
>> >
>> > PTable<Pair<String, Integer>, Long> table =
>> > inputTable.parallelDo("testjob",
>> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(),
>> > Writables.ints()), Writables.longs()));
>> >
>> > pipeline.run();
>> >
>> > final Iterator<Long> it =
>> > table.top(1).values().asCollection().getValue().iterator();
>> >
>> > if (it.hasNext()) {
>> >
>> > System.err.println("MAX LONG = " + it.next());
>> >
>> > }
>> >
>> > PTable<String, Integer> newTable = table.keys().parallelDo(new
>> > PTableConverterFn(), Writables.tableOf(Writables.strings(),
>> > Writables.ints()));
>> >
>> > // PTableConverterFn just emits the input back, it is used to convert
>> >
>> > // PCollection to PTable so that the values can be easily grouped by.
>> >
>> > final Iterator<Pair<String, Integer>> mapIt =
>> >
>> > newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();
>> >
>> > int count = 0;
>> >
>> > while (mapIt.hasNext()) {
>> >
>> >                 Pair<String, Integer> aPair = mapIt.next();
>> >
>> >                 System.out.println(aPair.first() + "---" +
>> > aPair.second());
>> > // Need to print these in the logs.
>> >
>> >                 count += aPair.second();
>> >
>> > }
>> >
>> > pipeline.done();
>> >
>> > System.out.println("Count = " + count);
>> >
>> > ...
>> >
>> > private static class TestFn extends DoFn<Pair<Double, Long>,
>> > Pair<Pair<String, Integer>, Long>> {
>> >
>> > protected TestFn(){
>> >
>> > }
>> >
>> > @Override
>> >
>> >         public void process(Pair<Double, Long> input,
>> > Emitter<Pair<Pair<String, Integer>, Long>> emitter) {
>> >
>> > System.out.println("Entering process ...");
>> >
>> > ...
>> >
>> > Pair<String, Integer> aPair = processInput(input); // internal method,
>> > does
>> > some database insertion operations.
>> >
>> > long maxLong = 0;
>> >
>> > maxLong = findMaxLong(input.first()); // internal method
>> >
>> > ...
>> >
>> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String,
Integer>,
>> > Long>(aPair, maxLong);
>> >
>> >                     emitter.emit(pair);
>> >
>> > System.out.println("Exiting process ...");
>> >
>> > }
>> >
>> > }
>> >
>> > ...
>> >
>> > {pseudo-code}
>> >
>> >
>> > This code gives me the results I expect, but the DoFn process method
>> > gets
>> > called twice, which I guess happens first during the
>> > "table.top(1).values().asCollection().getValue().iterator()" call and
>> > second
>> > during the
>> >
>> > "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
>> > call. Since I am doing database insertion operations inside the process
>> > method, this is undesirable.
>> >
>> > My questions are:
>> >
>> > 1) Is there a better way to do what I am trying to do here?
>> >
>> > 2) when/how many times the process method actually gets called and/or
>> > why
>> > does it get called more than once?
>> >
>> > 3) If I did not have the need to print the values after the processing,
>> > would the database insertions still have happened without materializing
>> > the
>> > results? Is materializing necessary for processing to happen?
>> >
>> > Thanks for the help/input.
>
>

Mime
View raw message