beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/2] incubator-beam git commit: BEAM-858 Enable ApexRunner integration test in examples.
Date Thu, 03 Nov 2016 04:26:46 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/apex-runner 968eb32b8 -> 51af7e592


BEAM-858 Enable ApexRunner integration test in examples.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/77f4ba2e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/77f4ba2e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/77f4ba2e

Branch: refs/heads/apex-runner
Commit: 77f4ba2eebf0c02dc95a8b90b4277a12638c4300
Parents: 968eb32
Author: Thomas Weise <thw@apache.org>
Authored: Fri Oct 28 23:47:42 2016 -0700
Committer: Thomas Weise <thw@apache.org>
Committed: Tue Nov 1 03:35:15 2016 +0100

----------------------------------------------------------------------
 examples/java/pom.xml                           | 31 ++++++++++++++++++++
 runners/apex/pom.xml                            |  2 +-
 .../beam/runners/apex/ApexPipelineOptions.java  |  6 ----
 .../runners/apex/ApexPipelineTranslator.java    |  2 +-
 .../apache/beam/runners/apex/ApexRunner.java    | 12 --------
 .../beam/runners/apex/ApexRunnerResult.java     | 27 +++++++++++++++--
 .../beam/runners/apex/TestApexRunner.java       | 20 +++++++++++--
 .../io/ApexReadUnboundedInputOperator.java      | 13 ++++++--
 .../apex/examples/StreamingWordCountTest.java   |  4 +--
 .../translators/ParDoBoundTranslatorTest.java   |  6 ++--
 10 files changed, 91 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/examples/java/pom.xml
----------------------------------------------------------------------
diff --git a/examples/java/pom.xml b/examples/java/pom.xml
index fc82ed4..6c1a16f 100644
--- a/examples/java/pom.xml
+++ b/examples/java/pom.xml
@@ -85,6 +85,14 @@
 
         <dependency>
           <groupId>org.apache.beam</groupId>
+          <artifactId>beam-runners-apex</artifactId>
+          <version>${project.version}</version>
+          <scope>runtime</scope>
+          <optional>true</optional>
+        </dependency>
+
+        <dependency>
+          <groupId>org.apache.beam</groupId>
           <artifactId>beam-runners-spark</artifactId>
           <version>${project.version}</version>
           <scope>runtime</scope>
@@ -224,6 +232,29 @@
                   </systemPropertyVariables>
                 </configuration>
               </execution>
+              <execution>
+                <id>apex-runner-integration-tests</id>
+                <goals>
+                  <goal>integration-test</goal>
+                  <goal>verify</goal>
+                </goals>
+                <configuration>
+                  <includes>
+                    <include>WordCountIT.java</include>
+                  </includes>
+                  <parallel>all</parallel>
+                  <threadCount>4</threadCount>
+                  <systemPropertyVariables>
+                    <beamTestPipelineOptions>
+                      [
+                      "--project=apache-beam-testing",
+                      "--tempRoot=gs://temp-storage-for-end-to-end-tests",
+                      "--runner=org.apache.beam.runners.apex.TestApexRunner"
+                      ]
+                    </beamTestPipelineOptions>
+                  </systemPropertyVariables>
+                </configuration>
+              </execution>
             </executions>
           </plugin>
         </plugins>

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/pom.xml
----------------------------------------------------------------------
diff --git a/runners/apex/pom.xml b/runners/apex/pom.xml
index 6ccc0da..d4bcc3d 100644
--- a/runners/apex/pom.xml
+++ b/runners/apex/pom.xml
@@ -72,7 +72,7 @@
       <groupId>org.apache.apex</groupId>
       <artifactId>apex-engine</artifactId>
       <version>${apex.core.version}</version>
-      <scope>test</scope>
+      <scope>runtime</scope>
     </dependency>
 
     <!--- Beam -->

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
index 141a8c1..54fdf76 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineOptions.java
@@ -32,12 +32,6 @@ public interface ApexPipelineOptions extends PipelineOptions, java.io.Serializab
 
   String getApplicationName();
 
