flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Semantic Properties and Functions with Iterables
Date Fri, 06 Mar 2015 10:25:21 GMT
Hi Timo,

there are several restrictions for forwarded fields of operators with
iterator input.
1) forwarded fields must be emitted in the order in which they are received
through the iterator
2) all forwarded fields of a record must stick together, i.e., if your
function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
of the 2nd, 4th, ... record coming through the iterator, these are not
valid forwarded fields.
3) it is OK to completely filter out records coming through the iterator.

The reason for these rules is, that the optimizer uses forwarded fields to
reason about physical data properties such as order and grouping. If you
mix up the order of records or emit records which are composed from
different input records, you might destroy a (secondary) order or grouping.

Considering these rules, your second example is correct as well.
In case of the TriadBuilder, the information is correct (in the context of
the Program) as well, because field 0 is used as key. It is however true,
that there is a strange dependency between the function and the context in
which it is used within the program. It would be better to remove the class
annotation, and add this information through the .withForwardedFields("0")
method in the program, to make that clear.

It is very good that you raise this point.
This is currently not reflected in the documentation is should be made
clear very soon. I will open a JIRA for that.

Thanks, Fabian



2015-03-06 10:19 GMT+01:00 Timo Walther <twalthr@apache.org>:

> Hey all,
>
> I'm currently working a lot on the UDF static code analyzer. But I have a
> general question about Semantic Properties which might be also interesting
> for other users.
>
> How is the ForwardedFields annotation interpreted for UDF functions with
> Iterables?
>
> An example can be found in: org.apache.flink.examples.
> java.graph.EnumTrianglesBasic.TriadBuilder
>
> Does this mean that each call of "collect" must happen in the same order
> than the call of "next"? But this is not the case in the example above. Or
> does the annotation only refer to the first iterator element?
>
> Other examples:
>
> @ForwardedFields("*") // CORRECT?
>     public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             out.collect(values.iterator().next());
>         }
>     }
>
> @ForwardedFields("*") // NOT CORRECT?
>     public static class GroupReduce3 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             Iterator<Tuple2<Long, Long>> it = values.iterator();
>             while (it.hasNext()) {
>                 Tuple2<Long,Long> t = it.next();
>                 if (t.f0 == 42) {
>                     out.collect(t);
>                 }
>             }
>         }
>     }
>
> Thanks in advance.
>
> Regards,
> Timo
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message