pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Philip (flip) Kromer (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (PIG-3979) group all performance, garbage collection, and incremental aggregation
Date Thu, 12 Jun 2014 02:12:04 GMT

    [ https://issues.apache.org/jira/browse/PIG-3979?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14028742#comment-14028742
] 

Philip (flip) Kromer commented on PIG-3979:
-------------------------------------------

This is great. Using partial aggregation with TOP was causing constant spills and terrible
performance; this patch reduced the runtime by a factor of 10. I'd prefer that most of the
log messages be at DEBUG priority, but in that case it was very helpful.

However, I also have some questions about whether there's more going on than the patch currently
addresses -- there are still pathological cases that cause the SpillableMemoryManager to enter
seppuku mode.

With a large heap size set, the initial FIRST_TIER_THRESHOLD of 20,000 is hit way before the
sample size is, and so it never actually adjusts that threshold. Is that behavior desirable,
or should the code still include the test for (numRecsInRawMap >= NUM_RECS_TO_SAMPLE) as
follows?

{code}
            if (!sizeReductionChecked   && ((sampleSize >= sampleMem) || (numRecsInRawMap
>= NUM_RECS_TO_SAMPLE))) {
                checkSizeReduction();
                sampleSize = 0;
            }
            if (!estimatedMemThresholds && ((sampleSize >= sampleMem) || (numRecsInRawMap
>= NUM_RECS_TO_SAMPLE))) {
                estimateMemThresholds();
            }
 // ...
{code}

As long as estimateMemThresholds() hasn't been called, avgTupleSize has its initial value
of zero -- meaning that getMemorySize() returns zero as well. If I'm reading this correctly,
with large heap sizes and this patch getMemorySize() will always return zero.

Lastly, I'm concerned there's still an interaction between POPartialAgg and SpillableMemoryManager
this doesn't address. I'm not deeply familiar with what's going on, so read my questions as
"I don't understand why X is" rather than "I don't think X should be".

With a large JVM heap size, when the POPartialAgg does actually get to a certain size the
SpillableMemoryManager goes into a gc hell of its own creation. Here's a taste:

{code}
2014-06-11 21:07:23,248 4974  INFO  o.a.p.b.h.e.m.PigGenericMapReduce$Map     | Aliases being
processed per job phase (AliasName[line,offset]): M: events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32]
C: events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 21:07:24,432 6158  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | After reduction,
processed map: 126; raw map: 0
2014-06-11 21:07:24,433 6159  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Observed reduction
factor: from 10000 to 126 => 79.
2014-06-11 21:07:24,714 6440  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Getting mem
limits; considering 1 POPArtialAgg objects.
2014-06-11 21:07:24,721 6447  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Estimated
total tuples to buffer, based on 10000 tuples that took up 14954552 bytes: 191715
2014-06-11 21:07:24,721 6447  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Setting thresholds.
Primary: 189288. Secondary: 2427
2014-06-11 21:07:24,827 6553  INFO  o.a.p.i.u.SpillableMemoryManager          | first memory
handler call- Usage threshold init = 5505024(5376K) used = 795475864(776831K) committed =
795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:24,832 6558  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 21:07:24,832 6558  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating raw inputs. 13545 tuples.
2014-06-11 21:07:24,832 6558  INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 20423195 bytes from 1 objects. init = 5505024(5376K) used = 795475864(776831K)
committed = 795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:24,852 6578  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 302 tuples.
2014-06-11 21:07:24,852 6578  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating processed inputs. 302 tuples.
2014-06-11 21:07:24,855 6581  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 301 tuples.
2014-06-11 21:07:24,928 6654  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In spillResults(),
processed map is empty -- done spilling.
2014-06-11 21:07:30,052 11778 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Aggregating
189289 raw records at first level.
2014-06-11 21:07:32,416 14142 INFO  o.a.p.i.u.SpillableMemoryManager          | first memory
handler call - Collection threshold init = 5505024(5376K) used = 645973328(630833K) committed
= 795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:32,418 14144 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 21:07:32,418 14144 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating raw inputs. 84343 tuples.
2014-06-11 21:07:32,418 14144 INFO  o.a.p.i.u.SpillableMemoryManager          | hard invoking
GC: accumulatedFreeSize 150120425 gcActivationSize 40000000 estimatedFreed 129697230 toFree:
383829328.
2014-06-11 21:07:33,049 14775 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 129697230 bytes from 1 objects. init = 5505024(5376K) used = 645973328(630833K)
committed = 795607040(776960K) max = 1048576000(1024000K)
2014-06-11 21:07:33,051 14777 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
...
... (Repeats several hundred times, causing a GC on each)
... (Three minutes later, it becomes able to carry on...)
...
2014-06-11 21:08:58,423 100149 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 21:08:58,423 100149 INFO  o.a.p.i.u.SpillableMemoryManager          | hard invoking
GC: accumulatedFreeSize 129706200 gcActivationSize 40000000 estimatedFreed 129706200 toFree:
262180472.
2014-06-11 21:08:58,697 100423 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 129706200 bytes from 1 objects. init = 5505024(5376K) used = 524324472(512035K)
committed = 1048576000(1024000K) max = 1048576000(1024000K)
2014-06-11 21:08:58,761 100487 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 3481 tuples.
2014-06-11 21:08:58,762 100488 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating processed inputs. 3481 tuples.
2014-06-11 21:08:58,789 100515 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 3480 tuples.
2014-06-11 21:08:58,916 100642 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In spillResults(),
processed map is empty -- done spilling.
2014-06-11 21:09:02,949 104675 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Aggregating
189289 raw records at first level.
2014-06-11 21:09:05,812 107538 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 21:09:05,813 107539 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating raw inputs. 120387 tuples.
2014-06-11 21:09:05,813 107539 INFO  o.a.p.i.u.SpillableMemoryManager          | hard invoking
GC: accumulatedFreeSize 183538160 gcActivationSize 40000000 estimatedFreed 183538160 toFree:
404073608.
2014-06-11 21:09:06,717 108443 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 183538160 bytes from 1 objects. init = 5505024(5376K) used = 771075208(753003K)
committed = 1048576000(1024000K) max = 1048576000(1024000K)
2014-06-11 21:09:06,717 108443 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 21:09:06,717 108443 INFO  o.a.p.i.u.SpillableMemoryManager          | hard invoking
GC: accumulatedFreeSize 183571050 gcActivationSize 40000000 estimatedFreed 183571050 toFree:
353267104.
2014-06-11 21:09:07,422 109148 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 183571050 bytes from 1 objects. init = 5505024(5376K) used = 615411104(600987K)
committed = 1048576000(1024000K) max = 1048576000(1024000K)
...
... (and so forth)
...
{code}

