beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [02/50] [abbrv] incubator-beam git commit: Remove many uses of .named methods
Date Wed, 06 Jul 2016 17:20:08 GMT
Remove many uses of .named methods

Specifically, remove uses of:
  - Window.named
  - AvroIO.named
  - PubSubIO.named
  - TextIO.named
  - BigQueryIO.named
  - Read.named


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/0fd0bd51
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/0fd0bd51
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/0fd0bd51

Branch: refs/heads/runners-spark2
Commit: 0fd0bd510ea82763383ece1a23a8ff2fbdc23b84
Parents: a25322e
Author: Ben Chambers <bchambers@google.com>
Authored: Thu Jun 23 22:27:05 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Wed Jul 6 10:18:49 2016 -0700

----------------------------------------------------------------------
 .../beam/examples/DebuggingWordCount.java       |   2 +-
 .../org/apache/beam/examples/WordCount.java     |   4 +-
 .../apache/beam/examples/complete/TfIdf.java    |   3 +-
 .../examples/complete/TopWikipediaSessions.java |   2 +-
 .../examples/cookbook/DatastoreWordCount.java   |   2 +-
 .../beam/examples/cookbook/DeDupExample.java    |   5 +-
 .../beam/examples/cookbook/TriggerExample.java  |   4 +-
 .../beam/examples/complete/game/GameStats.java  |  28 ++-
 .../examples/complete/game/HourlyTeamScore.java |   5 +-
 .../examples/complete/game/LeaderBoard.java     |   8 +-
 .../beam/runners/flink/examples/TFIDF.java      |   3 +-
 .../beam/runners/flink/examples/WordCount.java  |   4 +-
 .../flink/examples/streaming/AutoComplete.java  |   9 +-
 .../flink/examples/streaming/JoinExamples.java  |  13 +-
 .../KafkaWindowedWordCountExample.java          |   2 +-
 .../examples/streaming/WindowedWordCount.java   |   3 +-
 .../beam/runners/dataflow/DataflowRunner.java   |   2 +-
 .../DataflowPipelineTranslatorTest.java         |   6 +-
 .../runners/dataflow/DataflowRunnerTest.java    |  18 +-
 .../beam/runners/spark/SimpleWordCountTest.java |   3 +-
 .../java/org/apache/beam/sdk/io/BigQueryIO.java |   1 -
 .../beam/sdk/io/AvroIOGeneratedClassTest.java   | 192 +++++--------------
 .../java/org/apache/beam/sdk/io/AvroIOTest.java |   5 +-
 .../org/apache/beam/sdk/io/BigQueryIOTest.java  |  82 +++-----
 .../apache/beam/sdk/io/FileBasedSourceTest.java |   5 +-
 .../org/apache/beam/sdk/io/PubsubIOTest.java    |   4 -
 .../java/org/apache/beam/sdk/io/TextIOTest.java |  37 +---
 .../org/apache/beam/sdk/io/XmlSourceTest.java   |  19 +-
 .../beam/sdk/runners/TransformTreeTest.java     |   4 +-
 .../sdk/transforms/windowing/WindowTest.java    |   6 +-
 .../sdk/transforms/windowing/WindowingTest.java |   2 +-
 .../src/main/java/DebuggingWordCount.java       |   2 +-
 .../src/main/java/WordCount.java                |   4 +-
 33 files changed, 158 insertions(+), 331 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
