flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: Semantic Properties and Functions with Iterables
Date Sun, 08 Mar 2015 22:21:46 GMT
I added you comment and an answer to FLINK-1656:

"Right, that's a good point.

+1 limiting to key fields. That's much easier to reason about for users.

However, I am not sure how it is implemented right now.
I guess secondary sort info is already removed by the property filtering,
but I need to verify that."

2015-03-08 21:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:

> Any other thoughts in this?
>
> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <sewen@apache.org> wrote:
>
> > I think the order of emitting elements is not part of the forward field
> > properties, but would rather be a separate one that we do not have right
> > now.
> >
> > At the moment, we would assume that all group operations destroy
> secondary
> > orders.
> >
> > In that sense, forward fields in group operations only make sense for
> > fields where all fields are the same in the group (key fields).
> >
> > On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fhueske@gmail.com>
> wrote:
> >
> >> Hi Timo,
> >>
> >> there are several restrictions for forwarded fields of operators with
> >> iterator input.
> >> 1) forwarded fields must be emitted in the order in which they are
> >> received
> >> through the iterator
> >> 2) all forwarded fields of a record must stick together, i.e., if your
> >> function builds record from field 0 of the 1st, 3rd, 5th, ... and field
> 1
> >> of the 2nd, 4th, ... record coming through the iterator, these are not
> >> valid forwarded fields.
> >> 3) it is OK to completely filter out records coming through the
> iterator.
> >>
> >> The reason for these rules is, that the optimizer uses forwarded fields
> to
> >> reason about physical data properties such as order and grouping. If you
> >> mix up the order of records or emit records which are composed from
> >> different input records, you might destroy a (secondary) order or
> >> grouping.
> >>
> >> Considering these rules, your second example is correct as well.
> >> In case of the TriadBuilder, the information is correct (in the context
> of
> >> the Program) as well, because field 0 is used as key. It is however
> true,
> >> that there is a strange dependency between the function and the context
> in
> >> which it is used within the program. It would be better to remove the
> >> class
> >> annotation, and add this information through the
> .withForwardedFields("0")
> >> method in the program, to make that clear.
> >>
> >> It is very good that you raise this point.
> >> This is currently not reflected in the documentation is should be made
> >> clear very soon. I will open a JIRA for that.
> >>
> >> Thanks, Fabian
> >>
> >>
> >>
> >> 2015-03-06 10:19 GMT+01:00 Timo Walther <twalthr@apache.org>:
> >>
> >> > Hey all,
> >> >
> >> > I'm currently working a lot on the UDF static code analyzer. But I
> have
> >> a
> >> > general question about Semantic Properties which might be also
> >> interesting
> >> > for other users.
> >> >
> >> > How is the ForwardedFields annotation interpreted for UDF functions
> with
> >> > Iterables?
> >> >
> >> > An example can be found in: org.apache.flink.examples.
> >> > java.graph.EnumTrianglesBasic.TriadBuilder
> >> >
> >> > Does this mean that each call of "collect" must happen in the same
> order
> >> > than the call of "next"? But this is not the case in the example
> above.
> >> Or
> >> > does the annotation only refer to the first iterator element?
> >> >
> >> > Other examples:
> >> >
> >> > @ForwardedFields("*") // CORRECT?
> >> >     public static class GroupReduce1 implements
> >> GroupReduceFunction<Tuple2<Long,
> >> > Long>,Tuple2<Long, Long>> {
> >> >         @Override
> >> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >> >                 Collector<Tuple2<Long, Long>> out) throws Exception
{
> >> >             out.collect(values.iterator().next());
> >> >         }
> >> >     }
> >> >
> >> > @ForwardedFields("*") // NOT CORRECT?
> >> >     public static class GroupReduce3 implements
> >> GroupReduceFunction<Tuple2<Long,
> >> > Long>,Tuple2<Long, Long>> {
> >> >         @Override
> >> >         public void reduce(Iterable<Tuple2<Long, Long>> values,
> >> >                 Collector<Tuple2<Long, Long>> out) throws Exception
{
> >> >             Iterator<Tuple2<Long, Long>> it = values.iterator();
> >> >             while (it.hasNext()) {
> >> >                 Tuple2<Long,Long> t = it.next();
> >> >                 if (t.f0 == 42) {
> >> >                     out.collect(t);
> >> >                 }
> >> >             }
> >> >         }
> >> >     }
> >> >
> >> > Thanks in advance.
> >> >
> >> > Regards,
> >> > Timo
> >> >
> >>
> >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message