flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: AsyncCollector Does not release the thread (1.2.1)
Date Wed, 07 Jun 2017 10:37:48 GMT
Hi Steve,

I’m assuming you are using Flink 1.2.x? If yes, then I’m afraid you re-discovered this
issue: https://issues.apache.org/jira/browse/FLINK-6435 <https://issues.apache.org/jira/browse/FLINK-6435>.
It was fixed in Flink 1.3.0. Is it possible for you to update to that version or do you think
it’s important that we back port that fix to the Flink 1.2.x line?

Best,
Aljoscha

> On 6. Jun 2017, at 19:34, Aljoscha Krettek <aljoscha@apache.org> wrote:
> 
> Ok, thanks for letting us know. I’ll investigate.
>> On 6. Jun 2017, at 19:28, Steve Robert <srobert@qualys.com <mailto:srobert@qualys.com>>
wrote:
>> 
>> Hi Aljoscha ,
>> 
>> thank you for your reply,
>>  yes the queue being filled up and no more elements are being processed.(In relation
to the limit defined at the "orderedWait" function call).
>> To add additional information, if I run the test on a local cluster I can see that
the job never ends because the AsyncFunction stay blocked As if there was no call to  the
"collect" method
>> Best,
>> Steve
>> 
>> On Tue, Jun 6, 2017 at 4:56 PM, Aljoscha Krettek <aljoscha@apache.org <mailto:aljoscha@apache.org>>
wrote:
>> Hi,
>> 
>> As far as I know calling collect(Throwable) should also finish the promise that would
otherwise fulfilled by successfully collecting a result. If not then you might have found
a bug. What makes you think that the Thread is not being released? Is your queue being filled
up and no more elements are being processed?
>> 
>> Regarding your other question, yes, you can collect an empty Collection for signalling
that there was no result.
>> 
>> Best,
>> Aljoscha
>> 
>>> On 8. May 2017, at 21:47, Steve Robert <srobert@qualys.com <mailto:srobert@qualys.com>>
wrote:
>>> 
>>> Hi guys, 
>>> 
>>> AsyncCollector.collect(Throwable) method  seem to  not release  the Thread.
>>> This scenario may be problematic when calling an external API
>>> In the case of a timeout error there is no data to collect.
>>> 
>>> for example :
>>> 
>>>   CompletableFuture.supplyAsync(() -> asyncCallTask(input))
>>>             .thenAccept((Collection<Tuple3<String, streamDTO, Integer>>
result) -> {
>>> 
>>>                 this.tupleEmited.getAndIncrement();
>>> 
>>>                 asyncCollector.collect(result);
>>>             })
>>>             .exceptionally((ex) -> {
>>>                 asyncCollector.collect(ex);
>>>                 return null;
>>>             });
>>> }
>>> it is possible to create an empty Collection and collect this empty collection
to force the Thread to be released but this workflow seems strange to me.
>>> thank for your help
>>>  
>>> 
>>> -- 
>>> Steve Robert  <https://www.linkedin.com/company/qualys>
>>> Software Engineer
>>> srobert@qualys.com <mailto:srobert@qualys.com>
>>> T <>
>>> Qualys, Inc. – Continuous Security
>>> Blog <https://qualys.com/blog> | Community <https://community.qualys.com/>
| Twitter <https://twitter.com/qualys>
>>>  <https://www.qualys.com/email-banner>
>> 
>> 
>> 
>> -- 
>> Steve Robert  <https://www.linkedin.com/company/qualys>
>> Software Engineer
>> srobert@qualys.com <mailto:srobert@qualys.com>
>> T <>
>> Qualys, Inc. – Continuous Security
>> Blog <https://qualys.com/blog> | Community <https://community.qualys.com/>
| Twitter <https://twitter.com/qualys>
>>  <https://www.qualys.com/email-banner>


Mime
View raw message