crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gabriel Reid <gabriel.r...@gmail.com>
Subject Re: DoFn processing
Date Sat, 19 Oct 2013 03:37:23 GMT
Yep, there's only a JDBC Source in contrib, no Target.

On Sat, Oct 19, 2013 at 5:32 AM, Josh Wills <jwills@cloudera.com> wrote:
> There isnt a db target in contrib? There is a db source, IIRC.
>
> On Oct 18, 2013 8:29 PM, "Gabriel Reid" <gabriel.reid@gmail.com> wrote:
>>
>> Hi Hrishikesh,
>>
>> About the database insertion -- if you want to have parallel writes to
>> a database, probably the best way to do this within Crunch would be to
>> implement a Target[1] for writing to your RDBMS. This would involve
>> using a Hadoop OutputFormat, and would do the writes in parallel.
>>
>> There isn't currently a DatabaseTarget available in Crunch. However,
>> the Sqoop project[2] deals specifically with moving data between HDFS
>> and databases, and also helps to take care of details like limiting an
>> import job to the amount of parallelism that an RDBMS can handle.
>> Probably the best option (although slightly more involved) would be to
>> use Crunch to do your data processing and output data to HDFS, and
>> then use Sqoop to import the data into your RDBMS.
>>
>> - Gabriel
>>
>> [1] http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/Target.html
>> [2] http://sqoop.apache.org
>>
>> On Sat, Oct 19, 2013 at 12:14 AM, Hrishikesh P
>> <hrishi.engineer@gmail.com> wrote:
>> > Hi Gabriel,
>> >
>> > Thanks for the reply, that definitely answered some of my questions.
>> >
>> > My PCollection is relatively small (30K pairs should be a small value).
>> > My
>> > main concern was that the process method was getting called twice. But
>> > after
>> > adding "table.materialize().iterator().next();" its now getting called
>> > only
>> > once, which will solve my problem for now.
>> >
>> > But I see your point. If I make the database insertion operation an end
>> > point of the pipeline, would I still be able to do it in parallel,
>> > because
>> > that is one of the reasons I put the logic in the process method. If
>> > not, I
>> > may have to continue with my approach.
>> >
>> > Thanks.
>> >
>> >
>> > On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <gabriel.reid@gmail.com>
>> > wrote:
>> >>
>> >> Hi Hrishikesh,
>> >>
>> >> I think the basic principle in Crunch that should answer a large
>> >> portion of your questions is this: things only really start happening
>> >> in Crunch if you either write data to a target (i.e.
>> >> pipeline.write(...)), or materialize a PCollection. Materializing a
>> >> PCollection is something you typically only want to do if it is
>> >> relatively small, or if you have a specific need to get the contents
>> >> of a PCollection into the client application.
>> >>
>> >> If you specifically want to ensure that the TestFn is only used once
>> >> in your pipeline, it should be sufficient to add the following call
>> >> just after the "table" PCollection is created:
>> >>
>> >>     table.materialize().iterator().next();
>> >>
>> >> Using this instead of pipeline.run will probably ensure that TestFn is
>> >> only run once over the collection in this case.
>> >>
>> >> However, it's actually a much better idea to not have operations that
>> >> affect the state of an external application/process within a DoFn. I
>> >> would recommend instead writing the data to be added to an external
>> >> database as an end-point of a pipeline (or even as something that can
>> >> be run on the output of a pipeline), instead of doing it within a
>> >> pipeline. In this way you can become immune to the number of times a
>> >> DoFn is used/re-used in a pipeline.
>> >>
>> >> Hope this helps, but if this isn't answering your questions then just
>> >> let me know.
>> >>
>> >> - Gabriel
>> >>
>> >>
>> >> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P
>> >> <hrishi.engineer@gmail.com>
>> >> wrote:
>> >> > Basically, I have a need to emit a pair of a string and an int from
>> >> > the
>> >> > DoFn's process method; the integers will later be grouped by the
>> >> > string.
>> >> > In
>> >> > addition to this, I also need to emit another long from the process
>> >> > method
>> >> > that is independent of this pair. At the end, I want to -
>> >> > 1) group the integers by string value
>> >> > 2) print the max long value.
>> >> >
>> >> > So I came up with the following code:
>> >> >
>> >> > {pseudo-code}
>> >> > ...
>> >> >
>> >> > PTable<Double, Long> inputTable = read() // internal read method
>> >> >
>> >> > PTable<Pair<String, Integer>, Long> table =
>> >> > inputTable.parallelDo("testjob",
>> >> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(),
>> >> > Writables.ints()), Writables.longs()));
>> >> >
>> >> > pipeline.run();
>> >> >
>> >> > final Iterator<Long> it =
>> >> > table.top(1).values().asCollection().getValue().iterator();
>> >> >
>> >> > if (it.hasNext()) {
>> >> >
>> >> > System.err.println("MAX LONG = " + it.next());
>> >> >
>> >> > }
>> >> >
>> >> > PTable<String, Integer> newTable = table.keys().parallelDo(new
>> >> > PTableConverterFn(), Writables.tableOf(Writables.strings(),
>> >> > Writables.ints()));
>> >> >
>> >> > // PTableConverterFn just emits the input back, it is used to convert
>> >> >
>> >> > // PCollection to PTable so that the values can be easily grouped by.
>> >> >
>> >> > final Iterator<Pair<String, Integer>> mapIt =
>> >> >
>> >> >
>> >> > newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();
>> >> >
>> >> > int count = 0;
>> >> >
>> >> > while (mapIt.hasNext()) {
>> >> >
>> >> >                 Pair<String, Integer> aPair = mapIt.next();
>> >> >
>> >> >                 System.out.println(aPair.first() + "---" +
>> >> > aPair.second());
>> >> > // Need to print these in the logs.
>> >> >
>> >> >                 count += aPair.second();
>> >> >
>> >> > }
>> >> >
>> >> > pipeline.done();
>> >> >
>> >> > System.out.println("Count = " + count);
>> >> >
>> >> > ...
>> >> >
>> >> > private static class TestFn extends DoFn<Pair<Double, Long>,
>> >> > Pair<Pair<String, Integer>, Long>> {
>> >> >
>> >> > protected TestFn(){
>> >> >
>> >> > }
>> >> >
>> >> > @Override
>> >> >
>> >> >         public void process(Pair<Double, Long> input,
>> >> > Emitter<Pair<Pair<String, Integer>, Long>> emitter)
{
>> >> >
>> >> > System.out.println("Entering process ...");
>> >> >
>> >> > ...
>> >> >
>> >> > Pair<String, Integer> aPair = processInput(input); // internal
>> >> > method,
>> >> > does
>> >> > some database insertion operations.
>> >> >
>> >> > long maxLong = 0;
>> >> >
>> >> > maxLong = findMaxLong(input.first()); // internal method
>> >> >
>> >> > ...
>> >> >
>> >> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String,
>> >> > Integer>,
>> >> > Long>(aPair, maxLong);
>> >> >
>> >> >                     emitter.emit(pair);
>> >> >
>> >> > System.out.println("Exiting process ...");
>> >> >
>> >> > }
>> >> >
>> >> > }
>> >> >
>> >> > ...
>> >> >
>> >> > {pseudo-code}
>> >> >
>> >> >
>> >> > This code gives me the results I expect, but the DoFn process method
>> >> > gets
>> >> > called twice, which I guess happens first during the
>> >> > "table.top(1).values().asCollection().getValue().iterator()" call and
>> >> > second
>> >> > during the
>> >> >
>> >> >
>> >> > "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
>> >> > call. Since I am doing database insertion operations inside the
>> >> > process
>> >> > method, this is undesirable.
>> >> >
>> >> > My questions are:
>> >> >
>> >> > 1) Is there a better way to do what I am trying to do here?
>> >> >
>> >> > 2) when/how many times the process method actually gets called and/or
>> >> > why
>> >> > does it get called more than once?
>> >> >
>> >> > 3) If I did not have the need to print the values after the
>> >> > processing,
>> >> > would the database insertions still have happened without
>> >> > materializing
>> >> > the
>> >> > results? Is materializing necessary for processing to happen?
>> >> >
>> >> > Thanks for the help/input.
>> >
>> >

Mime
View raw message