flink-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Stephan Ewen <se...@apache.org>
Subject Re: Semantic Properties and Functions with Iterables
Date Mon, 09 Mar 2015 11:50:16 GMT
That is right, Timo, as far as I would understand it.

On Mon, Mar 9, 2015 at 12:04 PM, Timo Walther <twalthr@apache.org> wrote:

> Thanks for the clarification. If I have understood it correctly, forwarded
> fields are only interesting for key fields, right? So I will implement that
> key information is passed to the analyzer for consideration.
>
> So if GroupReduce1 is grouped by f1, the result will be
> @ForwardedFields("1") in this example and not "*":
>
>     public static class GroupReduce1 implements GroupReduceFunction<Tuple2<Long,
> Long>,Tuple2<Long, Long>> {
>         @Override
>         public void reduce(Iterable<Tuple2<Long, Long>> values,
>                 Collector<Tuple2<Long, Long>> out) throws Exception {
>             out.collect(values.iterator().next());
>         }
>     }
>
>
> On 08.03.2015 23:21, Fabian Hueske wrote:
>
>> I added you comment and an answer to FLINK-1656:
>>
>> "Right, that's a good point.
>>
>> +1 limiting to key fields. That's much easier to reason about for users.
>>
>> However, I am not sure how it is implemented right now.
>> I guess secondary sort info is already removed by the property filtering,
>> but I need to verify that."
>>
>> 2015-03-08 21:53 GMT+01:00 Stephan Ewen <sewen@apache.org>:
>>
>>  Any other thoughts in this?
>>>
>>> On Fri, Mar 6, 2015 at 12:12 PM, Stephan Ewen <sewen@apache.org> wrote:
>>>
>>>  I think the order of emitting elements is not part of the forward field
>>>> properties, but would rather be a separate one that we do not have right
>>>> now.
>>>>
>>>> At the moment, we would assume that all group operations destroy
>>>>
>>> secondary
>>>
>>>> orders.
>>>>
>>>> In that sense, forward fields in group operations only make sense for
>>>> fields where all fields are the same in the group (key fields).
>>>>
>>>> On Fri, Mar 6, 2015 at 11:25 AM, Fabian Hueske <fhueske@gmail.com>
>>>>
>>> wrote:
>>>
>>>> Hi Timo,
>>>>>
>>>>> there are several restrictions for forwarded fields of operators with
>>>>> iterator input.
>>>>> 1) forwarded fields must be emitted in the order in which they are
>>>>> received
>>>>> through the iterator
>>>>> 2) all forwarded fields of a record must stick together, i.e., if your
>>>>> function builds record from field 0 of the 1st, 3rd, 5th, ... and field
>>>>>
>>>> 1
>>>
>>>> of the 2nd, 4th, ... record coming through the iterator, these are not
>>>>> valid forwarded fields.
>>>>> 3) it is OK to completely filter out records coming through the
>>>>>
>>>> iterator.
>>>
>>>> The reason for these rules is, that the optimizer uses forwarded fields
>>>>>
>>>> to
>>>
>>>> reason about physical data properties such as order and grouping. If you
>>>>> mix up the order of records or emit records which are composed from
>>>>> different input records, you might destroy a (secondary) order or
>>>>> grouping.
>>>>>
>>>>> Considering these rules, your second example is correct as well.
>>>>> In case of the TriadBuilder, the information is correct (in the context
>>>>>
>>>> of
>>>
>>>> the Program) as well, because field 0 is used as key. It is however
>>>>>
>>>> true,
>>>
>>>> that there is a strange dependency between the function and the context
>>>>>
>>>> in
>>>
>>>> which it is used within the program. It would be better to remove the
>>>>> class
>>>>> annotation, and add this information through the
>>>>>
>>>> .withForwardedFields("0")
>>>
>>>> method in the program, to make that clear.
>>>>>
>>>>> It is very good that you raise this point.
>>>>> This is currently not reflected in the documentation is should be made
>>>>> clear very soon. I will open a JIRA for that.
>>>>>
>>>>> Thanks, Fabian
>>>>>
>>>>>
>>>>>
>>>>> 2015-03-06 10:19 GMT+01:00 Timo Walther <twalthr@apache.org>:
>>>>>
>>>>>  Hey all,
>>>>>>
>>>>>> I'm currently working a lot on the UDF static code analyzer. But
I
>>>>>>
>>>>> have
>>>
>>>> a
>>>>>
>>>>>> general question about Semantic Properties which might be also
>>>>>>
>>>>> interesting
>>>>>
>>>>>> for other users.
>>>>>>
>>>>>> How is the ForwardedFields annotation interpreted for UDF functions
>>>>>>
>>>>> with
>>>
>>>> Iterables?
>>>>>>
>>>>>> An example can be found in: org.apache.flink.examples.
>>>>>> java.graph.EnumTrianglesBasic.TriadBuilder
>>>>>>
>>>>>> Does this mean that each call of "collect" must happen in the same
>>>>>>
>>>>> order
>>>
>>>> than the call of "next"? But this is not the case in the example
>>>>>>
>>>>> above.
>>>
>>>> Or
>>>>>
>>>>>> does the annotation only refer to the first iterator element?
>>>>>>
>>>>>> Other examples:
>>>>>>
>>>>>> @ForwardedFields("*") // CORRECT?
>>>>>>      public static class GroupReduce1 implements
>>>>>>
>>>>> GroupReduceFunction<Tuple2<Long,
>>>>>
>>>>>> Long>,Tuple2<Long, Long>> {
>>>>>>          @Override
>>>>>>          public void reduce(Iterable<Tuple2<Long, Long>>
values,
>>>>>>                  Collector<Tuple2<Long, Long>> out) throws
Exception {
>>>>>>              out.collect(values.iterator().next());
>>>>>>          }
>>>>>>      }
>>>>>>
>>>>>> @ForwardedFields("*") // NOT CORRECT?
>>>>>>      public static class GroupReduce3 implements
>>>>>>
>>>>> GroupReduceFunction<Tuple2<Long,
>>>>>
>>>>>> Long>,Tuple2<Long, Long>> {
>>>>>>          @Override
>>>>>>          public void reduce(Iterable<Tuple2<Long, Long>>
values,
>>>>>>                  Collector<Tuple2<Long, Long>> out) throws
Exception {
>>>>>>              Iterator<Tuple2<Long, Long>> it = values.iterator();
>>>>>>              while (it.hasNext()) {
>>>>>>                  Tuple2<Long,Long> t = it.next();
>>>>>>                  if (t.f0 == 42) {
>>>>>>                      out.collect(t);
>>>>>>                  }
>>>>>>              }
>>>>>>          }
>>>>>>      }
>>>>>>
>>>>>> Thanks in advance.
>>>>>>
>>>>>> Regards,
>>>>>> Timo
>>>>>>
>>>>>>
>>>>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message