flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6091) Implement and turn on the retraction for aggregates
Date Sat, 22 Apr 2017 20:44:04 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6091?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15980128#comment-15980128
] 

ASF GitHub Bot commented on FLINK-6091:
---------------------------------------

Github user hequn8128 commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3733#discussion_r112817910
  
    --- Diff: flink-libraries/flink-table/src/test/scala/org/apache/flink/table/api/scala/stream/RetractionITCase.scala
---
    @@ -0,0 +1,310 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.table.api.scala.stream
    +
    +import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    +import org.apache.flink.table.api.{TableEnvironment, TableException}
    +import org.apache.flink.table.api.scala.stream.utils.{StreamITCase, StreamingWithStateTestBase}
    +import org.apache.flink.types.Row
    +import org.junit.Assert._
    +import org.junit.Test
    +import org.apache.flink.table.api.scala._
    +import org.apache.flink.api.scala._
    +import org.apache.flink.streaming.api.TimeCharacteristic
    +import org.apache.flink.table.utils.TableFunc0
    +
    +import scala.collection.mutable
    +
    +/**
    +  * tests for retraction
    +  */
    +class RetractionITCase extends StreamingWithStateTestBase {
    +  // input data
    +  val data = List(
    +    ("Hello", 1),
    +    ("word", 1),
    +    ("Hello", 1),
    +    ("bark", 1)
    +  )
    +
    +  // keyed groupby + keyed groupby
    +  @Test
    +  def testWordCount(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + non-keyed groupby
    +  @Test
    +  def testGroupByAndNonKeyedGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .select('count.sum)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1", "2", "1", "3", "4")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // non-keyed groupby + keyed groupby
    +  @Test
    +  def testNonKeyedGroupByAndGroupBy(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .select('num.sum as 'count)
    +      .groupBy('count)
    +      .select('count, 'count.count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,0", "2,1", "2,0", "3,1", "3,0", "4,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, keyed)
    +  @Test
    +  def testGroupByAndUnboundPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (PARTITION BY cnt ORDER BY ProcTime() " +
    +      "ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM " +
    +      "(SELECT word, count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,1", "bark,1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // keyed groupby + over agg(unbounded, procTime, non-keyed)
    +  @Test
    +  def testGroupByAndUnboundNonPartitionedProcessingWindowWithRow(): Unit = {
    +
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    env.setStateBackend(getStateBackend)
    +    env.setParallelism(1)
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +
    +    StreamITCase.testResults = mutable.MutableList()
    +
    +    val t1 = env.fromCollection(data).toTable(tEnv).as('word, 'number)
    +
    +    tEnv.registerTable("T1", t1)
    +
    +    val sqlQuery = "SELECT word, cnt, count(word) " +
    +      "OVER (ORDER BY ProcTime() ROWS BETWEEN UNBOUNDED preceding AND CURRENT ROW)" +
    +      "FROM (SELECT word , count(number) as cnt from T1 group by word) "
    +
    +    val result = tEnv.sql(sqlQuery).toDataStream[Row]
    +    result.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = mutable.MutableList("Hello,1,1", "word,1,2", "Hello,2,2", "bark,1,3")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // test unique process, if the current output message of unbounded groupby equals the
    +  // previous message, unbounded groupby will ignore the current one.
    +  @Test
    +  def testUniqueProcess(): Unit = {
    +    // data input
    +    val data = List(
    +      (1234, 2L),
    +      (1234, 0L)
    +    )
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'pk, 'value)
    +    val resultTable = table
    +      .groupBy('pk)
    +      .select('pk as 'pk, 'value.sum as 'sum)
    +      .groupBy('sum)
    +      .select('sum, 'pk.count as 'count)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("2,1")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // correlate should handle retraction messages correctly
    +  @Test
    +  def testCorrelate(): Unit = {
    +    val env = StreamExecutionEnvironment.getExecutionEnvironment
    +    val tEnv = TableEnvironment.getTableEnvironment(env)
    +    StreamITCase.testResults = mutable.MutableList()
    +    env.setParallelism(1)
    +    env.setStateBackend(getStateBackend)
    +
    +    val func0 = new TableFunc0
    +
    +    val stream = env.fromCollection(data)
    +    val table = stream.toTable(tEnv, 'word, 'num)
    +    val resultTable = table
    +      .groupBy('word)
    +      .select('word as 'word, 'num.sum as 'count)
    +      .leftOuterJoin(func0('word))
    +      .groupBy('count)
    +      .select('count, 'word.count as 'frequency)
    +
    +    val results = resultTable.toDataStream[Row]
    +    results.addSink(new StreamITCase.StringSink)
    +    env.execute()
    +
    +    val expected = Seq("1,1", "1,2", "1,1", "2,1", "1,2")
    +    assertEquals(expected.sorted, StreamITCase.testResults.sorted)
    +  }
    +
    +  // groupby + window agg
    +  @Test(expected = classOf[TableException])
    +  def testGroupByAndProcessingTimeSlidingGroupWindow(): Unit = {
    --- End diff --
    
    hi, these failures are throwed during `translateToPlan`,  so `TableTestBase` can not help,
right ?


> Implement and turn on the retraction for aggregates
> ---------------------------------------------------
>
>                 Key: FLINK-6091
>                 URL: https://issues.apache.org/jira/browse/FLINK-6091
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Shaoxuan Wang
>            Assignee: Hequn Cheng
>
> Implement functions for generating and consuming retract messages for different aggregates.

> 1. add delete/add property to Row
> 2. implement functions for generating retract messages for unbounded groupBy
> 3. implement functions for handling retract messages for different aggregates.
> 4. handle retraction messages in CommonCorrelate and CommonCalc (retain Delete property).
> Note: Currently, only unbounded groupby generates retraction and it is working under
unbounded and processing time mode. Hence, retraction is only supported for unbounded and
processing time aggregations so far. We can add more retraction support later.
> supported now: unbounded groupby, unbounded and processing time over window
> unsupported now: group window, event time or bounded over window.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message