spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From cloud-fan <...@git.apache.org>
Subject [GitHub] spark pull request #13932: [SPARK-15354] [CORE] Topology aware block replica...
Date Wed, 22 Mar 2017 01:42:57 GMT
Github user cloud-fan commented on a diff in the pull request:

    https://github.com/apache/spark/pull/13932#discussion_r107316905
  
    --- Diff: core/src/main/scala/org/apache/spark/storage/BlockReplicationPolicy.scala ---
    @@ -88,26 +131,94 @@ class RandomBlockReplicationPolicy
         logDebug(s"Prioritized peers : ${prioritizedPeers.mkString(", ")}")
         prioritizedPeers
       }
    +}
    +
    +@DeveloperApi
    +class BasicBlockReplicationPolicy
    +  extends BlockReplicationPolicy
    +    with Logging {
     
    -  // scalastyle:off line.size.limit
       /**
    -   * Uses sampling algorithm by Robert Floyd. Finds a random sample in O(n) while
    -   * minimizing space usage. Please see <a href="http://math.stackexchange.com/questions/178690/whats-the-proof-of-correctness-for-robert-floyds-algorithm-for-selecting-a-sin">
    -   * here</a>.
    +   * Method to prioritize a bunch of candidate peers of a block manager. This implementation
    +   * replicates the behavior of block replication in HDFS, a peer is chosen within the
rack,
    +   * one outside and that's it. This works best with a total replication factor of 3.
        *
    -   * @param n total number of indices
    -   * @param m number of samples needed
    -   * @param r random number generator
    -   * @return list of m random unique indices
    +   * @param blockManagerId    Id of the current BlockManager for self identification
    +   * @param peers             A list of peers of a BlockManager
    +   * @param peersReplicatedTo Set of peers already replicated to
    +   * @param blockId           BlockId of the block being replicated. This can be used
as a source of
    +   *                          randomness if needed.
    +   * @param numReplicas Number of peers we need to replicate to
    +   * @return A prioritized list of peers. Lower the index of a peer, higher its priority
        */
    -  // scalastyle:on line.size.limit
    -  private def getSampleIds(n: Int, m: Int, r: Random): List[Int] = {
    -    val indices = (n - m + 1 to n).foldLeft(Set.empty[Int]) {case (set, i) =>
    -      val t = r.nextInt(i) + 1
    -      if (set.contains(t)) set + i else set + t
    +  override def prioritize(
    +      blockManagerId: BlockManagerId,
    +      peers: Seq[BlockManagerId],
    +      peersReplicatedTo: mutable.HashSet[BlockManagerId],
    +      blockId: BlockId,
    +      numReplicas: Int): List[BlockManagerId] = {
    +
    +    logDebug(s"Input peers : $peers")
    +    logDebug(s"BlockManagerId : $blockManagerId")
    +
    +    val random = new Random(blockId.hashCode)
    +
    +    // if block doesn't have topology info, we can't do much, so we randlomly shuffle
    +    // if there is, we see what's needed from peersReplicatedTo and based on numReplicas,
    +    // we choose whats needed
    +    if (blockManagerId.topologyInfo.isEmpty || numReplicas == 0) {
    +      // no topology info for the block. The best we can do is randomly choose peers
    +      BlockReplicationUtils.getRandomSample(peers, numReplicas, random)
    +    } else {
    +      // we have topology information, we see what is left to be done from peersReplicatedTo
    +      val doneWithinRack = peersReplicatedTo.exists(_.topologyInfo == blockManagerId.topologyInfo)
    +      val doneOutsideRack = peersReplicatedTo.exists { p =>
    +        p.topologyInfo.isDefined && p.topologyInfo != blockManagerId.topologyInfo
    +      }
    +
    +      if (doneOutsideRack && doneWithinRack) {
    +        // we are done, we just return a random sample
    --- End diff --
    
    what? I think this branch is where we should do smart replication


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message