flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-5256) Extend DataSetSingleRowJoin to support Left and Right joins
Date Thu, 04 May 2017 15:24:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-5256?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15996913#comment-15996913
] 

ASF GitHub Bot commented on FLINK-5256:
---------------------------------------

Github user DmytroShkvyra commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3673#discussion_r114810408
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/batch/sql/JoinITCase.scala
---
    @@ -369,10 +363,258 @@ class JoinITCase(
         val table = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv).as('a1, 'a2,
'a3)
         tEnv.registerTable("A", table)
     
    -    val sqlQuery1 = "SELECT * FROM A CROSS JOIN (SELECT count(*) FROM A HAVING count(*)
< 0)"
    -    val result = tEnv.sql(sqlQuery1).count()
    +    val sqlQuery1 = "SELECT * FROM A CROSS JOIN " +
    +      "(SELECT count(*) FROM A HAVING count(*) < 0)"
    +    val result = tEnv.sql(sqlQuery1)
    +    val expected =Seq(
    +      "2,2,Hello,null",
    +      "1,1,Hi,null",
    +      "3,2,Hello world,null").mkString("\n")
    +
    +    val results = result.toDataSet[Row].collect()
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testLeftNullLeftJoin (): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +
    +    val sqlQuery =
    +      "SELECT a, cnt " +
    +        "FROM" +
    +        " (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) " +
    +        "LEFT JOIN A " +
    +        "ON cnt = a"
    +
    +    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd,
'e)
    +    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
    +    tEnv.registerTable("A", ds1)
    +    tEnv.registerTable("B", ds2)
    +
    +    val result = tEnv.sql(sqlQuery).collect()
    +    val resultSize = result.size
    +
    +    Assert.assertEquals(
    +      s"Expected empty result, but actual size result = $resultSize;\n[${result.mkString(",")}]",
    +      resultSize,0)
    +  }
    +
    +  @Test
    +  def testLeftNullRightJoin(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +    val sqlQuery =
    +      "SELECT a, cnt " +
    +        "FROM" +
    +        " (SELECT cnt FROM (SELECT COUNT(*) AS cnt FROM B) WHERE cnt < 0) " +
    +        "RIGHT JOIN A " +
    +        "ON a = cnt"
    +
    +    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd,
'e)
    +    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)
    +    tEnv.registerTable("A", ds1)
    +    tEnv.registerTable("B", ds2)
    +
    +
    +    val result = tEnv.sql(sqlQuery)
    +    val expected = Seq(
    +          "1,null",
    +          "2,null", "2,null",
    +          "3,null", "3,null", "3,null",
    +          "4,null", "4,null", "4,null", "4,null",
    +          "5,null", "5,null", "5,null", "5,null", "5,null").mkString("\n")
    +
    +    val results = result.toDataSet[Row].collect()
    +
    +    TestBaseUtils.compareResultAsText(results.asJava, expected)
    +  }
    +
    +  @Test
    +  def testLeftSingleLeftJoin(): Unit = {
    +    val env = ExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env, config)
    +    val sqlQuery =
    +      "SELECT a, cnt " +
    +        "FROM" +
    +        " (SELECT COUNT(*) AS cnt FROM A) " +
    +        "LEFT JOIN B " +
    +        "ON cnt = a"
    +
    +    val ds1 = CollectionDataSets.get5TupleDataSet(env).toTable(tEnv).as('a, 'b, 'c, 'd,
'e)
    +    val ds2 = CollectionDataSets.getSmall3TupleDataSet(env).toTable(tEnv)as('a, 'b, 'c)
    +    tEnv.registerTable("A", ds2)
    +    tEnv.registerTable("B", ds1)
    +
    +    val result = tEnv.sql(sqlQuery)
    +    val expected = Seq(
    --- End diff --
    
    Done


> Extend DataSetSingleRowJoin to support Left and Right joins
> -----------------------------------------------------------
>
>                 Key: FLINK-5256
>                 URL: https://issues.apache.org/jira/browse/FLINK-5256
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>    Affects Versions: 1.2.0
>            Reporter: Fabian Hueske
>            Assignee: Dmytro Shkvyra
>
> The {{DataSetSingleRowJoin}} is a broadcast-map join that supports arbitrary inner joins
where one input is a single row.
> I found that Calcite translates certain subqueries into non-equi left and right joins
with single input. These cases can be handled if the  {{DataSetSingleRowJoin}} is extended
to support outer joins on the non-single-row input, i.e., left joins if the right side is
single input and vice versa.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message