flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Semantic Properties and Functions with Iterables
Date Sun, 08 Mar 2015 20:53:12 GMT
Any other thoughts in this?

On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <sewen@apache.org> wrote:

> I think the order of emitting elements is not part of the forward field
> properties, but would rather be a separate one that we do not have right
> now.
>
> At the moment, we would assume that all group operations destroy secondary
> orders.
>
> In that sense, forward fields in group operations only make sense for
> fields where all fields are the same in the group (key fields).
>
> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fhueske@gmail.com> wrote:
>
>> Hi Timo,
>>
>> there are several restrictions for forwarded fields of operators with
>> iterator input.
>> 1) forwarded fields must be emitted in the order in which they are
>> received
>> through the iterator
>> 2) all forwarded fields of a record must stick together, i.e., if your
>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field 1
>> of the 2nd, 4th, ... record coming through the iterator, these are not
>> valid forwarded fields.
>> 3) it is OK to completely filter out records coming through the iterator.
>>
>> The reason for these rules is, that the optimizer uses forwarded fields to
>> reason about physical data properties such as order and grouping. If you
>> mix up the order of records or emit records which are composed from
>> different input records, you might destroy a (secondary) order or
>> grouping.
>>
>> Considering these rules, your second example is correct as well.
>> In case of the TriadBuilder, the information is correct (in the context of
>> the Program) as well, because field 0 is used as key. It is however true,
>> that there is a strange dependency between the function and the context in
>> which it is used within the program. It would be better to remove the
>> class
>> annotation, and add this information through the .withForwardedFields("0")
>> method in the program, to make that clear.
>>
>> It is very good that you raise this point.
>> This is currently not reflected in the documentation is should be made
>> clear very soon. I will open a JIRA for that.
>>
>> Thanks, Fabian
>>
>>
>>
>> 2015-03-06 10:19 GMT+01:00 Timo Walther <twalthr@apache.org>:
>>
>> > Hey all,
>> >
>> > I'm currently working a lot on the UDF static code analyzer. But I have
>> a
>> > general question about Semantic Properties which might be also
>> interesting
>> > for other users.
>> >
>> > How is the ForwardedFields annotation interpreted for UDF functions with
>> > Iterables?
>> >
>> > An example can be found in: org.apache.flink.examples.
>> > java.graph.EnumTrianglesBasic.TriadBuilder
>> >
>> > Does this mean that each call of "collect" must happen in the same order
>> > than the call of "next"? But this is not the case in the example above.
>> Or
>> > does the annotation only refer to the first iterator element?
>> >
>> > Other examples:
>> >
>> > @ForwardedFields("*") // CORRECT?
>> >     public static class GroupReduce1 implements
>> GroupReduceFunction<Tuple2<Long,
>> > Long>,Tuple2<Long, Long>> {
>> >         @Override
>> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
>> >                 Collector<Tuple2<Long, Long>> out) throws Exception
{
>> >             out.collect(values.iterator().next());
>> >         }
>> >     }
>> >
>> > @ForwardedFields("*") // NOT CORRECT?
>> >     public static class GroupReduce3 implements
>> GroupReduceFunction<Tuple2<Long,
>> > Long>,Tuple2<Long, Long>> {
>> >         @Override
>> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
>> >                 Collector<Tuple2<Long, Long>> out) throws Exception
{
>> >             Iterator<Tuple2<Long, Long>> it = values.iterator();
>> >             while (it.hasNext()) {
>> >                 Tuple2<Long,Long> t = it.next();
>> >                 if (t.f0 == 42) {
>> >                     out.collect(t);
>> >                 }
>> >             }
>> >         }
>> >     }
>> >
>> > Thanks in advance.
>> >
>> > Regards,
>> > Timo
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message