pig-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Kai Londenberg (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (PIG-3212) Race Conditions in POSort and (Internal)SortedBag during Proactive Spill.
Date Tue, 26 Feb 2013 01:56:12 GMT

     [ https://issues.apache.org/jira/browse/PIG-3212?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Kai Londenberg updated PIG-3212:
--------------------------------

    Attachment: PIG-3212-p1.patch

I have attached a patch which should fix this problem. 

To clarify a bit more, what caused the problems. 

The root cause of all problems here is that POSort.SortComparator is not thread safe - which
is very hard to fix in a performant way. So I didn't do that in this fix.

Another problem is that the SpillableMemoryManager may call the spill method in it's own thread
at any time, and that InternalSortedBag itself is not thread safe enough itself.

The approach I took in this patch is simple: First - synchronize modifications to InternalSortedBag
correctly. Second - prevent all modifications (including spills), once a read iterator has
been started.

The last point is extremely important.  POSort may create multiple InternalSortedBags after
each other, and each of these is registered with the SpillableMemoryManager. Since the WeakReferences
held by the SpillableMemoryManager are not cleared immediately if all other references to
the old InternalSortedBag have been removed, it's possible that the "spill" method gets called
on an otherwise dead instance of an InternalSortedBag. That way, it would be possible that
two different InternalSortedBag instances (one dead, one alive) are trying to use the same
POSort.SortComparator instance at the same time.

                
> Race Conditions in POSort and (Internal)SortedBag during Proactive Spill.
> -------------------------------------------------------------------------
>
>                 Key: PIG-3212
>                 URL: https://issues.apache.org/jira/browse/PIG-3212
>             Project: Pig
>          Issue Type: Bug
>    Affects Versions: 0.11
>            Reporter: Kai Londenberg
>            Priority: Critical
>             Fix For: 0.12, 0.11.1
>
>         Attachments: PIG-3212-p1.patch
>
>
> The following bug exists in the latest release of Pig 0.11.0
> While running some large jobs involving groups and sorts like these:
> {code}
> events_by_user = GROUP events BY user_id;
> sorted_events_by_user = FOREACH events_by_user {
> 	A = ORDER events BY ts, split_idx, line_num;
> 	GENERATE group, A;
> }
> {code}
> I got a pretty strange behaviour: While this worked on small datasets, if I ran it on
large datasets, the results were sometimes not sorted perfectly. 
> So after a long debugging session, I tracked it down to at least one race condition:
> The following partial stack trace shows how a proactive spill gets triggered on an InternalSortedBag.
A spill in turn triggers a sort of that InternalSortedBag.
> {code}
> 	at org.apache.pig.data.SortedSpillBag.proactive_spill(SortedSpillBag.java:83)
> 	at org.apache.pig.data.InternalSortedBag.spill(InternalSortedBag.java:455)
> 	at org.apache.pig.impl.util.SpillableMemoryManager.handleNotification(SpillableMemoryManager.java:243)
> 	at sun.management.NotificationEmitterSupport.sendNotification(NotificationEmitterSupport.java:138)
> 	at sun.management.MemoryImpl.createNotification(MemoryImpl.java:171)
> 	at sun.management.MemoryPoolImpl$PoolSensor.triggerAction(MemoryPoolImpl.java:272)
> 	at sun.management.Sensor.trigger(Sensor.java:120)
> {code}
> At the same time, the same InternalSortedBag might be sorted or accessed within a POSort
Operation. For example using the following Code path (line numbers might be off, I had to
add debug statements to diagnose this)
> {code}
> at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort.getNext(POSort.java:346)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:492)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject.processInputBag(POProject.java:582)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject.getNext(PORelationToExprProject.java:107)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.getNext(PhysicalOperator.java:394)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.processPlan(POForEach.java:372)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach.getNext(POForEach.java:297)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator.processInput(PhysicalOperator.java:368)
> 	at org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit.getNext(POSplit.java:214)
> 	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.runPipeline(PigGenericMapReduce.java:465)
> 	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.processOnePackageOutput(PigGenericMapReduce.java:433)
> 	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:413)
> 	at org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigGenericMapReduce$Reduce.reduce(PigGenericMapReduce.java:257)
> 	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:176)
> 	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:566)
> 	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:408)
> 	at org.apache.hadoop.mapred.Child.main(Child.java:170)
> {code}
> The key here is: Both operations try to compare and modify elements of the SortedBag
simultaneously. This leads to all kinds of problems, most notably incorrectly sorted data.
> POSort.SortComparator that's passed as a Comparison function to (Internal)SortedBag is
not thread safe, since it works by attaching single input tuples to PhysicalOperator's - these
Operators in turn are part of the POSort.sortPlans and are re-used among each thread accessing
the (Internal)SortedBag.
>  

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Mime
View raw message