beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jean-Baptiste Onofré ...@nanthrax.net>
Subject Re: DirectRunner in test - await completion of workers threads?
Date Sun, 01 Apr 2018 18:20:04 GMT
Indeed. It's exactly what Romain's PR is about.

Regards
JB

Le 1 avr. 2018 à 19:33, à 19:33, Reuven Lax <relax@google.com> a écrit:
>Correct - teardown is currently run in the direct runner, but
>asynchronously. I believe Romain's pending PRs should solve this for
>your
>use case.
>
>On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson
><timrobertson100@gmail.com>
>wrote:
>
>> Thanks for confirming Romain - also for the very fast reply!
>>
>> I'll continue with the workaround and reference BEAM-3409 inline as
>> justification.
>> I'm trying to wrap this up before travel next week, but if I get a
>chance
>> I'll try and run this scenario (BEAM-3848) with your patch.
>>
>>
>>
>> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau
><rmannibucau@gmail.com
>> > wrote:
>>
>>> Hi
>>>
>>> I have the same blocker and created
>>>
>>> https://github.com/apache/beam/pull/4790 and
>>> https://github.com/apache/beam/pull/4965 to solve part of it
>>>
>>>
>>>
>>> Le 1 avr. 2018 11:35, "Tim Robertson" <timrobertson100@gmail.com> a
>>> écrit :
>>>
>>> Hi devs
>>>
>>> I'm working on SolrIO tests for failure scenarios (i.e. an exception
>will
>>> come out of the pipeline execution).  I see that the exception is
>surfaced
>>> to the driver while "direct-runner-worker" threads are still
>running.
>>> This causes issue because:
>>>
>>>   1. The Solr tests do thread leak detection, and a
>solrClient.close()
>>> is what removes the object
>>>   2. @Teardown is not necessarily called which is what would close
>the
>>> solrClient
>>>
>>> I can unregister all the solrClients that have been spawned. 
>However I
>>> have seen race conditions where there are still threads running
>creating
>>> and registering clients. I need to someone ensure that all workers
>related
>>> to the pipeline execution are indeed finished so no new ones are
>created
>>> after the first exception is passed up.
>>>
>>> Currently I have this (psuedo code) which works, but I suspect
>someone
>>> can suggest a better approach:
>>>
>>> // store the state of clients registered for object leak check
>>> Set<Object> existingClients = registeredSolrClients();
>>> try {
>>>   pipeline.run();
>>>
>>> } catch (Pipeline.PipelineExecutionException e) {
>>>
>>>
>>> // Hack: await all bundle workers completing
>>> while (namedThreadStillExists("direct-runner-worker")) {
>>> Thread.sleep(100);
>>> }
>>>
>>> // remove all solrClients created in this execution only
>>> // since the teardown may not have done so
>>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>>> if (o instanceof SolrClient && !existingClients.contains(o)) {
>>> ObjectReleaseTracker.release(o);
>>> }
>>> }
>>>
>>> // now we can do our assertions
>>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.
>>> RETRY_ATTEMPT_LOG, 1));
>>>
>>>
>>> Please do point out the obvious if I am missing it - I am a newbie
>here...
>>>
>>> Thank you all very much,
>>> Tim
>>> (timrobertson100@gmail.com on the slack apache/beam channel)
>>>
>>>
>>>
>>>
>>

Mime
View raw message