There are two thresholds -- the user adjustable pig.spill.gc.activation.size and the hard-coded
extraGCThresholdFraction -- that cause SpillableMemoryManager to force a hard GC.
Somehow or another in a way that SpillableMemoryManager doesn't expect POPartialAgg either
exceeds those thresholds or enters the sort order of spillables or something.

When I disable the GC thresholds (by setting pig.spill.gc.activation.size and by manually
changing extraGCThresholdFraction in the code each to a large value),
the job finishes in under a minute:

{code}
2014-06-11 20:53:47,951 5265  INFO  o.a.p.b.h.e.m.PigGenericMapReduce$Map     | Aliases being
processed per job phase (AliasName[line,offset]): M: events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32]
C: events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 20:53:49,057 6371  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | After reduction,
processed map: 126; raw map: 0
2014-06-11 20:53:49,058 6372  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Observed reduction
factor: from 10000 to 126 => 79.
2014-06-11 20:53:49,256 6570  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Getting mem
limits; considering 1 POPArtialAgg objects.
2014-06-11 20:53:49,266 6580  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Estimated
total tuples to buffer, based on 10000 tuples that took up 14954552 bytes: 191715
2014-06-11 20:53:49,266 6580  INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Setting thresholds.
Primary: 189288. Secondary: 2427
2014-06-11 20:53:53,835 11149 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Aggregating
189289 raw records at first level.
2014-06-11 20:53:54,023 11337 INFO  o.a.p.i.u.SpillableMemoryManager          | first memory
handler call - Collection threshold init = 5505024(5376K) used = 661747360(646237K) committed
= 795602944(776956K) max = 1048576000(1024000K)
2014-06-11 20:53:54,152 11466 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spill triggered
by SpillableMemoryManager
2014-06-11 20:53:54,153 11467 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 283175425 bytes from 1 objects. init = 5505024(5376K) used = 661747360(646237K)
committed = 795602944(776956K) max = 1048576000(1024000K)
2014-06-11 20:53:54,391 11705 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Aggregating
2543 secondary records to be combined.
2014-06-11 20:53:54,435 11749 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Aggregating
2542 secondary records to be combined.
2014-06-11 20:53:54,435 11749 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Starting spill.
2014-06-11 20:53:54,435 11749 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating processed inputs. 2542 tuples.
2014-06-11 20:53:54,466 11780 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 2542 tuples.
2014-06-11 20:53:54,699 12013 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In spillResults(),
processed map is empty -- done spilling.
...
... about a dozen spills in all
...
2014-06-11 20:54:37,085 54399 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating raw inputs. 129522 tuples.
2014-06-11 20:54:37,086 54400 INFO  o.a.p.i.u.SpillableMemoryManager          | Spilled an
estimate of 193614460 bytes from 1 objects. init = 5505024(5376K) used = 568816808(555485K)
committed = 1048576000(1024000K) max = 1048576000(1024000K)
2014-06-11 20:54:37,244 54558 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 1616 tuples.
2014-06-11 20:54:37,245 54559 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating processed inputs. 1616 tuples.
2014-06-11 20:54:37,251 54565 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 1616 tuples.
2014-06-11 20:54:37,260 54574 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In spillResults(),
processed map is empty -- done spilling.
2014-06-11 20:54:41,159 58473 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating raw inputs. 180461 tuples.
2014-06-11 20:54:41,486 58800 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 2270 tuples.
2014-06-11 20:54:41,486 58800 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In startSpill(),
aggregating processed inputs. 2270 tuples.
2014-06-11 20:54:41,502 58816 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | processed
inputs: 2270 tuples.
2014-06-11 20:54:41,502 58816 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | Spilling last
bits.
2014-06-11 20:54:41,523 58837 INFO  o.a.p.b.h.e.p.r.POPartialAgg              | In spillResults(),
processed map is empty -- done spilling.
2014-06-11 20:54:41,528 58842 INFO  o.a.h.m.MapTask                           | Starting flush
of map output
2014-06-11 20:54:41,528 58842 INFO  o.a.h.m.MapTask                           | Spilling map
output
2014-06-11 20:54:41,528 58842 INFO  o.a.h.m.MapTask                           | bufstart =
0; bufend = 4240460; bufvoid = 471859200
2014-06-11 20:54:41,528 58842 INFO  o.a.h.m.MapTask                           | kvstart =
117964796(471859184); kvend = 117861448(471445792); length = 103349/29491200
2014-06-11 20:54:41,644 58958 INFO  o.a.h.i.c.CodecPool                       | Got brand-new
compressor [.deflate]
2014-06-11 20:54:41,689 59003 INFO  o.a.p.b.h.e.m.PigCombiner$Combine         | Aliases being
processed per job phase (AliasName[line,offset]): M: events[36,9],events[-1,-1],events_final_event_1[59,23],1-3[59,32]
C: events_final_event_1[59,23],1-3[59,32] R: events_final_event_1[59,23]
2014-06-11 20:54:43,913 61227 INFO  o.a.h.m.MapTask                           | Finished spill
0
2014-06-11 20:54:43,917 61231 INFO  o.a.h.m.Task                              | Task:attempt_1402538004132_0001_m_000000_0
is done. And is in the process of committing
2014-06-11 20:54:43,922 61236 INFO  o.a.h.m.Task                              | Task 'attempt_1402538004132_0001_m_000000_0'
done.
2014-06-11 20:54:44,023 61337 INFO  o.a.h.m.i.MetricsSystemImpl               | Stopping MapTask
metrics system...
2014-06-11 20:54:44,023 61337 INFO  o.a.h.m.i.MetricsSystemImpl               | MapTask metrics
system stopped.
2014-06-11 20:54:44,023 61337 INFO  o.a.h.m.i.MetricsSystemImpl               | MapTask metrics
system shutdown complete.
{code}

