beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [3/5] beam git commit: Renames FileBasedSink inner classes
Date Wed, 10 May 2017 19:20:27 GMT
Renames FileBasedSink inner classes

FileBasedWriteOperation -> WriteOperation, FileBasedWriter -> Writer


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/5bbc0423
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/5bbc0423
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/5bbc0423

Branch: refs/heads/master
Commit: 5bbc04232de6c6b9dce2a82b1449ffa03c83da47
Parents: df04303
Author: Reuven Lax <relax@google.com>
Authored: Tue May 9 13:02:12 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed May 10 12:18:41 2017 -0700

----------------------------------------------------------------------
 .../construction/PTransformMatchersTest.java    |  2 +-
 .../direct/WriteWithShardingFactoryTest.java    |  2 +-
 .../java/org/apache/beam/sdk/io/AvroSink.java   | 14 ++---
 .../org/apache/beam/sdk/io/FileBasedSink.java   | 62 ++++++++++----------
 .../java/org/apache/beam/sdk/io/TFRecordIO.java | 16 ++---
 .../java/org/apache/beam/sdk/io/TextSink.java   | 14 ++---
 .../java/org/apache/beam/sdk/io/WriteFiles.java | 44 +++++++-------
 .../apache/beam/sdk/io/FileBasedSinkTest.java   | 20 +++----
 .../java/org/apache/beam/sdk/io/SimpleSink.java |  4 +-
 .../org/apache/beam/sdk/io/xml/XmlSink.java     |  8 +--
 10 files changed, 93 insertions(+), 93 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
