flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: [FLINK-4832] Count/Sum 0 elements
Date Mon, 24 Oct 2016 13:20:18 GMT
Hi Anton,
executeOnCollection() is only meant for executing Flink Jobs in the local
machine without bringing up a local (or actual) Flink cluster. So solving
the problem there does not really solve the problem.

The underlying problem is this: in a Map-Reduce world the way to count
elements of type T is to map those T to (T, 1) and then to group by T and
sum up the ones. If you have no elements then you have no ones that you can
sum up, i.e. you also don't realise that you have zero elements.

Cheers,
Aljoscha

On Fri, 21 Oct 2016 at 13:55 Anton Mushin <Anton_Mushin@epam.com> wrote:

> Hi everybody,
> Could you explain issue https://issues.apache.org/jira/browse/FLINK-4832,
> please?
>
> Simple, I chose another option for resolve this issue, unlike as described
> in issue description
> In the
> `org.apache.flink.api.common.operators.base.MapOperatorBase#executeOnCollections`
> I added next code:
>         if( inputData.size() == 0) {
>                 IN inCopy = inSerializer.createInstance();
>                 OUT out = function.map(inCopy);
>                 result.add(outSerializer.copy(out));
>         }
>
>  And I change
> `org.apache.flink.api.table.runtime.aggregate.SumAggregate#initiate` as
>
>         override def initiate(partial: Row): Unit = {
>                 partial.setField(sumIndex, 0.asInstanceOf[T]) //cast 0 to
> type class  in each [Type]SumAggregate class are extends SumAggregate[T]
>         }
>
> And now next code is executing correct:
>         val sqlQuery =
>                 "SELECT sum(_1),sum(_2),sum(_3),sum(_4),sum(_5),sum(_6) " +
>                         "FROM (select * from MyTable where _1 = 4)"
>          val ds = env.fromElements(
>                  (1: Byte, 2l,1D,1f,1,1:Short ),
>                  (2: Byte, 2l,1D,1f,1,1:Short ))
>
>         val result = tEnv.sql(sqlQuery) //result == "0,0,0.0,0.0,0,0"
>
>         val sqlQuery2 =
>                 "SELECT
> count(_1),count(_2),count(_3),count(_4),count(_5),count(_6) " +
>                  "FROM (select * from MyTable where _1 = 4)"
>         val result2 = tEnv.sql(sqlQuery2) //result == " 0,0,0,0,0,0"
>
> Is this the correct solution for this ticket or not?
>
> Best regards,
> Anton Mushin
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message