beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [05/11] incubator-beam git commit: Return from awaitCompletion if Already Done
Date Tue, 07 Jun 2016 16:59:50 GMT
Return from awaitCompletion if Already Done

This ensures that a call to ExecutorService#awaitCompletion returns immediately
if there are no visible updates and the executor has completed. If the executor is
in this state, no additional visible updates will be published and the call will hang.

This sequence generally will not happen, as calls via InProcessPipelineResult
return if the state is already terminal, but this ensures that parallel calls to
awaitCompletion do not hang one calling thread.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/71fa9e62
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/71fa9e62
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/71fa9e62

Branch: refs/heads/release-0.1.0-incubating
Commit: 71fa9e62c6139a07c8a897861ffef4d6e1607b42
Parents: cf21fdb
Author: Thomas Groh <tgroh@users.noreply.github.com>
Authored: Mon May 30 11:04:15 2016 -0700
Committer: Davor Bonaci <davor@google.com>
Committed: Mon Jun 6 18:08:22 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/ExecutorServiceParallelExecutor.java  | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71fa9e62/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index a627125..3129145 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -200,11 +200,16 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
   public void awaitCompletion() throws Throwable {
     VisibleExecutorUpdate update;
     do {
-      update = visibleUpdates.take();
-      if (update.throwable.isPresent()) {
+      // Get an update; don't block forever if another thread has handled it
+      update = visibleUpdates.poll(2L, TimeUnit.SECONDS);
+      if (update == null && executorService.isShutdown()) {
+        // there are no updates to process and no updates will ever be published because
the
+        // executor is shutdown
+        return;
+      } else if (update != null && update.throwable.isPresent()) {
         throw update.throwable.get();
       }
-    } while (!update.isDone());
+    } while (update == null || !update.isDone());
     executorService.shutdown();
   }
 


Mime
View raw message