hadoop-pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "David Ciemiewicz (JIRA)" <j...@apache.org>
Subject [jira] Issue Comment Edited: (PIG-807) PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values iterator)
Date Thu, 28 May 2009 23:37:45 GMT

    [ https://issues.apache.org/jira/browse/PIG-807?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12714227#action_12714227
] 

David Ciemiewicz edited comment on PIG-807 at 5/28/09 4:36 PM:
---------------------------------------------------------------

@Yiping

I see what you mean.  Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE
SUM(m);

Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);


There was a hallway conversation about the situation of:

{code}
B = GROUP A BY key;
C = FOREACH B {
        SORTED = ORDER A BY value;
        GENERATE
                COUNT(SORTED) as count,
                QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50,
p75, p90, p100);
        };
{code}

I was told that a ReadOnce bag would not solve this problem because we'd need to pass through
SORTED twice because there were two UDFs.

I disagree.  It is possible to pass over this data once and only once if we create a class
of Accumulating or Running functions that differs from the current DataBag Eval and AlgebraicEval
functions.

First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY,
can all computed on a ReadOnce / Streaming DataBag of unknown length or size.  For each of
these functions, we simply "add" or "accumulate"  the values on row at a time, we can invoke
a combiner for intermediate results across partitions, and produce a final result, all without
materializing a DataBag as implemented today.

QUANTILES is a different beast.  To compute quantiles, the data must be sorted, which I prefer
to do outside the UDF at this time.  Also, the COUNT of the data is needed a prior.  Fortunately
sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length
or size so only two scans through the data (sorting and quantiles) are needed without needing
three scans (sort, count, quantiles).

So, if Pig could understand two additional data types

ReadOnceSizeUnknown -- COUNT() counts all individual rows
ReadOnceSizeKnown -- COUNT() just returns size attribute of ReadOnce data reference

And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate
values a row at a time, many computations in Pig could be much much more efficient.

In case anyone doesn't "get" what I mean by having running functions, here's some Perl code
that implements what I'm suggesting. I'll leave it as an exercise for the Pig development
team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)

runningsums.pl
{code}
#! /usr/bin/perl

use RunningSum;
use RunningCount;

$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();

while (<>)
{
        s/\r*\n*//g;

        ($a, $b, $c) = split(/\t/);

        $a_count->accumulate($a);
        $a_sum->accumulate($a);
        $b_sum->accumulate($b);
        $c_sum->accumulate($c);
}

print join("\t",
        $a_count->final(),
        $a_sum->final(),
        $b_sum->final(),
        $c_sum->final()
        ), "\n";
{code}

RunningCount.pm
{code}
package RunningCount;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'count'} ++;
}

sub final
{
        my $self = shift;
        return $self->{'count'};
}

1;
{code}

RunningSum.pl
{code}
package RunningSum;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;
}

sub final
{
        my $self = shift;
        return $self->{'sum'};
}

1;
{code}








      was (Author: ciemo):
    @Yiping

I see what you mean.  Maybe we should have FOREACH and FORALL as in B = FORALL A GENERATE
SUM(m);

Another version of this my be B = OVER A GENERATE SUM(m); or B = OVERALL A GENERATE SUM(m);


There was a hallway conversation about the situation of:

{code}
B = GROUP A BY key;
C = FOREACH B {
        SORTED = ORDER A BY value;
        GENERATE
                COUNT(SORTED) as count,
                QUANTILES(SORTED.value, 0.0, 0.5, 0.75, 0.9, 1.0) as quantiles: (p00, p50,
p75, p90, p100);
        };
{code}

I was told that a ReadOnce bag would not solve this problem because we'd need to pass through
SORTED twice because there were two UDFs.

I disagree.  It is possible to pass over this data once and only once if we create a class
of Accumulating or Running functions that differs from the current DataBag and AlgebraicDataBag
functions.

