beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] beam git commit: [BEAM-831] Fix chaining, add test. closes #2216
Date Mon, 08 May 2017 22:37:09 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.0.0 20bfa9411 -> 7dfc45563


[BEAM-831] Fix chaining, add test.
closes #2216


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/3f5282d5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/3f5282d5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/3f5282d5

Branch: refs/heads/release-2.0.0
Commit: 3f5282d515fa53516fda6d0376cc912560fd6d85
Parents: 6eab5c9
Author: Thomas Weise <thw@apache.org>
Authored: Fri May 5 06:45:34 2017 -0700
Committer: Thomas Weise <thw@apache.org>
Committed: Mon May 8 08:37:29 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/runners/apex/ApexRunner.java    | 11 +++--
 .../beam/runners/apex/TestApexRunner.java       |  8 ++++
 .../apex/translation/TranslationContext.java    | 26 ++++-------
 .../beam/runners/apex/ApexRunnerTest.java       | 47 ++++++++++++++++----
 .../FlattenPCollectionTranslatorTest.java       | 13 ++----
 5 files changed, 67 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index a50e10e..366308e 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -70,11 +70,11 @@ import org.apache.hadoop.conf.Configuration;
  * pipeline to an Apex DAG and executes it on an Apex cluster.
  *
  */
-@SuppressWarnings({"rawtypes", "unchecked"})
 public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
 
   private final ApexPipelineOptions options;
   public static final String CLASSPATH_SCHEME = "classpath";
+  protected boolean translateOnly = false;
 
   /**
    * TODO: this isn't thread safe and may cause issues when tests run in parallel
@@ -93,6 +93,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult> {
     return new ApexRunner(apexPipelineOptions);
   }
 
+  @SuppressWarnings({"rawtypes"})
   private List<PTransformOverride> getOverrides() {
     return ImmutableList.<PTransformOverride>builder()
         .add(
@@ -156,7 +157,7 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
     }
 
     if (options.isEmbeddedExecution()) {
-      Launcher<AppHandle> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
+      EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
       Attribute.AttributeMap launchAttributes = new Attribute.AttributeMap.DefaultAttributeMap();
       launchAttributes.put(EmbeddedAppLauncher.RUN_ASYNC, true);
       if (options.isEmbeddedExecutionDebugMode()) {
@@ -166,11 +167,15 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
       Configuration conf = new Configuration(false);
       ApexYarnLauncher.addProperties(conf, configProperties);
       try {
+        if (translateOnly) {
+          launcher.prepareDAG(apexApp, conf);
+          return new ApexRunnerResult(launcher.getDAG(), null);
+        }
         ApexRunner.ASSERTION_ERROR.set(null);
         AppHandle apexAppResult = launcher.launchApp(apexApp, conf, launchAttributes);
         return new ApexRunnerResult(apexDAG.get(), apexAppResult);
       } catch (Exception e) {
-        Throwables.propagateIfPossible(e);
+        Throwables.throwIfUnchecked(e);
         throw new RuntimeException(e);
       }
     } else {

http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index e068db0..b68d3da 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -17,6 +17,7 @@
  */
 package org.apache.beam.runners.apex;
 
+import com.datatorrent.api.DAG;
 import java.io.IOException;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.PipelineRunner;
@@ -44,6 +45,13 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult>
{
     return new TestApexRunner(apexOptions);
   }
 
