spark-reviews mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] [spark] HeartSaVioR commented on a change in pull request #26162: [SPARK-29438][SS] Use partition ID of StateStoreAwareZipPartitionsRDD for determining partition ID of state store in stream-stream join
Date Mon, 20 Jan 2020 01:14:42 GMT
HeartSaVioR commented on a change in pull request #26162: [SPARK-29438][SS] Use partition ID
of StateStoreAwareZipPartitionsRDD for determining partition ID of state store in stream-stream
join
URL: https://github.com/apache/spark/pull/26162#discussion_r368340775
 
 

 ##########
 File path: sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala
 ##########
 @@ -933,5 +934,57 @@ class StreamingOuterJoinSuite extends StreamTest with StateStoreMetricsTest
with
     assert(e.getMessage.toLowerCase(Locale.ROOT)
       .contains("the query is using stream-stream outer join with state format version 1"))
   }
-}
 
+  test("SPARK-29438: ensure UNION doesn't lead stream-stream join to use shifted partition
IDs") {
+    def constructUnionDf(desiredPartitionsForInput1: Int)
+        : (MemoryStream[Int], MemoryStream[Int], MemoryStream[Int], DataFrame) = {
+      val input1 = MemoryStream[Int](desiredPartitionsForInput1)
+      val df1 = input1.toDF
+        .select(
+          'value as "key",
+          'value as "leftValue",
+          'value as "rightValue")
+      val (input2, df2) = setupStream("left", 2)
+      val (input3, df3) = setupStream("right", 3)
+
+      val joined = df2
+        .join(df3,
+          df2("key") === df3("key") && df2("leftTime") === df3("rightTime"),
+          "inner")
+        .select(df2("key"), 'leftValue, 'rightValue)
+
+      (input1, input2, input3, df1.union(joined))
+    }
+
+    withTempDir { tempDir =>
+      val (input1, input2, input3, unionDf) = constructUnionDf(2)
+
+      testStream(unionDf)(
+        StartStream(checkpointLocation = tempDir.getAbsolutePath),
+        AddData(input1, 11, 12, 13),
+        MultiAddData(input2, 11, 12, 13, 14, 15)(input3, 13, 14, 15, 16, 17),
 
 Review comment:
   My bad, missed the details. Thanks for the explanation of lock mechanism around MultiAddData.
Then you're right we should put all of input data altogether in same MultiAddData. Will make
a change.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


Mime
View raw message