First, functions like SUM, COUNT, AVG, VAR, MIN, MAX, STDEV, ResevoirSampling, statistics.SUMMARY,
can all computed on a ReadOnce / Streaming DataBag of unknown length or size.  For each of
these functions, we simply "add" or "accumulate"  the values on row at a time, we can invoke
a combiner for intermediate results across partitions, and produce a final result, all without
materializing a DataBag as implemented today.

QUANTILES is a different beast.  To compute quantiles, the data must be sorted, which I prefer
to do outside the UDF at this time.  Also, the COUNT of the data is needed a prior.  Fortunately
sorting COULD produce a ReadOnce / Streaming DataBag of KNOWN as opposed to unknown length
or size so only two scans through the data (sorting and quantiles) are needed without needing
three scans (sort, count, quantiles).

So, if Pig could understand two additional data types

ReadOnceSizeUnknown -- COUNT() counts all individual rows
ReadOnceSizeKnown -- COUNT() just returns size attribute of ReadOnce data reference

And if Pig had RunningEval and RunningAlgebraicEval classes of functions which accumulate
values a row at a time, many computations in Pig could be much much more efficient.

In case anyone doesn't "get" what I mean by having running functions, here's some Perl code
that implements what I'm suggesting. I'll leave it as an exercise for the Pig development
team to figure out the RunningAlgebraicEval versions of these functions/classes. :^)

runningsums.pl
{code}
#! /usr/bin/perl

use RunningSum;
use RunningCount;

$a_count = RunningCount->new();
$a_sum = RunningSum->new();
$b_sum = RunningSum->new();
$c_sum = RunningSum->new();

while (<>)
{
        s/\r*\n*//g;

        ($a, $b, $c) = split(/\t/);

        $a_count->accumulate($a);
        $a_sum->accumulate($a);
        $b_sum->accumulate($b);
        $c_sum->accumulate($c);
}

print join("\t",
        $a_count->final(),
        $a_sum->final(),
        $b_sum->final(),
        $c_sum->final()
        ), "\n";
{code}

RunningCount.pm
{code}
package RunningCount;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'count'} ++;
}

sub final
{
        my $self = shift;
        return $self->{'count'};
}

1;
{code}

RunningSum.pl
{code}
package RunningSum;

sub new
{
        my $class = shift;
        my $self = {};
        bless $self, $class;
        return $self;
}

sub accumulate
{
        my $self = shift;
        my $value = shift;

        $self->{'sum'} += $value;
}

sub final
{
        my $self = shift;
        return $self->{'sum'};
}

1;
{code}







  
> PERFORMANCE: Provide a way for UDFs to use read-once bags (backed by the Hadoop values
iterator)
> ------------------------------------------------------------------------------------------------
>
>                 Key: PIG-807
>                 URL: https://issues.apache.org/jira/browse/PIG-807
>             Project: Pig
>          Issue Type: Improvement
>    Affects Versions: 0.2.1
>            Reporter: Pradeep Kamath
>             Fix For: 0.3.0
>
>
> Currently all bags resulting from a group or cogroup are materialized as bags containing
all of the contents. The issue with this is that if a particular key has many corresponding
values, all these values get stuffed in a bag which may run out of memory and hence spill
causing slow down in performance and sometime memory exceptions. In many cases, the udfs which
use these bags coming out a group and cogroup only need to iterate over the bag in a unidirectional
read-once manner. This can be implemented by having the bag implement its iterator by simply
iterating over the underlying hadoop iterator provided in the reduce. This kind of a bag is
also needed in http://issues.apache.org/jira/browse/PIG-802. So the code can be reused for
this issue too. The other part of this issue is to have some way for the udfs to communicate
to Pig that any input bags that they need are "read once" bags . This can be achieved by having
an Interface - say "UsesReadOnceBags " which is serves as a tag to indicate the intent to
Pig. Pig can then rewire its execution plan to use ReadOnceBags is feasible.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.


Mime
View raw message