beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/2] incubator-beam git commit: Fix multithreaded visibility issue with TransformExecutor
Date Mon, 11 Apr 2016 20:02:45 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master ab7bca7d0 -> 43ef9fe5b


Fix multithreaded visibility issue with TransformExecutor

Use an AtomicReference to ensure the thread the Executor is being run on
will be visible to other threads.

Add a checkState to ensure that the executor will not run more than
once.

Stop setting the active thread to null, as the Transform Executor should
be discarded from any refernces and not hold the overall execution.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/29861ed3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/29861ed3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/29861ed3

Branch: refs/heads/master
Commit: 29861ed3b3b734eab3a0b4c781eaaff46bfc8c23
Parents: ab7bca7
Author: Thomas Groh <tgroh@google.com>
Authored: Mon Apr 11 08:54:58 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Mon Apr 11 13:02:09 2016 -0700

----------------------------------------------------------------------
 .../sdk/runners/inprocess/TransformExecutor.java   | 17 +++++++++++++----
 1 file changed, 13 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/29861ed3/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
index 62a9e24..872aa80 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformExecutor.java
@@ -17,6 +17,8 @@
  */
 package com.google.cloud.dataflow.sdk.runners.inprocess;
 
+import static com.google.common.base.Preconditions.checkState;
+
 import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
 import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.util.WindowedValue;
@@ -25,6 +27,7 @@ import com.google.common.base.Throwables;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.concurrent.Callable;
+import java.util.concurrent.atomic.AtomicReference;
 
 import javax.annotation.Nullable;
 
@@ -68,7 +71,7 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult>
{
   private final CompletionCallback onComplete;
   private final TransformExecutorService transformEvaluationState;
 
-  private Thread thread;
+  private final AtomicReference<Thread> thread;
 
   private TransformExecutor(
       TransformEvaluatorFactory factory,
@@ -88,11 +91,18 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult>
{
     this.onComplete = completionCallback;
 
     this.transformEvaluationState = transformEvaluationState;
+    this.thread = new AtomicReference<>();
   }
 
   @Override
   public InProcessTransformResult call() {
-    this.thread = Thread.currentThread();
+    checkState(
+        thread.compareAndSet(null, Thread.currentThread()),
+        "Tried to execute %s for %s on thread %s, but is already executing on thread %s",
+        TransformExecutor.class.getSimpleName(),
+        transform.getFullName(),
+        Thread.currentThread(),
+        thread.get());
     try {
       Collection<ModelEnforcement<T>> enforcements = new ArrayList<>();
       for (ModelEnforcementFactory enforcementFactory : modelEnforcements) {
@@ -110,7 +120,6 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult>
{
       onComplete.handleThrowable(inputBundle, t);
       throw Throwables.propagate(t);
     } finally {
-      this.thread = null;
       transformEvaluationState.complete(this);
     }
   }
@@ -161,6 +170,6 @@ class TransformExecutor<T> implements Callable<InProcessTransformResult>
{
    */
   @Nullable
   public Thread getThread() {
-    return this.thread;
+    return thread.get();
   }
 }


Mime
View raw message