FYI - Xiangrui submitted an amazing pull request to fix a long standing issue with a lot of the nondeterministic expressions (rand, randn, monotonically_increasing_id): https://github.com/apache/spark/pull/15567

Prior to this PR, we were using TaskContext.partitionId as the partition index in initializing expressions. However, that is actually not a good index to use in most cases, because it is the physical task's partition id and does not always reflect the partition index at the time the RDD is created (or in the Spark SQL physical plan). This makes a big difference once there is a union or coalesce operation.

The "index" given by mapPartitionsWithIndex, on the other hand, does not have this problem because it actually reflects the logical partition index at the time the RDD is created.

When is it safe to use TaskContext.partitionId? It is safe at the very end of a query plan (the root node), because there partitionId is guaranteed based on the current implementation to be the same as the physical task partition id.


For example, prior to Xiangrui's PR, the following query would return 2 rows, whereas the correct behavior should be 1 entry:

spark.range(1).selectExpr("rand(1)").union(spark.range(1).selectExpr("rand(1)")).distinct.show()

The reason it'd return 2 rows is because rand was using TaskContext.partitionId as the per-partition seed, and as a result the two sides of the union are using different seeds.


I'm starting to think we should deprecate the API and ban the use of it within the project to be safe ...