beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [1/3] incubator-beam git commit: Use a weakValues LoadingCache for serial TransformExecutorServices
Date Fri, 08 Apr 2016 21:26:51 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 529bcdf56 -> 5d78420bf


Use a weakValues LoadingCache for serial TransformExecutorServices

This allows the garbage collector to clean up references to
TransformExecutorServices which are not currently in use.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6f526374
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6f526374
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6f526374

Branch: refs/heads/master
Commit: 6f526374fbc743a0d22e37ad0f746f0a695785dd
Parents: ad58e26
Author: Thomas Groh <tgroh@google.com>
Authored: Tue Mar 29 10:56:10 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Fri Apr 8 14:20:20 2016 -0700

----------------------------------------------------------------------
 .../ExecutorServiceParallelExecutor.java        | 36 ++++++++++++++------
 1 file changed, 25 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6f526374/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
index 4d45e8f..c770735 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/ExecutorServiceParallelExecutor.java
@@ -31,6 +31,9 @@ import com.google.cloud.dataflow.sdk.values.PCollection;
 import com.google.cloud.dataflow.sdk.values.PValue;
 import com.google.common.base.MoreObjects;
 import com.google.common.base.Optional;
+import com.google.common.cache.CacheBuilder;
+import com.google.common.cache.CacheLoader;
+import com.google.common.cache.LoadingCache;
 import com.google.common.collect.ImmutableList;
 
 import org.joda.time.Instant;
@@ -69,7 +72,7 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
 
   private final InProcessEvaluationContext evaluationContext;
 
-  private final ConcurrentMap<StepAndKey, TransformExecutorService> currentEvaluations;
+  private final LoadingCache<StepAndKey, TransformExecutorService> executorServices;
   private final ConcurrentMap<TransformExecutor<?>, Boolean> scheduledExecutors;
 
   private final Queue<ExecutorUpdate> allUpdates;
@@ -107,8 +110,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
     this.transformEnforcements = transformEnforcements;
     this.evaluationContext = context;
 
-    currentEvaluations = new ConcurrentHashMap<>();
     scheduledExecutors = new ConcurrentHashMap<>();
+    // Weak Values allows TransformExecutorServices that are no longer in use to be reclaimed.
+    // Executing TransformExecutorServices have a strong reference to their TransformExecutorService
+    // which stops the TransformExecutorServices from being prematurely garbage collected
+    executorServices =
+        CacheBuilder.newBuilder().weakValues().build(serialTransformExecutorServiceCacheLoader());
 
     this.allUpdates = new ConcurrentLinkedQueue<>();
     this.visibleUpdates = new ArrayBlockingQueue<>(20);
@@ -118,6 +125,16 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
     defaultCompletionCallback = new DefaultCompletionCallback();
   }
 
+  private CacheLoader<StepAndKey, TransformExecutorService>
+      serialTransformExecutorServiceCacheLoader() {
+    return new CacheLoader<StepAndKey, TransformExecutorService>() {
+      @Override
+      public TransformExecutorService load(StepAndKey stepAndKey) throws Exception {
+        return TransformExecutorServices.serial(executorService, scheduledExecutors);
+      }
+    };
+  }
+
   @Override
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
     rootNodes = ImmutableList.copyOf(roots);
@@ -142,7 +159,12 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
     if (bundle != null && isKeyed(bundle.getPCollection())) {
       final StepAndKey stepAndKey =
           StepAndKey.of(transform, bundle == null ? null : bundle.getKey());
-      transformExecutor = getSerialExecutorService(stepAndKey);
+      // This executor will remain reachable until it has executed all scheduled transforms.
+      // The TransformExecutors keep a strong reference to the Executor, the ExecutorService
keeps
+      // a reference to the scheduled TransformExecutor callable. Follow-up TransformExecutors
+      // (scheduled due to the completion of another TransformExecutor) are provided to the
+      // ExecutorService before the Earlier TransformExecutor callable completes.
+      transformExecutor = executorServices.getUnchecked(stepAndKey);
     } else {
       transformExecutor = parallelExecutorService;
     }
@@ -174,14 +196,6 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor
{
     }
   }
 
-  private TransformExecutorService getSerialExecutorService(StepAndKey stepAndKey) {
-    if (!currentEvaluations.containsKey(stepAndKey)) {
-      currentEvaluations.putIfAbsent(
-          stepAndKey, TransformExecutorServices.serial(executorService, scheduledExecutors));
-    }
-    return currentEvaluations.get(stepAndKey);
-  }
-
   @Override
   public void awaitCompletion() throws Throwable {
     VisibleExecutorUpdate update;


Mime
View raw message