----------------------------------------------------------------------
diff --git a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
index 9ae236b..cfea62f 100644
--- a/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
+++ b/runners/core-construction-java/src/test/java/org/apache/beam/runners/core/construction/PTransformMatchersTest.java
@@ -510,7 +510,7 @@ public class PTransformMatchersTest implements Serializable {
         WriteFiles.to(
             new FileBasedSink<Integer>(StaticValueProvider.of(outputDirectory), policy)
{
               @Override
-              public FileBasedWriteOperation<Integer> createWriteOperation() {
+              public WriteOperation<Integer> createWriteOperation() {
                 return null;
               }
             });

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 5a2a328..5c4fea1 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -134,7 +134,7 @@ public class WriteWithShardingFactoryTest {
     WriteFiles<Object> original = WriteFiles.to(
         new FileBasedSink<Object>(StaticValueProvider.of(outputDirectory), policy)
{
           @Override
-          public FileBasedWriteOperation<Object> createWriteOperation() {
+          public WriteOperation<Object> createWriteOperation() {
             throw new IllegalArgumentException("Should not be used");
           }
         });

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
index 7d42574..6c36266 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/AvroSink.java
@@ -51,12 +51,12 @@ class AvroSink<T> extends FileBasedSink<T> {
   }
 
   @Override
-  public FileBasedWriteOperation<T> createWriteOperation() {
+  public WriteOperation<T> createWriteOperation() {
     return new AvroWriteOperation<>(this, coder, codec, metadata);
   }
 
-  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for Avro files. */
-  private static class AvroWriteOperation<T> extends FileBasedWriteOperation<T>
{
+  /** A {@link WriteOperation WriteOperation} for Avro files. */
+  private static class AvroWriteOperation<T> extends WriteOperation<T> {
     private final AvroCoder<T> coder;
     private final SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
@@ -72,19 +72,19 @@ class AvroSink<T> extends FileBasedSink<T> {
     }
 
     @Override
-    public FileBasedWriter<T> createWriter() throws Exception {
+    public Writer<T> createWriter() throws Exception {
       return new AvroWriter<>(this, coder, codec, metadata);
     }
   }
 
-  /** A {@link FileBasedWriter FileBasedWriter} for Avro files. */
-  private static class AvroWriter<T> extends FileBasedWriter<T> {
+  /** A {@link Writer Writer} for Avro files. */
+  private static class AvroWriter<T> extends Writer<T> {
     private final AvroCoder<T> coder;
     private DataFileWriter<T> dataFileWriter;
     private SerializableAvroCodecFactory codec;
     private final ImmutableMap<String, Object> metadata;
 
-    public AvroWriter(FileBasedWriteOperation<T> writeOperation,
+    public AvroWriter(WriteOperation<T> writeOperation,
                       AvroCoder<T> coder,
                       SerializableAvroCodecFactory codec,
                       ImmutableMap<String, Object> metadata) {

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
index 32aa9c3..c2e230d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/FileBasedSink.java
@@ -78,7 +78,7 @@ import org.slf4j.LoggerFactory;
  * type, etc.).
  *
  * <p>At pipeline construction time, the methods of FileBasedSink are called to validate
the sink
- * and to create a {@link FileBasedWriteOperation} that manages the process of writing to
the sink.
+ * and to create a {@link WriteOperation} that manages the process of writing to the sink.
  *
  * <p>The process of writing to file-based sink is as follows:
  * <ol>
@@ -89,8 +89,8 @@ import org.slf4j.LoggerFactory;
  *
  * <p>In order to ensure fault-tolerance, a bundle may be executed multiple times (e.g.,
in the
  * event of failure/retry or for redundancy). However, exactly one of these executions will
have its
- * result passed to the finalize method. Each call to {@link FileBasedWriter#openWindowed}
- * or {@link FileBasedWriter#openUnwindowed} is passed a unique <i>bundle id</i>
when it is called
+ * result passed to the finalize method. Each call to {@link Writer#openWindowed}
+ * or {@link Writer#openUnwindowed} is passed a unique <i>bundle id</i> when
it is called
  * by the WriteFiles transform, so even redundant or retried bundles will have a unique way
of
  * identifying
  * their output.
@@ -359,10 +359,10 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
   public void validate(PipelineOptions options) {}
 
   /**
-   * Return a subclass of {@link FileBasedSink.FileBasedWriteOperation} that will manage
the write
+   * Return a subclass of {@link WriteOperation} that will manage the write
    * to the sink.
    */
-  public abstract FileBasedWriteOperation<T> createWriteOperation();
+  public abstract WriteOperation<T> createWriteOperation();
 
   public void populateDisplayData(DisplayData.Builder builder) {
     getFilenamePolicy().populateDisplayData(builder);
@@ -371,24 +371,24 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
   /**
    * Abstract operation that manages the process of writing to {@link FileBasedSink}.
    *
-   * <p>The primary responsibilities of the FileBasedWriteOperation is the management
of output
-   * files. During a write, {@link FileBasedSink.FileBasedWriter}s write bundles to temporary
file
+   * <p>The primary responsibilities of the WriteOperation is the management of output
+   * files. During a write, {@link Writer}s write bundles to temporary file
    * locations. After the bundles have been written,
    * <ol>
-   * <li>{@link FileBasedSink.FileBasedWriteOperation#finalize} is given a list of
the temporary
+   * <li>{@link WriteOperation#finalize} is given a list of the temporary
    * files containing the output bundles.
    * <li>During finalize, these temporary files are copied to final output locations
and named
    * according to a file naming template.
    * <li>Finally, any temporary files that were created during the write are removed.
    * </ol>
    *
-   * <p>Subclass implementations of FileBasedWriteOperation must implement
-   * {@link FileBasedSink.FileBasedWriteOperation#createWriter} to return a concrete
+   * <p>Subclass implementations of WriteOperation must implement
+   * {@link WriteOperation#createWriter} to return a concrete
    * FileBasedSinkWriter.
    *
    * <h2>Temporary and Output File Naming:</h2> During the write, bundles are
written to temporary
    * files using the tempDirectory that can be provided via the constructor of
-   * FileBasedWriteOperation. These temporary files will be named
+   * WriteOperation. These temporary files will be named
    * {@code {tempDirectory}/{bundleId}}, where bundleId is the unique id of the bundle.
    * For example, if tempDirectory is "gs://my-bucket/my_temp_output", the output for a
    * bundle with bundle id 15723 will be "gs://my-bucket/my_temp_output/15723".
@@ -408,7 +408,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
    *
    * @param <T> the type of values written to the sink.
    */
-  public abstract static class FileBasedWriteOperation<T> implements Serializable {
+  public abstract static class WriteOperation<T> implements Serializable {
     /**
      * The Sink that this WriteOperation will write to.
      */
@@ -427,7 +427,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     }
 
     /**
-     * Constructs a FileBasedWriteOperation using the default strategy for generating a temporary
+     * Constructs a WriteOperation using the default strategy for generating a temporary
      * directory from the base output filename.
      *
      * <p>Default is a uniquely named sibling of baseOutputFilename, e.g. if baseOutputFilename
is
@@ -435,7 +435,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink) {
+    public WriteOperation(FileBasedSink<T> sink) {
       this(sink, NestedValueProvider.of(
           sink.getBaseOutputDirectoryProvider(), new TemporaryDirectoryBuilder()));
     }
@@ -461,16 +461,16 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     }
 
     /**
-     * Create a new FileBasedWriteOperation.
+     * Create a new WriteOperation.
      *
      * @param sink the FileBasedSink that will be used to configure this write operation.
      * @param tempDirectory the base directory to be used for temporary output files.
      */
-    public FileBasedWriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory)
{
+    public WriteOperation(FileBasedSink<T> sink, ResourceId tempDirectory) {
       this(sink, StaticValueProvider.of(tempDirectory));
     }
 
-    private FileBasedWriteOperation(
+    private WriteOperation(
         FileBasedSink<T> sink, ValueProvider<ResourceId> tempDirectory) {
       this.sink = sink;
       this.tempDirectory = tempDirectory;
@@ -478,10 +478,10 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     }
 
     /**
-     * Clients must implement to return a subclass of {@link FileBasedSink.FileBasedWriter}.
This
+     * Clients must implement to return a subclass of {@link Writer}. This
      * method must not mutate the state of the object.
      */
-    public abstract FileBasedWriter<T> createWriter() throws Exception;
+    public abstract Writer<T> createWriter() throws Exception;
 
     /**
      * Indicates that the operation will be performing windowed writes.
@@ -558,7 +558,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     /**
      * Copy temporary files to final output filenames using the file naming template.
      *
-     * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
      *
      * <p>Files will be named according to the file naming template. The order of the
output files
      * will be the same as the sorted order of the input filenames.  In other words, if the
input
@@ -591,7 +591,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     /**
      * Removes temporary output files. Uses the temporary directory to find files to remove.
      *
-     * <p>Can be called from subclasses that override {@link FileBasedWriteOperation#finalize}.
+     * <p>Can be called from subclasses that override {@link WriteOperation#finalize}.
      * <b>Note:</b>If finalize is overridden and does <b>not</b>
rename or otherwise finalize
      * temporary files, this method will remove them.
      */
@@ -667,15 +667,15 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
    * after the values in a bundle, respectively, as well as provide a MIME type for the output
    * channel.
    *
-   * <p>Multiple {@link FileBasedWriter} instances may be created on the same worker,
and therefore
+   * <p>Multiple {@link Writer} instances may be created on the same worker, and therefore
    * any access to static members or methods should be thread safe.
    *
    * @param <T> the type of values to write.
    */
-  public abstract static class FileBasedWriter<T> {
-    private static final Logger LOG = LoggerFactory.getLogger(FileBasedWriter.class);
+  public abstract static class Writer<T> {
+    private static final Logger LOG = LoggerFactory.getLogger(Writer.class);
 
-    private final FileBasedWriteOperation<T> writeOperation;
+    private final WriteOperation<T> writeOperation;
 
     /** Unique id for this output bundle. */
     private String id;
@@ -704,9 +704,9 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     private final String mimeType;
 
     /**
-     * Construct a new {@link FileBasedWriter} that will produce files of the given MIME
type.
+     * Construct a new {@link Writer} that will produce files of the given MIME type.
      */
-    public FileBasedWriter(FileBasedWriteOperation<T> writeOperation, String mimeType)
{
+    public Writer(WriteOperation<T> writeOperation, String mimeType) {
       checkNotNull(writeOperation);
       this.writeOperation = writeOperation;
       this.mimeType = mimeType;
@@ -739,7 +739,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
 
     /**
      *  Performs bundle initialization. For example, creates a temporary file for writing
or
-     * initializes any state that will be used across calls to {@link FileBasedWriter#write}.
+     * initializes any state that will be used across calls to {@link Writer#write}.
      *
      * <p>The unique id that is given to open should be used to ensure that the writer's
output
      * does not interfere with the output of other Writers, as a bundle may be executed many
@@ -818,7 +818,7 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
         closeChannelAndThrow(tempChannel, outputFile, e);
       }
 
-      // The caller shouldn't have to close() this FileBasedWriter if it fails to open(),
so close
+      // The caller shouldn't have to close() this Writer if it fails to open(), so close
       // the channel if prepareWrite() or writeHeader() fails.
       String step = "";
       try {
@@ -895,9 +895,9 @@ public abstract class FileBasedSink<T> implements Serializable,
HasDisplayData {
     }
 
     /**
-     * Return the FileBasedWriteOperation that this Writer belongs to.
+     * Return the WriteOperation that this Writer belongs to.
      */
-    public FileBasedWriteOperation<T> getWriteOperation() {
+    public WriteOperation<T> getWriteOperation() {
       return writeOperation;
     }
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
index 6350fb5..f73d6f3 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TFRecordIO.java
@@ -567,7 +567,7 @@ public class TFRecordIO {
     }
 
     @Override
-    public FileBasedWriteOperation<byte[]> createWriteOperation() {
+    public WriteOperation<byte[]> createWriteOperation() {
       return new TFRecordWriteOperation(this);
     }
 
@@ -587,29 +587,29 @@ public class TFRecordIO {
     }
 
     /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation
-     * FileBasedWriteOperation} for TFRecord files.
+     * A {@link WriteOperation
+     * WriteOperation} for TFRecord files.
      */
-    private static class TFRecordWriteOperation extends FileBasedWriteOperation<byte[]>
{
+    private static class TFRecordWriteOperation extends WriteOperation<byte[]> {
       private TFRecordWriteOperation(TFRecordSink sink) {
         super(sink);
       }
 
       @Override
-      public FileBasedWriter<byte[]> createWriter() throws Exception {
+      public Writer<byte[]> createWriter() throws Exception {
         return new TFRecordWriter(this);
       }
     }
 
     /**
-     * A {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter FileBasedWriter}
+     * A {@link Writer Writer}
      * for TFRecord files.
      */
-    private static class TFRecordWriter extends FileBasedWriter<byte[]> {
+    private static class TFRecordWriter extends Writer<byte[]> {
       private WritableByteChannel outChannel;
       private TFRecordCodec codec;
 
-      private TFRecordWriter(FileBasedWriteOperation<byte[]> writeOperation) {
+      private TFRecordWriter(WriteOperation<byte[]> writeOperation) {
         super(writeOperation, MimeTypes.BINARY);
       }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
index 0ba537e..511d697 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextSink.java
@@ -49,12 +49,12 @@ class TextSink extends FileBasedSink<String> {
     this.footer = footer;
   }
   @Override
-  public FileBasedWriteOperation<String> createWriteOperation() {
+  public WriteOperation<String> createWriteOperation() {
     return new TextWriteOperation(this, header, footer);
   }
 
-  /** A {@link FileBasedWriteOperation FileBasedWriteOperation} for text files. */
-  private static class TextWriteOperation extends FileBasedWriteOperation<String> {
+  /** A {@link WriteOperation WriteOperation} for text files. */
+  private static class TextWriteOperation extends WriteOperation<String> {
     @Nullable private final String header;
     @Nullable private final String footer;
 
@@ -65,20 +65,20 @@ class TextSink extends FileBasedSink<String> {
     }
 
     @Override
-    public FileBasedWriter<String> createWriter() throws Exception {
+    public Writer<String> createWriter() throws Exception {
       return new TextWriter(this, header, footer);
     }
   }
 
-  /** A {@link FileBasedWriter FileBasedWriter} for text files. */
-  private static class TextWriter extends FileBasedWriter<String> {
+  /** A {@link Writer Writer} for text files. */
+  private static class TextWriter extends Writer<String> {
     private static final String NEWLINE = "\n";
     @Nullable private final String header;
     @Nullable private final String footer;
     private OutputStreamWriter out;
 
     public TextWriter(
-        FileBasedWriteOperation<String> writeOperation,
+        WriteOperation<String> writeOperation,
         @Nullable String header,
         @Nullable String footer) {
       super(writeOperation, MimeTypes.TEXT);

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
index 5b21902..af24a8f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/WriteFiles.java
@@ -30,10 +30,10 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.coders.VoidCoder;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FileResultCoder;
+import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
+import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
@@ -63,7 +63,7 @@ import org.slf4j.LoggerFactory;
  * finalization of the write. The output of a write is {@link PDone}.
  *
  * <p>By default, every bundle in the input {@link PCollection} will be processed by
a
- * {@link org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation}, so the number of
output
+ * {@link WriteOperation}, so the number of output
  * will vary based on runner behavior, though at least 1 output will always be produced.
The
  * exact parallelism of the write stage can be controlled using {@link WriteFiles#withNumShards},
  * typically used to control how many files are produced or to globally limit the number
of
@@ -86,7 +86,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
   private static final int UNKNOWN_NUMSHARDS = -1;
 
   private FileBasedSink<T> sink;
-  private FileBasedWriteOperation<T> writeOperation;
+  private WriteOperation<T> writeOperation;
   // This allows the number of shards to be dynamically computed based on the input
   // PCollection.
   @Nullable
@@ -238,13 +238,13 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
   }
 
   /**
-   * Writes all the elements in a bundle using a {@link FileBasedWriter} produced by the
-   * {@link FileBasedSink.FileBasedWriteOperation} associated with the {@link FileBasedSink}.
+   * Writes all the elements in a bundle using a {@link Writer} produced by the
+   * {@link WriteOperation} associated with the {@link FileBasedSink}.
    */
   private class WriteBundles extends DoFn<T, FileResult> {
     // Writer that will write the records in this bundle. Lazily
     // initialized in processElement.
-    private FileBasedWriter<T> writer = null;
+    private Writer<T> writer = null;
     private BoundedWindow window = null;
 
     WriteBundles() {
@@ -320,7 +320,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
       // In a sharded write, single input element represents one shard. We can open and close
       // the writer in each call to processElement.
       LOG.info("Opening writer for write operation {}", writeOperation);
-      FileBasedWriter<T> writer = writeOperation.createWriter();
+      Writer<T> writer = writeOperation.createWriter();
       if (windowedWrites) {
         writer.openWindowed(UUID.randomUUID().toString(), window, c.pane(), c.element().getKey(),
             numShards);
@@ -404,29 +404,29 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
   /**
    * A write is performed as sequence of three {@link ParDo}'s.
    *
-   * <p>This singleton collection containing the FileBasedWriteOperation is then used
as a side
+   * <p>This singleton collection containing the WriteOperation is then used as a side
    * input to a ParDo over the PCollection of elements to write. In this bundle-writing phase,
-   * {@link FileBasedWriteOperation#createWriter} is called to obtain a {@link FileBasedWriter}.
-   * {@link FileBasedWriter#open} and {@link FileBasedWriter#close} are called in
+   * {@link WriteOperation#createWriter} is called to obtain a {@link Writer}.
+   * {@link Writer#open} and {@link Writer#close} are called in
    * {@link DoFn.StartBundle} and {@link DoFn.FinishBundle}, respectively, and
-   * {@link FileBasedWriter#write} method is called for every element in the bundle. The
output
+   * {@link Writer#write} method is called for every element in the bundle. The output
    * of this ParDo is a PCollection of <i>writer result</i> objects (see {@link
FileBasedSink}
    * for a description of writer results)-one for each bundle.
    *
    * <p>The final do-once ParDo uses a singleton collection asinput and the collection
of writer
-   * results as a side-input. In this ParDo, {@link FileBasedWriteOperation#finalize} is
called
+   * results as a side-input. In this ParDo, {@link WriteOperation#finalize} is called
    * to finalize the write.
    *
-   * <p>If the write of any element in the PCollection fails, {@link FileBasedWriter#close}
will be
+   * <p>If the write of any element in the PCollection fails, {@link Writer#close}
will be
    * called before the exception that caused the write to fail is propagated and the write
result
    * will be discarded.
    *
-   * <p>Since the {@link FileBasedWriteOperation} is serialized after the initialization
ParDo and
+   * <p>Since the {@link WriteOperation} is serialized after the initialization ParDo
and
    * deserialized in the bundle-writing and finalization phases, any state change to the
-   * FileBasedWriteOperation object that occurs during initialization is visible in the latter
-   * phases. However, the FileBasedWriteOperation is not serialized after the bundle-writing
+   * WriteOperation object that occurs during initialization is visible in the latter
+   * phases. However, the WriteOperation is not serialized after the bundle-writing
    * phase. This is why implementations should guarantee that
-   * {@link FileBasedWriteOperation#createWriter} does not mutate FileBasedWriteOperation).
+   * {@link WriteOperation#createWriter} does not mutate WriteOperation).
    */
   private PDone createWrite(PCollection<T> input) {
     Pipeline p = input.getPipeline();
@@ -442,9 +442,9 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
 
 
     // Perform the per-bundle writes as a ParDo on the input PCollection (with the
-    // FileBasedWriteOperation as a side input) and collect the results of the writes in
a
+    // WriteOperation as a side input) and collect the results of the writes in a
     // PCollection. There is a dependency between this ParDo and the first (the
-    // FileBasedWriteOperation PCollection as a side input), so this will happen after the
+    // WriteOperation PCollection as a side input), so this will happen after the
     // initial ParDo.
     PCollection<FileResult> results;
     final PCollectionView<Integer> numShardsView;
@@ -511,7 +511,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
 
       // Finalize the write in another do-once ParDo on the singleton collection containing
the
       // Writer. The results from the per-bundle writes are given as an Iterable side input.
-      // The FileBasedWriteOperation's state is the same as after its initialization in the
first
+      // The WriteOperation's state is the same as after its initialization in the first
       // do-once ParDo. There is a dependency between this ParDo and the parallel write (the
writer
       // results collection as a side input), so it will happen after the parallel write.
       // For the non-windowed case, we guarantee that  if no data is written but the user
has
@@ -542,7 +542,7 @@ public class WriteFiles<T> extends PTransform<PCollection<T>,
PDone> {
                     "Creating {} empty output shards in addition to {} written for a total
of {}.",
                     extraShardsNeeded, results.size(), minShardsNeeded);
                 for (int i = 0; i < extraShardsNeeded; ++i) {
-                  FileBasedWriter<T> writer = writeOperation.createWriter();
+                  Writer<T> writer = writeOperation.createWriter();
                   writer.openUnwindowed(UUID.randomUUID().toString(), UNKNOWN_SHARDNUM,
                       UNKNOWN_NUMSHARDS);
                   FileResult emptyWrite = writer.close();

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index d9bcef4..c3f2a58 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -46,12 +46,12 @@ import java.util.List;
 import java.util.Map;
 import java.util.zip.GZIPInputStream;
 import org.apache.beam.sdk.io.FileBasedSink.CompressionType;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriteOperation;
-import org.apache.beam.sdk.io.FileBasedSink.FileBasedWriter;
 import org.apache.beam.sdk.io.FileBasedSink.FileResult;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy.Context;
 import org.apache.beam.sdk.io.FileBasedSink.WritableByteChannelFactory;
+import org.apache.beam.sdk.io.FileBasedSink.WriteOperation;
+import org.apache.beam.sdk.io.FileBasedSink.Writer;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.commons.compress.compressors.bzip2.BZip2CompressorInputStream;
@@ -88,7 +88,7 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * FileBasedWriter opens the correct file, writes the header, footer, and elements in the
+   * Writer opens the correct file, writes the header, footer, and elements in the
    * correct order, and returns the correct filename.
    */
   @Test
@@ -187,13 +187,13 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * Generate n temporary files using the temporary file pattern of FileBasedWriter.
+   * Generate n temporary files using the temporary file pattern of Writer.
    */
   private List<File> generateTemporaryFilesForFinalize(int numFiles) throws Exception
{
     List<File> temporaryFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
       ResourceId temporaryFile =
-          FileBasedWriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i);
+          WriteOperation.buildTemporaryFilename(getBaseTempDirectory(), "" + i);
       File tmpFile = new File(tmpFolder.getRoot(), temporaryFile.toString());
       tmpFile.getParentFile().mkdirs();
       assertTrue(tmpFile.createNewFile());
@@ -242,14 +242,14 @@ public class FileBasedSinkTest {
     SimpleSink sink =
         new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
 
-    FileBasedWriteOperation<String> writeOp =
+    WriteOperation<String> writeOp =
         new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
       ResourceId tempResource =
-          FileBasedWriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
+          WriteOperation.buildTemporaryFilename(tempDirectory, prefix + i);
       File tmpFile = new File(tempResource.toString());
       tmpFile.getParentFile().mkdirs();
       assertTrue("not able to create new temp file", tmpFile.createNewFile());
@@ -487,17 +487,17 @@ public class FileBasedSinkTest {
   }
 
   /**
-   * {@link FileBasedWriter} writes to the {@link WritableByteChannel} provided by
+   * {@link Writer} writes to the {@link WritableByteChannel} provided by
    * {@link DrunkWritableByteChannelFactory}.
    */
   @Test
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
     ResourceId root = getBaseOutputDirectory();
-    FileBasedWriteOperation<String> writeOp =
+    WriteOperation<String> writeOp =
         new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final FileBasedWriter<String> writer = writeOp.createWriter();
+    final Writer<String> writer = writeOp.createWriter();
     final ResourceId expectedFile =
         writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 9265520..c97313d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -45,7 +45,7 @@ class SimpleSink extends FileBasedSink<String> {
     return new SimpleWriteOperation(this);
   }
 
-  static final class SimpleWriteOperation extends FileBasedWriteOperation<String> {
+  static final class SimpleWriteOperation extends WriteOperation<String> {
     public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
       super(sink, tempOutputDirectory);
     }
@@ -60,7 +60,7 @@ class SimpleSink extends FileBasedSink<String> {
     }
   }
 
-  static final class SimpleWriter extends FileBasedWriter<String> {
+  static final class SimpleWriter extends Writer<String> {
     static final String HEADER = "header";
     static final String FOOTER = "footer";
 

http://git-wip-us.apache.org/repos/asf/beam/blob/5bbc0423/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
index 963ab1b..60075a7 100644
--- a/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
+++ b/sdks/java/io/xml/src/main/java/org/apache/beam/sdk/io/xml/XmlSink.java
@@ -76,9 +76,9 @@ class XmlSink<T> extends FileBasedSink<T> {
   }
 
   /**
-   * {@link FileBasedSink.FileBasedWriteOperation} for XML {@link FileBasedSink}s.
+   * {@link WriteOperation} for XML {@link FileBasedSink}s.
    */
-  protected static final class XmlWriteOperation<T> extends FileBasedWriteOperation<T>
{
+  protected static final class XmlWriteOperation<T> extends WriteOperation<T>
{
     public XmlWriteOperation(XmlSink<T> sink) {
       super(sink);
     }
@@ -113,9 +113,9 @@ class XmlSink<T> extends FileBasedSink<T> {
   }
 
   /**
-   * A {@link FileBasedWriter} that can write objects as XML elements.
+   * A {@link Writer} that can write objects as XML elements.
    */
-  protected static final class XmlWriter<T> extends FileBasedWriter<T> {
+  protected static final class XmlWriter<T> extends Writer<T> {
     final Marshaller marshaller;
     private OutputStream os = null;
 


Mime
View raw message