+  public static DAG translate(Pipeline pipeline, ApexPipelineOptions options) {
+    ApexRunner delegate = new ApexRunner(options);
+    delegate.translateOnly = true;
+    DAG dag = delegate.run(pipeline).getApexDAG();
+    return dag;
+  }
+
   @Override
   public ApexRunnerResult run(Pipeline pipeline) {
     ApexRunnerResult result = delegate.run(pipeline);

http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
index 1224e25..a5e3028 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/TranslationContext.java
@@ -39,6 +39,7 @@ import org.apache.beam.sdk.runners.AppliedPTransform;
 import org.apache.beam.sdk.runners.TransformHierarchy;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.WindowedValue.FullWindowedValueCoder;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
@@ -217,36 +218,27 @@ class TranslationContext {
     List<InputPortInfo> sinks = streamEntry.getValue().getRight();
     OutputPortInfo source = streamEntry.getValue().getLeft();
     PTransform sourceTransform = source.transform.getTransform();
-    if (sourceTransform instanceof ParDo.Bound
-        || sourceTransform instanceof ParDo.BoundMulti) {
-      // Source is ParDo.. Check sink(s)
+    if (sourceTransform instanceof ParDo.MultiOutput
+        || sourceTransform instanceof Window.Assign) {
+      // source qualifies for chaining, check sink(s)
       for (InputPortInfo sink : sinks) {
         PTransform transform = sink.transform.getTransform();
-        if (transform instanceof ParDo.Bound) {
-          ParDo.Bound t = (ParDo.Bound) transform;
-          if (t.getSideInputs().size() > 0) {
-            loc = DAG.Locality.CONTAINER_LOCAL;
-            break;
-          } else {
-            loc = DAG.Locality.THREAD_LOCAL;
-          }
-        } else if (transform instanceof ParDo.BoundMulti) {
-          ParDo.BoundMulti t = (ParDo.BoundMulti) transform;
+        if (transform instanceof ParDo.MultiOutput) {
+          ParDo.MultiOutput t = (ParDo.MultiOutput) transform;
           if (t.getSideInputs().size() > 0) {
             loc = DAG.Locality.CONTAINER_LOCAL;
             break;
           } else {
             loc = DAG.Locality.THREAD_LOCAL;
           }
+        } else if (transform instanceof Window.Assign) {
+          loc = DAG.Locality.THREAD_LOCAL;
         } else {
-          // Sink is not ParDo.. set null locality.
+          // cannot chain, if there is any other sink
           loc = null;
           break;
         }
       }
-    } else {
-      // Source is not ParDo... set null locality
-      loc = null;
     }
 
     streamMeta.setLocality(loc);

http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
index e9e9a5b..c5521d1 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/ApexRunnerTest.java
@@ -18,15 +18,23 @@
 package org.apache.beam.runners.apex;
 
 import com.datatorrent.api.DAG;
+import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DAG.OperatorMeta;
 import com.datatorrent.stram.engine.OperatorContext;
+import com.google.common.collect.Sets;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.util.Properties;
+import java.util.Set;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.VoidCoder;
+import org.apache.beam.sdk.io.GenerateSequence;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.values.PCollection;
+import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -41,16 +49,13 @@ public class ApexRunnerTest {
     String operName = "testProperties";
     ApexPipelineOptions options = PipelineOptionsFactory.create()
         .as(ApexPipelineOptions.class);
-    options.setRunner(ApexRunner.class);
 
     // default configuration from class path
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create();
     Create.Values<Void> empty = Create.empty(VoidCoder.of());
     p.apply(operName, empty);
-    ApexRunnerResult result = (ApexRunnerResult) p.run();
-    result.cancel();
 
-    DAG dag = result.getApexDAG();
+    DAG dag = TestApexRunner.translate(p, options);
     OperatorMeta t1Meta = dag.getOperatorMeta(operName);
     Assert.assertNotNull(t1Meta);
     Assert.assertEquals(new Integer(32), t1Meta.getValue(OperatorContext.MEMORY_MB));
@@ -63,14 +68,40 @@ public class ApexRunnerTest {
       props.store(fos, "");
     }
     options.setConfigFile(tmp.getAbsolutePath());
-    result = (ApexRunnerResult) p.run();
-    result.cancel();
+    dag = TestApexRunner.translate(p, options);
     tmp.delete();
-    dag = result.getApexDAG();
+
     t1Meta = dag.getOperatorMeta(operName);
     Assert.assertNotNull(t1Meta);
     Assert.assertEquals(new Integer(64), t1Meta.getValue(OperatorContext.MEMORY_MB));
 
   }
 
+  @Test
+  public void testParDoChaining() throws Exception {
+    Pipeline p = Pipeline.create();
+    long numElements = 1000;
+    PCollection<Long> input = p.apply(GenerateSequence.from(0).to(numElements));
+    PAssert.thatSingleton(input.apply("Count", Count.<Long>globally())).isEqualTo(numElements);
+
+    ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
+    DAG dag = TestApexRunner.translate(p, options);
+
+    String[] expectedThreadLocal = { "/CreateActual/FilterActuals/Window.Assign" };
+    Set<String> actualThreadLocal = Sets.newHashSet();
+    for (DAG.StreamMeta sm : dag.getAllStreamsMeta()) {
+      DAG.OutputPortMeta opm = sm.getSource();
+      if (sm.getLocality() == Locality.THREAD_LOCAL) {
+         String name = opm.getOperatorMeta().getName();
+         String prefix = "PAssert$";
+         if (name.startsWith(prefix)) {
+           // remove indeterministic prefix
+           name = name.substring(prefix.length() + 1);
+         }
+         actualThreadLocal.add(name);
+      }
+    }
+    Assert.assertThat(actualThreadLocal, Matchers.hasItems(expectedThreadLocal));
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/3f5282d5/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
index 64ca0ee..929778a 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translation/FlattenPCollectionTranslatorTest.java
@@ -26,12 +26,10 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
-import org.apache.apex.api.EmbeddedAppLauncher;
-import org.apache.apex.api.Launcher;
-import org.apache.apex.api.Launcher.LaunchMode;
 import org.apache.beam.runners.apex.ApexPipelineOptions;
 import org.apache.beam.runners.apex.ApexRunner;
 import org.apache.beam.runners.apex.ApexRunnerResult;
+import org.apache.beam.runners.apex.TestApexRunner;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -100,16 +98,11 @@ public class FlattenPCollectionTranslatorTest {
   @Test
   public void testFlattenSingleCollection() {
     ApexPipelineOptions options = PipelineOptionsFactory.as(ApexPipelineOptions.class);
-    options.setRunner(ApexRunner.class);
-    ApexPipelineTranslator translator = new ApexPipelineTranslator(options);
-    EmbeddedAppLauncher<?> launcher = Launcher.getLauncher(LaunchMode.EMBEDDED);
-    DAG dag = launcher.getDAG();
-
-    Pipeline p = Pipeline.create(options);
+    Pipeline p = Pipeline.create();
     PCollection<String> single = p.apply(Create.of(Collections.singletonList("1")));
     PCollectionList.of(single).apply(Flatten.<String>pCollections())
       .apply(ParDo.of(new EmbeddedCollector()));
-    translator.translate(p, dag);
+    DAG dag = TestApexRunner.translate(p, options);
     Assert.assertNotNull(
         dag.getOperatorMeta("ParDo(EmbeddedCollector)/ParMultiDo(EmbeddedCollector)"));
   }


Mime
View raw message