beam-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reuven Lax <re...@google.com>
Subject Re: DirectRunner in test - await completion of workers threads?
Date Sun, 01 Apr 2018 17:33:02 GMT
Correct - teardown is currently run in the direct runner, but
asynchronously. I believe Romain's pending PRs should solve this for your
use case.

On Sun, Apr 1, 2018 at 3:13 AM Tim Robertson <timrobertson100@gmail.com>
wrote:

> Thanks for confirming Romain - also for the very fast reply!
>
> I'll continue with the workaround and reference BEAM-3409 inline as
> justification.
> I'm trying to wrap this up before travel next week, but if I get a chance
> I'll try and run this scenario (BEAM-3848) with your patch.
>
>
>
> On Sun, Apr 1, 2018 at 12:05 PM, Romain Manni-Bucau <rmannibucau@gmail.com
> > wrote:
>
>> Hi
>>
>> I have the same blocker and created
>>
>> https://github.com/apache/beam/pull/4790 and
>> https://github.com/apache/beam/pull/4965 to solve part of it
>>
>>
>>
>> Le 1 avr. 2018 11:35, "Tim Robertson" <timrobertson100@gmail.com> a
>> écrit :
>>
>> Hi devs
>>
>> I'm working on SolrIO tests for failure scenarios (i.e. an exception will
>> come out of the pipeline execution).  I see that the exception is surfaced
>> to the driver while "direct-runner-worker" threads are still running.
>> This causes issue because:
>>
>>   1. The Solr tests do thread leak detection, and a solrClient.close()
>> is what removes the object
>>   2. @Teardown is not necessarily called which is what would close the
>> solrClient
>>
>> I can unregister all the solrClients that have been spawned.  However I
>> have seen race conditions where there are still threads running creating
>> and registering clients. I need to someone ensure that all workers related
>> to the pipeline execution are indeed finished so no new ones are created
>> after the first exception is passed up.
>>
>> Currently I have this (psuedo code) which works, but I suspect someone
>> can suggest a better approach:
>>
>> // store the state of clients registered for object leak check
>> Set<Object> existingClients = registeredSolrClients();
>> try {
>>   pipeline.run();
>>
>> } catch (Pipeline.PipelineExecutionException e) {
>>
>>
>> // Hack: await all bundle workers completing
>> while (namedThreadStillExists("direct-runner-worker")) {
>> Thread.sleep(100);
>> }
>>
>> // remove all solrClients created in this execution only
>> // since the teardown may not have done so
>> for (Object o : ObjectReleaseTracker.OBJECTS.keySet()) {
>> if (o instanceof SolrClient && !existingClients.contains(o)) {
>> ObjectReleaseTracker.release(o);
>> }
>> }
>>
>> // now we can do our assertions
>> expectedLogs.verifyWarn(String.format(SolrIO.Write.WriteFn.
>> RETRY_ATTEMPT_LOG, 1));
>>
>>
>> Please do point out the obvious if I am missing it - I am a newbie here...
>>
>> Thank you all very much,
>> Tim
>> (timrobertson100@gmail.com on the slack apache/beam channel)
>>
>>
>>
>>
>

Mime
View raw message