beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [2/2] beam git commit: Convert Executor Services to use Daemon Threads
Date Mon, 08 May 2017 19:52:35 GMT
Convert Executor Services to use Daemon Threads

This will cause the DirectRunner to automatically shut down when the
worker threads are shut down.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/13e51c9c
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/13e51c9c
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/13e51c9c

Branch: refs/heads/master
Commit: 13e51c9cd114eec73c47b71f46214dd92ee81048
Parents: e1791c3
Author: Dan Halperin <dhalperi@google.com>
Authored: Fri May 5 19:49:29 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon May 8 12:52:09 2017 -0700

----------------------------------------------------------------------
 .../runners/direct/BoundedReadEvaluatorFactory.java    | 13 ++++++++++++-
 .../org/apache/beam/runners/direct/DirectMetrics.java  | 10 +++++++++-
 .../apache/beam/runners/direct/EvaluationContext.java  |  3 +--
 .../direct/ExecutorServiceParallelExecutor.java        | 12 +++++++++++-
 .../SplittableProcessElementsEvaluatorFactory.java     | 11 ++++++++++-
 5 files changed, 43 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
index 99a0fca..76db861 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/BoundedReadEvaluatorFactory.java
@@ -21,7 +21,9 @@ import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
 import com.google.common.util.concurrent.SettableFuture;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
@@ -57,7 +59,16 @@ final class BoundedReadEvaluatorFactory implements TransformEvaluatorFactory
{
    */
   private static final long REQUIRED_DYNAMIC_SPLIT_ORIGINAL_SIZE = 0;
   private final EvaluationContext evaluationContext;
-  @VisibleForTesting final ExecutorService executor = Executors.newCachedThreadPool();
+
+  // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
+  @VisibleForTesting
+  final ExecutorService executor =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder()
+              .setThreadFactory(MoreExecutors.platformThreadFactory())
+              .setDaemon(true)
+              .setNameFormat("direct-dynamic-split-requester")
+              .build());
 
   private final long minimumDynamicSplitSize;
 

http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
index b6ca492..b7cd6e7 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectMetrics.java
@@ -21,6 +21,8 @@ import static java.util.Arrays.asList;
 
 import com.google.auto.value.AutoValue;
 import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Map;
 import java.util.Map.Entry;
@@ -51,7 +53,13 @@ import org.apache.beam.sdk.metrics.MetricsMap;
 class DirectMetrics extends MetricResults {
 
   // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in the DirectRunner.
-  private static final ExecutorService COUNTER_COMMITTER = Executors.newCachedThreadPool();
+  private static final ExecutorService COUNTER_COMMITTER =
+      Executors.newCachedThreadPool(
+          new ThreadFactoryBuilder()
+              .setThreadFactory(MoreExecutors.platformThreadFactory())
+              .setDaemon(true)
+              .setNameFormat("direct-metrics-counter-committer")
+              .build());
 
   private interface MetricAggregation<UpdateT, ResultT> {
     UpdateT zero();

http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
index 362ff91..c627119 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/EvaluationContext.java
@@ -123,8 +123,7 @@ class EvaluationContext {
     this.applicationStateInternals = new ConcurrentHashMap<>();
     this.metrics = new DirectMetrics();
 
-    this.callbackExecutor =
-        WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
+    this.callbackExecutor = WatermarkCallbackExecutor.create(MoreExecutors.directExecutor());
   }
 
   public void initialize(

http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 02fb11a..b7f4732 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -28,6 +28,8 @@ import com.google.common.cache.LoadingCache;
 import com.google.common.cache.RemovalListener;
 import com.google.common.cache.RemovalNotification;
 import com.google.common.collect.Iterables;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -132,7 +134,15 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
transformEnforcements,
       EvaluationContext context) {
     this.targetParallelism = targetParallelism;
-    this.executorService = Executors.newFixedThreadPool(targetParallelism);
+    // Don't use Daemon threads for workers. The Pipeline should continue to execute even
if there
+    // are no other active threads (for example, because waitUntilFinish was not called)
+    this.executorService =
+        Executors.newFixedThreadPool(
+            targetParallelism,
+            new ThreadFactoryBuilder()
+                .setThreadFactory(MoreExecutors.platformThreadFactory())
+                .setNameFormat("direct-runner-worker")
+                .build());
     this.graph = graph;
     this.rootProviderRegistry = rootProviderRegistry;
     this.registry = registry;

http://git-wip-us.apache.org/repos/asf/beam/blob/13e51c9c/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 2797233..f490b0b 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.direct;
 
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Executors;
@@ -171,7 +173,14 @@ class SplittableProcessElementsEvaluatorFactory<
             outputWindowedValue,
             evaluationContext.createSideInputReader(transform.getSideInputs()),
             // TODO: For better performance, use a higher-level executor?
-            Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()),
+            // TODO: (BEAM-723) Create a shared ExecutorService for maintenance tasks in
the
+            // DirectRunner.
+            Executors.newSingleThreadScheduledExecutor(
+                new ThreadFactoryBuilder()
+                    .setThreadFactory(MoreExecutors.platformThreadFactory())
+                    .setDaemon(true)
+                    .setNameFormat("direct-splittable-process-element-checkpoint-executor")
+                    .build()),
             10000,
             Duration.standardSeconds(10)));
 


Mime
View raw message