beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [6/7] beam git commit: Removes TextIO.Write.Bound
Date Tue, 02 May 2017 19:23:24 GMT
Removes TextIO.Write.Bound


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/4f5098dd
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/4f5098dd
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/4f5098dd

Branch: refs/heads/master
Commit: 4f5098ddf641f97417955ea33f429385d6fce384
Parents: 987b4e6
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Fri Apr 28 17:28:06 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Tue May 2 12:20:14 2017 -0700

----------------------------------------------------------------------
 .../apache/beam/examples/MinimalWordCount.java  |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   2 +-
 .../examples/common/WriteOneFilePerWindow.java  |   2 +-
 .../apache/beam/examples/complete/TfIdf.java    |   2 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../beam/examples/cookbook/DistinctExample.java |   2 +-
 .../beam/examples/cookbook/JoinExamples.java    |   2 +-
 .../beam/examples/MinimalWordCountJava8.java    |   2 +-
 .../examples/MinimalWordCountJava8Test.java     |   2 +-
 .../runners/apex/examples/WordCountTest.java    |   2 +-
 .../direct/WriteWithShardingFactoryTest.java    |   2 +-
 .../beam/runners/flink/ReadSourceITCase.java    |   2 +-
 .../flink/ReadSourceStreamingITCase.java        |   2 +-
 .../flink/streaming/GroupByNullKeyTest.java     |   2 +-
 .../streaming/TopWikipediaSessionsITCase.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   4 +-
 .../runners/dataflow/DataflowRunnerTest.java    |   8 +-
 .../beam/runners/spark/examples/WordCount.java  |   2 +-
 .../runners/spark/SparkRunnerDebuggerTest.java  |   2 +-
 .../beam/runners/spark/io/NumShardsTest.java    |   3 +-
 .../main/java/org/apache/beam/sdk/Pipeline.java |   2 +-
 .../apache/beam/sdk/io/ShardNameTemplate.java   |   2 +-
 .../java/org/apache/beam/sdk/io/TextIO.java     | 553 +++++++------------
 .../org/apache/beam/sdk/io/package-info.java    |   2 +-
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  20 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   2 +-
 .../org/apache/beam/sdk/util/NameUtilsTest.java |   2 +-
 .../org/apache/beam/sdk/values/PDoneTest.java   |   2 +-
 28 files changed, 249 insertions(+), 385 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
index 4a0c1bb..5ac8080 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/MinimalWordCount.java
@@ -111,7 +111,7 @@ public class MinimalWordCount {
      // formatted strings) to a series of text files.
      //
      // By default, it will write to a set of files with names like wordcount-00001-of-00005
-     .apply(TextIO.Write.to("wordcounts"));
+     .apply(TextIO.write().to("wordcounts"));
 
     // Run the pipeline.
     p.run().waitUntilFinish();

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index e331a86..bfa7eb3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -179,7 +179,7 @@ public class WordCount {
     p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+     .apply("WriteCounts", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
index 6609828..461b46d 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/common/WriteOneFilePerWindow.java
@@ -48,7 +48,7 @@ public class WriteOneFilePerWindow extends PTransform<PCollection<String>, PDone
   @Override
   public PDone expand(PCollection<String> input) {
     return input.apply(
-        TextIO.Write
+        TextIO.write()
             .to(new PerWindowFiles(filenamePrefix))
             .withWindowedWrites()
             .withNumShards(3));

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 1ef69c0..6fd9755 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -400,7 +400,7 @@ public class TfIdf {
                   c.element().getValue().getValue()));
             }
           }))
