beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tg...@apache.org
Subject [1/2] incubator-beam git commit: Actually Split Root Transforms
Date Sat, 12 Nov 2016 02:03:23 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master fe17ef7f8 -> e2c21599d


Actually Split Root Transforms

Permit the ExecutorServiceParallelExecutor to control its own
ExecutorService by passing only a TargetParallelism parameter. Split
roots into the greater of 3 or the target parallelism.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b1cec29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b1cec29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b1cec29

Branch: refs/heads/master
Commit: 6b1cec2930a199bba2c65d379116c746532c4148
Parents: fe17ef7
Author: Thomas Groh <tgroh@google.com>
Authored: Thu Nov 10 13:47:40 2016 -0800
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri Nov 11 16:54:11 2016 -0800

----------------------------------------------------------------------
 .../beam/runners/direct/DirectRunner.java       | 27 +--------
 .../direct/ExecutorServiceParallelExecutor.java | 15 +++--
 .../beam/runners/direct/DirectRunnerTest.java   | 62 ++++++++++++++++++++
 3 files changed, 73 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
index f4aeb3e..c9a7864 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/DirectRunner.java
@@ -29,8 +29,6 @@ import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import javax.annotation.Nullable;
 import org.apache.beam.runners.core.GBKIntoKeyedWorkItems;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupByKeyOnly;
@@ -258,7 +256,6 @@ public class DirectRunner
   ////////////////////////////////////////////////////////////////////////////////////////////////
   private final DirectOptions options;
   private final Set<Enforcement> enabledEnforcements;
