beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/3] beam git commit: Add windowing support to FileBasedSink
Date Wed, 05 Apr 2017 17:42:57 GMT
Repository: beam
Updated Branches:
  refs/heads/master 8e5cfdea9 -> bc907c58b


http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
index 948a65b..16f3eb6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/Write.java
@@ -20,7 +20,6 @@ package org.apache.beam.sdk.io;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 import java.util.List;
@@ -30,7 +29,9 @@ import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.SerializableCoder;
+import org.apache.beam.sdk.coders.VoidCoder;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -42,7 +43,9 @@ import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.WithKeys;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.DefaultTrigger;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -81,9 +84,19 @@ import org.slf4j.LoggerFactory;
 public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private static final Logger LOG = LoggerFactory.getLogger(Write.class);
 
+  private static final int UNKNOWN_SHARDNUM = -1;
+  private static final int UNKNOWN_NUMSHARDS = -1;
+
   private final Sink<T> sink;
+  // This allows the number of shards to be dynamically computed based on the input
+  // PCollection.
   @Nullable
   private final PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards;
+  // We don't use a side input for static sharding, as we want this value to be updatable
+  // when a pipeline is updated.
+  @Nullable
+  private final ValueProvider<Integer> numShardsProvider;
+  private boolean windowedWrites;
 
   /**
    * Creates a {@link Write} transform that writes to the given {@link Sink}, letting the runner
@@ -91,21 +104,24 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    */
   public static <T> Write<T> to(Sink<T> sink) {
     checkNotNull(sink, "sink");
-    return new Write<>(sink, null /* runner-determined sharding */);
+    return new Write<>(sink, null /* runner-determined sharding */, null, false);
   }
 
   private Write(
       Sink<T> sink,
-      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards) {
+      @Nullable PTransform<PCollection<T>, PCollectionView<Integer>> computeNumShards,
+      @Nullable ValueProvider<Integer> numShardsProvider,
+      boolean windowedWrites) {
     this.sink = sink;
     this.computeNumShards = computeNumShards;
+    this.numShardsProvider = numShardsProvider;
+    this.windowedWrites = windowedWrites;
   }
 
   @Override
   public PDone expand(PCollection<T> input) {
-    checkArgument(
-        IsBounded.BOUNDED == input.isBounded(),
-        "%s can only be applied to a Bounded PCollection",
+    checkArgument(IsBounded.BOUNDED == input.isBounded() || windowedWrites,
+        "%s can only be applied to an unbounded PCollection if doing windowed writes",
         Write.class.getSimpleName());
     PipelineOptions options = input.getPipeline().getOptions();
     sink.validate(options);
@@ -120,6 +136,11 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
         .include("sink", sink);
     if (getSharding() != null) {
       builder.include("sharding", getSharding());
+    } else if (getNumShards() != null) {
+      String numShards = getNumShards().isAccessible()
+          ? getNumShards().get().toString() : getNumShards().toString();
+      builder.add(DisplayData.item("numShards", numShards)
+          .withLabel("Fixed Number of Shards"));
     }
   }
 
@@ -141,6 +162,10 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     return computeNumShards;
   }
 
+  public ValueProvider<Integer> getNumShards() {
+    return numShardsProvider;
+  }
+
   /**
    * Returns a new {@link Write} that will write to the current {@link Sink} using the
    * specified number of shards.
@@ -165,8 +190,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This option should be used sparingly as it can hurt performance. See {@link Write} for
    * more information.
    */
