beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: Specifies numShards on windowed writes examples, as it is now required
Date Thu, 16 Nov 2017 00:09:17 GMT
Repository: beam
Updated Branches:
  refs/heads/master c3a96bf3d -> 92013ec9e


Specifies numShards on windowed writes examples, as it is now required


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9c5f8be9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9c5f8be9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9c5f8be9

Branch: refs/heads/master
Commit: 9c5f8be92f0d99ba356e3a6f2b822f9d9a1659cf
Parents: c3a96bf
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Wed Nov 15 10:48:52 2017 -0800
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Nov 15 16:02:09 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/examples/WindowedWordCount.java     |  7 ++++---
 .../beam/examples/common/WriteOneFilePerWindow.java     | 12 ++++--------
 .../org/apache/beam/examples/WindowedWordCountIT.java   |  8 --------
 3 files changed, 8 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
index 5c039cd..21cfed8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WindowedWordCount.java
@@ -161,9 +161,10 @@ public class WindowedWordCount {
     Long getMaxTimestampMillis();
     void setMaxTimestampMillis(Long value);
 
-    @Description("Fixed number of shards to produce per window, or null for runner-chosen
sharding")
-    Integer getNumShards();
-    void setNumShards(Integer numShards);
+    @Description("Fixed number of shards to produce per window")
+    @Default.Integer(3)
+    int getNumShards();
+    void setNumShards(int numShards);
   }
 
   public static void main(String[] args) throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index abd14b7..a5c84f6 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -19,7 +19,6 @@ package org.apache.beam.examples.common;
 
 import static com.google.common.base.MoreObjects.firstNonNull;
 
-import javax.annotation.Nullable;
 import org.apache.beam.sdk.io.FileBasedSink;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
@@ -46,10 +45,9 @@ import org.joda.time.format.ISODateTimeFormat;
 public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone>
{
   private static final DateTimeFormatter FORMATTER = ISODateTimeFormat.hourMinute();
   private String filenamePrefix;
-  @Nullable
-  private Integer numShards;
+  private int numShards;
 
-  public WriteOneFilePerWindow(String filenamePrefix, Integer numShards) {
+  public WriteOneFilePerWindow(String filenamePrefix, int numShards) {
     this.filenamePrefix = filenamePrefix;
     this.numShards = numShards;
   }
@@ -61,10 +59,8 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>,
PDone
         TextIO.write()
             .to(new PerWindowFiles(resource))
             .withTempDirectory(resource.getCurrentDirectory())
-            .withWindowedWrites();
-    if (numShards != null) {
-      write = write.withNumShards(numShards);
-    }
+            .withWindowedWrites()
+            .withNumShards(numShards);
     return input.apply(write);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9c5f8be9/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
index bec7952..279de53 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WindowedWordCountIT.java
@@ -87,14 +87,6 @@ public class WindowedWordCountIT {
   }
 
   @Test
-  public void testWindowedWordCountInBatchDynamicSharding() throws Exception {
-    WindowedWordCountITOptions options = batchOptions();
-    // This is the default value, but make it explicit
-    options.setNumShards(null);
-    testWindowedWordCountPipeline(options);
-  }
-
-  @Test
   public void testWindowedWordCountInBatchStaticSharding() throws Exception {
     WindowedWordCountITOptions options = batchOptions();
     options.setNumShards(3);


Mime
View raw message