beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [2/7] beam git commit: Converts TextIO.Write to AutoValue
Date Tue, 02 May 2017 19:23:20 GMT
Converts TextIO.Write to AutoValue


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/681b5d64
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/681b5d64
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/681b5d64

Branch: refs/heads/master
Commit: 681b5d6400e1d00169a06860506a46053a226003
Parents: 4f5098d
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Apr 28 17:39:01 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 181 ++++++++-----------
 .../java/org/apache/beam/sdk/io/TextIOTest.java |   4 +-
 3 files changed, 77 insertions(+), 110 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index e43bc4e..ea058b2 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -100,7 +100,7 @@ public class SparkRunnerDebuggerTest {
         + ".SparkRunnerDebuggerTest$PlusOne())\n"
         + "sparkContext.union(...)\n"
         + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$FormatAsTextFn())\n"
-        + "_.<org.apache.beam.sdk.io.TextIO$Write$Bound>";
+        + "_.<org.apache.beam.sdk.io.AutoValue_TextIO_Write>";
 
     SparkRunnerDebugger.DebugSparkPipelineResult result =
         (SparkRunnerDebugger.DebugSparkPipelineResult) pipeline.run();

http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index 2d82572..90d56e7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.io;
 
-import static com.google.common.base.MoreObjects.firstNonNull;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
@@ -123,7 +122,13 @@ public class TextIO {
    * element of the input collection encoded into its own line.
    */
   public static Write write() {
-    return new Write();
+    return new AutoValue_TextIO_Write.Builder()
+        .setFilenameSuffix("")
+        .setNumShards(0)
+        .setShardTemplate(Write.DEFAULT_SHARD_TEMPLATE)
+        .setWritableByteChannelFactory(FileBasedSink.CompressionType.UNCOMPRESSED)
+        .setWindowedWrites(false)
+        .build();
   }
 
   /** Implementation of {@link #read}. */
@@ -237,60 +242,56 @@ public class TextIO {
   /////////////////////////////////////////////////////////////////////////////
 
   /** Implementation of {@link #write}. */
-  public static class Write extends PTransform<PCollection<String>, PDone> {
+  @AutoValue
+  public abstract static class Write extends PTransform<PCollection<String>, PDone>
{
     private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
 
     /** The prefix of each file written, combined with suffix and shardTemplate. */
-    private final ValueProvider<String> filenamePrefix;
+    @Nullable abstract ValueProvider<String> getFilenamePrefix();
+
     /** The suffix of each file written, combined with prefix and shardTemplate. */
-    private final String filenameSuffix;
+    abstract String getFilenameSuffix();
 
     /** An optional header to add to each file. */
-    @Nullable private final String header;
+    @Nullable abstract String getHeader();
 
     /** An optional footer to add to each file. */
-    @Nullable private final String footer;
+    @Nullable abstract String getFooter();
 
     /** Requested number of shards. 0 for automatic. */
-    private final int numShards;
+    abstract int getNumShards();
 
     /** The shard template of each file written, combined with prefix and suffix. */
-    private final String shardTemplate;
+    abstract String getShardTemplate();
 
     /** A policy for naming output files. */
-    private final FilenamePolicy filenamePolicy;
+    @Nullable abstract FilenamePolicy getFilenamePolicy();
 
     /** Whether to write windowed output files. */
-    private boolean windowedWrites;
+    abstract boolean getWindowedWrites();
 
     /**
      * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default
is
      * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
      */
-    private final WritableByteChannelFactory writableByteChannelFactory;
+    abstract WritableByteChannelFactory getWritableByteChannelFactory();
 
-    private Write() {
-      this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
-          FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
-    }
+    abstract Builder toBuilder();
 
-    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-        @Nullable String header, @Nullable String footer, int numShards,
-        String shardTemplate,
-        WritableByteChannelFactory writableByteChannelFactory,
-        FilenamePolicy filenamePolicy,
-        boolean windowedWrites) {
-      super(name);
-      this.header = header;
-      this.footer = footer;
-      this.filenamePrefix = filenamePrefix;
-      this.filenameSuffix = filenameSuffix;
-      this.numShards = numShards;
-      this.shardTemplate = shardTemplate;
-      this.writableByteChannelFactory =
-          firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
-      this.filenamePolicy = filenamePolicy;
-      this.windowedWrites = windowedWrites;
+    @AutoValue.Builder
+    abstract static class Builder {
+      abstract Builder setFilenamePrefix(ValueProvider<String> filenamePrefix);
+      abstract Builder setFilenameSuffix(String filenameSuffix);
+      abstract Builder setHeader(String header);
+      abstract Builder setFooter(String footer);
+      abstract Builder setNumShards(int numShards);
+      abstract Builder setShardTemplate(String shardTemplate);
+      abstract Builder setFilenamePolicy(FilenamePolicy filenamePolicy);
+      abstract Builder setWindowedWrites(boolean windowedWrites);
+      abstract Builder setWritableByteChannelFactory(
+          WritableByteChannelFactory writableByteChannelFactory);
+
+      abstract Write build();
     }
 
     /**
@@ -305,21 +306,17 @@ public class TextIO {
      */
     public Write to(String filenamePrefix) {
       validateOutputComponent(filenamePrefix);
-      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
-          header, footer, numShards, shardTemplate,
-          writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return to(StaticValueProvider.of(filenamePrefix));
     }
 
     /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
     public Write to(ValueProvider<String> filenamePrefix) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenamePrefix(filenamePrefix).build();
     }
 
     /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
     public Write to(FilenamePolicy filenamePolicy) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenamePolicy(filenamePolicy).build();
     }
 
     /**
@@ -329,8 +326,7 @@ public class TextIO {
      */
     public Write withSuffix(String nameExtension) {
       validateOutputComponent(nameExtension);
-      return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFilenameSuffix(nameExtension).build();
     }
 
     /**
@@ -346,8 +342,7 @@ public class TextIO {
      */
     public Write withNumShards(int numShards) {
       checkArgument(numShards >= 0);
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setNumShards(numShards).build();
     }
 
     /**
@@ -356,8 +351,7 @@ public class TextIO {
      * @see ShardNameTemplate
      */
     public Write withShardNameTemplate(String shardTemplate) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setShardTemplate(shardTemplate).build();
     }
 
     /**
@@ -371,8 +365,7 @@ public class TextIO {
      * {@code .withNumShards(1).withShardNameTemplate("")}
      */
     public Write withoutSharding() {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
-          writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return withNumShards(1).withShardNameTemplate("");
     }
 
     /**
@@ -381,8 +374,7 @@ public class TextIO {
      * <p>A {@code null} value will clear any previously configured header.
      */
     public Write withHeader(@Nullable String header) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setHeader(header).build();
     }
 
     /**
@@ -391,8 +383,7 @@ public class TextIO {
      * <p>A {@code null} value will clear any previously configured footer.
      */
     public Write withFooter(@Nullable String footer) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setFooter(footer).build();
     }
 
     /**
@@ -404,38 +395,47 @@ public class TextIO {
      */
     public Write withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      return toBuilder().setWritableByteChannelFactory(writableByteChannelFactory).build();
     }
 
     public Write withWindowedWrites() {
-      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-          shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+      return toBuilder().setWindowedWrites(true).build();
     }
 
     @Override
     public PDone expand(PCollection<String> input) {
-      if (filenamePolicy == null && filenamePrefix == null) {
+      if (getFilenamePolicy() == null && getFilenamePrefix() == null) {
         throw new IllegalStateException(
             "need to set the filename prefix of an TextIO.Write transform");
       }
-      if (filenamePolicy != null && filenamePrefix != null) {
+      if (getFilenamePolicy() != null && getFilenamePrefix() != null) {
         throw new IllegalStateException(
             "cannot set both a filename policy and a filename prefix");
       }
       WriteFiles<String> write = null;
-      if (filenamePolicy != null) {
-       write = WriteFiles.to(
-           new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+      if (getFilenamePolicy() != null) {
+        write =
+            WriteFiles.to(
+                new TextSink(
+                    getFilenamePolicy(),
+                    getHeader(),
+                    getFooter(),
+                    getWritableByteChannelFactory()));
       } else {
-        write = WriteFiles.to(
-            new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                writableByteChannelFactory));
+        write =
+            WriteFiles.to(
+                new TextSink(
+                    getFilenamePrefix(),
+                    getFilenameSuffix(),
+                    getHeader(),
+                    getFooter(),
+                    getShardTemplate(),
+                    getWritableByteChannelFactory()));
       }
       if (getNumShards() > 0) {
         write = write.withNumShards(getNumShards());
       }
-      if (windowedWrites) {
+      if (getWindowedWrites()) {
         write = write.withWindowedWrites();
       }
       return input.apply("WriteFiles", write);
@@ -446,66 +446,33 @@ public class TextIO {
       super.populateDisplayData(builder);
 
       String prefixString = "";
-      if (filenamePrefix != null) {
-        prefixString = filenamePrefix.isAccessible()
-            ? filenamePrefix.get() : filenamePrefix.toString();
+      if (getFilenamePrefix() != null) {
+        prefixString = getFilenamePrefix().isAccessible()
+            ? getFilenamePrefix().get() : getFilenamePrefix().toString();
       }
       builder
           .addIfNotNull(DisplayData.item("filePrefix", prefixString)
             .withLabel("Output File Prefix"))
-          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+          .addIfNotDefault(DisplayData.item("fileSuffix", getFilenameSuffix())
             .withLabel("Output File Suffix"), "")
-          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", getShardTemplate())
             .withLabel("Output Shard Name Template"),
               DEFAULT_SHARD_TEMPLATE)
-          .addIfNotDefault(DisplayData.item("numShards", numShards)
+          .addIfNotDefault(DisplayData.item("numShards", getNumShards())
             .withLabel("Maximum Output Shards"), 0)
-          .addIfNotNull(DisplayData.item("fileHeader", header)
+          .addIfNotNull(DisplayData.item("fileHeader", getHeader())
             .withLabel("File Header"))
-          .addIfNotNull(DisplayData.item("fileFooter", footer)
+          .addIfNotNull(DisplayData.item("fileFooter", getFooter())
               .withLabel("File Footer"))
           .add(DisplayData
-              .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+              .item("writableByteChannelFactory", getWritableByteChannelFactory().toString())
               .withLabel("Compression/Transformation Type"));
     }
 
-    /**
-     * Returns the current shard name template string.
-     */
-    public String getShardNameTemplate() {
-      return shardTemplate;
-    }
-
     @Override
     protected Coder<Void> getDefaultOutputCoder() {
       return VoidCoder.of();
     }
-
-    public String getFilenamePrefix() {
-      return filenamePrefix.get();
-    }
-
-    public String getShardTemplate() {
-      return shardTemplate;
-    }
-
-    public int getNumShards() {
-      return numShards;
-    }
-
-    public String getFilenameSuffix() {
-      return filenameSuffix;
-    }
-
-    @Nullable
-    public String getHeader() {
-      return header;
-    }
-
-    @Nullable
-    public String getFooter() {
-      return footer;
-    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/beam/blob/681b5d64/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 095b69f..425e0d6 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -315,7 +315,7 @@ public class TextIOTest {
     p.run();
 
     assertOutputFiles(elems, header, footer, numShards, baseDir, outputName,
-        write.getShardNameTemplate());
+        write.getShardTemplate());
   }
 
   public static void assertOutputFiles(
@@ -478,7 +478,7 @@ public class TextIOTest {
       drunkElems.add(elem);
     }
     assertOutputFiles(drunkElems.toArray(new String[0]), null, null, 1, baseDir,
-        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardNameTemplate());
+        outputName + writableByteChannelFactory.getFilenameSuffix(), write.getShardTemplate());
   }
 
   @Test


Mime
View raw message