crunch-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Josh Wills <jwi...@cloudera.com>
Subject Re: DoFn processing
Date Sat, 19 Oct 2013 03:32:10 GMT
There isnt a db target in contrib? There is a db source, IIRC.
On Oct 18, 2013 8:29 PM, "Gabriel Reid" <gabriel.reid@gmail.com> wrote:

> Hi Hrishikesh,
>
> About the database insertion -- if you want to have parallel writes to
> a database, probably the best way to do this within Crunch would be to
> implement a Target[1] for writing to your RDBMS. This would involve
> using a Hadoop OutputFormat, and would do the writes in parallel.
>
> There isn't currently a DatabaseTarget available in Crunch. However,
> the Sqoop project[2] deals specifically with moving data between HDFS
> and databases, and also helps to take care of details like limiting an
> import job to the amount of parallelism that an RDBMS can handle.
> Probably the best option (although slightly more involved) would be to
> use Crunch to do your data processing and output data to HDFS, and
> then use Sqoop to import the data into your RDBMS.
>
> - Gabriel
>
> [1] http://crunch.apache.org/apidocs/0.7.0/org/apache/crunch/Target.html
> [2] http://sqoop.apache.org
>
> On Sat, Oct 19, 2013 at 12:14 AM, Hrishikesh P
> <hrishi.engineer@gmail.com> wrote:
> > Hi Gabriel,
> >
> > Thanks for the reply, that definitely answered some of my questions.
> >
> > My PCollection is relatively small (30K pairs should be a small value).
> My
> > main concern was that the process method was getting called twice. But
> after
> > adding "table.materialize().iterator().next();" its now getting called
> only
> > once, which will solve my problem for now.
> >
> > But I see your point. If I make the database insertion operation an end
> > point of the pipeline, would I still be able to do it in parallel,
> because
> > that is one of the reasons I put the logic in the process method. If
> not, I
> > may have to continue with my approach.
> >
> > Thanks.
> >
> >
> > On Fri, Oct 18, 2013 at 3:27 PM, Gabriel Reid <gabriel.reid@gmail.com>
> > wrote:
> >>
> >> Hi Hrishikesh,
> >>
> >> I think the basic principle in Crunch that should answer a large
> >> portion of your questions is this: things only really start happening
> >> in Crunch if you either write data to a target (i.e.
> >> pipeline.write(...)), or materialize a PCollection. Materializing a
> >> PCollection is something you typically only want to do if it is
> >> relatively small, or if you have a specific need to get the contents
> >> of a PCollection into the client application.
> >>
> >> If you specifically want to ensure that the TestFn is only used once
> >> in your pipeline, it should be sufficient to add the following call
> >> just after the "table" PCollection is created:
> >>
> >>     table.materialize().iterator().next();
> >>
> >> Using this instead of pipeline.run will probably ensure that TestFn is
> >> only run once over the collection in this case.
> >>
> >> However, it's actually a much better idea to not have operations that
> >> affect the state of an external application/process within a DoFn. I
> >> would recommend instead writing the data to be added to an external
> >> database as an end-point of a pipeline (or even as something that can
> >> be run on the output of a pipeline), instead of doing it within a
> >> pipeline. In this way you can become immune to the number of times a
> >> DoFn is used/re-used in a pipeline.
> >>
> >> Hope this helps, but if this isn't answering your questions then just
> >> let me know.
> >>
> >> - Gabriel
> >>
> >>
> >> On Fri, Oct 18, 2013 at 9:43 PM, Hrishikesh P <
> hrishi.engineer@gmail.com>
> >> wrote:
> >> > Basically, I have a need to emit a pair of a string and an int from
> the
> >> > DoFn's process method; the integers will later be grouped by the
> string.
> >> > In
> >> > addition to this, I also need to emit another long from the process
> >> > method
> >> > that is independent of this pair. At the end, I want to -
> >> > 1) group the integers by string value
> >> > 2) print the max long value.
> >> >
> >> > So I came up with the following code:
> >> >
> >> > {pseudo-code}
> >> > ...
> >> >
> >> > PTable<Double, Long> inputTable = read() // internal read method
> >> >
> >> > PTable<Pair<String, Integer>, Long> table =
> >> > inputTable.parallelDo("testjob",
> >> > new TestFn(), Writables.tableOf(Writables.pairs(Writables.strings(),
> >> > Writables.ints()), Writables.longs()));
> >> >
> >> > pipeline.run();
> >> >
> >> > final Iterator<Long> it =
> >> > table.top(1).values().asCollection().getValue().iterator();
> >> >
> >> > if (it.hasNext()) {
> >> >
> >> > System.err.println("MAX LONG = " + it.next());
> >> >
> >> > }
> >> >
> >> > PTable<String, Integer> newTable = table.keys().parallelDo(new
> >> > PTableConverterFn(), Writables.tableOf(Writables.strings(),
> >> > Writables.ints()));
> >> >
> >> > // PTableConverterFn just emits the input back, it is used to convert
> >> >
> >> > // PCollection to PTable so that the values can be easily grouped by.
> >> >
> >> > final Iterator<Pair<String, Integer>> mapIt =
> >> >
> >> >
> newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator();
> >> >
> >> > int count = 0;
> >> >
> >> > while (mapIt.hasNext()) {
> >> >
> >> >                 Pair<String, Integer> aPair = mapIt.next();
> >> >
> >> >                 System.out.println(aPair.first() + "---" +
> >> > aPair.second());
> >> > // Need to print these in the logs.
> >> >
> >> >                 count += aPair.second();
> >> >
> >> > }
> >> >
> >> > pipeline.done();
> >> >
> >> > System.out.println("Count = " + count);
> >> >
> >> > ...
> >> >
> >> > private static class TestFn extends DoFn<Pair<Double, Long>,
> >> > Pair<Pair<String, Integer>, Long>> {
> >> >
> >> > protected TestFn(){
> >> >
> >> > }
> >> >
> >> > @Override
> >> >
> >> >         public void process(Pair<Double, Long> input,
> >> > Emitter<Pair<Pair<String, Integer>, Long>> emitter) {
> >> >
> >> > System.out.println("Entering process ...");
> >> >
> >> > ...
> >> >
> >> > Pair<String, Integer> aPair = processInput(input); // internal method,
> >> > does
> >> > some database insertion operations.
> >> >
> >> > long maxLong = 0;
> >> >
> >> > maxLong = findMaxLong(input.first()); // internal method
> >> >
> >> > ...
> >> >
> >> > Pair<Pair<String, Integer>, Long> pair = new Pair<Pair<String,
> Integer>,
> >> > Long>(aPair, maxLong);
> >> >
> >> >                     emitter.emit(pair);
> >> >
> >> > System.out.println("Exiting process ...");
> >> >
> >> > }
> >> >
> >> > }
> >> >
> >> > ...
> >> >
> >> > {pseudo-code}
> >> >
> >> >
> >> > This code gives me the results I expect, but the DoFn process method
> >> > gets
> >> > called twice, which I guess happens first during the
> >> > "table.top(1).values().asCollection().getValue().iterator()" call and
> >> > second
> >> > during the
> >> >
> >> >
> "newTable.groupByKey().combineValues(Aggregators.SUM_INTS()).materialize().iterator()"
> >> > call. Since I am doing database insertion operations inside the
> process
> >> > method, this is undesirable.
> >> >
> >> > My questions are:
> >> >
> >> > 1) Is there a better way to do what I am trying to do here?
> >> >
> >> > 2) when/how many times the process method actually gets called and/or
> >> > why
> >> > does it get called more than once?
> >> >
> >> > 3) If I did not have the need to print the values after the
> processing,
> >> > would the database insertions still have happened without
> materializing
> >> > the
> >> > results? Is materializing necessary for processing to happen?
> >> >
> >> > Thanks for the help/input.
> >
> >
>

Mime
View raw message