flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gábor Gévay <gga...@gmail.com>
Subject Re: termination of stream#iterate on finite streams
Date Tue, 05 Sep 2017 12:07:44 GMT
Hello,

There is a Flink Improvement Proposal to redesign the iterations:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=66853132
This will address the termination issue.

Best,
Gábor





On Mon, Sep 4, 2017 at 11:00 AM, Xingcan Cui <xingcanc@gmail.com> wrote:
> Hi Peter,
>
> That's a good idea, but may not be applicable with an iteration operator.
> The operator can
> not determine when to generate the "end-of-stream message" for the feedback
> stream.
> The provided function (e.g., filter(_ > 0).map(_ - 1)) is stateless and has
> no side-effects.
>
> Best,
> Xingcan
>
>
>
> On Mon, Sep 4, 2017 at 4:40 AM, Peter Ertl <peter.ertl@gmx.net> wrote:
>>
>> Hi Xingcan!
>>
>> if a _finite_ stream would, at the end, emit a special, trailing
>> "End-Of-Stream Message" that floats downward the operator stream, wouldn't
>> this enable us to deterministically end the iteration without needing a
>> timeout?
>>
>> Having an arbitrary timeout that must be longer than any iteration step
>> takes seems really awkward.
>>
>> What you think?
>>
>> Best regards
>> Peter
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingcanc@gmail.com>:
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.ertl@gmx.net> wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingcanc@gmail.com>:
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) //
dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>
>>
>>
>> Am 02.09.2017 um 17:16 schrieb Xingcan Cui <xingcanc@gmail.com>:
>>
>> Hi Peter,
>>
>> I just omitted the filter part. Sorry for that.
>>
>> Actually, as the javadoc explained, by default a DataStream with iteration
>> will never terminate. That's because in a
>> stream environment with iteration, the operator will never know whether
>> the feedback stream has reached its end
>> (though the data source is terminated, there may be unknowable subsequent
>> data) and that's why it needs a
>> timeout value to make the judgement, just like many other function calls
>> in network connection. In other words,
>> you know the feedback stream will be empty in the future, but the operator
>> doesn't. Thus we provide it a maximum
>> waiting time for the next record.
>>
>> Internally, this mechanism is implemented via a blocking queue (the
>> related code can be found here).
>>
>> Hope everything is considered this time : )
>>
>> Best,
>> Xingcan
>>
>> On Sat, Sep 2, 2017 at 10:08 PM, Peter Ertl <peter.ertl@gmx.net> wrote:
>>>
>>>
>>> Am 02.09.2017 um 04:45 schrieb Xingcan Cui <xingcanc@gmail.com>:
>>>
>>> In your codes, all the the long values will subtract 1 and be sent back
>>> to the iterate operator, endlessly.
>>>
>>>
>>>
>>> Is this true? shouldn't
>>>
>>>   val iterationResult2 = env.generateSequence(1, 4).iterate(it => {
>>>     (it.filter(_ > 0).map(_ - 1), it.filter(_ > 0).map(_ => 'y')) //
dump
>>> meaningless 'y' chars just to do anything
>>>   })
>>>   iterationResult2.print()
>>>
>>>
>>> produce the following _feedback_ streams?
>>>
>>> initial input to #iterate(): [1 2 3 4]
>>>
>>> iteration #1 : [1 2 3]
>>> iteration #2 : [1 2]
>>> iteration #3 : [1]
>>> iteration #4 : []  => empty feedback stream => cause termination? (which
>>> actually only happens when setting a timeout value)
>>>
>>> Best regards
>>> Peter
>>>
>>>
>>
>>
>

Mime
View raw message