* POPartialAgg's spill() method is lazy -- it only sets the doSpill flag for a later iteration.
Since the SpillableMemoryManager action is synchronized, I think there's no way for the spill
to actually take place. If I understand this right, the invokeGC can't ever help with a POPartialAgg
spillable, as no memory could possibly have been freed.
* Why is SpillableMemoryManager looping the way it is in the pathological case?
* Manually forcing a GC seems brutish. How confident are folks that this helps compared to
proper JVM gc tuning?

> group all performance, garbage collection, and incremental aggregation
> ----------------------------------------------------------------------
>
>                 Key: PIG-3979
>                 URL: https://issues.apache.org/jira/browse/PIG-3979
>             Project: Pig
>          Issue Type: Improvement
>          Components: impl
>    Affects Versions: 0.12.0, 0.11.1
>            Reporter: David Dreyfus
>            Assignee: David Dreyfus
>             Fix For: 0.14.0
>
>         Attachments: PIG-3979-v1.patch, POPartialAgg.java
>
>
> I have a PIG statement similar to:
> summary = foreach (group data ALL) generate 
> COUNT(data.col1), SUM(data.col2), SUM(data.col2)
> , Moments(col3)
> , Moments(data.col4)
> There are a couple of hundred columns.
> I set the following:
> SET pig.exec.mapPartAgg true;
> SET pig.exec.mapPartAgg.minReduction 3;
> SET pig.cachedbag.memusage 0.05;
> I found that when I ran this on a JVM with insufficient memory, the process eventually
timed out because of an infinite garbage collection loop.
> The problem was invariant to the memusage setting.
> I solved the problem by making changes to:
> org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperator.POPartialAgg.java
> Rather than reading in 10000 records to establish an estimate of the reduction, I make
an estimate after reading in enough tuples to fill pig.cachedbag.memusage percent of Runtime.getRuntime().maxMemory().
> I also made a change to guarantee at least one record allowed in second tier storage.
In the current implementation, if the reduction is very high 1000:1, space in second tier
storage is zero.
> With these changes, I can summarize large data sets with small JVMs. I also find that
setting pig.cachedbag.memusage to a small number such as 0.05 results in much better garbage
collection performance without reducing throughput. I suppose tuning GC would also solve a
problem with excessive garbage collection.
> The performance is sweet. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Mime
View raw message