beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [1/2] beam git commit: BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs
Date Tue, 25 Jul 2017 06:16:49 GMT
Repository: beam
Updated Branches:
  refs/heads/release-2.1.0 1cf560b4b -> c7c777d8b


BEAM-2575 ApexRunner doesn't emit watermarks for additional outputs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/96ff2206
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/96ff2206
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/96ff2206

Branch: refs/heads/release-2.1.0
Commit: 96ff22060c3e3e99e5bf913f5f58c095f48c0021
Parents: 1cf560b
Author: Thomas Weise <thw@apache.org>
Authored: Sun Jul 9 11:57:43 2017 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Mon Jul 24 10:58:35 2017 -0700

----------------------------------------------------------------------
 .../operators/ApexParDoOperator.java            | 21 +++++++++++++-------
 .../runners/apex/examples/WordCountTest.java    |  8 ++++++--
 2 files changed, 20 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/96ff2206/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
index 809ca2a..c3cbab2 100644
--- a/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
+++ b/runners/apex/src/main/java/org/apache/beam/runners/apex/translation/operators/ApexParDoOperator.java
@@ -359,10 +359,7 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator
implements
       }
     }
     if (sideInputs.isEmpty()) {
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", mark);
-      }
-      output.emit(mark);
+      outputWatermark(mark);
       return;
     }
 
@@ -370,10 +367,20 @@ public class ApexParDoOperator<InputT, OutputT> extends BaseOperator
implements
         Math.min(pushedBackWatermark.get(), currentInputWatermark);
     if (potentialOutputWatermark > currentOutputWatermark) {
       currentOutputWatermark = potentialOutputWatermark;
-      if (traceTuples) {
-        LOG.debug("\nemitting watermark {}\n", currentOutputWatermark);
+      outputWatermark(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
+    }
+  }
+
+  private void outputWatermark(ApexStreamTuple.WatermarkTuple<?> mark) {
+    if (traceTuples) {
+      LOG.debug("\nemitting {}\n", mark);
+    }
+    output.emit(mark);
+    if (!additionalOutputPortMapping.isEmpty()) {
+      for (DefaultOutputPort<ApexStreamTuple<?>> additionalOutput :
+          additionalOutputPortMapping.values()) {
+        additionalOutput.emit(mark);
       }
-      output.emit(ApexStreamTuple.WatermarkTuple.of(currentOutputWatermark));
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/96ff2206/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index e76096e..ba75746 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -123,11 +123,15 @@ public class WordCountTest {
     options.setInputFile(new File(inputFile).getAbsolutePath());
     String outputFilePrefix = "target/wordcountresult.txt";
     options.setOutput(outputFilePrefix);
-    WordCountTest.main(TestPipeline.convertToArgs(options));
 
     File outFile1 = new File(outputFilePrefix + "-00000-of-00002");
     File outFile2 = new File(outputFilePrefix + "-00001-of-00002");
-    Assert.assertTrue(outFile1.exists() && outFile2.exists());
+    Assert.assertTrue(!outFile1.exists() || outFile1.delete());
+    Assert.assertTrue(!outFile2.exists() || outFile2.delete());
+
+    WordCountTest.main(TestPipeline.convertToArgs(options));
+
+    Assert.assertTrue("result files exist", outFile1.exists() && outFile2.exists());
     HashSet<String> results = new HashSet<>();
     results.addAll(FileUtils.readLines(outFile1));
     results.addAll(FileUtils.readLines(outFile2));


Mime
View raw message