spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Claus Stadler (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-21417) Detect transitive join conditions via expressions
Date Fri, 14 Jul 2017 11:39:00 GMT
Claus Stadler created SPARK-21417:
-------------------------------------

             Summary: Detect transitive join conditions via expressions
                 Key: SPARK-21417
                 URL: https://issues.apache.org/jira/browse/SPARK-21417
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 2.2.0
            Reporter: Claus Stadler


Disclaimer: The nature of this report is similar to that of https://issues.apache.org/jira/browse/CALCITE-1887
- yet, as SPARK (to my understanding) uses its own SQL implementation, the requested improvement
has to be treated as a separate issue.

Given table aliases ta, tb column names ca, cb, and an arbitrary (deterministic) expression
expr then calcite should be capable to infer join conditions by transitivity:

{noformat}
ta.ca = expr AND tb.cb = expr -> ta.ca = tb.cb
{noformat}

The use case for us stems from SPARQL to SQL rewriting, where SPARQL queries such as

{code:java}
SELECT {
  dbr:Leipzig a ?type .
  dbr:Leipzig dbo:mayor ?mayor
}
{code}
result in an SQL query similar to

{noformat}
SELECT s.rdf a, s.rdf b WHERE a.s = 'dbr:Leipzig' AND b.s = 'dbr:Leipzig'
{noformat}

A consequence of the join condition not being recognized is, that Apache Flink does not find
an executable plan to process the query.

Self contained example:
{code:java}
package my.package;

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StringType, StructField, StructType}
import org.scalatest._

class TestSparkSqlJoin extends FlatSpec {

  "SPARK SQL processor" should "be capable of handling transitive join conditions" in {


    val spark = SparkSession
      .builder()
      .master("local[2]")
      .appName("Spark SQL parser bug")
      .getOrCreate()

    import spark.implicits._

    // The schema is encoded in a string
    val schemaString = "s p o"

    // Generate the schema based on the string of schema
    val fields = schemaString.split(" ")
      .map(fieldName => StructField(fieldName, StringType, nullable = true))
    val schema = StructType(fields)

    val data = List(("s1", "p1", "o1"))
    val dataRDD = spark.sparkContext.parallelize(data).map(attributes => Row(attributes._1,
attributes._2, attributes._3))
    val df = spark.createDataFrame(dataRDD, schema).as("TRIPLES")
    df.createOrReplaceTempView("TRIPLES")


    println("First Query")
    spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = B.s AND A.s = 'dbr:Leipzig'").show(10)

    println("Second Query")
    spark.sql("SELECT A.s FROM TRIPLES A, TRIPLES B WHERE A.s = 'dbr:Leipzig' AND B.s = 'dbr:Leipzig'").show(10)
  }

}
{code}

Output (excerpt):
{noformat}
First Query
...
+---+
|  s|
+---+
+---+

Second Query
- should be capable of handling transitive join conditions *** FAILED ***
  org.apache.spark.sql.AnalysisException: Detected cartesian product for INNER join between
logical plans
Project [s#3]
+- Filter (isnotnull(s#3) && (s#3 = dbr:Leipzig))
   +- LogicalRDD [s#3, p#4, o#5]
and
Project
+- Filter (isnotnull(s#20) && (s#20 = dbr:Leipzig))
   +- LogicalRDD [s#20, p#21, o#22]
Join condition is missing or trivial.
Use the CROSS JOIN syntax to allow cartesian products between these relations.;
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1080)
  at org.apache.spark.sql.catalyst.optimizer.CheckCartesianProducts$$anonfun$apply$20.applyOrElse(Optimizer.scala:1077)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$2.apply(TreeNode.scala:267)
  at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:70)
  at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:266)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformDown$1.apply(TreeNode.scala:272)
  at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:306)
  at org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:187)
  ...
Run completed in 6 seconds, 833 milliseconds.
Total number of tests run: 1
Suites: completed 1, aborted 0
Tests: succeeded 0, failed 1, canceled 0, ignored 0, pending 0
*** 1 TEST FAILED ***

{noformat}

Expected:

A correctly working, executable, query plan for the second query (ideally equivalent to that
of the first query)




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message