beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Thomas Groh (JIRA)" <>
Subject [jira] [Commented] (BEAM-1835) NPE in DirectRunner PubsubReader.ackBatch
Date Thu, 30 Mar 2017 02:17:41 GMT


Thomas Groh commented on BEAM-1835:

This is actually a bug in PubsubUnboundedSource or PubsubCheckpoint. The {{finalizeCheckpoint}}
documentation states that
"This finalize method may be called from any thread, concurrently with calls to {{UnboundedReader}}
it was created from."
"It is not safe to assume the {{UnboundedReader}} from which this checkpoint was created still
exists at the time this method is called."

The Runner is permitted to close the reader and finalize all outstanding checkpoints in whatever
order, potentially interleaving the two operations, so the checkpoint mark must not assume
that the client is still available, or that the client, if it is available and open when the
call to finalizeCheckpoint begins, is still available and open at any future point.

> NPE in DirectRunner PubsubReader.ackBatch
> -----------------------------------------
>                 Key: BEAM-1835
>                 URL:
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-direct, sdk-java-core
>            Reporter: Rafal Wojdyla
>            Assignee: Rafal Wojdyla
> DirectRunner streaming mode throws null pointer exception:
> {noformat}
>   at$PubsubCheckpoint.finalizeCheckpoint(
>   at$UnboundedReadEvaluator.finishRead(
>   at$UnboundedReadEvaluator.processElement(
>   at
>   at
>   at java.util.concurrent.Executors$
>   at
>   at java.util.concurrent.ThreadPoolExecutor.runWorker(
>   at java.util.concurrent.ThreadPoolExecutor$
>   at
> {noformat}
> This does not happen always, but for large enough number of events it's pretty reproducible.
The problems seems to be the concurrent reuse of a reader among multiple threads, and a race
condition, when one of the threads "decided" to close the reader, based on:
> {code}
> private static final double DEFAULT_READER_REUSE_CHANCE = 0.95;
> {code}
> the close, nulls out pubsub client:
> {code}
>     @Override
>     public void close() throws IOException {
>       if (pubsubClient != null) {
>         pubsubClient.close();
>         pubsubClient = null;
>       }
>     }
> {code}
> which if still in use by other thread will result in NPE above.

This message was sent by Atlassian JIRA

View raw message