flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Anton Mushin (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-4832) Count/Sum 0 elements
Date Thu, 20 Oct 2016 14:31:58 GMT

    [ https://issues.apache.org/jira/browse/FLINK-4832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15591955#comment-15591955
] 

Anton Mushin edited comment on FLINK-4832 at 10/20/16 2:31 PM:
---------------------------------------------------------------

Hello
I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will
be called if elements are in inputData.
{code}
        TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
        TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
	for (IN element : inputData) {
		IN inCopy = inSerializer.copy(element);
		OUT out = function.map(inCopy);
		result.add(outSerializer.copy(out));
	}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited
for examle as
{code}
override def initiate(partial: Row): Unit = {
    partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends
SumAggregate[T]
  }
{code}
then next test will be passed
{code}
 @Test
  def testSumNullElements(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery =
      "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
        "FROM (select * from MyTable where _1 = 4)"

    val ds = env.fromElements(
      (1: Byte, 2l,1D,1f,1,1:Short ),
      (2: Byte, 2l,1D,1f,1,1:Short ))
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "0,0,0.0,0.0,0,0"
    val results = result.toDataSet[Row].collect()
    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

@Test
  def testCountNullElements(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery =
      "SELECT count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
        "FROM (select * from MyTable where _1 = 4)"

    val ds = env.fromElements(
      (1: Byte, 2l,1D,1f,1,1:Short ),
      (2: Byte, 2l,1D,1f,1,1:Short ))
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "0,0,0,0,0,0"
    val results = result.toDataSet[Row].collect()

    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}


was (Author: anmu):
Hello
I think that it needs to change {{org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections}}
also, because {{org.apache.flink.api.table.runtime.aggregate.AggregateMapFunction#map}} will
be called if elements are in inputData.
{code}
        TypeSerializer<IN> inSerializer = getOperatorInfo().getInputType().createSerializer(executionConfig);
        TypeSerializer<OUT> outSerializer = getOperatorInfo().getOutputType().createSerializer(executionConfig);
	for (IN element : inputData) {
		IN inCopy = inSerializer.copy(element);
		OUT out = function.map(inCopy);
		result.add(outSerializer.copy(out));
	}
{code}
And if {{org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate}} will be edited
for examle as
{code}
override def initiate(partial: Row): Unit = {
    partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to type for sum class is extends
SumAggregate[T]
  }
{code}
then next test will be passed
{code}
@Test
  def testDataSetAggregation(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery = "SELECT sum(_1) FROM MyTable"

    val ds = CollectionDataSets.get3TupleDataSet(env)
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "231"
    val results = result.toDataSet[Row].collect()
    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }

  @Test
  def testSumNullElements(): Unit = {

    val env = ExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env, config)

    val sqlQuery =
      "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
        "FROM (select * from MyTable where _1 = 4)"

    val ds = env.fromElements(
      (1: Byte, 2l,1D,1f,1,1:Short ),
      (2: Byte, 2l,1D,1f,1,1:Short ))
    tEnv.registerDataSet("MyTable", ds)

    val result = tEnv.sql(sqlQuery)

    val expected = "0,0,0.0,0.0,0,0"
    val results = result.toDataSet[Row].collect()
    TestBaseUtils.compareResultAsText(results.asJava, expected)
  }
{code}

> Count/Sum 0 elements
> --------------------
>
>                 Key: FLINK-4832
>                 URL: https://issues.apache.org/jira/browse/FLINK-4832
>             Project: Flink
>          Issue Type: Improvement
>          Components: Table API & SQL
>            Reporter: Timo Walther
>
> Currently, the Table API is unable to count or sum up 0 elements. We should improve DataSet
aggregations for this. Maybe by union the original DataSet with a dummy record or by using
a MapPartition function. Coming up with a good design for this is also part of this issue.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message