-  public Write<T> withNumShards(ValueProvider<Integer> numShards) {
-    return new Write<>(sink, new ConstantShards<T>(numShards));
+  public Write<T> withNumShards(ValueProvider<Integer> numShardsProvider) {
+    return new Write<>(sink, null, numShardsProvider, windowedWrites);
   }
 
   /**
@@ -179,7 +204,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   public Write<T> withSharding(PTransform<PCollection<T>, PCollectionView<Integer>> sharding) {
     checkNotNull(
         sharding, "Cannot provide null sharding. Use withRunnerDeterminedSharding() instead");
-    return new Write<>(sink, sharding);
+    return new Write<>(sink, sharding, null, windowedWrites);
   }
 
   /**
@@ -187,7 +212,25 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * runner-determined sharding.
    */
   public Write<T> withRunnerDeterminedSharding() {
-    return new Write<>(sink, null);
+    return new Write<>(sink, null, null, windowedWrites);
+  }
+
+  /**
+   * Returns a new {@link Write} that writes preserves windowing on it's input.
+   *
+   * <p>If this option is not specified, windowing and triggering are replaced by
+   * {@link GlobalWindows} and {@link DefaultTrigger}.
+   *
+   * <p>If there is no data for a window, no output shards will be generated for that window.
+   * If a window triggers multiple times, then more than a single output shard might be
+   * generated multiple times; it's up to the sink implementation to keep these output shards
+   * unique.
+   *
+   * <p>This option can only be used if {@link #withNumShards(int)} is also set to a
+   * positive value.
+   */
+  public Write<T> withWindowedWrites() {
+    return new Write<>(sink, computeNumShards, numShardsProvider, true);
   }
 
   /**
@@ -205,13 +248,19 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       // Lazily initialize the Writer
       if (writer == null) {
         WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
         LOG.info("Opening writer for write operation {}", writeOperation);
         writer = writeOperation.createWriter(c.getPipelineOptions());
-        writer.open(UUID.randomUUID().toString());
+
+        if (windowedWrites) {
+          writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), UNKNOWN_SHARDNUM,
+              UNKNOWN_NUMSHARDS);
+        } else {
+          writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+        }
         LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
       }
       try {
@@ -257,42 +306,57 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    */
   private class WriteShardedBundles<WriteT> extends DoFn<KV<Integer, Iterable<T>>, WriteT> {
     private final PCollectionView<WriteOperation<T, WriteT>> writeOperationView;
+    private final PCollectionView<Integer> numShardsView;
 
-    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView) {
+    WriteShardedBundles(PCollectionView<WriteOperation<T, WriteT>> writeOperationView,
+                        PCollectionView<Integer> numShardsView) {
       this.writeOperationView = writeOperationView;
+      this.numShardsView = numShardsView;
     }
 
     @ProcessElement
-    public void processElement(ProcessContext c) throws Exception {
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
+      int numShards = numShardsView != null ? c.sideInput(numShardsView) : getNumShards().get();
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
       LOG.info("Opening writer for write operation {}", writeOperation);
       Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-      writer.open(UUID.randomUUID().toString());
+      if (windowedWrites) {
+        writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
+            numShards);
+      } else {
+        writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM, UNKNOWN_NUMSHARDS);
+      }
       LOG.debug("Done opening writer {} for operation {}", writer, writeOperationView);
 
       try {
-        for (T t : c.element().getValue()) {
-          writer.write(t);
-        }
-      } catch (Exception e) {
         try {
-          writer.close();
-        } catch (Exception closeException) {
-          if (closeException instanceof InterruptedException) {
-            // Do not silently ignore interrupted state.
-            Thread.currentThread().interrupt();
+          for (T t : c.element().getValue()) {
+            writer.write(t);
           }
-          // Do not mask the exception that caused the write to fail.
-          e.addSuppressed(closeException);
+        } catch (Exception e) {
+          try {
+            writer.close();
+          } catch (Exception closeException) {
+            if (closeException instanceof InterruptedException) {
+              // Do not silently ignore interrupted state.
+              Thread.currentThread().interrupt();
+            }
+            // Do not mask the exception that caused the write to fail.
+            e.addSuppressed(closeException);
+          }
+          throw e;
         }
+
+        // Close the writer; if this throws let the error propagate.
+        WriteT result = writer.close();
+        c.output(result);
+      } catch (Exception e) {
+        // If anything goes wrong, make sure to delete the temporary file.
+        writer.cleanup();
         throw e;
       }
-
-      // Close the writer; if this throws let the error propagate.
-      WriteT result = writer.close();
-      c.output(result);
     }
 
     @Override
@@ -302,23 +366,32 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   }
 
   private static class ApplyShardingKey<T> extends DoFn<T, KV<Integer, T>> {
-    private final PCollectionView<Integer> numShards;
+    private final PCollectionView<Integer> numShardsView;
+    private final ValueProvider<Integer> numShardsProvider;
     private int shardNumber;
 
-    ApplyShardingKey(PCollectionView<Integer> numShards) {
-      this.numShards = numShards;
-      shardNumber = -1;
+    ApplyShardingKey(PCollectionView<Integer> numShardsView,
+                     ValueProvider<Integer> numShardsProvider) {
+      this.numShardsView = numShardsView;
+      this.numShardsProvider = numShardsProvider;
+      shardNumber = UNKNOWN_SHARDNUM;
     }
 
     @ProcessElement
     public void processElement(ProcessContext context) {
-      Integer shardCount = context.sideInput(numShards);
+      int shardCount = 0;
+      if (numShardsView != null) {
+        shardCount = context.sideInput(numShardsView);
+      } else {
+        checkNotNull(numShardsProvider);
+        shardCount = numShardsProvider.get();
+      }
       checkArgument(
           shardCount > 0,
           "Must have a positive number of shards specified for non-runner-determined sharding."
               + " Got %s",
           shardCount);
-      if (shardNumber == -1) {
+      if (shardNumber == UNKNOWN_SHARDNUM) {
         // We want to desynchronize the first record sharding key for each instance of
         // ApplyShardingKey, so records in a small PCollection will be statistically balanced.
         shardNumber = ThreadLocalRandom.current().nextInt(shardCount);
@@ -340,8 +413,8 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
    * <p>This singleton collection containing the WriteOperation is then used as a side input to a
    * ParDo over the PCollection of elements to write. In this bundle-writing phase,
    * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
-   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn.StartBundle} and
-   * {@link DoFn.FinishBundle}, respectively, and {@link Writer#write} method is called for
+   * {@link Writer#open} and {@link Writer#close} are called in {@link DoFn#startBundle} and
+   * {@link DoFn#finishBundle}, respectively, and {@link Writer#write} method is called for
    * every element in the bundle. The output of this ParDo is a PCollection of
    * <i>writer result</i> objects (see {@link Sink} for a description of writer results)-one for
    * each bundle.
@@ -364,6 +437,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
   private <WriteT> PDone createWrite(
       PCollection<T> input, WriteOperation<T, WriteT> writeOperation) {
     Pipeline p = input.getPipeline();
+    writeOperation.setWindowedWrites(windowedWrites);
 
     // A coder to use for the WriteOperation.
     @SuppressWarnings("unchecked")
@@ -373,7 +447,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     // A singleton collection of the WriteOperation, to be used as input to a ParDo to initialize
     // the sink.
     PCollection<WriteOperation<T, WriteT>> operationCollection =
-        p.apply(Create.of(writeOperation).withCoder(operationCoder));
+        p.apply("CreateOperationCollection", Create.of(writeOperation).withCoder(operationCoder));
 
     // Initialize the resource in a do-once ParDo on the WriteOperation.
     operationCollection = operationCollection
@@ -384,6 +458,7 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
             WriteOperation<T, WriteT> writeOperation = c.element();
             LOG.info("Initializing write operation {}", writeOperation);
             writeOperation.initialize(c.getPipelineOptions());
+            writeOperation.setWindowedWrites(windowedWrites);
             LOG.debug("Done initializing write operation {}", writeOperation);
             // The WriteOperation is also the output of this ParDo, so it can have mutable
             // state.
@@ -396,133 +471,133 @@ public class Write<T> extends PTransform<PCollection<T>, PDone> {
     final PCollectionView<WriteOperation<T, WriteT>> writeOperationView =
         operationCollection.apply(View.<WriteOperation<T, WriteT>>asSingleton());
 
-    // Re-window the data into the global window and remove any existing triggers.
-    PCollection<T> inputInGlobalWindow =
-        input.apply(
-            Window.<T>into(new GlobalWindows())
-                .triggering(DefaultTrigger.of())
-                .discardingFiredPanes());
+    if (!windowedWrites) {
+      // Re-window the data into the global window and remove any existing triggers.
+      input =
+          input.apply(
+              Window.<T>into(new GlobalWindows())
+                  .triggering(DefaultTrigger.of())
+                  .discardingFiredPanes());
+    }
+
 
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the WriteOperation
     // as a side input) and collect the results of the writes in a PCollection.
     // There is a dependency between this ParDo and the first (the WriteOperation PCollection
     // as a side input), so this will happen after the initial ParDo.
     PCollection<WriteT> results;
-    final PCollectionView<Integer> numShards;
-    if (computeNumShards == null) {
-      numShards = null;
-      results =
-          inputInGlobalWindow.apply(
-              "WriteBundles",
+    final PCollectionView<Integer> numShardsView;
+    if (computeNumShards == null && numShardsProvider == null) {
+      if (windowedWrites) {
+        throw new IllegalStateException("When doing windowed writes, numShards must be set"
+            + "explicitly to a positive value");
+      }
+      numShardsView = null;
+      results = input
+          .apply("WriteBundles",
               ParDo.of(new WriteBundles<>(writeOperationView))
                   .withSideInputs(writeOperationView));
     } else {
-      numShards = inputInGlobalWindow.apply(computeNumShards);
-      results =
-          inputInGlobalWindow
-              .apply(
-                  "ApplyShardLabel",
-                  ParDo.of(new ApplyShardingKey<T>(numShards)).withSideInputs(numShards))
-              .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
-              .apply(
-                  "WriteShardedBundles",
-                  ParDo.of(new WriteShardedBundles<>(writeOperationView))
-                      .withSideInputs(writeOperationView));
+      if (computeNumShards != null) {
+        numShardsView = input.apply(computeNumShards);
+        results  = input
+            .apply("ApplyShardLabel", ParDo.of(
+                new ApplyShardingKey<T>(numShardsView, null)).withSideInputs(numShardsView))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, numShardsView))
+                    .withSideInputs(numShardsView, writeOperationView));
+      } else {
+        numShardsView = null;
+        results = input
+            .apply("ApplyShardLabel", ParDo.of(new ApplyShardingKey<T>(null, numShardsProvider)))
+            .apply("GroupIntoShards", GroupByKey.<Integer, T>create())
+            .apply("WriteShardedBundles",
+                ParDo.of(new WriteShardedBundles<>(writeOperationView, null))
+                    .withSideInputs(writeOperationView));
+      }
     }
     results.setCoder(writeOperation.getWriterResultCoder());
 
-    final PCollectionView<Iterable<WriteT>> resultsView =
-        results.apply(View.<WriteT>asIterable());
-
-    // Finalize the write in another do-once ParDo on the singleton collection containing the
-    // Writer. The results from the per-bundle writes are given as an Iterable side input.
-    // The WriteOperation's state is the same as after its initialization in the first do-once
-    // ParDo. There is a dependency between this ParDo and the parallel write (the writer results
-    // collection as a side input), so it will happen after the parallel write.
-    ImmutableList.Builder<PCollectionView<?>> sideInputs =
-        ImmutableList.<PCollectionView<?>>builder().add(resultsView);
-    if (numShards != null) {
-      sideInputs.add(numShards);
-    }
-    operationCollection
-        .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) throws Exception {
-            WriteOperation<T, WriteT> writeOperation = c.element();
-            LOG.info("Finalizing write operation {}.", writeOperation);
-            List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
-            LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
-
-            // We must always output at least 1 shard, and honor user-specified numShards if set.
-            int minShardsNeeded;
-            if (numShards == null) {
-              minShardsNeeded = 1;
-            } else {
-              minShardsNeeded = c.sideInput(numShards);
-              checkArgument(
-                  minShardsNeeded > 0,
-                  "Must have a positive number of shards for non-runner-determined sharding."
-                      + " Got %s",
-                  minShardsNeeded);
+    if (windowedWrites) {
+      // When processing streaming windowed writes, results will arrive multiple times. This
+      // means we can't share the below implementation that turns the results into a side input,
+      // as new data arriving into a side input does not trigger the listening DoFn. Instead
+      // we aggregate the result set using a singleton GroupByKey, so the DoFn will be triggered
+      // whenever new data arrives.
+      PCollection<KV<Void, WriteT>> keyedResults =
+          results.apply("AttachSingletonKey", WithKeys.<Void, WriteT>of((Void) null));
+      keyedResults.setCoder(KvCoder.<Void, WriteT>of(VoidCoder.of(), writeOperation
+          .getWriterResultCoder()));
+
+      // Is the continuation trigger sufficient?
+      keyedResults
+          .apply("FinalizeGroupByKey", GroupByKey.<Void, WriteT>create())
+          .apply("Finalize", ParDo.of(new DoFn<KV<Void, Iterable<WriteT>>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.sideInput(writeOperationView);
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.element().getValue());
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
             }
-            int extraShardsNeeded = minShardsNeeded - results.size();
-            if (extraShardsNeeded > 0) {
-              LOG.info(
-                  "Creating {} empty output shards in addition to {} written for a total of {}.",
-                  extraShardsNeeded, results.size(), minShardsNeeded);
-              for (int i = 0; i < extraShardsNeeded; ++i) {
-                Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
-                writer.open(UUID.randomUUID().toString());
-                WriteT emptyWrite = writer.close();
-                results.add(emptyWrite);
+          }).withSideInputs(writeOperationView));
+    } else {
+      final PCollectionView<Iterable<WriteT>> resultsView =
+          results.apply(View.<WriteT>asIterable());
+      ImmutableList.Builder<PCollectionView<?>> sideInputs =
+          ImmutableList.<PCollectionView<?>>builder().add(resultsView);
+      if (numShardsView != null) {
+        sideInputs.add(numShardsView);
+      }
+
+      // Finalize the write in another do-once ParDo on the singleton collection containing the
+      // Writer. The results from the per-bundle writes are given as an Iterable side input.
+      // The WriteOperation's state is the same as after its initialization in the first do-once
+      // ParDo. There is a dependency between this ParDo and the parallel write (the writer
+      // results collection as a side input), so it will happen after the parallel write.
+      // For the non-windowed case, we guarantee that  if no data is written but the user has
+      // set numShards, then all shards will be written out as empty files. For this reason we
+      // use a side input here.
+      operationCollection
+          .apply("Finalize", ParDo.of(new DoFn<WriteOperation<T, WriteT>, Integer>() {
+            @ProcessElement
+            public void processElement(ProcessContext c) throws Exception {
+              WriteOperation<T, WriteT> writeOperation = c.element();
+              LOG.info("Finalizing write operation {}.", writeOperation);
+              List<WriteT> results = Lists.newArrayList(c.sideInput(resultsView));
+              LOG.debug("Side input initialized to finalize write operation {}.", writeOperation);
+
+              // We must always output at least 1 shard, and honor user-specified numShards if
+              // set.
+              int minShardsNeeded;
+              if (numShardsView != null) {
+                minShardsNeeded = c.sideInput(numShardsView);
+              } else if (numShardsProvider != null) {
+                minShardsNeeded = numShardsProvider.get();
+              } else {
+                minShardsNeeded = 1;
               }
-              LOG.debug("Done creating extra shards.");
+              int extraShardsNeeded = minShardsNeeded - results.size();
+              if (extraShardsNeeded > 0) {
+                LOG.info(
+                    "Creating {} empty output shards in addition to {} written for a total of "
+                        + " {}.", extraShardsNeeded, results.size(), minShardsNeeded);
+                for (int i = 0; i < extraShardsNeeded; ++i) {
+                  Writer<T, WriteT> writer = writeOperation.createWriter(c.getPipelineOptions());
+                  writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
+                      UNKNOWN_NUMSHARDS);
+                  WriteT emptyWrite = writer.close();
+                  results.add(emptyWrite);
+                }
+                LOG.debug("Done creating extra shards.");
+              }
+              writeOperation.finalize(results, c.getPipelineOptions());
+              LOG.debug("Done finalizing write operation {}", writeOperation);
             }
-
-            writeOperation.finalize(results, c.getPipelineOptions());
-            LOG.debug("Done finalizing write operation {}", writeOperation);
-          }
-        }).withSideInputs(sideInputs.build()));
-    return PDone.in(input.getPipeline());
-  }
-
-  @VisibleForTesting
-  static class ConstantShards<T>
-      extends PTransform<PCollection<T>, PCollectionView<Integer>> {
-    private final ValueProvider<Integer> numShards;
-
-    private ConstantShards(ValueProvider<Integer> numShards) {
-      this.numShards = numShards;
-    }
-
-    @Override
-    public PCollectionView<Integer> expand(PCollection<T> input) {
-      return input
-          .getPipeline()
-          .apply(Create.of(0))
-          .apply(
-              "FixedNumShards",
-              ParDo.of(
-                  new DoFn<Integer, Integer>() {
-                    @ProcessElement
-                    public void outputNumShards(ProcessContext ctxt) {
-                      checkArgument(
-                          numShards.isAccessible(),
-                          "NumShards must be accessible at runtime to use constant sharding");
-                      ctxt.output(numShards.get());
-                    }
-                  }))
-          .apply(View.<Integer>asSingleton());
-    }
-
-    @Override
-    public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(
-          DisplayData.item("Fixed Number of Shards", numShards).withLabel("ConstantShards"));
-    }
-
-    public ValueProvider<Integer> getNumShards() {
-      return numShards;
+          }).withSideInputs(sideInputs.build()));
     }
+    return PDone.in(input.getPipeline());
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
index 6937e93..2159c8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/XmlSink.java
@@ -176,7 +176,7 @@ public class XmlSink {
      * <p>The specified class must be able to be used to create a JAXB context.
      */
     public <T> Bound<T> ofRecordClass(Class<T> classToBind) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
     }
 
     /**
@@ -194,7 +194,7 @@ public class XmlSink {
      * supplied name.
      */
     public Bound<T> withRootElement(String rootElementName) {
-      return new Bound<>(classToBind, rootElementName, baseOutputFilename.get());
+      return new Bound<>(classToBind, rootElementName, getBaseOutputFilenameProvider().get());
     }
 
     /**
@@ -205,7 +205,7 @@ public class XmlSink {
     public void validate(PipelineOptions options) {
       checkNotNull(classToBind, "Missing a class to bind to a JAXB context.");
       checkNotNull(rootElementName, "Missing a root element name.");
-      checkNotNull(baseOutputFilename, "Missing a filename to write to.");
+      checkNotNull(getBaseOutputFilenameProvider().get(), "Missing a filename to write to.");
       try {
         JAXBContext.newInstance(classToBind);
       } catch (JAXBException e) {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
index 0739381..e1ad47b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/TestPipelineOptions.java
@@ -17,7 +17,10 @@
  */
 package org.apache.beam.sdk.testing;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
+
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.PipelineResult;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.DefaultValueFactory;
@@ -35,10 +38,12 @@ public interface TestPipelineOptions extends PipelineOptions {
   void setTempRoot(String value);
 
   @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  @JsonIgnore
   SerializableMatcher<PipelineResult> getOnCreateMatcher();
   void setOnCreateMatcher(SerializableMatcher<PipelineResult> value);
 
   @Default.InstanceFactory(AlwaysPassMatcherFactory.class)
+  @JsonIgnore
   SerializableMatcher<PipelineResult> getOnSuccessMatcher();
   void setOnSuccessMatcher(SerializableMatcher<PipelineResult> value);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
index dd81a34..6f6ba37 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/FileIOChannelFactory.java
@@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
 import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
 import java.io.BufferedOutputStream;
 import java.io.File;
 import java.io.FileInputStream;
@@ -45,6 +46,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.regex.Matcher;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -175,17 +177,20 @@ public class FileIOChannelFactory implements IOChannelFactory {
   }
 
   @Override
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws
+      IOException {
+    List<String> srcList = Lists.newArrayList(srcFilenames);
+    List<String> destList = Lists.newArrayList(destFilenames);
     checkArgument(
-        srcFilenames.size() == destFilenames.size(),
+        srcList.size() == destList.size(),
         "Number of source files %s must equal number of destination files %s",
-        srcFilenames.size(),
-        destFilenames.size());
-    int numFiles = srcFilenames.size();
+        srcList.size(),
+        destList.size());
+    int numFiles = srcList.size();
     for (int i = 0; i < numFiles; i++) {
-      String src = srcFilenames.get(i);
-      String dst = destFilenames.get(i);
-      LOG.debug("Copying {} to {}", src, dst);
+      String src = srcList.get(i);
+      String dst = destList.get(i);
+      LOG.info("Copying {} to {}", src, dst);
       try {
         // Copy the source file, replacing the existing destination.
         // Paths.get(x) will not work on Windows OSes cause of the ":" after the drive letter.
@@ -194,7 +199,7 @@ public class FileIOChannelFactory implements IOChannelFactory {
             new File(dst).toPath(),
             StandardCopyOption.REPLACE_EXISTING);
       } catch (NoSuchFileException e) {
-        LOG.debug("{} does not exist.", src);
+        LOG.info("{} does not exist.", src);
         // Suppress exception if file does not exist.
       }
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
index 9f99cd6..745dcb9 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsIOChannelFactory.java
@@ -99,7 +99,8 @@ public class GcsIOChannelFactory implements IOChannelFactory {
   }
 
   @Override
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames)
+      throws IOException {
     options.getGcsUtil().copy(srcFilenames, destFilenames);
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
index 14781c4..1c853bb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/GcsUtil.java
@@ -68,6 +68,7 @@ import java.util.concurrent.TimeUnit;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 import javax.annotation.Nullable;
+
 import org.apache.beam.sdk.options.DefaultValueFactory;
 import org.apache.beam.sdk.options.GcsOptions;
 import org.apache.beam.sdk.options.PipelineOptions;
@@ -635,23 +636,27 @@ public class GcsUtil {
     return batches;
   }
 
-  public void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException {
+  public void copy(Iterable<String> srcFilenames,
+                   Iterable<String> destFilenames) throws
+      IOException {
     executeBatches(makeCopyBatches(srcFilenames, destFilenames));
   }
 
-  List<BatchRequest> makeCopyBatches(List<String> srcFilenames, List<String> destFilenames)
+  List<BatchRequest> makeCopyBatches(Iterable<String> srcFilenames, Iterable<String> destFilenames)
       throws IOException {
+    List<String> srcList = Lists.newArrayList(srcFilenames);
+    List<String> destList = Lists.newArrayList(destFilenames);
     checkArgument(
-        srcFilenames.size() == destFilenames.size(),
+        srcList.size() == destList.size(),
         "Number of source files %s must equal number of destination files %s",
-        srcFilenames.size(),
-        destFilenames.size());
+        srcList.size(),
+        destList.size());
 
     List<BatchRequest> batches = new LinkedList<>();
     BatchRequest batch = createBatchRequest();
-    for (int i = 0; i < srcFilenames.size(); i++) {
-      final GcsPath sourcePath = GcsPath.fromUri(srcFilenames.get(i));
-      final GcsPath destPath = GcsPath.fromUri(destFilenames.get(i));
+    for (int i = 0; i < srcList.size(); i++) {
+      final GcsPath sourcePath = GcsPath.fromUri(srcList.get(i));
+      final GcsPath destPath = GcsPath.fromUri(destList.get(i));
       enqueueCopy(sourcePath, destPath, batch);
       if (batch.size() >= MAX_REQUESTS_PER_BATCH) {
         batches.add(batch);

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
index 9504f45..3a3af17 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IOChannelFactory.java
@@ -23,7 +23,6 @@ import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.WritableByteChannel;
 import java.nio.file.Path;
 import java.util.Collection;
-import java.util.List;
 
 /**
  * Defines a factory for working with read and write channels.
@@ -116,7 +115,7 @@ public interface IOChannelFactory {
    * @param srcFilenames the source filenames.
    * @param destFilenames the destination filenames.
    */
-  void copy(List<String> srcFilenames, List<String> destFilenames) throws IOException;
+  void copy(Iterable<String> srcFilenames, Iterable<String> destFilenames) throws IOException;
 
   /**
    * Removes a collection of files or directories.

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 19f5ffa..f3dbb05 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -31,13 +31,18 @@ import com.google.common.base.MoreObjects;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterators;
+import com.google.common.collect.Lists;
+
 import java.io.File;
 import java.io.FileInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
+import java.util.Random;
 import java.util.Set;
+
 import org.apache.avro.Schema;
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileReader;
@@ -48,18 +53,29 @@ import org.apache.avro.reflect.Nullable;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
 import org.apache.beam.sdk.io.AvroIO.Write.Bound;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.testing.TestStream;
+import org.apache.beam.sdk.testing.UsesTestStream;
 import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.DisplayDataEvaluator;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.SerializableUtils;
 import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
 import org.junit.BeforeClass;
 import org.junit.Ignore;
 import org.junit.Rule;
@@ -94,7 +110,7 @@ public class AvroIOTest {
   }
 
   @Test
-  public void testWriteWithoutValidationFlag() throws Exception {
+  public void testWriteWithoutValPuidationFlag() throws Exception {
     AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write.to("gs://bucket/foo/baz");
     assertTrue(write.needsValidation());
     assertFalse(write.withoutValidation().needsValidation());
@@ -275,6 +291,132 @@ public class AvroIOTest {
     p.run();
   }
 
+  private TimestampedValue<GenericClass> newValue(GenericClass element, Duration duration) {
+    return TimestampedValue.of(element, new Instant(0).plus(duration));
+  }
+
+  private static class WindowedFilenamePolicy extends FilenamePolicy {
+    String outputFilePrefix;
+
+    WindowedFilenamePolicy(String outputFilePrefix) {
+      this.outputFilePrefix = outputFilePrefix;
+    }
+
+    @Override
+    public ValueProvider<String> getBaseOutputFilenameProvider() {
+      return StaticValueProvider.of(outputFilePrefix);
+    }
+
+    @Override
+    public String windowedFilename(WindowedContext input) {
+      String filename = outputFilePrefix + "-" + input.getWindow().toString() +  "-"
+          + input.getShardNumber() + "-of-" + (input.getNumShards() - 1) + "-pane-"
+          + input.getPaneInfo().getIndex();
+      if (input.getPaneInfo().isLast()) {
+        filename += "-final";
+      }
+      return filename;
+    }
+
+    @Override
+    public String unwindowedFilename(Context input) {
+      String filename = outputFilePrefix + input.getShardNumber() + "-of-"
+          + (input.getNumShards() - 1);
+      return filename;
+    }
+
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
+          .withLabel("File Name Prefix"));
+    }
+  }
+
+  @Rule
+  public TestPipeline windowedAvroWritePipeline = TestPipeline.create();
+
+  @Test
+  @Category({ValidatesRunner.class, UsesTestStream.class })
+  public void testWindowedAvroIOWrite() throws Throwable {
+    File baseOutputFile = new File(tmpFolder.getRoot(), "prefix");
+    final String outputFilePrefix = baseOutputFile.getAbsolutePath();
+
+    Instant base = new Instant(0);
+    ArrayList<GenericClass> allElements = new ArrayList<>();
+    ArrayList<TimestampedValue<GenericClass>> firstWindowElements = new ArrayList<>();
+    ArrayList<Instant> firstWindowTimestamps = Lists.newArrayList(
+        base.plus(Duration.standardSeconds(0)), base.plus(Duration.standardSeconds(10)),
+        base.plus(Duration.standardSeconds(20)), base.plus(Duration.standardSeconds(30)));
+
+    Random random = new Random();
+    for (int i = 0; i < 100; ++i) {
+      GenericClass item = new GenericClass(i, String.valueOf(i));
+      allElements.add(item);
+      firstWindowElements.add(TimestampedValue.of(item,
+          firstWindowTimestamps.get(random.nextInt(firstWindowTimestamps.size()))));
+    }
+
+    ArrayList<TimestampedValue<GenericClass>> secondWindowElements = new ArrayList<>();
+    ArrayList<Instant> secondWindowTimestamps = Lists.newArrayList(
+        base.plus(Duration.standardSeconds(60)), base.plus(Duration.standardSeconds(70)),
+        base.plus(Duration.standardSeconds(80)), base.plus(Duration.standardSeconds(90)));
+    for (int i = 100; i < 200; ++i) {
+      GenericClass item = new GenericClass(i, String.valueOf(i));
+      allElements.add(new GenericClass(i, String.valueOf(i)));
+      secondWindowElements.add(TimestampedValue.of(item,
+          secondWindowTimestamps.get(random.nextInt(secondWindowTimestamps.size()))));
+    }
+
+
+    TimestampedValue<GenericClass>[] firstWindowArray =
+        firstWindowElements.toArray(new TimestampedValue[100]);
+    TimestampedValue<GenericClass>[] secondWindowArray =
+        secondWindowElements.toArray(new TimestampedValue[100]);
+
+    TestStream<GenericClass> values = TestStream.create(AvroCoder.of(GenericClass.class))
+        .advanceWatermarkTo(new Instant(0))
+        .addElements(firstWindowArray[0],
+            Arrays.copyOfRange(firstWindowArray, 1, firstWindowArray.length))
+        .advanceWatermarkTo(new Instant(0).plus(Duration.standardMinutes(1)))
+        .addElements(secondWindowArray[0],
+        Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
+        .advanceWatermarkToInfinity();
+
+    windowedAvroWritePipeline
+        .apply(values)
+        .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
+        .apply(AvroIO.Write.to(new WindowedFilenamePolicy(outputFilePrefix))
+            .withWindowedWrites()
+            .withNumShards(2)
+            .withSchema(GenericClass.class));
+    windowedAvroWritePipeline.run();
+
+    // Validate that the data written matches the expected elements in the expected order
+    List<File> expectedFiles = new ArrayList<>();
+    for (int shard = 0; shard < 2; shard++) {
+      for (int window = 0; window < 2; window++) {
+        Instant windowStart = new Instant(0).plus(Duration.standardMinutes(window));
+        IntervalWindow intervalWindow = new IntervalWindow(
+            windowStart, Duration.standardMinutes(1));
+        expectedFiles.add(
+            new File(outputFilePrefix + "-" + intervalWindow.toString() + "-" + shard
+                + "-of-1" + "-pane-0-final"));
+      }
+    }
+
+    List<GenericClass> actualElements = new ArrayList<>();
+    for (File outputFile : expectedFiles) {
+      assertTrue("Expected output file " + outputFile.getAbsolutePath(), outputFile.exists());
+      try (DataFileReader<GenericClass> reader =
+               new DataFileReader<>(outputFile, AvroCoder.of(
+                   GenericClass.class).createDatumReader())) {
+        Iterators.addAll(actualElements, reader);
+      }
+      outputFile.delete();
+    }
+    assertThat(actualElements, containsInAnyOrder(allElements.toArray()));
+  }
+
   @Test
   public void testWriteWithDefaultCodec() throws Exception {
     AvroIO.Write.Bound<GenericRecord> write = AvroIO.Write
@@ -347,8 +489,10 @@ public class AvroIOTest {
 
     Bound<String> write = AvroIO.Write.to(outputFilePrefix).withSchema(String.class);
     if (numShards > 1) {
+      System.out.println("NumShards " + numShards);
       write = write.withNumShards(numShards);
     } else {
+      System.out.println("no sharding");
       write = write.withoutSharding();
     }
     p.apply(Create.of(ImmutableList.copyOf(expectedElements))).apply(write);

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index d2c1968..5b81ba8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.beam.sdk.io;
 
-import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import com.google.common.collect.Lists;
+
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,13 +41,17 @@ import java.nio.file.Paths;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.zip.GZIPInputStream;
 
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
 import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
+import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
@@ -99,7 +103,7 @@ public class FileBasedSinkTest {
     expected.addAll(values);
     expected.add(SimpleSink.SimpleWriter.FOOTER);
 
-    writer.open(testUid);
+    writer.openUnwindowed(testUid, -1, -1);
     for (String value : values) {
       writer.write(value);
     }
@@ -215,20 +219,18 @@ public class FileBasedSinkTest {
 
     int numFiles = temporaryFiles.size();
 
-    List<File> outputFiles = new ArrayList<>();
     List<FileResult> fileResults = new ArrayList<>();
-    List<String> outputFilenames = writeOp.generateDestinationFilenames(numFiles);
-
-    // Create temporary output bundles and output File objects
+    // Create temporary output bundles and output File objects.
     for (int i = 0; i < numFiles; i++) {
-      fileResults.add(new FileResult(temporaryFiles.get(i).toString()));
-      outputFiles.add(new File(outputFilenames.get(i)));
+      fileResults.add(new FileResult(temporaryFiles.get(i).toString(), null));
     }
 
     writeOp.finalize(fileResults, options);
 
     for (int i = 0; i < numFiles; i++) {
-      assertTrue(outputFiles.get(i).exists());
+      String outputFilename = writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+          new Context(i, numFiles));
+      assertTrue(new File(outputFilename).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
 
@@ -258,7 +260,7 @@ public class FileBasedSinkTest {
       outputFiles.add(outputFile);
     }
 
-    writeOp.removeTemporaryFiles(Collections.<String>emptyList(), options);
+    writeOp.removeTemporaryFiles(Collections.<String>emptySet(), true, options);
 
     for (int i = 0; i < numFiles; i++) {
       assertFalse(temporaryFiles.get(i).exists());
@@ -274,12 +276,12 @@ public class FileBasedSinkTest {
     PipelineOptions options = PipelineOptionsFactory.create();
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
 
-    List<String> inputFilenames = Arrays.asList("input-3", "input-2", "input-1");
-    List<String> inputContents = Arrays.asList("3", "2", "1");
+    List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
+    List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
-        "output-00002-of-00003.test", "output-00001-of-00003.test", "output-00000-of-00003.test");
+        "output-00000-of-00003.test", "output-00001-of-00003.test", "output-00002-of-00003.test");
 
-    List<String> inputFilePaths = new ArrayList<>();
+    Map<String, String> inputFilePaths = new HashMap<>();
     List<String> expectedOutputPaths = new ArrayList<>();
 
     for (int i = 0; i < inputFilenames.size(); i++) {
@@ -291,14 +293,13 @@ public class FileBasedSinkTest {
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
       List<String> lines = Arrays.asList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.add(inputTmpFile.toString());
+      inputFilePaths.put(inputTmpFile.toString(),
+          writeOp.getSink().getFileNamePolicy().unwindowedFilename(
+              new Context(i, inputFilenames.size())));
     }
 
     // Copy input files to output files.
-    List<String> actual = writeOp.copyToOutputFiles(inputFilePaths, options);
-
-    // Assert that the expected paths are returned.
-    assertThat(expectedOutputPaths, containsInAnyOrder(actual.toArray()));
+    writeOp.copyToOutputFiles(inputFilePaths, options);
 
     // Assert that the contents were copied.
     for (int i = 0; i < expectedOutputPaths.size(); i++) {
@@ -306,6 +307,14 @@ public class FileBasedSinkTest {
     }
   }
 
+  public List<String> generateDestinationFilenames(FilenamePolicy policy, int numFiles) {
+    List<String> filenames = new ArrayList<>();
+    for (int i = 0; i < numFiles; i++) {
+      filenames.add(policy.unwindowedFilename(new Context(i, numFiles)));
+    }
+    return filenames;
+  }
+
   /**
    * Output filenames use the supplied naming template.
    */
@@ -314,36 +323,35 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink sink = new SimpleSink(getBaseOutputFilename(), "test", ".SS.of.NN");
-    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+    FilenamePolicy policy = sink.getFileNamePolicy();
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
         appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
 
     // Also validate that we handle the case where the user specified "." that we do
     // not prefix an additional "." making "..test"
     sink = new SimpleSink(getBaseOutputFilename(), ".test", ".SS.of.NN");
-    writeOp = new SimpleSink.SimpleWriteOperation(sink);
     expected = Arrays.asList(appendToTempFolder("output.00.of.03.test"),
         appendToTempFolder("output.01.of.03.test"), appendToTempFolder("output.02.of.03.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output.00.of.01.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -355,20 +363,21 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    FilenamePolicy policy = writeOp.getSink().getFileNamePolicy();
 
     expected = Arrays.asList(
         appendToTempFolder("output-00000-of-00003.test"),
         appendToTempFolder("output-00001-of-00003.test"),
         appendToTempFolder("output-00002-of-00003.test"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00001.test"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -380,16 +389,17 @@ public class FileBasedSinkTest {
     SimpleSink sink = new SimpleSink("output", "test", "-NN");
     SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
 
-    // A single shard doesn't need to include the shard number.
-    assertEquals(Arrays.asList("output-01.test"),
-                 writeOp.generateDestinationFilenames(1));
-
     // More than one shard does.
     try {
-      writeOp.generateDestinationFilenames(3);
+      Iterable<FileResult> results = Lists.newArrayList(
+          new FileResult("temp1", "file1"),
+          new FileResult("temp2", "file1"),
+          new FileResult("temp3", "file1"));
+
+      writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
     } catch (IllegalStateException exn) {
-      assertEquals("Shard name template '-NN' only generated 1 distinct file names for 3 files.",
+      assertEquals("Only generated 1 distinct file names for 3 files.",
                    exn.getMessage());
     }
   }
@@ -402,19 +412,19 @@ public class FileBasedSinkTest {
     List<String> expected;
     List<String> actual;
     SimpleSink sink = new SimpleSink(appendToTempFolder(baseOutputFilename), "");
-    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
+    FilenamePolicy policy = sink.getFileNamePolicy();
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00003"),
         appendToTempFolder("output-00001-of-00003"), appendToTempFolder("output-00002-of-00003"));
-    actual = writeOp.generateDestinationFilenames(3);
+    actual = generateDestinationFilenames(policy, 3);
     assertEquals(expected, actual);
 
     expected = Arrays.asList(appendToTempFolder("output-00000-of-00001"));
-    actual = writeOp.generateDestinationFilenames(1);
+    actual = generateDestinationFilenames(policy, 1);
     assertEquals(expected, actual);
 
     expected = new ArrayList<>();
-    actual = writeOp.generateDestinationFilenames(0);
+    actual = generateDestinationFilenames(policy, 0);
     assertEquals(expected, actual);
   }
 
@@ -513,7 +523,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.open(testUid);
+    writer.openUnwindowed(testUid, -1, -1);
     writer.write("a");
     writer.write("b");
     final FileResult result = writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index 3ecbed4..16d7f2a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -49,10 +49,10 @@ import org.apache.beam.sdk.coders.SerializableCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.Sink.WriteOperation;
 import org.apache.beam.sdk.io.Sink.Writer;
-import org.apache.beam.sdk.io.Write.ConstantShards;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactoryTest.TestPipelineOptions;
+import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
@@ -63,11 +63,12 @@ import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SimpleFunction;
-import org.apache.beam.sdk.transforms.ToString;
 import org.apache.beam.sdk.transforms.Top;
 import org.apache.beam.sdk.transforms.View;
 import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
@@ -311,8 +312,10 @@ public class WriteTest {
     assertThat(write.getSink(), is(sink));
     PTransform<PCollection<String>, PCollectionView<Integer>> originalSharding =
         write.getSharding();
-    assertThat(write.getSharding(), instanceOf(ConstantShards.class));
-    assertThat(((ConstantShards<String>) write.getSharding()).getNumShards().get(), equalTo(3));
+
+    assertThat(write.getSharding(), is(nullValue()));
+    assertThat(write.getNumShards(), instanceOf(StaticValueProvider.class));
+    assertThat(write.getNumShards().get(), equalTo(3));
     assertThat(write.getSharding(), equalTo(originalSharding));
 
     Write<String> write2 = write.withSharding(SHARDING_TRANSFORM);
@@ -352,7 +355,7 @@ public class WriteTest {
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("sink", sink.getClass()));
     assertThat(displayData, includesDisplayDataFor("sink", sink));
-    assertThat(displayData, hasDisplayItem("Fixed Number of Shards", 1));
+    assertThat(displayData, hasDisplayItem("numShards", "1"));
   }
 
   @Test
@@ -383,17 +386,6 @@ public class WriteTest {
     assertThat(displayData, hasDisplayItem("spam", "ham"));
   }
 
-  @Test
-  public void testWriteUnbounded() {
-    PCollection<String> unbounded = p.apply(CountingInput.unbounded())
-        .apply(ToString.elements());
-
-    TestSink sink = new TestSink();
-    thrown.expect(IllegalArgumentException.class);
-    thrown.expectMessage("Write can only be applied to a Bounded PCollection");
-    unbounded.apply(Write.to(sink));
-  }
-
   /**
    * Performs a Write transform and verifies the Write transform calls the appropriate methods on
    * a test sink in the correct order, as well as verifies that the elements of a PCollection are
@@ -535,6 +527,10 @@ public class WriteTest {
     }
 
     @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+    }
+
+    @Override
     public void finalize(Iterable<TestWriterResult> bundleResults, PipelineOptions options)
         throws Exception {
       assertEquals("test_value", options.as(WriteOptions.class).getTestFlag());
@@ -633,7 +629,21 @@ public class WriteTest {
     }
 
     @Override
-    public void open(String uId) throws Exception {
+    public final void openWindowed(String uId,
+                                   BoundedWindow window,
+                                   PaneInfo paneInfo,
+                                   int shard,
+                                   int nShards) throws Exception {
+      numShards.incrementAndGet();
+      this.uId = uId;
+      assertEquals(State.INITIAL, state);
+      state = State.OPENED;
+    }
+
+    @Override
+    public final void openUnwindowed(String uId,
+                                     int shard,
+                                     int nShards) throws Exception {
       numShards.incrementAndGet();
       this.uId = uId;
       assertEquals(State.INITIAL, state);
@@ -653,8 +663,13 @@ public class WriteTest {
       state = State.CLOSED;
       return new TestWriterResult(uId, elementsWritten);
     }
+
+    @Override
+    public void cleanup() throws Exception {
+    }
   }
 
+
   /**
    * Options for test, exposed for PipelineOptionsFactory.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
index 96b8c57..63b5d11 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSinkTest.java
@@ -93,7 +93,7 @@ public class XmlSinkTest {
             .withRootElement(testRootElement);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
   }
 
   /**
@@ -105,7 +105,7 @@ public class XmlSinkTest {
         XmlSink.writeOf(Bird.class, testRootElement, testFilePrefix);
     assertEquals(testClass, sink.classToBind);
     assertEquals(testRootElement, sink.rootElementName);
-    assertEquals(testFilePrefix, sink.baseOutputFilename.get());
+    assertEquals(testFilePrefix, sink.getBaseOutputFilenameProvider().get());
   }
 
   /**
@@ -142,9 +142,9 @@ public class XmlSinkTest {
         XmlSink.writeOf(testClass, testRootElement, testFilePrefix);
     XmlWriteOperation<Bird> writeOp = sink.createWriteOperation(options);
     assertEquals(testClass, writeOp.getSink().classToBind);
-    assertEquals(testFilePrefix, writeOp.getSink().baseOutputFilename.get());
+    assertEquals(testFilePrefix, writeOp.getSink().getBaseOutputFilenameProvider().get());
     assertEquals(testRootElement, writeOp.getSink().rootElementName);
-    assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().extension);
+   // assertEquals(XmlSink.XML_EXTENSION, writeOp.getSink().getFilenamePolicy().extension);
     Path outputPath = new File(testFilePrefix).toPath();
     Path tempPath = new File(writeOp.tempDirectory.get()).toPath();
     assertEquals(outputPath.getParent(), tempPath.getParent());

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
index 084d303..2ddead7 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestPipelineTest.java
@@ -162,23 +162,6 @@ public class TestPipelineTest implements Serializable {
     }
 
     @Test
-    public void testMatcherSerializationDeserialization() {
-      TestPipelineOptions opts = PipelineOptionsFactory.as(TestPipelineOptions.class);
-      SerializableMatcher<PipelineResult> m1 = new TestMatcher();
-      SerializableMatcher<PipelineResult> m2 = new TestMatcher();
-
-      opts.setOnCreateMatcher(m1);
-      opts.setOnSuccessMatcher(m2);
-
-      String[] arr = TestPipeline.convertToArgs(opts);
-      TestPipelineOptions newOpts =
-          PipelineOptionsFactory.fromArgs(arr).as(TestPipelineOptions.class);
-
-      assertEquals(m1, newOpts.getOnCreateMatcher());
-      assertEquals(m2, newOpts.getOnSuccessMatcher());
-    }
-
-    @Test
     public void testRunWithDummyEnvironmentVariableFails() {
       System.getProperties()
           .setProperty(TestPipeline.PROPERTY_USE_DEFAULT_DUMMY_RUNNER, Boolean.toString(true));

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
index 9b085ca..10ff788 100644
--- a/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
+++ b/sdks/java/io/hdfs/src/main/java/org/apache/beam/sdk/io/hdfs/HDFSFileSink.java
@@ -40,6 +40,8 @@ import org.apache.beam.sdk.io.Sink;
 import org.apache.beam.sdk.io.hadoop.SerializableConfiguration;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.values.KV;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -284,6 +286,10 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
+    public void setWindowedWrites(boolean windowedWrites) {
+    }
+
+    @Override
     public void finalize(final Iterable<String> writerResults, PipelineOptions options)
         throws Exception {
       UGIHelper.getBestUGI(sink.username()).doAs(new PrivilegedExceptionAction<Void>() {
@@ -298,7 +304,6 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     private void doFinalize(Iterable<String> writerResults) throws Exception {
       Job job = sink.newJob();
       FileSystem fs = FileSystem.get(new URI(path), job.getConfiguration());
-
       // If there are 0 output shards, just create output folder.
       if (!writerResults.iterator().hasNext()) {
         fs.mkdirs(new Path(path));
@@ -389,7 +394,17 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
-    public void open(final String uId) throws Exception {
+    public void openWindowed(final String uId,
+                             BoundedWindow window,
+                             PaneInfo paneInfo,
+                             int shard,
+                             int numShards) throws Exception {
+      throw new UnsupportedOperationException("Windowing support not implemented yet for"
+          + "HDFS. Window " + window);
+    }
+
+    @Override
+    public void openUnwindowed(final String uId, int shard, int numShards) throws Exception {
       UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
           new PrivilegedExceptionAction<Void>() {
             @Override
@@ -427,6 +442,11 @@ public abstract class HDFSFileSink<T, K, V> extends Sink<T> {
     }
 
     @Override
+    public void cleanup() throws Exception {
+
+    }
+
+    @Override
     public String close() throws Exception {
       return UGIHelper.getBestUGI(writeOperation.sink.username()).doAs(
           new PrivilegedExceptionAction<String>() {

http://git-wip-us.apache.org/repos/asf/beam/blob/6addc95f/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
index 8b9a6d1..cedd812 100644
--- a/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
+++ b/sdks/java/io/hdfs/src/test/java/org/apache/beam/sdk/io/hdfs/HDFSFileSinkTest.java
@@ -66,7 +66,7 @@ public class HDFSFileSinkTest {
     Sink.WriteOperation<T, String> writeOperation =
         (Sink.WriteOperation<T, String>) sink.createWriteOperation(options);
     Sink.Writer<T, String> writer = writeOperation.createWriter(options);
-    writer.open(UUID.randomUUID().toString());
+    writer.openUnwindowed(UUID.randomUUID().toString(),  -1, -1);
     for (T t: toWrite) {
       writer.write(t);
     }


Mime
View raw message