-  private Supplier<ExecutorService> executorServiceSupplier;
   private Supplier<Clock> clockSupplier = new NanosOffsetClockSupplier();
 
   public static DirectRunner fromOptions(PipelineOptions options) {
@@ -268,7 +265,6 @@ public class DirectRunner
   private DirectRunner(DirectOptions options) {
     this.options = options;
     this.enabledEnforcements = Enforcement.enabled(options);
-    this.executorServiceSupplier = new FixedThreadPoolSupplier(options);
   }
 
   /**
@@ -326,14 +322,11 @@ public class DirectRunner
             consumerTrackingVisitor.getStepNames(),
             consumerTrackingVisitor.getViews());
 
-    // independent executor service for each run
-    ExecutorService executorService = executorServiceSupplier.get();
-
     RootInputProvider rootInputProvider = RootProviderRegistry.defaultRegistry(context);
     TransformEvaluatorRegistry registry = TransformEvaluatorRegistry.defaultRegistry(context);
     PipelineExecutor executor =
         ExecutorServiceParallelExecutor.create(
-            executorService,
+            options.getTargetParallelism(),
             consumerTrackingVisitor.getValueToConsumers(),
             keyedPValueVisitor.getKeyedPValues(),
             rootInputProvider,
@@ -470,24 +463,6 @@ public class DirectRunner
   }
 
   /**
-   * A {@link Supplier} that creates a {@link ExecutorService} based on
-   * {@link Executors#newFixedThreadPool(int)}.
-   */
-  private static class FixedThreadPoolSupplier implements Supplier<ExecutorService>
{
-    private final DirectOptions options;
-
-    private FixedThreadPoolSupplier(DirectOptions options) {
-      this.options = options;
-    }
-
-    @Override
-    public ExecutorService get() {
-      return Executors.newFixedThreadPool(options.getTargetParallelism());
-    }
-  }
-
-
-  /**
    * A {@link Supplier} that creates a {@link NanosOffsetClock}.
    */
   private static class NanosOffsetClockSupplier implements Supplier<Clock> {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 30fc417..0bb3d01 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -37,6 +37,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
@@ -65,6 +66,7 @@ import org.slf4j.LoggerFactory;
 final class ExecutorServiceParallelExecutor implements PipelineExecutor {
   private static final Logger LOG = LoggerFactory.getLogger(ExecutorServiceParallelExecutor.class);
 
+  private final int targetParallelism;
   private final ExecutorService executorService;
 
   private final Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers;
@@ -101,7 +103,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
   private final AtomicLong outstandingWork = new AtomicLong();
 
   public static ExecutorServiceParallelExecutor create(
-      ExecutorService executorService,
+      int targetParallelism,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
       RootInputProvider rootInputProvider,
@@ -111,7 +113,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
               transformEnforcements,
       EvaluationContext context) {
     return new ExecutorServiceParallelExecutor(
-        executorService,
+        targetParallelism,
         valueToConsumers,
         keyedPValues,
         rootInputProvider,
@@ -121,7 +123,7 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
   }
 
   private ExecutorServiceParallelExecutor(
-      ExecutorService executorService,
+      int targetParallelism,
       Map<PValue, Collection<AppliedPTransform<?, ?, ?>>> valueToConsumers,
       Set<PValue> keyedPValues,
       RootInputProvider rootInputProvider,
@@ -129,7 +131,8 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
       @SuppressWarnings("rawtypes")
       Map<Class<? extends PTransform>, Collection<ModelEnforcementFactory>>
transformEnforcements,
       EvaluationContext context) {
-    this.executorService = executorService;
+    this.targetParallelism = targetParallelism;
+    this.executorService = Executors.newFixedThreadPool(targetParallelism);
     this.valueToConsumers = valueToConsumers;
     this.keyedPValues = keyedPValues;
     this.rootInputProvider = rootInputProvider;
@@ -164,10 +167,12 @@ final class ExecutorServiceParallelExecutor implements PipelineExecutor
{
 
   @Override
   public void start(Collection<AppliedPTransform<?, ?, ?>> roots) {
+    int numTargetSplits = Math.max(3, targetParallelism);
     for (AppliedPTransform<?, ?, ?> root : roots) {
       ConcurrentLinkedQueue<CommittedBundle<?>> pending = new ConcurrentLinkedQueue<>();
       try {
-        Collection<CommittedBundle<?>> initialInputs = rootInputProvider.getInitialInputs(root,
1);
+        Collection<CommittedBundle<?>> initialInputs =
+            rootInputProvider.getInitialInputs(root, numTargetSplits);
         pending.addAll(initialInputs);
       } catch (Exception e) {
         throw UserCodeException.wrap(e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b1cec29/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
index 3836f58..3c860b1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/DirectRunnerTest.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.direct;
 
+import static com.google.common.base.Preconditions.checkState;
 import static org.apache.beam.sdk.metrics.MetricMatchers.metricResult;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
@@ -38,11 +39,15 @@ import org.apache.beam.runners.direct.DirectRunner.DirectPipelineResult;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.coders.AtomicCoder;
+import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderException;
 import org.apache.beam.sdk.coders.ListCoder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.io.BoundedSource;
 import org.apache.beam.sdk.io.CountingInput;
+import org.apache.beam.sdk.io.CountingSource;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.Distribution;
 import org.apache.beam.sdk.metrics.DistributionResult;
@@ -219,6 +224,15 @@ public class DirectRunnerTest implements Serializable {
   }
 
   @Test
+  public void splitsInputs() {
+    Pipeline p = getPipeline();
+    PCollection<Long> longs = p.apply(Read.from(MustSplitSource.of(CountingSource.upTo(3))));
+
+    PAssert.that(longs).containsInAnyOrder(0L, 1L, 2L);
+    p.run();
+  }
+
+  @Test
   public void transformDisplayDataExceptionShouldFail() {
     DoFn<Integer, Integer> brokenDoFn = new DoFn<Integer, Integer>() {
       @ProcessElement
@@ -478,4 +492,52 @@ public class DirectRunnerTest implements Serializable {
             DistributionResult.create(26L, 3L, 5L, 13L),
             DistributionResult.create(26L, 3L, 5L, 13L))));
   }
+
+  private static class MustSplitSource<T> extends BoundedSource<T>{
+    public static <T> BoundedSource<T> of(BoundedSource<T> underlying)
{
+      return new MustSplitSource<>(underlying);
+    }
+
+    private final BoundedSource<T> underlying;
+
+    public MustSplitSource(BoundedSource<T> underlying) {
+      this.underlying = underlying;
+    }
+
+    @Override
+    public List<? extends BoundedSource<T>> splitIntoBundles(
+        long desiredBundleSizeBytes, PipelineOptions options) throws Exception {
+      // Must have more than
+      checkState(
+          desiredBundleSizeBytes < getEstimatedSizeBytes(options),
+          "Must split into more than one source");
+      return underlying.splitIntoBundles(desiredBundleSizeBytes, options);
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws Exception {
+      return underlying.getEstimatedSizeBytes(options);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws Exception {
+      return underlying.producesSortedKeys(options);
+    }
+
+    @Override
+    public BoundedReader<T> createReader(PipelineOptions options) throws IOException
{
+      throw new IllegalStateException(
+          "The MustSplitSource cannot create a reader without being split first");
+    }
+
+    @Override
+    public void validate() {
+      underlying.validate();
+    }
+
+    @Override
+    public Coder<T> getDefaultOutputCoder() {
+      return underlying.getDefaultOutputCoder();
+    }
+  }
 }


Mime
View raw message