spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Stadler (JIRA)" <>
Subject [jira] [Created] (SPARK-21417) Detect transitive join conditions via expressions
Date Fri, 14 Jul 2017 11:39:00 GMT
Claus Stadler created SPARK-21417:

             Summary: Detect transitive join conditions via expressions
                 Key: SPARK-21417
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: Claus Stadler

Disclaimer: The nature of this report is similar to that of
- yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement
has to be treated as a separate issue.

Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression
expr then calcite should be capable to infer join conditions by transitivity:

{noformat} = expr AND tb.cb = expr -> = tb.cb

The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as

  dbr:Leipzig a ?type .
  dbr:Leipzig dbo:mayor ?mayor
result in an SQL query similar to

SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'

A consequence of the join condition not being recognized is, that Apache Flink does not find
an executable plan to process the query.

Self contained example:
package my.package;

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.scalatest._

class TestSparkSqlJoin extends FlatSpec {

  "SPARK SQL processor" should "be capable of handling transitive join conditions" in {

    val spark = SparkSession
      .appName("Spark SQL parser bug")

    import spark.implicits._

    // The schema is encoded in a string
    val schemaString = "s p o"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    val data = List(("s1", "p1", "o1"))
    val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1,
attributes._2, attributes._3))
    val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")

    println("First Query")
    spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)

    println("Second Query")
    spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)


Output (excerpt):
First Query
|  s|

Second Query
- should be capable of handling transitive join conditions *** FAILED ***
  org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between
logical plans
Project [s#3]
+- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
   +- LogicalRDD [s#3, p#4, o#5]
+- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
   +- LogicalRDD [s#20, p#21, o#22]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
Run completed in 6 seconds, 833 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***



A correctly working, executable, query plan for the second query (ideally equivalent to that
of the first query)

This message was sent by Atlassian JIRA

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message