spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From xiaocai00 <...@git.apache.org>
Subject [GitHub] spark pull request: [SQL] SPARK-1800 Add broadcast hash join opera...
Date Sat, 24 May 2014 02:48:47 GMT
Github user xiaocai00 commented on a diff in the pull request:

    https://github.com/apache/spark/pull/734#discussion_r13024562
  
    --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/joins.scala ---
    @@ -142,6 +136,68 @@ case class HashJoin(
     
     /**
      * :: DeveloperApi ::
    + * Performs and inner hash join of two child relations by first shuffling the data using
the join
    + * keys.
    + */
    +@DeveloperApi
    +case class ShuffledHashJoin(
    +    leftKeys: Seq[Expression],
    +    rightKeys: Seq[Expression],
    +    buildSide: BuildSide,
    +    left: SparkPlan,
    +    right: SparkPlan) extends BinaryNode with HashJoin {
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil
    +
    +
    +  def execute() = {
    +    buildPlan.execute().zipPartitions(streamedPlan.execute()) {
    +      (buildIter, streamIter) => joinIterators(buildIter, streamIter)
    +    }
    +  }
    +}
    +
    +
    +/**
    + * :: DeveloperApi ::
    + * Performs an inner hash join of two child relations.  When the operator is constructed,
a Spark
    + * job is asynchronously started to calculate the values for the broadcasted relation.
 This data
    + * is then placed in a Spark broadcast variable.  The streamed relation is not shuffled.
    + */
    +@DeveloperApi
    +case class BroadcastHashJoin(
    +     leftKeys: Seq[Expression],
    +     rightKeys: Seq[Expression],
    +     buildSide: BuildSide,
    +     left: SparkPlan,
    +     right: SparkPlan)(@transient sc: SparkContext) extends BinaryNode with HashJoin
{
    +
    +  override def otherCopyArgs = sc :: Nil
    +
    +  override def outputPartitioning: Partitioning = left.outputPartitioning
    +
    +  override def requiredChildDistribution =
    +    UnspecifiedDistribution :: UnspecifiedDistribution :: Nil
    +
    +  @transient
    +  lazy val broadcastFuture = future {
    +   sc.broadcast(buildPlan.executeCollect())
    --- End diff --
    
    Good to know. Thanks for the headsup


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

Mime
View raw message