index 85823c2..8d85d44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/DebuggingWordCount.java
@@ -173,7 +173,7 @@ public class DebuggingWordCount {
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> filteredWords =
-        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
          .apply(new WordCount.CountWords())
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
index cf6c45a..af16c44 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/WordCount.java
@@ -205,10 +205,10 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
      .apply(new CountWords())
      .apply(MapElements.via(new FormatAsTextFn()))
-     .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 23653d6..8305314 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -187,8 +187,7 @@ public class TfIdf {
         }
 
         PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply(TextIO.Read.from(uriString)
-                .named("TextIO.Read(" + uriString + ")"))
+            .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
             .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
 
         urisToLines = urisToLines.and(oneUriToLines);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 5d95e3f..80b3ade 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -217,7 +217,7 @@ public class TopWikipediaSessions {
         .from(options.getInput())
         .withCoder(TableRowJsonCoder.of()))
      .apply(new ComputeTopSessions(samplingThreshold))
-     .apply(TextIO.Write.named("Write").withoutSharding().to(options.getOutput()));
+     .apply("Write", TextIO.Write.withoutSharding().to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 7578d79..b070f94 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -193,7 +193,7 @@ public class DatastoreWordCount {
    */
   public static void writeDataToDatastore(Options options) {
       Pipeline p = Pipeline.create(options);
-      p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+      p.apply("ReadLines", TextIO.Read.from(options.getInput()))
        .apply(ParDo.of(new CreateEntityFn(options.getNamespace(), options.getKind())))
        .apply(DatastoreIO.writeTo(options.getProject()));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
index db65543..d573bcd 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DeDupExample.java
@@ -89,10 +89,9 @@ public class DeDupExample {
     Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInput()))
      .apply(RemoveDuplicates.<String>create())
-     .apply(TextIO.Write.named("DedupedShakespeare")
-         .to(options.getOutput()));
+     .apply("DedupedShakespeare", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index ab1fb66..ff4909b 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -467,7 +467,7 @@ public class TriggerExample {
     TableReference tableRef = getTableReference(options.getProject(),
         options.getBigQueryDataset(), options.getBigQueryTable());
 
-    PCollectionList<TableRow> resultList = pipeline.apply(PubsubIO.Read.named("ReadPubsubInput")
+    PCollectionList<TableRow> resultList = pipeline.apply("ReadPubsubInput", PubsubIO.Read
         .timestampLabel(PUBSUB_TIMESTAMP_LABEL_KEY)
         .topic(options.getPubsubTopic()))
         .apply(ParDo.of(new ExtractFlowInfo()))
@@ -493,7 +493,7 @@ public class TriggerExample {
     copiedOptions.setJobName(options.getJobName() + "-injector");
     Pipeline injectorPipeline = Pipeline.create(copiedOptions);
     injectorPipeline
-    .apply(TextIO.Read.named("ReadMyFile").from(options.getInput()))
+    .apply("ReadMyFile", TextIO.Read.from(options.getInput()))
     .apply("InsertRandomDelays", ParDo.of(new InsertDelays()))
     .apply(IntraBundleParallelization.of(PubsubFileInjector
         .withTimestampLabelKey(PUBSUB_TIMESTAMP_LABEL_KEY)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index ad8b49e..b1cb312 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -260,10 +260,8 @@ public class GameStats extends LeaderBoard {
     // Calculate the total score per user over fixed windows, and
     // cumulative updates for late data.
     final PCollectionView<Map<String, Integer>> spammersView = userEvents
-      .apply(Window.named("FixedWindowsUser")
-          .<KV<String, Integer>>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
+      .apply("FixedWindowsUser", Window.<KV<String, Integer>>into(
+          FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
 
       // Filter out everyone but those with (SCORE_WEIGHT * avg) clickrate.
       // These might be robots/spammers.
@@ -278,10 +276,8 @@ public class GameStats extends LeaderBoard {
     // suspected robots-- to filter out scores from those users from the sum.
     // Write the results to BigQuery.
     rawEvents
-      .apply(Window.named("WindowIntoFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getFixedWindowDuration())))
-          )
+      .apply("WindowIntoFixedWindows", Window.<GameActionInfo>into(
+          FixedWindows.of(Duration.standardMinutes(options.getFixedWindowDuration()))))
       // Filter out the detected spammer users, using the side input derived above.
       .apply("FilterOutSpammers", ParDo
               .withSideInputs(spammersView)
@@ -299,8 +295,8 @@ public class GameStats extends LeaderBoard {
       // [END DocInclude_FilterAndCalc]
       // Write the result to BigQuery
       .apply("WriteTeamSums",
-             new WriteWindowedToBigQuery<KV<String, Integer>>(
-                options.getTablePrefix() + "_team", configureWindowedWrite()));
+          new WriteWindowedToBigQuery<KV<String, Integer>>(
+              options.getTablePrefix() + "_team", configureWindowedWrite()));
 
 
     // [START DocInclude_SessionCalc]
@@ -309,10 +305,9 @@ public class GameStats extends LeaderBoard {
     // This information could help the game designers track the changing user engagement
     // as their set of games changes.
     userEvents
-      .apply(Window.named("WindowIntoSessions")
-            .<KV<String, Integer>>into(
-                  Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
-        .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
+      .apply("WindowIntoSessions", Window.<KV<String, Integer>>into(
+          Sessions.withGapDuration(Duration.standardMinutes(options.getSessionGap())))
+          .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()))
       // For this use, we care only about the existence of the session, not any particular
       // information aggregated over it, so the following is an efficient way to do that.
       .apply(Combine.perKey(x -> 0))
@@ -321,9 +316,8 @@ public class GameStats extends LeaderBoard {
       // [END DocInclude_SessionCalc]
       // [START DocInclude_Rewindow]
       // Re-window to process groups of session sums according to when the sessions complete.
-      .apply(Window.named("WindowToExtractSessionMean")
-            .<Integer>into(
-                FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
+      .apply("WindowToExtractSessionMean", Window.<Integer>into(
+          FixedWindows.of(Duration.standardMinutes(options.getUserActivityWindowDuration()))))
       // Find the mean session duration in each window.
       .apply(Mean.<Integer>globally().withoutDefaults())
       // Write this info to a BigQuery table.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
index 7a808ac..e489607 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/HourlyTeamScore.java
@@ -178,9 +178,8 @@ public class HourlyTeamScore extends UserScore {
       // Add an element timestamp based on the event log, and apply fixed windowing.
       .apply("AddEventTimestamps",
              WithTimestamps.of((GameActionInfo i) -> new Instant(i.getTimestamp())))
-      .apply(Window.named("FixedWindowsTeam")
-          .<GameActionInfo>into(FixedWindows.of(
-                Duration.standardMinutes(options.getWindowDuration()))))
+      .apply("FixedWindowsTeam", Window.<GameActionInfo>into(
+          FixedWindows.of(Duration.standardMinutes(options.getWindowDuration()))))
       // [END DocInclude_HTSAddTsAndWindow]
 
       // Extract and sum teamname/score pairs from the event data.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
index 2c608aa..a14d533 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java
@@ -190,9 +190,8 @@ public class LeaderBoard extends HourlyTeamScore {
     // [START DocInclude_WindowAndTrigger]
     // Extract team/score pairs from the event stream, using hour-long windows by default.
     gameEvents
-        .apply(Window.named("LeaderboardTeamFixedWindows")
-          .<GameActionInfo>into(FixedWindows.of(
-              Duration.standardMinutes(options.getTeamWindowDuration())))
+        .apply("LeaderboardTeamFixedWindows", Window.<GameActionInfo>into(
+            FixedWindows.of(Duration.standardMinutes(options.getTeamWindowDuration())))
           // We will get early (speculative) results as well as cumulative
           // processing of late data.
           .triggering(
@@ -215,8 +214,7 @@ public class LeaderBoard extends HourlyTeamScore {
     // Extract user/score pairs from the event stream using processing time, via global windowing.
     // Get periodic updates on all users' running scores.
     gameEvents
-        .apply(Window.named("LeaderboardUserGlobalWindow")
-          .<GameActionInfo>into(new GlobalWindows())
+        .apply("LeaderboardUserGlobalWindow", Window.<GameActionInfo>into(new GlobalWindows())
           // Get periodic results every ten minutes.
               .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()
                   .plusDelayOf(TEN_MINUTES)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
index 084ac7c..56737a4 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/TFIDF.java
@@ -191,8 +191,7 @@ public class TFIDF {
         }
 
         PCollection<KV<URI, String>> oneUriToLines = pipeline
-            .apply(TextIO.Read.from(uriString)
-                .named("TextIO.Read(" + uriString + ")"))
+            .apply("TextIO.Read(" + uriString + ")", TextIO.Read.from(uriString))
             .apply("WithKeys(" + uriString + ")", WithKeys.<URI, String>of(uri));
 
         urisToLines = urisToLines.and(oneUriToLines);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
index 2817622..2d95c97 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/WordCount.java
@@ -109,10 +109,10 @@ public class WordCount {
 
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInput()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInput()))
         .apply(new CountWords())
         .apply(MapElements.via(new FormatAsTextFn()))
-        .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+        .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index ed11781..c0ff85d 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -44,7 +44,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
 
@@ -380,14 +379,14 @@ public class AutoComplete {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    PTransform<? super PBegin, PCollection<String>> readSource =
-            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("WordStream");
-    WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
+
+    WindowFn<Object, ?> windowFn =
+        FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
 
     // Create the pipeline.
     Pipeline p = Pipeline.create(options);
     PCollection<KV<String, List<CompletionCandidate>>> toWrite = p
-      .apply(readSource)
+      .apply("WordStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
       .apply(ParDo.of(new ExtractWordsFn()))
       .apply(Window.<String>into(windowFn)
               .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
index 0828ddc..f456b27 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/JoinExamples.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.TextIO;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -34,7 +33,6 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.values.KV;
-import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TupleTag;
 
@@ -135,22 +133,19 @@ public class JoinExamples {
     options.setExecutionRetryDelay(3000L);
     options.setRunner(FlinkRunner.class);
 
-    PTransform<? super PBegin, PCollection<String>> readSourceA =
-        Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("FirstStream");
-    PTransform<? super PBegin, PCollection<String>> readSourceB =
-        Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)).named("SecondStream");
-
     WindowFn<Object, ?> windowFn = FixedWindows.of(Duration.standardSeconds(options.getWindowSize()));
 
     Pipeline p = Pipeline.create(options);
 
     // the following two 'applys' create multiple inputs to our pipeline, one for each
     // of our two input sources.
-    PCollection<String> streamA = p.apply(readSourceA)
+    PCollection<String> streamA = p
+        .apply("FirstStream", Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
         .apply(Window.<String>into(windowFn)
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());
-    PCollection<String> streamB = p.apply(readSourceB)
+    PCollection<String> streamB = p
+        .apply("SecondStream", Read.from(new UnboundedSocketSource<>("localhost", 9998, '\n', 3)))
         .apply(Window.<String>into(windowFn)
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)
             .discardingFiredPanes());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
index b14c5ae..4e81420 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/KafkaWindowedWordCountExample.java
@@ -132,7 +132,7 @@ public class KafkaWindowedWordCountExample {
         new SimpleStringSchema(), p);
 
     PCollection<String> words = pipeline
-        .apply(Read.named("StreamingWordCount").from(UnboundedFlinkSource.of(kafkaConsumer)))
+        .apply("StreamingWordCount", Read.from(UnboundedFlinkSource.of(kafkaConsumer)))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Window.<String>into(FixedWindows.of(Duration.standardSeconds(options.getWindowSize())))
             .triggering(AfterWatermark.pastEndOfWindow()).withAllowedLateness(Duration.ZERO)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
index f72b705..1b532a7 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/WindowedWordCount.java
@@ -119,7 +119,8 @@ public class WindowedWordCount {
     Pipeline pipeline = Pipeline.create(options);
 
     PCollection<String> words = pipeline
-        .apply(Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)).named("StreamingWordCount"))
+        .apply("StreamingWordCount",
+            Read.from(new UnboundedSocketSource<>("localhost", 9999, '\n', 3)))
         .apply(ParDo.of(new ExtractWordsFn()))
         .apply(Window.<String>into(SlidingWindows.of(Duration.standardSeconds(options.getWindowSize()))
             .every(Duration.standardSeconds(options.getSlide())))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
index d47d285..7ff247a 100644
--- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
+++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java
@@ -2686,7 +2686,7 @@ public class DataflowRunner extends PipelineRunner<DataflowPipelineJob> {
       try {
         Coder<T> coder = transform.getDefaultOutputCoder(input);
         return Pipeline.applyTransform(
-            input, PubsubIO.Read.named("StartingSignal").subscription("_starting_signal/"))
+            "StartingSignal", input, PubsubIO.Read.subscription("_starting_signal/"))
             .apply(ParDo.of(new OutputNullKv()))
             .apply("GlobalSingleton", Window.<KV<Void, Void>>into(new GlobalWindows())
                 .triggering(AfterPane.elementCountAtLeast(1))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
index c3a6a11..e04a1fc 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowPipelineTranslatorTest.java
@@ -127,8 +127,8 @@ public class DataflowPipelineTranslatorTest implements Serializable {
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+     .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
 
     return p;
   }
@@ -458,7 +458,7 @@ public class DataflowPipelineTranslatorTest implements Serializable {
 
     // Create a pipeline that the predefined step will be embedded into
     Pipeline pipeline = Pipeline.create(options);
-    pipeline.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/in"))
+    pipeline.apply("ReadMyFile", TextIO.Read.from("gs://bucket/in"))
         .apply(ParDo.of(new NoOpFn()))
         .apply(new EmbeddedTransform(predefinedStep.clone()))
         .apply(ParDo.of(new NoOpFn()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
----------------------------------------------------------------------
diff --git a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
index e094d0d..999dc3a 100644
--- a/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
+++ b/runners/google-cloud-dataflow-java/src/test/java/org/apache/beam/runners/dataflow/DataflowRunnerTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.runners.dataflow;
 
 import static org.apache.beam.sdk.util.WindowedValue.valueInGlobalWindow;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
@@ -151,8 +152,8 @@ public class DataflowRunnerTest {
     options.setRunner(DataflowRunner.class);
     Pipeline p = Pipeline.create(options);
 
-    p.apply(TextIO.Read.named("ReadMyFile").from("gs://bucket/object"))
-        .apply(TextIO.Write.named("WriteMyFile").to("gs://bucket/object"));
+    p.apply("ReadMyFile", TextIO.Read.from("gs://bucket/object"))
+        .apply("WriteMyFile", TextIO.Write.to("gs://bucket/object"));
 
     return p;
   }
@@ -464,7 +465,7 @@ public class DataflowRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadMyNonGcsFile").from(tmpFolder.newFile().getPath()));
+    p.apply("ReadMyNonGcsFile", TextIO.Read.from(tmpFolder.newFile().getPath()));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -477,11 +478,11 @@ public class DataflowRunnerTest {
   @Test
   public void testNonGcsFilePathInWriteFailure() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage(containsString("expected a valid 'gs://' path but was given"));
-    pc.apply(TextIO.Write.named("WriteMyNonGcsFile").to("/tmp/file"));
+    pc.apply("WriteMyNonGcsFile", TextIO.Write.to("/tmp/file"));
   }
 
   @Test
@@ -489,8 +490,7 @@ public class DataflowRunnerTest {
     ArgumentCaptor<Job> jobCaptor = ArgumentCaptor.forClass(Job.class);
 
     Pipeline p = buildDataflowPipeline(buildPipelineOptions(jobCaptor));
-    p.apply(TextIO.Read.named("ReadInvalidGcsFile")
-        .from("gs://bucket/tmp//file"));
+    p.apply("ReadInvalidGcsFile", TextIO.Read.from("gs://bucket/tmp//file"));
 
     thrown.expectCause(Matchers.allOf(
         instanceOf(IllegalArgumentException.class),
@@ -502,11 +502,11 @@ public class DataflowRunnerTest {
   @Test
   public void testMultiSlashGcsFileWritePath() throws IOException {
     Pipeline p = buildDataflowPipeline(buildPipelineOptions());
-    PCollection<String> pc = p.apply(TextIO.Read.named("ReadMyGcsFile").from("gs://bucket/object"));
+    PCollection<String> pc = p.apply("ReadMyGcsFile", TextIO.Read.from("gs://bucket/object"));
 
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("consecutive slashes");
-    pc.apply(TextIO.Write.named("WriteInvalidGcsFile").to("gs://bucket/tmp//file"));
+    pc.apply("WriteInvalidGcsFile", TextIO.Write.to("gs://bucket/tmp//file"));
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
index da3fa7a..6a3edd7 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SimpleWordCountTest.java
@@ -89,8 +89,7 @@ public class SimpleWordCountTest {
     PCollection<String> output = inputWords.apply(new CountWords());
 
     File outputFile = testFolder.newFile();
-    output.apply(
-        TextIO.Write.named("WriteCounts").to(outputFile.getAbsolutePath()).withoutSharding());
+    output.apply("WriteCounts", TextIO.Write.to(outputFile.getAbsolutePath()).withoutSharding());
 
     EvaluationResult res = SparkRunner.create().run(p);
     res.close();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
index 6a36c8d..7cac705 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/io/BigQueryIO.java
@@ -518,7 +518,6 @@ public class BigQueryIO {
               + " query without a result flattening preference");
         }
 
-        // Only verify existence/correctness if validation is enabled.
         if (validate) {
           // Check for source table/query presence for early failure notification.
           // Note that a presence check can fail if the table or dataset are created by earlier

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
index da886de..6e26d33 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOGeneratedClassTest.java
@@ -136,6 +136,18 @@ public class AvroIOGeneratedClassTest {
     return users;
   }
 
+  <T> void runTestRead(
+      String applyName, AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
+      throws Exception {
+    generateAvroFile(generateAvroObjects());
+
+    TestPipeline p = TestPipeline.create();
+    PCollection<T> output = p.apply(applyName, read);
+    PAssert.that(output).containsInAnyOrder(expectedOutput);
+    p.run();
+    assertEquals(expectedName, output.getName());
+  }
+
   <T> void runTestRead(AvroIO.Read.Bound<T> read, String expectedName, T[] expectedOutput)
       throws Exception {
     generateAvroFile(generateAvroObjects());
@@ -158,28 +170,16 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
         "MyRead/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()),
         "MyRead/Read.out",
         generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(AvroGeneratedUser.class),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.withSchema(AvroGeneratedUser.class).named("HerRead").from(avroFile.getPath()),
-        "HerRead/Read.out",
-        generateAvroObjects());
-    runTestRead(
-        AvroIO.Read.withSchema(AvroGeneratedUser.class).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
         "HerRead/Read.out",
         generateAvroObjects());
   }
@@ -195,28 +195,20 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schema),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(schema).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.withSchema(schema).from(avroFile.getPath()),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(schema).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schema),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schema).named("HerRead").from(avroFile.getPath()),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "HerRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schema).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schema),
         "HerRead/Read.out",
         generateAvroGenericRecords());
   }
@@ -232,28 +224,12 @@ public class AvroIOGeneratedClassTest {
         AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
         "AvroIO.Read/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").from(avroFile.getPath()).withSchema(schemaString),
-        "MyRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.named("MyRead").withSchema(schemaString).from(avroFile.getPath()),
+    runTestRead("MyRead",
+        AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString),
         "MyRead/Read.out",
         generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).withSchema(schemaString).named("HerRead"),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.from(avroFile.getPath()).named("HerRead").withSchema(schemaString),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schemaString).named("HerRead").from(avroFile.getPath()),
-        "HerRead/Read.out",
-        generateAvroGenericRecords());
-    runTestRead(
-        AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()).named("HerRead"),
+    runTestRead("HerRead",
+        AvroIO.Read.withSchema(schemaString).from(avroFile.getPath()),
         "HerRead/Read.out",
         generateAvroGenericRecords());
   }
@@ -276,106 +252,34 @@ public class AvroIOGeneratedClassTest {
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromGeneratedClass() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(AvroGeneratedUser.class)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(AvroGeneratedUser.class),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(AvroGeneratedUser.class)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(AvroGeneratedUser.class),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(AvroGeneratedUser.class).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromSchema() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schema),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(schema),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(schema)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schema)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(schema),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schema)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(schema),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(schema).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   @Test
   @Category(NeedsRunner.class)
   public void testWriteFromSchemaString() throws Exception {
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schemaString),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .to(avroFile.getPath()),
-                 "AvroIO.Write");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .to(avroFile.getPath())
-                             .withSchema(schemaString),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.named("MyWrite")
-                             .withSchema(schemaString)
-                             .to(avroFile.getPath()),
-                 "MyWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .withSchema(schemaString)
-                             .named("HerWrite"),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.to(avroFile.getPath())
-                             .named("HerWrite")
-                             .withSchema(schemaString),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .named("HerWrite")
-                             .to(avroFile.getPath()),
-                 "HerWrite");
-    runTestWrite(AvroIO.Write.withSchema(schemaString)
-                             .to(avroFile.getPath())
-                             .named("HerWrite"),
-                 "HerWrite");
+    runTestWrite(
+        AvroIO.Write.to(avroFile.getPath()).withSchema(schemaString),
+        "AvroIO.Write");
+    runTestWrite(
+        AvroIO.Write.withSchema(schemaString).to(avroFile.getPath()),
+        "AvroIO.Write");
   }
 
   // TODO: for Write only, test withSuffix, withNumShards,

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 13c1bcf..8625b10 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -18,6 +18,7 @@
 package org.apache.beam.sdk.io;
 
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -81,10 +82,6 @@ public class AvroIOTest {
   public void testAvroIOGetName() {
     assertEquals("AvroIO.Read", AvroIO.Read.from("gs://bucket/foo*/baz").getName());
     assertEquals("AvroIO.Write", AvroIO.Write.to("gs://bucket/foo/baz").getName());
-    assertEquals("ReadMyFile",
-        AvroIO.Read.named("ReadMyFile").from("gs://bucket/foo*/baz").getName());
-    assertEquals("WriteMyFile",
-        AvroIO.Write.named("WriteMyFile").to("gs://bucket/foo/baz").getName());
   }
 
   @DefaultCoder(AvroCoder.class)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
index a1daf72..f0d3fce 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/BigQueryIOTest.java
@@ -340,8 +340,7 @@ public class BigQueryIOTest implements Serializable {
     checkReadTableObjectWithValidate(bound, project, dataset, table, true);
   }
 
-  private void checkReadQueryObject(
-      BigQueryIO.Read.Bound bound, String query) {
+  private void checkReadQueryObject(BigQueryIO.Read.Bound bound, String query) {
     checkReadQueryObjectWithValidate(bound, query, true);
   }
 
@@ -393,15 +392,13 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildTableBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("foo.com:project:somedataset.sometable");
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from("foo.com:project:somedataset.sometable");
     checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
   }
 
   @Test
   public void testBuildQueryBasedSource() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyQuery")
-        .fromQuery("foo_query");
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.fromQuery("foo_query");
     checkReadQueryObject(bound, "foo_query");
   }
 
@@ -409,8 +406,8 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildTableBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("foo.com:project:somedataset.sometable").withoutValidation();
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("foo.com:project:somedataset.sometable").withoutValidation();
     checkReadTableObjectWithValidate(bound, "foo.com:project", "somedataset", "sometable", false);
   }
 
@@ -418,15 +415,15 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildQueryBasedSourceWithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .fromQuery("some_query").withoutValidation();
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.fromQuery("some_query").withoutValidation();
     checkReadQueryObjectWithValidate(bound, "some_query", false);
   }
 
   @Test
   public void testBuildTableBasedSourceWithDefaultProject() {
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from("somedataset.sometable");
+    BigQueryIO.Read.Bound bound =
+        BigQueryIO.Read.from("somedataset.sometable");
     checkReadTableObject(bound, null, "somedataset", "sometable");
   }
 
@@ -436,8 +433,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Read.Bound bound = BigQueryIO.Read.named("ReadMyTable")
-        .from(table);
+    BigQueryIO.Read.Bound bound = BigQueryIO.Read.from(table);
     checkReadTableObject(bound, "foo.com:project", "somedataset", "sometable");
   }
 
@@ -457,18 +453,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
-    p.apply(BigQueryIO.Read.named("ReadMyTable").from(tableRef));
-  }
-
-  @Test
-  @Category(RunnableOnService.class)
-  public void testBuildSourceWithoutTableOrQuery() {
-    Pipeline p = TestPipeline.create();
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(
-        "Invalid BigQuery read operation, either table reference or query has to be set");
-    p.apply(BigQueryIO.Read.named("ReadMyTable"));
-    p.run();
+    p.apply(BigQueryIO.Read.from(tableRef));
   }
 
   @Test
@@ -490,8 +475,8 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies both a query and a table, only one of these"
         + " should be provided");
-    p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .fromQuery("query"));
     p.run();
@@ -505,8 +490,8 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage(
         "Invalid BigQuery read operation. Specifies a"
               + " table with a result flattening preference, which is not configurable");
-    p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+    p.apply("ReadMyTable",
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .withoutResultFlattening());
     p.run();
@@ -521,7 +506,7 @@ public class BigQueryIOTest implements Serializable {
         "Invalid BigQuery read operation. Specifies a"
               + " table with a result flattening preference, which is not configurable");
     p.apply(
-        BigQueryIO.Read.named("ReadMyTable")
+        BigQueryIO.Read
             .from("foo.com:project:somedataset.sometable")
             .withoutValidation()
             .withoutResultFlattening());
@@ -644,8 +629,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSink() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable");
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("foo.com:project:somedataset.sometable");
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -655,8 +639,8 @@ public class BigQueryIOTest implements Serializable {
   public void testBuildSinkwithoutValidation() {
     // This test just checks that using withoutValidation will not trigger object
     // construction errors.
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable").withoutValidation();
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withoutValidation();
     checkWriteObjectWithValidate(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY, false);
@@ -664,8 +648,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkDefaultProject() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("somedataset.sometable");
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to("somedataset.sometable");
     checkWriteObject(
         bound, null, "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -677,8 +660,7 @@ public class BigQueryIOTest implements Serializable {
         .setProjectId("foo.com:project")
         .setDatasetId("somedataset")
         .setTableId("sometable");
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to(table);
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write.to(table);
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         null, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -691,14 +673,14 @@ public class BigQueryIOTest implements Serializable {
     thrown.expect(IllegalStateException.class);
     thrown.expectMessage("must set the table reference");
     p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-        .apply(BigQueryIO.Write.named("WriteMyTable"));
+        .apply(BigQueryIO.Write.withoutValidation());
   }
 
   @Test
   public void testBuildSinkWithSchema() {
     TableSchema schema = new TableSchema();
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
-        .to("foo.com:project:somedataset.sometable").withSchema(schema);
+    BigQueryIO.Write.Bound bound =
+        BigQueryIO.Write.to("foo.com:project:somedataset.sometable").withSchema(schema);
     checkWriteObject(
         bound, "foo.com:project", "somedataset", "sometable",
         schema, CreateDisposition.CREATE_IF_NEEDED, WriteDisposition.WRITE_EMPTY);
@@ -706,7 +688,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithCreateDispositionNever() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_NEVER);
     checkWriteObject(
@@ -716,7 +698,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithCreateDispositionIfNeeded() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED);
     checkWriteObject(
@@ -726,7 +708,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionTruncate() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_TRUNCATE);
     checkWriteObject(
@@ -736,7 +718,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionAppend() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_APPEND);
     checkWriteObject(
@@ -746,7 +728,7 @@ public class BigQueryIOTest implements Serializable {
 
   @Test
   public void testBuildSinkWithWriteDispositionEmpty() {
-    BigQueryIO.Write.Bound bound = BigQueryIO.Write.named("WriteMyTable")
+    BigQueryIO.Write.Bound bound = BigQueryIO.Write
         .to("foo.com:project:somedataset.sometable")
         .withWriteDisposition(WriteDisposition.WRITE_EMPTY);
     checkWriteObject(
@@ -794,7 +776,7 @@ public class BigQueryIOTest implements Serializable {
         Matchers.either(Matchers.containsString("Unable to confirm BigQuery dataset presence"))
             .or(Matchers.containsString("BigQuery dataset not found for table")));
     p.apply(Create.<TableRow>of().withCoder(TableRowJsonCoder.of()))
-     .apply(BigQueryIO.Write.named("WriteMyTable")
+     .apply(BigQueryIO.Write
          .to(tableRef)
          .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
          .withSchema(new TableSchema()));
@@ -878,8 +860,6 @@ public class BigQueryIOTest implements Serializable {
   public void testBigQueryIOGetName() {
     assertEquals("BigQueryIO.Read", BigQueryIO.Read.from("somedataset.sometable").getName());
     assertEquals("BigQueryIO.Write", BigQueryIO.Write.to("somedataset.sometable").getName());
-    assertEquals("ReadMyTable", BigQueryIO.Read.named("ReadMyTable").getName());
-    assertEquals("WriteMyTable", BigQueryIO.Write.named("WriteMyTable").getName());
   }
 
   @Test
@@ -915,7 +895,7 @@ public class BigQueryIOTest implements Serializable {
     thrown.expectMessage("must set the table reference of a BigQueryIO.Write transform");
     TestPipeline.create()
         .apply(Create.<TableRow>of())
-        .apply(BigQueryIO.Write.named("name"));
+        .apply("name", BigQueryIO.Write.withoutValidation());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
index b0c577d..c9f4079 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.testing.SourceTestUtils.readFromSource;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -717,7 +718,7 @@ public class FileBasedSourceTest {
     File file = createFileWithData(fileName, data);
 
     TestFileBasedSource source = new TestFileBasedSource(file.getPath(), 64, null);
-    PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<String> output = p.apply("ReadFileData", Read.from(source));
 
     PAssert.that(output).containsInAnyOrder(data);
     p.run();
@@ -743,7 +744,7 @@ public class FileBasedSourceTest {
     TestFileBasedSource source =
         new TestFileBasedSource(new File(file1.getParent(), "file*").getPath(), 64, null);
 
-    PCollection<String> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<String> output = p.apply("ReadFileData", Read.from(source));
 
     List<String> expectedResults = new ArrayList<String>();
     expectedResults.addAll(data1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
index eaf452d..efa1cd2 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/PubsubIOTest.java
@@ -45,10 +45,6 @@ public class PubsubIOTest {
         PubsubIO.Read.topic("projects/myproject/topics/mytopic").getName());
     assertEquals("PubsubIO.Write",
         PubsubIO.Write.topic("projects/myproject/topics/mytopic").getName());
-    assertEquals("ReadMyTopic",
-        PubsubIO.Read.named("ReadMyTopic").topic("projects/myproject/topics/mytopic").getName());
-    assertEquals("WriteMyTopic",
-        PubsubIO.Write.named("WriteMyTopic").topic("projects/myproject/topics/mytopic").getName());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
index c3a5084..df598c8 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/TextIOTest.java
@@ -22,6 +22,7 @@ import static org.apache.beam.sdk.TestUtils.LINES_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_INTS_ARRAY;
 import static org.apache.beam.sdk.TestUtils.NO_LINES_ARRAY;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -44,14 +45,12 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.SourceTestUtils;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.GcsUtil;
 import org.apache.beam.sdk.util.IOChannelUtils;
 import org.apache.beam.sdk.util.gcsfs.GcsPath;
 import org.apache.beam.sdk.values.PCollection;
-import org.apache.beam.sdk.values.PDone;
 
 import com.google.common.collect.ImmutableList;
 
@@ -167,16 +166,9 @@ public class TextIOTest {
     }
 
     {
-      PCollection<String> output2 =
-          p.apply(TextIO.Read.named("MyRead").from(file));
+      PCollection<String> output2 = p.apply("MyRead", TextIO.Read.from(file));
       assertEquals("MyRead/Read.out", output2.getName());
     }
-
-    {
-      PCollection<String> output3 =
-          p.apply(TextIO.Read.from(file).named("HerRead"));
-      assertEquals("HerRead/Read.out", output3.getName());
-    }
   }
 
   @Test
@@ -299,27 +291,6 @@ public class TextIOTest {
   }
 
   @Test
-  public void testWriteNamed() {
-    {
-      PTransform<PCollection<String>, PDone> transform1 =
-        TextIO.Write.to("/tmp/file.txt");
-      assertEquals("TextIO.Write", transform1.getName());
-    }
-
-    {
-      PTransform<PCollection<String>, PDone> transform2 =
-          TextIO.Write.named("MyWrite").to("/tmp/file.txt");
-      assertEquals("MyWrite", transform2.getName());
-    }
-
-    {
-      PTransform<PCollection<String>, PDone> transform3 =
-          TextIO.Write.to("/tmp/file.txt").named("HerWrite");
-      assertEquals("HerWrite", transform3.getName());
-    }
-  }
-
-  @Test
   @Category(NeedsRunner.class)
   public void testShardedWrite() throws Exception {
     runTestWrite(LINES_ARRAY, StringUtf8Coder.of(), 5);
@@ -620,12 +591,8 @@ public class TextIOTest {
   public void testTextIOGetName() {
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").getName());
     assertEquals("TextIO.Write", TextIO.Write.to("somefile").getName());
-    assertEquals("ReadMyFile", TextIO.Read.named("ReadMyFile").from("somefile").getName());
-    assertEquals("WriteMyFile", TextIO.Write.named("WriteMyFile").to("somefile").getName());
 
     assertEquals("TextIO.Read", TextIO.Read.from("somefile").toString());
-    assertEquals(
-        "ReadMyFile [TextIO.Read]", TextIO.Read.named("ReadMyFile").from("somefile").toString());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
index eb65468..37e3881 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/XmlSourceTest.java
@@ -21,6 +21,7 @@ import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionE
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionFails;
 import static org.apache.beam.sdk.testing.SourceTestUtils.assertSplitAtFractionSucceedsAndConsistent;
 import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem;
+
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.containsString;
@@ -582,7 +583,7 @@ public class XmlSourceTest {
             .withRecordClass(Train.class)
             .withMinBundleSize(1024);
 
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     List<Train> expectedResults =
         ImmutableList.of(new Train("Thomas", 1, "blue", null), new Train("Henry", 3, "green", null),
@@ -672,7 +673,7 @@ public class XmlSourceTest {
             .withRecordElement("train")
             .withRecordClass(Train.class)
             .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     PAssert.that(output).containsInAnyOrder(trains);
     p.run();
@@ -814,13 +815,13 @@ public class XmlSourceTest {
 
     Pipeline p = TestPipeline.create();
 
-    XmlSource<Train> source = XmlSource.<Train>from(file.getParent() + "/"
-        + "temp*.xml")
-                                  .withRootElement("trains")
-                                  .withRecordElement("train")
-                                  .withRecordClass(Train.class)
-                                  .withMinBundleSize(1024);
-    PCollection<Train> output = p.apply(Read.from(source).named("ReadFileData"));
+    XmlSource<Train> source =
+        XmlSource.<Train>from(file.getParent() + "/" + "temp*.xml")
+            .withRootElement("trains")
+            .withRecordElement("train")
+            .withRecordClass(Train.class)
+            .withMinBundleSize(1024);
+    PCollection<Train> output = p.apply("ReadFileData", Read.from(source));
 
     List<Train> expectedResults = new ArrayList<>();
     expectedResults.addAll(trains1);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
index 0c992c4..08c3996 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/runners/TransformTreeTest.java
@@ -120,9 +120,9 @@ public class TransformTreeTest {
 
     Pipeline p = TestPipeline.create();
 
-    p.apply(TextIO.Read.named("ReadMyFile").from(inputFile.getPath()))
+    p.apply("ReadMyFile", TextIO.Read.from(inputFile.getPath()))
         .apply(Sample.<String>any(10))
-        .apply(TextIO.Write.named("WriteMyFile").to(outputFile.getPath()));
+        .apply("WriteMyFile", TextIO.Write.to(outputFile.getPath()));
 
     final EnumSet<TransformsSeen> visited =
         EnumSet.noneOf(TransformsSeen.class);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index c858f32..76bc038 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -119,11 +119,11 @@ public class WindowTest implements Serializable {
     FixedWindows fixed25 = FixedWindows.of(Duration.standardMinutes(25));
     WindowingStrategy<?, ?> strategy = TestPipeline.create()
         .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
-        .apply(Window.named("WindowInto10").<String>into(fixed10)
+        .apply("WindowInto10", Window.<String>into(fixed10)
             .withAllowedLateness(Duration.standardDays(1))
             .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
             .accumulatingFiredPanes())
-        .apply(Window.named("WindowInto25").<String>into(fixed25))
+        .apply("WindowInto25", Window.<String>into(fixed25))
         .getWindowingStrategy();
 
     assertEquals(Duration.standardDays(1), strategy.getAllowedLateness());
@@ -272,7 +272,7 @@ public class WindowTest implements Serializable {
 
   @Test
   public void testDisplayDataExcludesUnspecifiedProperties() {
-    Window.Bound<?> onlyHasAccumulationMode = Window.named("foobar").discardingFiredPanes();
+    Window.Bound<?> onlyHasAccumulationMode = Window.discardingFiredPanes();
     assertThat(DisplayData.from(onlyHasAccumulationMode), not(hasDisplayItem(hasKey(isOneOf(
         "windowFn",
         "trigger",

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
index 21f58df..c1e092a 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java
@@ -221,7 +221,7 @@ public class WindowingTest implements Serializable {
 
     Pipeline p = TestPipeline.create();
     PCollection<String> output = p.begin()
-        .apply(TextIO.Read.named("ReadLines").from(filename))
+        .apply("ReadLines", TextIO.Read.from(filename))
         .apply(ParDo.of(new ExtractWordsWithTimestampsFn()))
         .apply(new WindowedCount(FixedWindows.of(Duration.millis(10))));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
index 3306cb4..c0e5b17 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/DebuggingWordCount.java
@@ -174,7 +174,7 @@ public class DebuggingWordCount {
     Pipeline p = Pipeline.create(options);
 
     PCollection<KV<String, Long>> filteredWords =
-        p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+        p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
          .apply(new WordCount.CountWords())
          .apply(ParDo.of(new FilterTextFn(options.getFilterPattern())));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fd0bd51/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
----------------------------------------------------------------------
diff --git a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
index 07ed6d0..803e800 100644
--- a/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
+++ b/sdks/java/maven-archetypes/examples/src/main/resources/archetype-resources/src/main/java/WordCount.java
@@ -195,10 +195,10 @@ public class WordCount {
 
     // Concepts #2 and #3: Our pipeline applies the composite CountWords transform, and passes the
     // static FormatAsTextFn() to the ParDo transform.
-    p.apply(TextIO.Read.named("ReadLines").from(options.getInputFile()))
+    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()))
      .apply(new CountWords())
      .apply(ParDo.of(new FormatAsTextFn()))
-     .apply(TextIO.Write.named("WriteCounts").to(options.getOutput()));
+     .apply("WriteCounts", TextIO.Write.to(options.getOutput()));
 
     p.run();
   }


Mime
View raw message