spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Patrick Wendell (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-2638) Improve concurrency of fetching Map outputs
Date Mon, 15 Sep 2014 23:13:39 GMT

     [ https://issues.apache.org/jira/browse/SPARK-2638?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Patrick Wendell updated SPARK-2638:
-----------------------------------
    Fix Version/s:     (was: 1.1.0)
                   1.2.0

> Improve concurrency of fetching Map outputs
> -------------------------------------------
>
>                 Key: SPARK-2638
>                 URL: https://issues.apache.org/jira/browse/SPARK-2638
>             Project: Spark
>          Issue Type: Improvement
>          Components: Spark Core
>    Affects Versions: 1.0.0
>         Environment: All
>            Reporter: Stephen Boesch
>            Assignee: Josh Rosen
>            Priority: Minor
>              Labels: MapOutput, concurrency
>             Fix For: 1.2.0
>
>   Original Estimate: 0h
>  Remaining Estimate: 0h
>
> This issue was noticed while perusing the MapOutputTracker source code. Notice that the
synchronization is on the containing "fetching" collection - which makes ALL fetches wait
if any fetch were occurring.  
> The fix is to synchronize instead on the shuffleId (interned as a string to ensure JVM
wide visibility).
>   def getServerStatuses(shuffleId: Int, reduceId: Int): Array[(BlockManagerId, Long)]
= {
>     val statuses = mapStatuses.get(shuffleId).orNull
>     if (statuses == null) {
>       logInfo("Don't have map outputs for shuffle " + shuffleId + ", fetching them")
>       var fetchedStatuses: Array[MapStatus] = null
>       fetching.synchronized {   // This is existing code
>      //  shuffleId.toString.intern.synchronized {  // New Code
>         if (fetching.contains(shuffleId)) {
>           // Someone else is fetching it; wait for them to be done
>           while (fetching.contains(shuffleId)) {
>             try {
>               fetching.wait()
>             } catch {
>               case e: InterruptedException =>
>             }
>           }
> This is only a small code change, but the testcases to prove (a) proper functionality
and (b) proper performance improvement are not so trivial.  
> For (b) it is not worthwhile to add a testcase to the codebase. Instead I have added
a git project that demonstrates the concurrency/performance improvement using the fine-grained
approach . The github project is at
> https://github.com/javadba/scalatesting.git  .  Simply run "sbt test". Note: it is unclear
how/where to include this ancillary testing/verification information that will not be included
in the git PR: i am open for any suggestions - even as far as simply removing references to
it.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message