beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [8/9] beam git commit: BEAM-1416 Write transform should comply with PTransform style guide
Date Wed, 01 Mar 2017 04:10:55 GMT
BEAM-1416 Write transform should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b87621e9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b87621e9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b87621e9

Branch: refs/heads/master
Commit: b87621e9533e772f92a3cf4325299350eb615b62
Parents: fffd2c5
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Feb 7 16:44:40 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../core/construction/PTransformMatchers.java   |   4 +-
 .../construction/PTransformMatchersTest.java    |  10 +-
 .../direct/WriteWithShardingFactory.java        |   9 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../FlinkStreamingTransformTranslators.java     |   6 +-
 .../beam/runners/flink/WriteSinkITCase.java     |   2 +-
 .../beam/runners/dataflow/DataflowRunner.java   |  13 +-
 .../java/org/apache/beam/sdk/io/AvroIO.java     |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     |   2 +-
 .../main/java/org/apache/beam/sdk/io/Write.java | 727 +++++++++----------
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  16 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 12 files changed, 394 insertions(+), 403 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
index 05b632b..7f8d467 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/PTransformMatchers.java
@@ -182,8 +182,8 @@ public class PTransformMatchers {
     return new PTransformMatcher() {
       @Override
       public boolean matches(AppliedPTransform<?, ?, ?> application) {
-        if (application.getTransform() instanceof Write.Bound) {
-          return ((Write.Bound) application.getTransform()).getSharding() == null;
+        if (application.getTransform() instanceof Write) {
+          return ((Write) application.getTransform()).getSharding() == null;
         }
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index cace033..be3ed6b 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -387,7 +387,7 @@ public class PTransformMatchersTest implements Serializable {
 
   @Test
   public void writeWithRunnerDeterminedSharding() {
-    Write.Bound<Integer> write =
+    Write<Integer> write =
         Write.to(
             new FileBasedSink<Integer>("foo", "bar") {
               @Override
@@ -400,13 +400,13 @@ public class PTransformMatchersTest implements Serializable {
         PTransformMatchers.writeWithRunnerDeterminedSharding().matches(appliedWrite(write)),
         is(true));
 
-    Write.Bound<Integer> withStaticSharding = write.withNumShards(3);
+    Write<Integer> withStaticSharding = write.withNumShards(3);
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
             .matches(appliedWrite(withStaticSharding)),
         is(false));
 
-    Write.Bound<Integer> withCustomSharding =
+    Write<Integer> withCustomSharding =
         write.withSharding(Sum.integersGlobally().asSingletonView());
     assertThat(
         PTransformMatchers.writeWithRunnerDeterminedSharding()
@@ -414,8 +414,8 @@ public class PTransformMatchersTest implements Serializable {
         is(false));
   }
 
-  private AppliedPTransform<?, ?, ?> appliedWrite(Write.Bound<Integer> write) {
-    return AppliedPTransform.<PCollection<Integer>, PDone, Write.Bound<Integer>>of(
+  private AppliedPTransform<?, ?, ?> appliedWrite(Write<Integer> write) {
+    return AppliedPTransform.<PCollection<Integer>, PDone, Write<Integer>>of(
         "Write",
         Collections.<TaggedPValue>emptyList(),
         Collections.<TaggedPValue>emptyList(),

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
index f206fb0..63122fe 100644
--- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
+++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WriteWithShardingFactory.java
@@ -29,7 +29,6 @@ import java.util.Map;
 import java.util.concurrent.ThreadLocalRandom;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.runners.PTransformOverrideFactory;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -48,13 +47,15 @@ import org.apache.beam.sdk.values.TaggedPValue;
  * of shards is the log base 10 of the number of input records, with up to 2 additional shards.
  */
 class WriteWithShardingFactory<InputT>
-    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Bound<InputT>> {
+    implements PTransformOverrideFactory<PCollection<InputT>, PDone, Write<InputT>> {
   static final int MAX_RANDOM_EXTRA_SHARDS = 3;
   @VisibleForTesting static final int MIN_SHARDS_FOR_LOG = 3;
 
   @Override
-  public PTransform<PCollection<InputT>, PDone> getReplacementTransform(Bound<InputT> transform) {
-    return transform.withSharding(new LogElementShardsWithDrift<InputT>());
+  public PTransform<PCollection<InputT>, PDone> getReplacementTransform(
+      Write<InputT> transform) {
+
+      return transform.withSharding(new LogElementShardsWithDrift<InputT>());
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 51f3a87..16b6312 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -117,7 +117,7 @@ public class WriteWithShardingFactoryTest {
 
   @Test
   public void withNoShardingSpecifiedReturnsNewTransform() {
-    Write.Bound<Object> original = Write.to(new TestSink());
+    Write<Object> original = Write.to(new TestSink());
     assertThat(factory.getReplacementTransform(original), not(equalTo((Object) original)));
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
index eaab3d1..a3cceb2 100644
--- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
+++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/FlinkStreamingTransformTranslators.java
@@ -118,7 +118,7 @@ class FlinkStreamingTransformTranslators {
   static {
     TRANSLATORS.put(Read.Bounded.class, new BoundedReadSourceTranslator());
     TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
-    TRANSLATORS.put(Write.Bound.class, new WriteSinkStreamingTranslator());
+    TRANSLATORS.put(Write.class, new WriteSinkStreamingTranslator());
     TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator());
 
     TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
@@ -194,10 +194,10 @@ class FlinkStreamingTransformTranslators {
   }
 
   private static class WriteSinkStreamingTranslator<T>
-      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write.Bound<T>> {
+      extends FlinkStreamingPipelineTranslator.StreamTransformTranslator<Write<T>> {
 
     @Override
-    public void translateNode(Write.Bound<T> transform, FlinkStreamingTranslationContext context) {
+    public void translateNode(Write<T> transform, FlinkStreamingTranslationContext context) {
       String name = transform.getName();
       PValue input = context.getInput(transform);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
index 6986663..572c291 100644
--- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
+++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/WriteSinkITCase.java
@@ -38,7 +38,7 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.test.util.JavaProgramTestBase;
 
 /**
- * Tests the translation of custom Write.Bound sinks.
+ * Tests the translation of custom Write sinks.
  */
 public class WriteSinkITCase extends JavaProgramTestBase {
 

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index 0fe3a89..dbf1958 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -86,7 +86,6 @@ import org.apache.beam.sdk.io.PubsubUnboundedSource;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.Write;
-import org.apache.beam.sdk.io.Write.Bound;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsValidator;
 import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
@@ -367,7 +366,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
               "The DataflowRunner in batch mode does not support Read.Unbounded"));
       ptoverrides
           // Write uses views internally
-          .put(PTransformMatchers.classEqualTo(Write.Bound.class), new BatchWriteFactory(this))
+          .put(PTransformMatchers.classEqualTo(Write.class), new BatchWriteFactory(this))
           .put(
               PTransformMatchers.classEqualTo(View.AsMap.class),
               new ReflectiveOneToOneOverrideFactory(BatchViewOverrides.BatchViewAsMap.class, this))
@@ -772,14 +771,14 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
   }
 
   private class BatchWriteFactory<T>
-      implements PTransformOverrideFactory<PCollection<T>, PDone, Write.Bound<T>> {
+      implements PTransformOverrideFactory<PCollection<T>, PDone, Write<T>> {
     private final DataflowRunner runner;
     private BatchWriteFactory(DataflowRunner dataflowRunner) {
       this.runner = dataflowRunner;
     }
 
     @Override
-    public PTransform<PCollection<T>, PDone> getReplacementTransform(Bound<T> transform) {
+    public PTransform<PCollection<T>, PDone> getReplacementTransform(Write<T> transform) {
       return new BatchWrite<>(runner, transform);
     }
 
@@ -797,17 +796,17 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
 
   /**
    * Specialized implementation which overrides
-   * {@link org.apache.beam.sdk.io.Write.Bound Write.Bound} to provide Google
+   * {@link org.apache.beam.sdk.io.Write Write} to provide Google
    * Cloud Dataflow specific path validation of {@link FileBasedSink}s.
    */
   private static class BatchWrite<T> extends PTransform<PCollection<T>, PDone> {
     private final DataflowRunner runner;
-    private final Write.Bound<T> transform;
+    private final Write<T> transform;
     /**
      * Builds an instance of this class from the overridden transform.
      */
     @SuppressWarnings("unused") // used via reflection in DataflowRunner#apply()
-    public BatchWrite(DataflowRunner runner, Write.Bound<T> transform) {
+    public BatchWrite(DataflowRunner runner, Write<T> transform) {
       this.runner = runner;
       this.transform = transform;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
index 388d9f0..67a4381 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroIO.java
@@ -804,7 +804,7 @@ public class AvroIO {
           throw new IllegalStateException("need to set the schema of an AvroIO.Write transform");
         }
 
-        org.apache.beam.sdk.io.Write.Bound<T> write =
+        org.apache.beam.sdk.io.Write<T> write =
             org.apache.beam.sdk.io.Write.to(
                 new AvroSink<>(
                     filenamePrefix,

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 6e23a28..fe8d0fd 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -662,7 +662,7 @@ public class TextIO {
           throw new IllegalStateException(
               "need to set the filename prefix of a TextIO.Write transform");
         }
-        org.apache.beam.sdk.io.Write.Bound<String> write =
+        org.apache.beam.sdk.io.Write<String> write =
             org.apache.beam.sdk.io.Write.to(
                 new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
                     writableByteChannelFactory));

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index acbbb97..948a65b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -62,7 +62,7 @@ import org.slf4j.LoggerFactory;
  * <p>By default, every bundle in the input {@link PCollection} will be processed by a
  * {@link WriteOperation}, so the number of outputs will vary based on runner behavior, though at
  * least 1 output will always be produced. The exact parallelism of the write stage can be
- * controlled using {@link Write.Bound#withNumShards}, typically used to control how many files are
+ * controlled using {@link Write#withNumShards}, typically used to control how many files are
  * produced or to globally limit the number of workers connecting to an external service. However,
  * this option can often hurt performance: it adds an additional {@link GroupByKey} to the pipeline.
  *
@@ -78,421 +78,412 @@ import org.slf4j.LoggerFactory;
  * <pre>{@code p.apply(Write.to(new MySink(...)).withNumShards(3));}</pre>
  */
 @Experimental(Experimental.Kind.SOURCE_SINK)
-public class Write {
+public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(Write.class);
 
+  private final Sink<T> sink;
+  @Nullable
+  private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+
   /**
    * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
    * control how many different shards are produced.
    */
-  public static <T> Bound<T> to(Sink<T> sink) {
+  public static <T> Write<T> to(Sink<T> sink) {
     checkNotNull(sink, "sink");
-    return new Bound<>(sink, null /* runner-determined sharding */);
+    return new Write<>(sink, null /* runner-determined sharding */);
   }
 
-  /**
-   * A {@link PTransform} that writes to a {@link Sink}. See the class-level Javadoc for more
-   * information.
-   *
-   * @see Write
-   * @see Sink
-   */
-  public static class Bound<T> extends PTransform<PCollection<T>, PDone> {
-    private final Sink<T> sink;
-    @Nullable
-    private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
-
-    private Bound(
-        Sink<T> sink,
-        @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
-      this.sink = sink;
-      this.computeNumShards = computeNumShards;
-    }
+  private Write(
+      Sink<T> sink,
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+    this.sink = sink;
+    this.computeNumShards = computeNumShards;
+  }
 
-    @Override
-    public PDone expand(PCollection<T> input) {
-      checkArgument(
-          IsBounded.BOUNDED == input.isBounded(),
-          "%s can only be applied to a Bounded PCollection",
-          Write.class.getSimpleName());
-      PipelineOptions options = input.getPipeline().getOptions();
-      sink.validate(options);
-      return createWrite(input, sink.createWriteOperation(options));
-    }
+  @Override
+  public PDone expand(PCollection<T> input) {
+    checkArgument(
+        IsBounded.BOUNDED == input.isBounded(),
+        "%s can only be applied to a Bounded PCollection",
+        Write.class.getSimpleName());
+    PipelineOptions options = input.getPipeline().getOptions();
+    sink.validate(options);
+    return createWrite(input, sink.createWriteOperation(options));
+  }
 
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      super.populateDisplayData(builder);
-      builder
-          .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
-          .include("sink", sink);
-      if (getSharding() != null) {
-        builder.include("sharding", getSharding());
-      }
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+    super.populateDisplayData(builder);
+    builder
+        .add(DisplayData.item("sink", sink.getClass()).withLabel("Write Sink"))
+        .include("sink", sink);
+    if (getSharding() != null) {
+      builder.include("sharding", getSharding());
     }
+  }
 
-    /**
-     * Returns the {@link Sink} associated with this PTransform.
-     */
-    public Sink<T> getSink() {
-      return sink;
-    }
+  /**
+   * Returns the {@link Sink} associated with this PTransform.
+   */
+  public Sink<T> getSink() {
+    return sink;
+  }
 
-    /**
-     * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
-     * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
-     * {@link #withSharding(PTransform)}), or runner-determined (by {@link
-     * #withRunnerDeterminedSharding()}.
-     */
-    @Nullable
-    public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
-      return computeNumShards;
-    }
+  /**
+   * Gets the {@link PTransform} that will be used to determine sharding. This can be either a
+   * static number of shards (as following a call to {@link #withNumShards(int)}), dynamic (by
+   * {@link #withSharding(PTransform)}), or runner-determined (by {@link
+   * #withRunnerDeterminedSharding()}.
+   */
+  @Nullable
+  public PTransform<PCollection<T>, PCollectionView<Integer>> getSharding() {
+    return computeNumShards;
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * specified number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     *
-     * <p>A value less than or equal to 0 will be equivalent to the default behavior of
-     * runner-determined sharding.
-     */
-    public Bound<T> withNumShards(int numShards) {
-      if (numShards > 0) {
-        return withNumShards(StaticValueProvider.of(numShards));
-      }
-      return withRunnerDeterminedSharding();
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   *
+   * <p>A value less than or equal to 0 will be equivalent to the default behavior of
+   * runner-determined sharding.
+   */
+  public Write<T> withNumShards(int numShards) {
+    if (numShards > 0) {
+      return withNumShards(StaticValueProvider.of(numShards));
     }
+    return withRunnerDeterminedSharding();
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * {@link ValueProvider} specified number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     */
-    public Bound<T> withNumShards(ValueProvider<Integer> numShards) {
-      return new Bound<>(sink, new ConstantShards<T>(numShards));
-    }
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * {@link ValueProvider} specified number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withNumShards(ValueProvider<Integer> numShards) {
+    return new Write<>(sink, new ConstantShards<T>(numShards));
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} using the
-     * specified {@link PTransform} to compute the number of shards.
-     *
-     * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
-     * more information.
-     */
-    public Bound<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
-      checkNotNull(
-          sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-      return new Bound<>(sink, sharding);
-    }
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} using the
+   * specified {@link PTransform} to compute the number of shards.
+   *
+   * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
+   * more information.
+   */
+  public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
+    checkNotNull(
+        sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
+    return new Write<>(sink, sharding);
+  }
+
+  /**
+   * Returns a new {@link Write} that will write to the current {@link Sink} with
+   * runner-determined sharding.
+   */
+  public Write<T> withRunnerDeterminedSharding() {
+    return new Write<>(sink, null);
+  }
 
-    /**
-     * Returns a new {@link Write.Bound} that will write to the current {@link Sink} with
-     * runner-determined sharding.
-     */
-    public Bound<T> withRunnerDeterminedSharding() {
-      return new Bound<>(sink, null);
+  /**
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link Sink}.
+   */
+  private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
+    // Writer that will write the records in this bundle. Lazily
+    // initialized in processElement.
+    private Writer<T, WriteT> writer = null;
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+
+    WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+      this.writeOperationView = writeOperationView;
     }
 
-    /**
-     * Writes all the elements in a bundle using a {@link Writer} produced by the
-     * {@link WriteOperation} associated with the {@link Sink}.
-     */
-    private class WriteBundles<WriteT> extends DoFn<T, WriteT> {
-      // Writer that will write the records in this bundle. Lazily
-      // initialized in processElement.
-      private Writer<T, WriteT> writer = null;
-      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-      WriteBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-        this.writeOperationView = writeOperationView;
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      // Lazily initialize the Writer
+      if (writer == null) {
+        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+        LOG.info("Opening writer for write operation {}", writeOperation);
+        writer = writeOperation.createWriter(c.getPipelineOptions());
+        writer.open(UUID.randomUUID().toString());
+        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
       }
-
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        // Lazily initialize the Writer
-        if (writer == null) {
-          WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-          LOG.info("Opening writer for write operation {}", writeOperation);
-          writer = writeOperation.createWriter(c.getPipelineOptions());
-          writer.open(UUID.randomUUID().toString());
-          LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
-        }
+      try {
+        writer.write(c.element());
+      } catch (Exception e) {
+        // Discard write result and close the write.
         try {
-          writer.write(c.element());
-        } catch (Exception e) {
-          // Discard write result and close the write.
-          try {
-            writer.close();
-            // The writer does not need to be reset, as this DoFn cannot be reused.
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
+          writer.close();
+          // The writer does not need to be reset, as this DoFn cannot be reused.
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
           }
-          throw e;
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
         }
+        throw e;
       }
+    }
 
-      @FinishBundle
-      public void finishBundle(Context c) throws Exception {
-        if (writer != null) {
-          WriteT result = writer.close();
-          c.output(result);
-          // Reset state in case of reuse.
-          writer = null;
-        }
+    @FinishBundle
+    public void finishBundle(Context c) throws Exception {
+      if (writer != null) {
+        WriteT result = writer.close();
+        c.output(result);
+        // Reset state in case of reuse.
+        writer = null;
       }
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Write.Bound.this);
-      }
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
     }
+  }
 
-    /**
-     * Like {@link WriteBundles}, but where the elements for each shard have been collected into
-     * a single iterable.
-     *
-     * @see WriteBundles
-     */
-    private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
-      private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
-
-      WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
-        this.writeOperationView = writeOperationView;
-      }
+  /**
+   * Like {@link WriteBundles}, but where the elements for each shard have been collected into
+   * a single iterable.
+   *
+   * @see WriteBundles
+   */
+  private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
+    private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
 
-      @ProcessElement
-      public void processElement(ProcessContext c) throws Exception {
-        // In a sharded write, single input element represents one shard. We can open and close
-        // the writer in each call to processElement.
-        WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
-        LOG.info("Opening writer for write operation {}", writeOperation);
-        Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-        writer.open(UUID.randomUUID().toString());
-        LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+      this.writeOperationView = writeOperationView;
+    }
 
+    @ProcessElement
+    public void processElement(ProcessContext c) throws Exception {
+      // In a sharded write, single input element represents one shard. We can open and close
+      // the writer in each call to processElement.
+      WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+      LOG.info("Opening writer for write operation {}", writeOperation);
+      Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+      writer.open(UUID.randomUUID().toString());
+      LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
+
+      try {
+        for (T t : c.element().getValue()) {
+          writer.write(t);
+        }
+      } catch (Exception e) {
         try {
-          for (T t : c.element().getValue()) {
-            writer.write(t);
+          writer.close();
+        } catch (Exception closeException) {
+          if (closeException instanceof InterruptedException) {
+            // Do not silently ignore interrupted state.
+            Thread.currentThread().interrupt();
           }
-        } catch (Exception e) {
-          try {
-            writer.close();
-          } catch (Exception closeException) {
-            if (closeException instanceof InterruptedException) {
-              // Do not silently ignore interrupted state.
-              Thread.currentThread().interrupt();
-            }
-            // Do not mask the exception that caused the write to fail.
-            e.addSuppressed(closeException);
-          }
-          throw e;
+          // Do not mask the exception that caused the write to fail.
+          e.addSuppressed(closeException);
         }
-
-        // Close the writer; if this throws let the error propagate.
-        WriteT result = writer.close();
-        c.output(result);
+        throw e;
       }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        builder.delegate(Write.Bound.this);
-      }
+      // Close the writer; if this throws let the error propagate.
+      WriteT result = writer.close();
+      c.output(result);
     }
 
-    private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-      private final PCollectionView<Integer> numShards;
-      private int shardNumber;
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.delegate(Write.this);
+    }
+  }
 
-      ApplyShardingKey(PCollectionView<Integer> numShards) {
-        this.numShards = numShards;
-        shardNumber = -1;
-      }
+  private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
+    private final PCollectionView<Integer> numShards;
+    private int shardNumber;
 
-      @ProcessElement
-      public void processElement(ProcessContext context) {
-        Integer shardCount = context.sideInput(numShards);
-        checkArgument(
-            shardCount > 0,
-            "Must have a positive number of shards specified for non-runner-determined sharding."
-                + " Got %s",
-            shardCount);
-        if (shardNumber == -1) {
-          // We want to desynchronize the first record sharding key for each instance of
-          // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
-          shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
-        } else {
-          shardNumber = (shardNumber + 1) % shardCount;
-        }
-        context.output(KV.of(shardNumber, context.element()));
-      }
+    ApplyShardingKey(PCollectionView<Integer> numShards) {
+      this.numShards = numShards;
+      shardNumber = -1;
     }
 
-    /**
-     * A write is performed as sequence of three {@link ParDo}'s.
-     *
-     * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
-     * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
-     * called. The output of this ParDo is a singleton PCollection
-     * containing the WriteOperation.
-     *
-     * <p>This singleton collection containing the WriteOperation is then used as a side input to a
-     * ParDo over the PCollection of elements to write. In this bundle-writing phase,
-     * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-     * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
-     * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
-     * every element in the bundle. The output of this ParDo is a PCollection of
-     * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
-     * each bundle.
-     *
-     * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
-     * the collection of writer results as a side-input. In this ParDo,
-     * {@link WriteOperation#finalize} is called to finalize the write.
-     *
-     * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
-     * before the exception that caused the write to fail is propagated and the write result will be
-     * discarded.
-     *
-     * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
-     * deserialized in the bundle-writing and finalization phases, any state change to the
-     * WriteOperation object that occurs during initialization is visible in the latter phases.
-     * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
-     * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
-     * WriteOperation).
-     */
-    private <WriteT> PDone createWrite(
-        PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
-      Pipeline p = input.getPipeline();
-
-      // A coder to use for the WriteOperation.
-      @SuppressWarnings("unchecked")
-      Coder<WriteOperation<T, WriteT>> operationCoder =
-          (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
-
-      // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
-      // the sink.
-      PCollection<WriteOperation<T, WriteT>> operationCollection =
-          p.apply(Create.of(writeOperation).withCoder(operationCoder));
-
-      // Initialize the resource in a do-once ParDo on the WriteOperation.
-      operationCollection = operationCollection
-          .apply("Initialize", ParDo.of(
-              new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Initializing write operation {}", writeOperation);
-              writeOperation.initialize(c.getPipelineOptions());
-              LOG.debug("Done initializing write operation {}", writeOperation);
-              // The WriteOperation is also the output of this ParDo, so it can have mutable
-              // state.
-              c.output(writeOperation);
-            }
-          }))
-          .setCoder(operationCoder);
-
-      // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
-      final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
-          operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
-
-      // Re-window the data into the global window and remove any existing triggers.
-      PCollection<T> inputInGlobalWindow =
-          input.apply(
-              Window.<T>into(new GlobalWindows())
-                  .triggering(DefaultTrigger.of())
-                  .discardingFiredPanes());
-
-      // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
-      // as a side input) and collect the results of the writes in a PCollection.
-      // There is a dependency between this ParDo and the first (the WriteOperation PCollection
-      // as a side input), so this will happen after the initial ParDo.
-      PCollection<WriteT> results;
-      final PCollectionView<Integer> numShards;
-      if (computeNumShards == null) {
-        numShards = null;
-        results =
-            inputInGlobalWindow.apply(
-                "WriteBundles",
-                ParDo.of(new WriteBundles<>(writeOperationView))
-                    .withSideInputs(writeOperationView));
+    @ProcessElement
+    public void processElement(ProcessContext context) {
+      Integer shardCount = context.sideInput(numShards);
+      checkArgument(
+          shardCount > 0,
+          "Must have a positive number of shards specified for non-runner-determined sharding."
+              + " Got %s",
+          shardCount);
+      if (shardNumber == -1) {
+        // We want to desynchronize the first record sharding key for each instance of
+        // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
+        shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
       } else {
-        numShards = inputInGlobalWindow.apply(computeNumShards);
-        results =
-            inputInGlobalWindow
-                .apply(
-                    "ApplyShardLabel",
-                    ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
-                .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-                .apply(
-                    "WriteShardedBundles",
-                    ParDo.of(new WriteShardedBundles<>(writeOperationView))
-                        .withSideInputs(writeOperationView));
-      }
-      results.setCoder(writeOperation.getWriterResultCoder());
-
-      final PCollectionView<Iterable<WriteT>> resultsView =
-          results.apply(View.<WriteT>asIterable());
-
-      // Finalize the write in another do-once ParDo on the singleton collection containing the
-      // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The WriteOperation's state is the same as after its initialization in the first do-once
-      // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-      // collection as a side input), so it will happen after the parallel write.
-      ImmutableList.Builder<PCollectionView<?>> sideInputs =
-          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-      if (numShards != null) {
-        sideInputs.add(numShards);
+        shardNumber = (shardNumber + 1) % shardCount;
       }
-      operationCollection
-          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) throws Exception {
-              WriteOperation<T, WriteT> writeOperation = c.element();
-              LOG.info("Finalizing write operation {}.", writeOperation);
-              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-              // We must always output at least 1 shard, and honor user-specified numShards if set.
-              int minShardsNeeded;
-              if (numShards == null) {
-                minShardsNeeded = 1;
-              } else {
-                minShardsNeeded = c.sideInput(numShards);
-                checkArgument(
-                    minShardsNeeded > 0,
-                    "Must have a positive number of shards for non-runner-determined sharding."
-                        + " Got %s",
-                    minShardsNeeded);
-              }
-              int extraShardsNeeded = minShardsNeeded - results.size();
-              if (extraShardsNeeded > 0) {
-                LOG.info(
-                    "Creating {} empty output shards in addition to {} written for a total of {}.",
-                    extraShardsNeeded, results.size(), minShardsNeeded);
-                for (int i = 0; i < extraShardsNeeded; ++i) {
-                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                  writer.open(UUID.randomUUID().toString());
-                  WriteT emptyWrite = writer.close();
-                  results.add(emptyWrite);
-                }
-                LOG.debug("Done creating extra shards.");
-              }
+      context.output(KV.of(shardNumber, context.element()));
+    }
+  }
 
-              writeOperation.finalize(results, c.getPipelineOptions());
-              LOG.debug("Done finalizing write operation {}", writeOperation);
-            }
-          }).withSideInputs(sideInputs.build()));
-      return PDone.in(input.getPipeline());
+  /**
+   * A write is performed as sequence of three {@link ParDo}'s.
+   *
+   * <p>In the first, a do-once ParDo is applied to a singleton PCollection containing the Sink's
+   * {@link WriteOperation}. In this initialization ParDo, {@link WriteOperation#initialize} is
+   * called. The output of this ParDo is a singleton PCollection
+   * containing the WriteOperation.
+   *
+   * <p>This singleton collection containing the WriteOperation is then used as a side input to a
+   * ParDo over the PCollection of elements to write. In this bundle-writing phase,
+   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
+   * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+   * every element in the bundle. The output of this ParDo is a PCollection of
+   * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
+   * each bundle.
+   *
+   * <p>The final do-once ParDo uses the singleton collection of the WriteOperation as input and
+   * the collection of writer results as a side-input. In this ParDo,
+   * {@link WriteOperation#finalize} is called to finalize the write.
+   *
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close} will be called
+   * before the exception that caused the write to fail is propagated and the write result will be
+   * discarded.
+   *
+   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo and
+   * deserialized in the bundle-writing and finalization phases, any state change to the
+   * WriteOperation object that occurs during initialization is visible in the latter phases.
+   * However, the WriteOperation is not serialized after the bundle-writing phase.  This is why
+   * implementations should guarantee that {@link WriteOperation#createWriter} does not mutate
+   * WriteOperation).
+   */
+  private <WriteT> PDone createWrite(
+      PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
+    Pipeline p = input.getPipeline();
+
+    // A coder to use for the WriteOperation.
+    @SuppressWarnings("unchecked")
+    Coder<WriteOperation<T, WriteT>> operationCoder =
+        (Coder<WriteOperation<T, WriteT>>) SerializableCoder.of(writeOperation.getClass());
+
+    // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
+    // the sink.
+    PCollection<WriteOperation<T, WriteT>> operationCollection =
+        p.apply(Create.of(writeOperation).withCoder(operationCoder));
+
+    // Initialize the resource in a do-once ParDo on the WriteOperation.
+    operationCollection = operationCollection
+        .apply("Initialize", ParDo.of(
+            new DoFn<WriteOperation<T, WriteT>, WriteOperation<T, WriteT>>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            WriteOperation<T, WriteT> writeOperation = c.element();
+            LOG.info("Initializing write operation {}", writeOperation);
+            writeOperation.initialize(c.getPipelineOptions());
+            LOG.debug("Done initializing write operation {}", writeOperation);
+            // The WriteOperation is also the output of this ParDo, so it can have mutable
+            // state.
+            c.output(writeOperation);
+          }
+        }))
+        .setCoder(operationCoder);
+
+    // Create a view of the WriteOperation to be used as a sideInput to the parallel write phase.
+    final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
+        operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
+
+    // Re-window the data into the global window and remove any existing triggers.
+    PCollection<T> inputInGlobalWindow =
+        input.apply(
+            Window.<T>into(new GlobalWindows())
+                .triggering(DefaultTrigger.of())
+                .discardingFiredPanes());
+
+    // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
+    // as a side input) and collect the results of the writes in a PCollection.
+    // There is a dependency between this ParDo and the first (the WriteOperation PCollection
+    // as a side input), so this will happen after the initial ParDo.
+    PCollection<WriteT> results;
+    final PCollectionView<Integer> numShards;
+    if (computeNumShards == null) {
+      numShards = null;
+      results =
+          inputInGlobalWindow.apply(
+              "WriteBundles",
+              ParDo.of(new WriteBundles<>(writeOperationView))
+                  .withSideInputs(writeOperationView));
+    } else {
+      numShards = inputInGlobalWindow.apply(computeNumShards);
+      results =
+          inputInGlobalWindow
+              .apply(
+                  "ApplyShardLabel",
+                  ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
+              .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+              .apply(
+                  "WriteShardedBundles",
+                  ParDo.of(new WriteShardedBundles<>(writeOperationView))
+                      .withSideInputs(writeOperationView));
     }
+    results.setCoder(writeOperation.getWriterResultCoder());
+
+    final PCollectionView<Iterable<WriteT>> resultsView =
+        results.apply(View.<WriteT>asIterable());
+
+    // Finalize the write in another do-once ParDo on the singleton collection containing the
+    // Writer. The results from the per-bundle writes are given as an Iterable side input.
+    // The WriteOperation's state is the same as after its initialization in the first do-once
+    // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
+    // collection as a side input), so it will happen after the parallel write.
+    ImmutableList.Builder<PCollectionView<?>> sideInputs =
+        ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+    if (numShards != null) {
+      sideInputs.add(numShards);
+    }
+    operationCollection
+        .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+          @ProcessElement
+          public void processElement(ProcessContext c) throws Exception {
+            WriteOperation<T, WriteT> writeOperation = c.element();
+            LOG.info("Finalizing write operation {}.", writeOperation);
+            List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+            LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+            // We must always output at least 1 shard, and honor user-specified numShards if set.
+            int minShardsNeeded;
+            if (numShards == null) {
+              minShardsNeeded = 1;
+            } else {
+              minShardsNeeded = c.sideInput(numShards);
+              checkArgument(
+                  minShardsNeeded > 0,
+                  "Must have a positive number of shards for non-runner-determined sharding."
+                      + " Got %s",
+                  minShardsNeeded);
+            }
+            int extraShardsNeeded = minShardsNeeded - results.size();
+            if (extraShardsNeeded > 0) {
+              LOG.info(
+                  "Creating {} empty output shards in addition to {} written for a total of {}.",
+                  extraShardsNeeded, results.size(), minShardsNeeded);
+              for (int i = 0; i < extraShardsNeeded; ++i) {
+                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                writer.open(UUID.randomUUID().toString());
+                WriteT emptyWrite = writer.close();
+                results.add(emptyWrite);
+              }
+              LOG.debug("Done creating extra shards.");
+            }
+
+            writeOperation.finalize(results, c.getPipelineOptions());
+            LOG.debug("Done finalizing write operation {}", writeOperation);
+          }
+        }).withSideInputs(sideInputs.build()));
+    return PDone.in(input.getPipeline());
   }
 
   @VisibleForTesting

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index fd349e2..80f6f66 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -210,7 +210,7 @@ public class WriteTest {
     }
 
     TestSink sink = new TestSink();
-    Write.Bound<String> write = Write.to(sink).withSharding(new LargestInt());
+    Write<String> write = Write.to(sink).withSharding(new LargestInt());
     p.apply(Create.timestamped(inputs, timestamps).withCoder(StringUtf8Coder.of()))
         .apply(IDENTITY_MAP)
         .apply(write);
@@ -307,7 +307,7 @@ public class WriteTest {
   @Test
   public void testBuildWrite() {
     Sink<String> sink = new TestSink() {};
-    Write.Bound<String> write = Write.to(sink).withNumShards(3);
+    Write<String> write = Write.to(sink).withNumShards(3);
     assertThat(write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
@@ -315,12 +315,12 @@ public class WriteTest {
     assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
-    Write.Bound<String> write2 = write.withSharding(SHARDING_TRANSFORM);
+    Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
     assertThat(write2.getSink(), is(sink));
     assertThat(write2.getSharding(), equalTo(SHARDING_TRANSFORM));
     // original unchanged
 
-    Write.Bound<String> writeUnsharded = write2.withRunnerDeterminedSharding();
+    Write<String> writeUnsharded = write2.withRunnerDeterminedSharding();
     assertThat(writeUnsharded.getSharding(), nullValue());
     assertThat(write.getSharding(), equalTo(originalSharding));
   }
@@ -333,7 +333,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write = Write.to(sink);
+    Write<String> write = Write.to(sink);
     DisplayData displayData = DisplayData.from(write);
 
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
@@ -348,7 +348,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write = Write.to(sink).withNumShards(1);
+    Write<String> write = Write.to(sink).withNumShards(1);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
@@ -363,7 +363,7 @@ public class WriteTest {
         builder.add(DisplayData.item("foo", "bar"));
       }
     };
-    Write.Bound<String> write =
+    Write<String> write =
         Write.to(sink)
             .withSharding(
                 new PTransform<PCollection<String>, PCollectionView<Integer>>() {
@@ -433,7 +433,7 @@ public class WriteTest {
     }
 
     TestSink sink = new TestSink();
-    Write.Bound<String> write = Write.to(sink);
+    Write<String> write = Write.to(sink);
     if (numConfiguredShards.isPresent()) {
       write = write.withNumShards(numConfiguredShards.get());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/b87621e9/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 1f9beb3..53bc114 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -139,7 +139,7 @@ public class TransformTreeTest {
           assertTrue(visited.add(TransformsSeen.COMBINE_GLOBALLY));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
-        } else if (transform instanceof Write.Bound) {
+        } else if (transform instanceof Write) {
           assertTrue(visited.add(TransformsSeen.WRITE));
           assertNotNull(node.getEnclosingNode());
           assertTrue(node.isCompositeNode());
@@ -161,7 +161,7 @@ public class TransformTreeTest {
         PTransform<?, ?> transform = node.getTransform();
         // Pick is a composite, should not be visited here.
         assertThat(transform, not(instanceOf(Combine.Globally.class)));
-        assertThat(transform, not(instanceOf(Write.Bound.class)));
+        assertThat(transform, not(instanceOf(Write.class)));
         if (transform instanceof Read.Bounded
             && node.getEnclosingNode().getTransform() instanceof TextIO.Read.Bound) {
           assertTrue(visited.add(TransformsSeen.READ));


Mime
View raw message