spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-21052) Add hash map metrics to join
Date Mon, 10 Dec 2018 17:59:02 GMT

    [ https://issues.apache.org/jira/browse/SPARK-21052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16715245#comment-16715245
] 

ASF GitHub Bot commented on SPARK-21052:
----------------------------------------

dongjoon-hyun closed pull request #23204: Revert "[SPARK-21052][SQL] Add hash map metrics
to join"
URL: https://github.com/apache/spark/pull/23204
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
index dab873bf9b9a0..4a98fa75a67c1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashJoin.scala
@@ -213,10 +213,6 @@ trait HashJoin {
           s"BroadcastHashJoin should not take $x as the JoinType")
     }
 
-    // At the end of the task, we update the avg hash probe.
-    TaskContext.get().addTaskCompletionListener[Unit](_ =>
-      avgHashProbe.set(hashed.getAverageProbesPerLookup))
-
     val resultProj = createResultProjection
     joinedIter.map { r =>
       numOutputRows += 1
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
index b1ff6e83acc24..7c21062c4cec3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/joins/HashedRelation.scala
@@ -80,11 +80,6 @@ private[execution] sealed trait HashedRelation extends KnownSizeEstimation
{
    * Release any used resources.
    */
   def close(): Unit
-
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double
 }
 
 private[execution] object HashedRelation {
@@ -279,8 +274,6 @@ private[joins] class UnsafeHashedRelation(
   override def read(kryo: Kryo, in: Input): Unit = Utils.tryOrIOException {
     read(() => in.readInt(), () => in.readLong(), in.readBytes)
   }
-
-  override def getAverageProbesPerLookup: Double = binaryMap.getAverageProbesPerLookup
 }
 
 private[joins] object UnsafeHashedRelation {
@@ -395,10 +388,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
   // The number of unique keys.
   private var numKeys = 0L
 
-  // Tracking average number of probes per key lookup.
-  private var numKeyLookups = 0L
-  private var numProbes = 0L
-
   // needed by serializer
   def this() = {
     this(
@@ -483,8 +472,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
    */
   def getValue(key: Long, resultRow: UnsafeRow): UnsafeRow = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -493,14 +480,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return getRow(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -528,8 +512,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
    */
   def get(key: Long, resultRow: UnsafeRow): Iterator[UnsafeRow] = {
     if (isDense) {
-      numKeyLookups += 1
-      numProbes += 1
       if (key >= minKey && key <= maxKey) {
         val value = array((key - minKey).toInt)
         if (value > 0) {
@@ -538,14 +520,11 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
       }
     } else {
       var pos = firstSlot(key)
-      numKeyLookups += 1
-      numProbes += 1
       while (array(pos + 1) != 0) {
         if (array(pos) == key) {
           return valueIter(array(pos + 1), resultRow)
         }
         pos = nextSlot(pos)
-        numProbes += 1
       }
     }
     null
@@ -585,11 +564,8 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
   private def updateIndex(key: Long, address: Long): Unit = {
     var pos = firstSlot(key)
     assert(numKeys < array.length / 2)
-    numKeyLookups += 1
-    numProbes += 1
     while (array(pos) != key && array(pos + 1) != 0) {
       pos = nextSlot(pos)
-      numProbes += 1
     }
     if (array(pos + 1) == 0) {
       // this is the first value for this key, put the address in array.
@@ -721,8 +697,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
     writeLong(maxKey)
     writeLong(numKeys)
     writeLong(numValues)
-    writeLong(numKeyLookups)
-    writeLong(numProbes)
 
     writeLong(array.length)
     writeLongArray(writeBuffer, array, array.length)
@@ -764,8 +738,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
     maxKey = readLong()
     numKeys = readLong()
     numValues = readLong()
-    numKeyLookups = readLong()
-    numProbes = readLong()
 
     val length = readLong().toInt
     mask = length - 2
@@ -783,11 +755,6 @@ private[execution] final class LongToUnsafeRowMap(val mm: TaskMemoryManager,
cap
   override def read(kryo: Kryo, in: Input): Unit = {
     read(() => in.readBoolean(), () => in.readLong(), in.readBytes)
   }
-
-  /**
-   * Returns the average number of probes per key lookup.
-   */
-  def getAverageProbesPerLookup: Double = numProbes.toDouble / numKeyLookups
 }
 
 private[joins] class LongHashedRelation(
@@ -839,8 +806,6 @@ private[joins] class LongHashedRelation(
     resultRow = new UnsafeRow(nFields)
     map = in.readObject().asInstanceOf[LongToUnsafeRowMap]
   }
-
-  override def getAverageProbesPerLookup: Double = map.getAverageProbesPerLookup
 }
 
 /**


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Add hash map metrics to join
> ----------------------------
>
>                 Key: SPARK-21052
>                 URL: https://issues.apache.org/jira/browse/SPARK-21052
>             Project: Spark
>          Issue Type: Sub-task
>          Components: SQL
>    Affects Versions: 2.2.0
>            Reporter: Liang-Chi Hsieh
>            Assignee: Liang-Chi Hsieh
>            Priority: Major
>             Fix For: 2.3.0
>
>
> We should add avg hash map probe metric to join operator and report it on UI.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message