-          .apply(TextIO.Write
+          .apply(TextIO.write()
               .to(output)
               .withSuffix(".csv"));
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index bb8c8bc..478e2dc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -210,7 +210,7 @@ public class TopWikipediaSessions {
     p.apply(TextIO.read().from(options.getInput()))
         .apply(MapElements.via(new ParseTableRowJson()))
         .apply(new ComputeTopSessions(samplingThreshold))
-        .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
+        .apply("Write", TextIO.write().withoutSharding().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
index 20c8fa0..bb16528 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DistinctExample.java
@@ -89,7 +89,7 @@ public class DistinctExample {
 
     p.apply("ReadLines", TextIO.read().from(options.getInput()))
      .apply(Distinct.<String>create())
-     .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
+     .apply("DedupedShakespeare", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 05a3ad3..d1fffb4 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -169,7 +169,7 @@ public class JoinExamples {
     PCollection<TableRow> eventsTable = p.apply(BigQueryIO.read().from(GDELT_EVENTS_TABLE));
     PCollection<TableRow> countryCodes = p.apply(BigQueryIO.read().from(COUNTRY_CODES));
     PCollection<String> formattedResults = joinEvents(eventsTable, countryCodes);
-    formattedResults.apply(TextIO.Write.to(options.getOutput()));
+    formattedResults.apply(TextIO.write().to(options.getOutput()));
     p.run().waitUntilFinish();
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
index 6dabadc..85c291d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/MinimalWordCountJava8.java
@@ -65,7 +65,7 @@ public class MinimalWordCountJava8 {
          .into(TypeDescriptors.strings())
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
      // CHANGE 3/3: The Google Cloud Storage path is required for outputting the results to.
-     .apply(TextIO.Write.to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
+     .apply(TextIO.write().to("gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX"));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
index 689005a..e071b4e 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/MinimalWordCountJava8Test.java
@@ -71,7 +71,7 @@ public class MinimalWordCountJava8Test implements Serializable {
      .apply(MapElements
          .into(TypeDescriptors.strings())
          .via((KV<String, Long> wordCount) -> wordCount.getKey() + ": " + wordCount.getValue()))
-     .apply(TextIO.Write.to("gs://your-output-bucket/and-output-prefix"));
+     .apply(TextIO.write().to("gs://your-output-bucket/and-output-prefix"));
   }
 
   private GcsUtil buildMockGcsUtil() throws IOException {

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
index b0fab0b..83af61b 100644
--- a/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
+++ b/runners/apex/src/test/java/org/apache/beam/runners/apex/examples/WordCountTest.java
@@ -110,7 +110,7 @@ public class WordCountTest {
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Count.<String>perElement())
       .apply(ParDo.of(new FormatAsStringFn()))
-      .apply("WriteCounts", TextIO.Write.to(options.getOutput()))
+      .apply("WriteCounts", TextIO.write().to(options.getOutput()))
       ;
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
----------------------------------------------------------------------
diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
index 960640c..53d2ba3 100644
--- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
+++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WriteWithShardingFactoryTest.java
@@ -86,7 +86,7 @@ public class WriteWithShardingFactoryTest {
     String targetLocation = IOChannelUtils.resolve(outputPath, fileName);
     // TextIO is implemented in terms of the WriteFiles PTransform. When sharding is not specified,
     // resharding should be automatically applied
-    p.apply(Create.of(strs)).apply(TextIO.Write.to(targetLocation));
+    p.apply(Create.of(strs)).apply(TextIO.write().to(targetLocation));
 
     p.run();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
index 65d198e..5985da8 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceITCase.java
@@ -76,7 +76,7 @@ public class ReadSourceITCase extends JavaProgramTestBase {
           }
         }));
 
-    result.apply(TextIO.Write.to(new URI(resultPath).getPath() + "/part"));
+    result.apply(TextIO.write().to(new URI(resultPath).getPath() + "/part"));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
index 4f597c3..0707c21 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/ReadSourceStreamingITCase.java
@@ -65,7 +65,7 @@ public class ReadSourceStreamingITCase extends StreamingProgramTestBase {
             c.output(c.element().toString());
           }
         }))
-      .apply(TextIO.Write.to(resultPath));
+      .apply(TextIO.write().to(resultPath));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
index 82d9f4f..2bd8e72 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/GroupByNullKeyTest.java
@@ -123,7 +123,7 @@ public class GroupByNullKeyTest extends StreamingProgramTestBase implements Seri
               c.output(str.toString());
             }
           }));
-    output.apply(TextIO.Write.to(resultPath));
+    output.apply(TextIO.write().to(resultPath));
     p.run();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
----------------------------------------------------------------------
diff --git a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
index 9e6bba8..28335e3 100644
--- a/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
+++ b/runners/flink/src/test/java/org/apache/beam/runners/flink/streaming/TopWikipediaSessionsITCase.java
@@ -126,7 +126,7 @@ public class TopWikipediaSessionsITCase extends StreamingProgramTestBase impleme
       }
     }));
 
