spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nishkamravi2 <...@git.apache.org>
Subject [GitHub] spark pull request: [SPARK-7988][STREAMING] Round-robin scheduling...
Date Fri, 05 Jun 2015 19:03:33 GMT
Github user nishkamravi2 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/6607#discussion_r31841859
  
    --- Diff: streaming/src/main/scala/org/apache/spark/streaming/scheduler/ReceiverTracker.scala
---
    @@ -271,27 +272,64 @@ class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch:
Boolean = false
         }
     
         /**
    -     * Get the receivers from the ReceiverInputDStreams, distributes them to the
    -     * worker nodes as a parallel collection, and runs them.
    +     * Get the list of executors excluding driver
          */
    -    private def startReceivers() {
    -      val receivers = receiverInputStreams.map(nis => {
    -        val rcvr = nis.getReceiver()
    -        rcvr.setReceiverId(nis.id)
    -        rcvr
    -      })
    +    private def getExecutors(ssc: StreamingContext): List[String] = {
    +      val executors = ssc.sparkContext.getExecutorMemoryStatus.map(_._1.split(":")(0)).toList
    +      val driver = ssc.sparkContext.getConf.get("spark.driver.host")
    +      executors.diff(List(driver))
    +    }
     
    -      // Right now, we only honor preferences if all receivers have them
    +    /* Schedule receivers using preferredLocation if specified
    +     * and round-robin otherwise
    +     */
    +    private def scheduleReceivers(receivers: Seq[Receiver[_]]): RDD[Receiver[_]] = {
    +      // Location preferences are honored if all receivers have them
           val hasLocationPreferences = receivers.map(_.preferredLocation.isDefined).reduce(_
&& _)
     
    -      // Create the parallel collection of receivers to distributed them on the worker
nodes
    +      // If no location preferences are specified, set host location for each receiver
    +      // so as to distribute them evenly over executors in a round-robin fashion
    +      // If num_executors > num_receivers, distribute executors among receivers
    +      val locations = new Array[ArrayBuffer[String]](receivers.length)
    +      if (!hasLocationPreferences && !ssc.sparkContext.isLocal) {
    +        val executors = getExecutors(ssc)
    +        var i = 0
    +        for (i <- 0 to (receivers.length - 1)) {
    +          locations(i) = new ArrayBuffer[String]()
    +        }
    +        if (receivers.length >= executors.length) {
    --- End diff --
    
    This won't be enough to cover both cases.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message