-  @Description("set parallelism for Apex runner")
-  void setParallelism(int parallelism);
-
-  @Default.Integer(1)
-  int getParallelism();
-
   @Description("execute the pipeline with embedded cluster")
   void setEmbeddedExecution(boolean embedded);
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
index a6857ee..8a87ce0 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexPipelineTranslator.java
@@ -150,7 +150,7 @@ public class ApexPipelineTranslator implements Pipeline.PipelineVisitor
{
       BoundedToUnboundedSourceAdapter unboundedSource = new BoundedToUnboundedSourceAdapter<>(
           transform.getSource());
       ApexReadUnboundedInputOperator<T, ?> operator = new ApexReadUnboundedInputOperator<>(
-          unboundedSource, context.getPipelineOptions());
+          unboundedSource, true, context.getPipelineOptions());
       context.addOperator(operator, operator.output);
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
index 8da4ec3..416e99c 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunner.java
@@ -137,18 +137,6 @@ public class ApexRunner extends PipelineRunner<ApexRunnerResult>
{
       }
       assertionError = null;
       lc.runAsync();
-      if (options.getRunMillis() > 0) {
-        try {
-          long timeout = System.currentTimeMillis() + options.getRunMillis();
-          while (System.currentTimeMillis() < timeout) {
-            if (assertionError != null) {
-              throw assertionError;
-            }
-          }
-        } finally {
-          lc.shutdown();
-        }
-      }
       return new ApexRunnerResult(lma.getDAG(), lc);
     } catch (Exception e) {
       Throwables.propagateIfPossible(e);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
index 4e3a8d2..03428a6 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/ApexRunnerResult.java
@@ -21,6 +21,7 @@ import com.datatorrent.api.DAG;
 import com.datatorrent.api.LocalMode;
 
 import java.io.IOException;
+import java.lang.reflect.Field;
 
 import org.apache.beam.sdk.AggregatorRetrievalException;
 import org.apache.beam.sdk.AggregatorValues;
@@ -63,12 +64,12 @@ public class ApexRunnerResult implements PipelineResult {
 
   @Override
   public State waitUntilFinish(Duration duration) {
-    throw new UnsupportedOperationException();
+    return ApexRunnerResult.waitUntilFinished(ctrl, duration);
   }
 
   @Override
   public State waitUntilFinish() {
-    throw new UnsupportedOperationException();
+    return ApexRunnerResult.waitUntilFinished(ctrl, null);
   }
 
   @Override
@@ -84,4 +85,26 @@ public class ApexRunnerResult implements PipelineResult {
     return apexDAG;
   }
 
+  public static State waitUntilFinished(LocalMode.Controller ctrl, Duration duration) {
+    // we need to rely on internal field for now
+    // Apex should make it available through API in upcoming release.
+    long timeout = (duration == null || duration.getMillis() < 1) ? Long.MAX_VALUE
+        : System.currentTimeMillis() + duration.getMillis();
+    Field appDoneField;
+    try {
+      appDoneField = ctrl.getClass().getDeclaredField("appDone");
+      appDoneField.setAccessible(true);
+      while (!appDoneField.getBoolean(ctrl) && System.currentTimeMillis() < timeout)
{
+        if (ApexRunner.assertionError != null) {
+          throw ApexRunner.assertionError;
+        }
+        Thread.sleep(500);
+      }
+      return appDoneField.getBoolean(ctrl) ? State.DONE : null;
+    } catch (NoSuchFieldException | SecurityException | IllegalArgumentException
+        | IllegalAccessException | InterruptedException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
index 2e048f0..e447e37 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/TestApexRunner.java
@@ -17,6 +17,8 @@
  */
 package org.apache.beam.runners.apex;
 
+import java.io.IOException;
+
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
@@ -24,18 +26,19 @@ import org.apache.beam.sdk.runners.PipelineRunner;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.values.PInput;
 import org.apache.beam.sdk.values.POutput;
+import org.joda.time.Duration;
 
 /**
  * Apex {@link PipelineRunner} for testing.
  */
 public class TestApexRunner extends PipelineRunner<ApexRunnerResult> {
 
-  private ApexRunner delegate;
+  private static final int RUN_WAIT_MILLIS = 20000;
+  private final ApexRunner delegate;
 
   private TestApexRunner(ApexPipelineOptions options) {
     options.setEmbeddedExecution(true);
     //options.setEmbeddedExecutionDebugMode(false);
-    options.setRunMillis(20000);
     this.delegate = ApexRunner.fromOptions(options);
   }
 
@@ -53,7 +56,18 @@ public class TestApexRunner extends PipelineRunner<ApexRunnerResult>
{
 
   @Override
   public ApexRunnerResult run(Pipeline pipeline) {
-    return delegate.run(pipeline);
+    ApexRunnerResult result = delegate.run(pipeline);
+    try {
+      // this is necessary for tests that just call run() and not waitUntilFinish
+      result.waitUntilFinish(Duration.millis(RUN_WAIT_MILLIS));
+      return result;
+    } finally {
+      try {
+        result.cancel();
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
index 3188dfa..0e2b0c2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translators/io/ApexReadUnboundedInputOperator.java
@@ -55,6 +55,7 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
   private final SerializablePipelineOptions pipelineOptions;
   @Bind(JavaSerializer.class)
   private final UnboundedSource<OutputT, CheckpointMarkT> source;
+  private final boolean isBoundedSource;
   private transient UnboundedSource.UnboundedReader<OutputT> reader;
   private transient boolean available = false;
   @OutputPortFieldAnnotation(optional = true)
@@ -65,16 +66,24 @@ public class ApexReadUnboundedInputOperator<OutputT, CheckpointMarkT
       ApexPipelineOptions options) {
     this.pipelineOptions = new SerializablePipelineOptions(options);
     this.source = source;
+    this.isBoundedSource = false;
+  }
+
+  public ApexReadUnboundedInputOperator(UnboundedSource<OutputT, CheckpointMarkT> source,
+      boolean isBoundedSource, ApexPipelineOptions options) {
+    this.pipelineOptions = new SerializablePipelineOptions(options);
+    this.source = source;
+    this.isBoundedSource = isBoundedSource;
   }
 
   @SuppressWarnings("unused") // for Kryo
   private ApexReadUnboundedInputOperator() {
-    this.pipelineOptions = null; this.source = null;
+    this.pipelineOptions = null; this.source = null; this.isBoundedSource = false;
   }
 
   @Override
   public void beginWindow(long windowId) {
-    if (!available && source instanceof ValuesSource) {
+    if (!available && (isBoundedSource || source instanceof ValuesSource)) {
       // if it's a Create and the input was consumed, emit final watermark
       emitWatermarkIfNecessary(GlobalWindow.TIMESTAMP_MAX_VALUE.getMillis());
       // terminate the stream (allows tests to finish faster)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
index 6ab2e8e..363e669 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/StreamingWordCountTest.java
@@ -91,7 +91,6 @@ public class StreamingWordCountTest {
     ApexPipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
         .as(ApexPipelineOptions.class);
     options.setApplicationName("StreamingWordCount");
-    options.setParallelism(1);
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> wordCounts =
@@ -110,7 +109,7 @@ public class StreamingWordCountTest {
           && FormatAsStringFn.RESULTS.containsKey("bar")) {
         break;
       }
-      Thread.sleep(1000);
+      result.waitUntilFinish(Duration.millis(1000));
     }
     result.cancel();
     Assert.assertTrue(
@@ -118,4 +117,5 @@ public class StreamingWordCountTest {
     FormatAsStringFn.RESULTS.clear();
 
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/77f4ba2e/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
index 6f50398..2379a9e 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/translators/ParDoBoundTranslatorTest.java
@@ -192,7 +192,7 @@ public class ParDoBoundTranslatorTest {
     Pipeline pipeline = Pipeline.create(options);
     PCollection<Integer> pcollection = pipeline.apply(Create.of(1, 2, 3, 4));
     PAssert.that(pcollection).containsInAnyOrder(2, 1, 4, 3);
-    // TODO: good candidate to terminate fast based on processed assertion vs. for auto-shutdown
+    // TODO: terminate faster based on processed assertion vs. auto-shutdown
     pipeline.run();
   }
 
@@ -263,8 +263,8 @@ public class ParDoBoundTranslatorTest {
     Pipeline pipeline = Pipeline.create(options);
 
     List<Integer> inputs = Arrays.asList(3, -42, 666);
-    final TupleTag<String> mainOutputTag = new TupleTag<String>("main");
-    final TupleTag<Void> sideOutputTag = new TupleTag<Void>("sideOutput");
+    final TupleTag<String> mainOutputTag = new TupleTag<>("main");
+    final TupleTag<Void> sideOutputTag = new TupleTag<>("sideOutput");
 
     PCollectionView<Integer> sideInput1 = pipeline
         .apply("CreateSideInput1", Create.of(11))


Mime
View raw message