-    format.apply(TextIO.Write.to(resultPath));
+    format.apply(TextIO.write().to(resultPath));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index d47da45..31c47b4 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -138,7 +138,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     FileSystems.setDefaultConfigInWorkers(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
-     .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+     .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(p);
 
@@ -525,7 +525,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     String stepName = "DoFn1";
     pipeline.apply("ReadMyFile", TextIO.read().from("gs://bucket/in"))
         .apply(stepName, ParDo.of(new NoOpFn()))
-        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/out"));
+        .apply("WriteMyFile", TextIO.write().to("gs://bucket/out"));
     DataflowRunner runner = DataflowRunner.fromOptions(options);
     runner.replaceTransforms(pipeline);
     Job job =

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index 7261fe9..d011994 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -172,7 +172,7 @@ public class DataflowRunnerTest {
     Pipeline p = Pipeline.create(options);
 
     p.apply("ReadMyFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
+        .apply("WriteMyFile", TextIO.write().to("gs://bucket/object"));
 
     // Enable the FileSystems API to know about gs:// URIs in this test.
     FileSystems.setDefaultConfigInWorkers(options);
@@ -335,7 +335,7 @@ public class DataflowRunnerTest {
     Pipeline p = buildDataflowPipeline(dataflowOptions);
     p
         .apply(TextIO.read().from(options.getInput()))
-        .apply(TextIO.Write.to(options.getOutput()));
+        .apply(TextIO.write().to(options.getOutput()));
   }
 
   /**
@@ -587,7 +587,7 @@ public class DataflowRunnerTest {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
 
     p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"))
-        .apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
+        .apply("WriteMyNonGcsFile", TextIO.write().to("/tmp/file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("Expected a valid 'gs://' path but was given"));
@@ -613,7 +613,7 @@ public class DataflowRunnerTest {
   public void testMultiSlashGcsFileWritePath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
     PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.read().from("gs://bucket/object"));
-    pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
+    pc.apply("WriteInvalidGcsFile", TextIO.write().to("gs://bucket/tmp//file"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
index 2bcf140..0779bd5 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/examples/WordCount.java
@@ -128,7 +128,7 @@ public class WordCount {
     p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
+     .apply("WriteCounts", TextIO.write().to(options.getOutput()));
 
     p.run().waitUntilFinish();
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index ce52b90..e43bc4e 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -88,7 +88,7 @@ public class SparkRunnerDebuggerTest {
 
     wordCounts
         .apply(MapElements.via(new WordCount.FormatAsTextFn()))
-        .apply(TextIO.Write.to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
+        .apply(TextIO.write().to("!!PLACEHOLDER-OUTPUT-DIR!!").withNumShards(3).withSuffix(".txt"));
 
     final String expectedPipeline = "sparkContext.parallelize(Arrays.asList(...))\n"
         + "_.mapPartitions(new org.apache.beam.runners.spark.examples.WordCount$ExtractWordsFn())\n"

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
index c936ed3..5021744 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/io/NumShardsTest.java
@@ -73,7 +73,8 @@ public class NumShardsTest {
     PCollection<String> inputWords = p.apply(Create.of(WORDS).withCoder(StringUtf8Coder.of()));
     PCollection<String> output = inputWords.apply(new WordCount.CountWords())
         .apply(MapElements.via(new WordCount.FormatAsTextFn()));
-    output.apply(TextIO.Write.to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
+    output.apply(
+        TextIO.write().to(outputDir.getAbsolutePath()).withNumShards(3).withSuffix(".txt"));
     p.run().waitUntilFinish();
 
     int count = 0;

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
index 7cb9386..d4c46cc 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/Pipeline.java
@@ -99,7 +99,7 @@ import org.slf4j.LoggerFactory;
  *     .apply(new Count<String>());
  * PCollection<String> formattedWordCounts =
  *     wordCounts.apply(ParDo.of(new FormatCounts()));
- * formattedWordCounts.apply(TextIO.Write.to("gs://bucket/dir/counts.txt"));
+ * formattedWordCounts.apply(TextIO.write().to("gs://bucket/dir/counts.txt"));
  *
  * // PTransforms aren't executed when they're applied, rather they're
  * // just added to the Pipeline.  Once the whole Pipeline of PTransforms

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
index 7f48a5c..cc85242 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/ShardNameTemplate.java
@@ -45,7 +45,7 @@ package org.apache.beam.sdk.io;
  *
  * <pre>{@code
  *   pipeline.apply(
- *       TextIO.Write.to("gs://bucket/path")
+ *       TextIO.write().to("gs://bucket/path")
  *                   .withShardNameTemplate("-SS-of-NN")
  *                   .withSuffix(".txt"))
  * }</pre>

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
index f8670a6..2d82572 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/TextIO.java
@@ -86,9 +86,9 @@ import org.apache.beam.sdk.values.PDone;
  *
  * <p>By default, all input is put into the global window before writing. If per-window writes are
  * desired - for example, when using a streaming runner -
- * {@link TextIO.Write.Bound#withWindowedWrites()} will cause windowing and triggering to be
+ * {@link TextIO.Write#withWindowedWrites()} will cause windowing and triggering to be
  * preserved. When producing windowed writes, the number of output shards must be set explicitly
- * using {@link TextIO.Write.Bound#withNumShards(int)}; some runners may set this for you to a
+ * using {@link TextIO.Write#withNumShards(int)}; some runners may set this for you to a
  * runner-chosen value, so you may need not set it yourself. A {@link FilenamePolicy} must be
  * set, and unique windows and triggers must produce unique filenames.
  *
@@ -99,11 +99,11 @@ import org.apache.beam.sdk.values.PDone;
  * <pre>{@code
  * // A simple Write to a local file (only runs locally):
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
  *
  * // Same as above, only with Gzip compression:
  * PCollection<String> lines = ...;
- * lines.apply(TextIO.Write.to("/path/to/file.txt"));
+ * lines.apply(TextIO.write().to("/path/to/file.txt"));
  *      .withSuffix(".txt")
  *      .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP));
  * }</pre>
@@ -117,6 +117,15 @@ public class TextIO {
     return new AutoValue_TextIO_Read.Builder().setCompressionType(CompressionType.AUTO).build();
   }
 
+  /**
+   * A {@link PTransform} that writes a {@link PCollection} to text file (or
+   * multiple text files matching a sharding pattern), with each
+   * element of the input collection encoded into its own line.
+   */
+  public static Write write() {
+    return new Write();
+  }
+
   /** Implementation of {@link #read}. */
   @AutoValue
   public abstract static class Read extends PTransform<PBegin, PCollection<String>> {
@@ -227,49 +236,105 @@ public class TextIO {
 
   /////////////////////////////////////////////////////////////////////////////
 
-  /**
-   * A {@link PTransform} that writes a {@link PCollection} to text file (or
-   * multiple text files matching a sharding pattern), with each
-   * element of the input collection encoded into its own line.
-   */
-  public static class Write {
+  /** Implementation of {@link #write}. */
+  public static class Write extends PTransform<PCollection<String>, PDone> {
+    private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
+
+    /** The prefix of each file written, combined with suffix and shardTemplate. */
+    private final ValueProvider<String> filenamePrefix;
+    /** The suffix of each file written, combined with prefix and shardTemplate. */
+    private final String filenameSuffix;
+
+    /** An optional header to add to each file. */
+    @Nullable private final String header;
+
+    /** An optional footer to add to each file. */
+    @Nullable private final String footer;
+
+    /** Requested number of shards. 0 for automatic. */
+    private final int numShards;
+
+    /** The shard template of each file written, combined with prefix and suffix. */
+    private final String shardTemplate;
+
+    /** A policy for naming output files. */
+    private final FilenamePolicy filenamePolicy;
+
+    /** Whether to write windowed output files. */
+    private boolean windowedWrites;
+
+    /**
+     * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
+     * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     */
+    private final WritableByteChannelFactory writableByteChannelFactory;
+
+    private Write() {
+      this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
+          FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
+    }
+
+    private Write(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
+        @Nullable String header, @Nullable String footer, int numShards,
+        String shardTemplate,
+        WritableByteChannelFactory writableByteChannelFactory,
+        FilenamePolicy filenamePolicy,
+        boolean windowedWrites) {
+      super(name);
+      this.header = header;
+      this.footer = footer;
+      this.filenamePrefix = filenamePrefix;
+      this.filenameSuffix = filenameSuffix;
+      this.numShards = numShards;
+      this.shardTemplate = shardTemplate;
+      this.writableByteChannelFactory =
+          firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
+      this.filenamePolicy = filenamePolicy;
+      this.windowedWrites = windowedWrites;
+    }
 
     /**
-     * Returns a transform for writing to text files that writes to the file(s)
-     * with the given prefix. This can be a local filename
+     * Writes to text files with the given prefix. This can be a local filename
      * (if running locally), or a Google Cloud Storage filename of
      * the form {@code "gs://<bucket>/<filepath>"}
      * (if running locally or using remote execution).
      *
      * <p>The files written will begin with this prefix, followed by
-     * a shard identifier (see {@link Bound#withNumShards(int)}, and end
-     * in a common extension, if given by {@link Bound#withSuffix(String)}.
+     * a shard identifier (see {@link #withNumShards(int)}, and end
+     * in a common extension, if given by {@link #withSuffix(String)}.
      */
-    public static Bound to(String prefix) {
-      return new Bound().to(prefix);
+    public Write to(String filenamePrefix) {
+      validateOutputComponent(filenamePrefix);
+      return new Write(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
+          header, footer, numShards, shardTemplate,
+          writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
-    public static Bound to(FilenamePolicy filenamePolicy) {
-      return new Bound().to(filenamePolicy);
-
+    /** Like {@link #to(String)}, but with a {@link ValueProvider}. */
+    public Write to(ValueProvider<String> filenamePrefix) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
-    /**
-     * Like {@link #to(String)}, but with a {@link ValueProvider}.
-     */
-    public static Bound to(ValueProvider<String> prefix) {
-      return new Bound().to(prefix);
+
+    /** Like {@link #to(String)}, but with a {@link FilenamePolicy}. */
+    public Write to(FilenamePolicy filenamePolicy) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that appends the specified suffix
-     * to the created files.
+     * Writes to the file(s) with the given filename suffix.
+     *
+     * @see ShardNameTemplate
      */
-    public static Bound withSuffix(String nameExtension) {
-      return new Bound().withSuffix(nameExtension);
+    public Write withSuffix(String nameExtension) {
+      validateOutputComponent(nameExtension);
+      return new Write(name, filenamePrefix, nameExtension, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that uses the provided shard count.
+     * Uses the provided shard count.
      *
      * <p>Constraining the number of shards is likely to reduce
      * the performance of a pipeline. Setting this value is not recommended
@@ -277,371 +342,169 @@ public class TextIO {
      *
      * @param numShards the number of shards to use, or 0 to let the system
      *                  decide.
+     * @see ShardNameTemplate
      */
-    public static Bound withNumShards(int numShards) {
-      return new Bound().withNumShards(numShards);
+    public Write withNumShards(int numShards) {
+      checkArgument(numShards >= 0);
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that uses the given shard name
-     * template.
+     * Uses the given shard name template.
      *
-     * <p>See {@link ShardNameTemplate} for a description of shard templates.
+     * @see ShardNameTemplate
      */
-    public static Bound withShardNameTemplate(String shardTemplate) {
-      return new Bound().withShardNameTemplate(shardTemplate);
+    public Write withShardNameTemplate(String shardTemplate) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that forces a single file as
-     * output.
+     * Forces a single file as output.
+     *
+     * <p>Constraining the number of shards is likely to reduce
+     * the performance of a pipeline. Using this setting is not recommended
+     * unless you truly require a single output file.
+     *
+     * <p>This is a shortcut for
+     * {@code .withNumShards(1).withShardNameTemplate("")}
      */
-    public static Bound withoutSharding() {
-      return new Bound().withoutSharding();
+    public Write withoutSharding() {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
+          writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that adds a header string to the files
-     * it writes. Note that a newline character will be added after the header.
+     * Adds a header string to each file. A newline after the header is added automatically.
      *
      * <p>A {@code null} value will clear any previously configured header.
-     *
-     * @param header the string to be added as file header
      */
-    public static Bound withHeader(@Nullable String header) {
-      return new Bound().withHeader(header);
+    public Write withHeader(@Nullable String header) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
-     * Returns a transform for writing to text files that adds a footer string to the files
-     * it writes. Note that a newline character will be added after the header.
+     * Adds a footer string to each file. A newline after the footer is added automatically.
      *
      * <p>A {@code null} value will clear any previously configured footer.
-     *
-     * @param footer the string to be added as file footer
      */
-    public static Bound withFooter(@Nullable String footer) {
-      return new Bound().withFooter(footer);
+    public Write withFooter(@Nullable String footer) {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
     /**
      * Returns a transform for writing to text files like this one but that has the given
-     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output. The
-     * default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
+     * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
+     * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
      *
      * <p>A {@code null} value will reset the value to the default value mentioned above.
-     *
-     * @param writableByteChannelFactory the factory to be used during output
      */
-    public static Bound withWritableByteChannelFactory(
+    public Write withWritableByteChannelFactory(
         WritableByteChannelFactory writableByteChannelFactory) {
-      return new Bound().withWritableByteChannelFactory(writableByteChannelFactory);
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
     }
 
-    // TODO: appendingNewlines, etc.
-
-    /**
-     * A PTransform that writes a bounded PCollection to a text file (or
-     * multiple text files matching a sharding pattern), with each
-     * PCollection element being encoded into its own line.
-     */
-    public static class Bound extends PTransform<PCollection<String>, PDone> {
-      private static final String DEFAULT_SHARD_TEMPLATE = ShardNameTemplate.INDEX_OF_MAX;
-
-      /** The prefix of each file written, combined with suffix and shardTemplate. */
-      private final ValueProvider<String> filenamePrefix;
-      /** The suffix of each file written, combined with prefix and shardTemplate. */
-      private final String filenameSuffix;
-
-      /** An optional header to add to each file. */
-      @Nullable private final String header;
-
-      /** An optional footer to add to each file. */
-      @Nullable private final String footer;
-
-      /** Requested number of shards. 0 for automatic. */
-      private final int numShards;
-
-      /** The shard template of each file written, combined with prefix and suffix. */
-      private final String shardTemplate;
-
-      /** A policy for naming output files. */
-      private final FilenamePolicy filenamePolicy;
-
-      /** Whether to write windowed output files. */
-      private boolean windowedWrites;
-
-      /**
-       * The {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink}. Default is
-       * {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
-       */
-      private final WritableByteChannelFactory writableByteChannelFactory;
-
-      private Bound() {
-        this(null, null, "", null, null, 0, DEFAULT_SHARD_TEMPLATE,
-            FileBasedSink.CompressionType.UNCOMPRESSED, null, false);
-      }
-
-      private Bound(String name, ValueProvider<String> filenamePrefix, String filenameSuffix,
-          @Nullable String header, @Nullable String footer, int numShards,
-          String shardTemplate,
-          WritableByteChannelFactory writableByteChannelFactory,
-          FilenamePolicy filenamePolicy,
-          boolean windowedWrites) {
-        super(name);
-        this.header = header;
-        this.footer = footer;
-        this.filenamePrefix = filenamePrefix;
-        this.filenameSuffix = filenameSuffix;
-        this.numShards = numShards;
-        this.shardTemplate = shardTemplate;
-        this.writableByteChannelFactory =
-            firstNonNull(writableByteChannelFactory, FileBasedSink.CompressionType.UNCOMPRESSED);
-        this.filenamePolicy = filenamePolicy;
-        this.windowedWrites = windowedWrites;
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that writes to the file(s) with the given filename prefix.
-       *
-       * <p>See {@link TextIO.Write#to(String) Write.to(String)} for more information.
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound to(String filenamePrefix) {
-        validateOutputComponent(filenamePrefix);
-        return new Bound(name, StaticValueProvider.of(filenamePrefix), filenameSuffix,
-            header, footer, numShards, shardTemplate,
-            writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Like {@link #to(String)}, but with a {@link ValueProvider}.
-       */
-      public Bound to(ValueProvider<String> filenamePrefix) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-       /**
-        * Like {@link #to(String)}, but with a {@link FilenamePolicy}.
-        */
-      public Bound to(FilenamePolicy filenamePolicy) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that that's like this one but
-       * that writes to the file(s) with the given filename suffix.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withSuffix(String nameExtension) {
-        validateOutputComponent(nameExtension);
-        return new Bound(name, filenamePrefix, nameExtension, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the provided shard count.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Setting this value is not recommended
-       * unless you require a specific number of output files.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param numShards the number of shards to use, or 0 to let the system
-       *                  decide.
-       * @see ShardNameTemplate
-       */
-      public Bound withNumShards(int numShards) {
-        checkArgument(numShards >= 0);
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that uses the given shard name template.
-       *
-       * <p>Does not modify this object.
-       *
-       * @see ShardNameTemplate
-       */
-      public Bound withShardNameTemplate(String shardTemplate) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
-
-      /**
-       * Returns a transform for writing to text files that's like this one but
-       * that forces a single file as output.
-       *
-       * <p>Constraining the number of shards is likely to reduce
-       * the performance of a pipeline. Using this setting is not recommended
-       * unless you truly require a single output file.
-       *
-       * <p>This is a shortcut for
-       * {@code .withNumShards(1).withShardNameTemplate("")}
-       *
-       * <p>Does not modify this object.
-       */
-      public Bound withoutSharding() {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, 1, "",
-            writableByteChannelFactory, filenamePolicy, windowedWrites);
-      }
+    public Write withWindowedWrites() {
+      return new Write(name, filenamePrefix, filenameSuffix, header, footer, numShards,
+          shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+    }
 
-      /**
-       * Returns a transform for writing to text files that adds a header string to the files
-       * it writes. Note that a newline character will be added after the header.
-       *
-       * <p>A {@code null} value will clear any previously configured header.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param header the string to be added as file header
-       */
-      public Bound withHeader(@Nullable String header) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+    @Override
+    public PDone expand(PCollection<String> input) {
+      if (filenamePolicy == null && filenamePrefix == null) {
+        throw new IllegalStateException(
+            "need to set the filename prefix of an TextIO.Write transform");
       }
-
-      /**
-       * Returns a transform for writing to text files that adds a footer string to the files
-       * it writes. Note that a newline character will be added after the header.
-       *
-       * <p>A {@code null} value will clear any previously configured footer.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param footer the string to be added as file footer
-       */
-      public Bound withFooter(@Nullable String footer) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      if (filenamePolicy != null && filenamePrefix != null) {
+        throw new IllegalStateException(
+            "cannot set both a filename policy and a filename prefix");
       }
-
-      /**
-       * Returns a transform for writing to text files like this one but that has the given
-       * {@link WritableByteChannelFactory} to be used by the {@link FileBasedSink} during output.
-       * The default is value is {@link FileBasedSink.CompressionType#UNCOMPRESSED}.
-       *
-       * <p>A {@code null} value will reset the value to the default value mentioned above.
-       *
-       * <p>Does not modify this object.
-       *
-       * @param writableByteChannelFactory the factory to be used during output
-       */
-      public Bound withWritableByteChannelFactory(
-          WritableByteChannelFactory writableByteChannelFactory) {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, windowedWrites);
+      WriteFiles<String> write = null;
+      if (filenamePolicy != null) {
+       write = WriteFiles.to(
+           new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
+      } else {
+        write = WriteFiles.to(
+            new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
+                writableByteChannelFactory));
       }
-
-      public Bound withWindowedWrites() {
-        return new Bound(name, filenamePrefix, filenameSuffix, header, footer, numShards,
-            shardTemplate, writableByteChannelFactory, filenamePolicy, true);
+      if (getNumShards() > 0) {
+        write = write.withNumShards(getNumShards());
       }
-
-      @Override
-      public PDone expand(PCollection<String> input) {
-        if (filenamePolicy == null && filenamePrefix == null) {
-          throw new IllegalStateException(
-              "need to set the filename prefix of an TextIO.Write transform");
-        }
-        if (filenamePolicy != null && filenamePrefix != null) {
-          throw new IllegalStateException(
-              "cannot set both a filename policy and a filename prefix");
-        }
-        WriteFiles<String> write = null;
-        if (filenamePolicy != null) {
-         write = WriteFiles.to(
-             new TextSink(filenamePolicy, header, footer, writableByteChannelFactory));
-        } else {
-          write = WriteFiles.to(
-              new TextSink(filenamePrefix, filenameSuffix, header, footer, shardTemplate,
-                  writableByteChannelFactory));
-        }
-        if (getNumShards() > 0) {
-          write = write.withNumShards(getNumShards());
-        }
-        if (windowedWrites) {
-          write = write.withWindowedWrites();
-        }
-        return input.apply("WriteFiles", write);
+      if (windowedWrites) {
+        write = write.withWindowedWrites();
       }
+      return input.apply("WriteFiles", write);
+    }
 
-      @Override
-      public void populateDisplayData(DisplayData.Builder builder) {
-        super.populateDisplayData(builder);
+    @Override
+    public void populateDisplayData(DisplayData.Builder builder) {
+      super.populateDisplayData(builder);
 
-        String prefixString = "";
-        if (filenamePrefix != null) {
-          prefixString = filenamePrefix.isAccessible()
-              ? filenamePrefix.get() : filenamePrefix.toString();
-        }
-        builder
-            .addIfNotNull(DisplayData.item("filePrefix", prefixString)
-              .withLabel("Output File Prefix"))
-            .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
-              .withLabel("Output File Suffix"), "")
-            .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
-              .withLabel("Output Shard Name Template"),
-                DEFAULT_SHARD_TEMPLATE)
-            .addIfNotDefault(DisplayData.item("numShards", numShards)
-              .withLabel("Maximum Output Shards"), 0)
-            .addIfNotNull(DisplayData.item("fileHeader", header)
-              .withLabel("File Header"))
-            .addIfNotNull(DisplayData.item("fileFooter", footer)
-                .withLabel("File Footer"))
-            .add(DisplayData
-                .item("writableByteChannelFactory", writableByteChannelFactory.toString())
-                .withLabel("Compression/Transformation Type"));
+      String prefixString = "";
+      if (filenamePrefix != null) {
+        prefixString = filenamePrefix.isAccessible()
+            ? filenamePrefix.get() : filenamePrefix.toString();
       }
+      builder
+          .addIfNotNull(DisplayData.item("filePrefix", prefixString)
+            .withLabel("Output File Prefix"))
+          .addIfNotDefault(DisplayData.item("fileSuffix", filenameSuffix)
+            .withLabel("Output File Suffix"), "")
+          .addIfNotDefault(DisplayData.item("shardNameTemplate", shardTemplate)
+            .withLabel("Output Shard Name Template"),
+              DEFAULT_SHARD_TEMPLATE)
+          .addIfNotDefault(DisplayData.item("numShards", numShards)
+            .withLabel("Maximum Output Shards"), 0)
+          .addIfNotNull(DisplayData.item("fileHeader", header)
+            .withLabel("File Header"))
+          .addIfNotNull(DisplayData.item("fileFooter", footer)
+              .withLabel("File Footer"))
+          .add(DisplayData
+              .item("writableByteChannelFactory", writableByteChannelFactory.toString())
+              .withLabel("Compression/Transformation Type"));
+    }
 
-      /**
-       * Returns the current shard name template string.
-       */
-      public String getShardNameTemplate() {
-        return shardTemplate;
-      }
+    /**
+     * Returns the current shard name template string.
+     */
+    public String getShardNameTemplate() {
+      return shardTemplate;
+    }
 
-      @Override
-      protected Coder<Void> getDefaultOutputCoder() {
-        return VoidCoder.of();
-      }
+    @Override
+    protected Coder<Void> getDefaultOutputCoder() {
+      return VoidCoder.of();
+    }
 
-      public String getFilenamePrefix() {
-        return filenamePrefix.get();
-      }
+    public String getFilenamePrefix() {
+      return filenamePrefix.get();
+    }
 
-      public String getShardTemplate() {
-        return shardTemplate;
-      }
+    public String getShardTemplate() {
+      return shardTemplate;
+    }
 
-      public int getNumShards() {
-        return numShards;
-      }
+    public int getNumShards() {
+      return numShards;
+    }
 
-      public String getFilenameSuffix() {
-        return filenameSuffix;
-      }
+    public String getFilenameSuffix() {
+      return filenameSuffix;
+    }
 
-      @Nullable
-      public String getHeader() {
-        return header;
-      }
+    @Nullable
+    public String getHeader() {
+      return header;
+    }
 
-      @Nullable
-      public String getFooter() {
-        return footer;
-      }
+    @Nullable
+    public String getFooter() {
+      return footer;
     }
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
index c65d7dd..3fc8e32 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/package-info.java
@@ -29,7 +29,7 @@
  * and {@code Write} transforms that persist PCollections to external storage:
  * <pre> {@code
  * PCollection<Integer> numbers = ...;
- * numbers.apply(TextIO.Write.to("gs://my_bucket/path/to/numbers"));
+ * numbers.apply(TextIO.write().to("gs://my_bucket/path/to/numbers"));
  * } </pre>
  */
 package org.apache.beam.sdk.io;

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index 8a7965c..095b69f 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -299,8 +299,8 @@ public class TextIOTest {
     PCollection<String> input =
         p.apply(Create.of(Arrays.asList(elems)).withCoder(StringUtf8Coder.of()));
 
-    TextIO.Write.Bound write =
-        TextIO.Write.to(baseFilename)
+    TextIO.Write write =
+        TextIO.write().to(baseFilename)
             .withHeader(header)
             .withFooter(footer);
 
@@ -463,7 +463,7 @@ public class TextIOTest {
 
     final WritableByteChannelFactory writableByteChannelFactory =
         new DrunkWritableByteChannelFactory();
-    TextIO.Write.Bound write = TextIO.Write.to(baseDir.resolve(outputName).toString())
+    TextIO.Write write = TextIO.write().to(baseDir.resolve(outputName).toString())
         .withoutSharding().withWritableByteChannelFactory(writableByteChannelFactory);
     DisplayData displayData = DisplayData.from(write);
     assertThat(displayData, hasDisplayItem("writableByteChannelFactory", "DRUNK"));
@@ -483,7 +483,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayData() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withSuffix("bar")
         .withShardNameTemplate("-SS-of-NN-")
@@ -504,7 +504,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenHeader() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withHeader("myHeader");
 
@@ -515,7 +515,7 @@ public class TextIOTest {
 
   @Test
   public void testWriteDisplayDataValidateThenFooter() {
-    TextIO.Write.Bound write = TextIO.Write
+    TextIO.Write write = TextIO.write()
         .to("foo")
         .withFooter("myFooter");
 
@@ -534,7 +534,7 @@ public class TextIOTest {
 
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    TextIO.Write.Bound write = TextIO.Write.to(outputPath);
+    TextIO.Write write = TextIO.write().to(outputPath);
 
     Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(write);
     assertThat("TextIO.Write should include the file prefix in its primitive display data",
@@ -553,7 +553,7 @@ public class TextIOTest {
 
     expectedException.expect(IllegalArgumentException.class);
     expectedException.expectMessage("Output name components are not allowed to contain");
-    input.apply(TextIO.Write.to(filename));
+    input.apply(TextIO.write().to(filename));
   }
 
   /** Options for testing. */
@@ -573,7 +573,7 @@ public class TextIOTest {
 
     p
         .apply(TextIO.read().from(options.getInput()))
-        .apply(TextIO.Write.to(options.getOutput()));
+        .apply(TextIO.write().to(options.getOutput()));
   }
 
   @Test
@@ -826,7 +826,7 @@ public class TextIOTest {
   @Test
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.read().from("somefile").getName());
-    assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
+    assertEquals("TextIO.Write", TextIO.write().to("somefile").getName());
     assertEquals("TextIO.Read", TextIO.read().from("somefile").toString());
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 29d9774..6c3aba2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -124,7 +124,7 @@ public class TransformTreeTest {
     p.apply("ReadMyFile", TextIO.read().from(inputFile.getPath()))
         .apply(sample)
         .apply(Flatten.<String>iterables())
-        .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
+        .apply("WriteMyFile", TextIO.write().to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
         EnumSet.noneOf(TransformsSeen.class);

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
index c685a63..411f913 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/NameUtilsTest.java
@@ -137,7 +137,7 @@ public class NameUtilsTest {
     assertEquals(
         "NameUtilsTest.SomeTransform",
         NameUtils.approximatePTransformName(AutoValue_NameUtilsTest_SomeTransform.class));
-    assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.Bound.class));
+    assertEquals("TextIO.Write", NameUtils.approximatePTransformName(TextIO.Write.class));
   }
 
   @AutoValue

http://git-wip-us.apache.org/repos/asf/beam/blob/4f5098dd/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
index 7c9d1d9..b07a5b8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/PDoneTest.java
@@ -71,7 +71,7 @@ public class PDoneTest {
       return
           begin
           .apply(Create.of(LINES))
-          .apply(TextIO.Write.to(filename));
+          .apply(TextIO.write().to(filename));
     }
   }
 


Mime
View raw message