beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Port examples to new DoFn
Date Thu, 04 Aug 2016 18:54:36 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 734bfb9af -> 74c5e5e1b


Port examples to new DoFn

Port example tests to new DoFn

Port TfIdf example to new DoFn

Port TopWikipediaSessions example to new DoFn

Port GameState Java 8 example to new DoFn

Port the UserScore example to new DoFn

Port StreamingWordExtract example to new DoFn

Port TrafficMaxLaneFlow to new DoFn

Port TrafficeRoutes example to new DoFn

Port DatastoreWordCount example to new DoFn

Port BigQueryTornadoes example to new DoFn

Port MaxPerKeyExamples to new DoFn

Port CombinePerKeyExamples to new DoFn

Port TriggerExample to new DoFn

Port JoinExamples to new DoFn

Port FilterExamples to new DoFn

Fix mention of DoFn in WordCountTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/71e027dc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/71e027dc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/71e027dc

Branch: refs/heads/master
Commit: 71e027dc1ff7d5de0eea82278427546c07e26e8f
Parents: 734bfb9
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Aug 3 18:54:22 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Aug 4 11:54:29 2016 -0700

----------------------------------------------------------------------
 .../examples/complete/StreamingWordExtract.java | 18 ++++++------
 .../apache/beam/examples/complete/TfIdf.java    | 28 +++++++++---------
 .../examples/complete/TopWikipediaSessions.java | 31 +++++++++-----------
 .../examples/complete/TrafficMaxLaneFlow.java   | 16 +++++-----
 .../beam/examples/complete/TrafficRoutes.java   | 20 ++++++-------
 .../examples/cookbook/BigQueryTornadoes.java    | 10 +++----
 .../cookbook/CombinePerKeyExamples.java         | 10 +++----
 .../examples/cookbook/DatastoreWordCount.java   | 14 ++++-----
 .../beam/examples/cookbook/FilterExamples.java  | 20 ++++++-------
 .../beam/examples/cookbook/JoinExamples.java    | 18 ++++++------
 .../examples/cookbook/MaxPerKeyExamples.java    | 10 +++----
 .../beam/examples/cookbook/TriggerExample.java  | 25 ++++++++--------
 .../org/apache/beam/examples/WordCountTest.java |  3 +-
 .../examples/complete/AutoCompleteTest.java     |  6 ++--
 .../examples/cookbook/TriggerExampleTest.java   |  6 ++--
 .../beam/examples/complete/game/GameStats.java  | 22 +++++++-------
 .../beam/examples/complete/game/UserScore.java  |  6 ++--
 .../examples/complete/game/UserScoreTest.java   |  2 +-
 18 files changed, 130 insertions(+), 135 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
index b0c9ffd..3f30f21 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/StreamingWordExtract.java
@@ -28,7 +28,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
 import com.google.api.services.bigquery.model.TableFieldSchema;
@@ -55,9 +55,9 @@ import java.util.ArrayList;
  */
 public class StreamingWordExtract {
 
-  /** A OldDoFn that tokenizes lines of text into individual words. */
-  static class ExtractWords extends OldDoFn<String, String> {
-    @Override
+  /** A {@link DoFn} that tokenizes lines of text into individual words. */
+  static class ExtractWords extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] words = c.element().split("[^a-zA-Z']+");
       for (String word : words) {
@@ -68,9 +68,9 @@ public class StreamingWordExtract {
     }
   }
 
-  /** A OldDoFn that uppercases a word. */
-  static class Uppercase extends OldDoFn<String, String> {
-    @Override
+  /** A {@link DoFn} that uppercases a word. */
+  static class Uppercase extends DoFn<String, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(c.element().toUpperCase());
     }
@@ -79,11 +79,11 @@ public class StreamingWordExtract {
   /**
    * Converts strings into BigQuery rows.
    */
-  static class StringToRowConverter extends OldDoFn<String, TableRow> {
+  static class StringToRowConverter extends DoFn<String, TableRow> {
     /**
      * In this example, put the whole string into single BigQuery field.
      */
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(new TableRow().set("string_field", c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
index 470a689..76b6b6a 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TfIdf.java
@@ -30,9 +30,9 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.Keys;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.RemoveDuplicates;
@@ -225,8 +225,8 @@ public class TfIdf {
       // of the words in the document associated with that that URI.
       PCollection<KV<URI, String>> uriToWords = uriToContent
           .apply("SplitWords", ParDo.of(
-              new OldDoFn<KV<URI, String>, KV<URI, String>>() {
-                @Override
+              new DoFn<KV<URI, String>, KV<URI, String>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   String line = c.element().getValue();
@@ -268,8 +268,8 @@ public class TfIdf {
       // by the URI key.
       PCollection<KV<URI, KV<String, Long>>> uriToWordAndCount = uriAndWordToCount
           .apply("ShiftKeys", ParDo.of(
-              new OldDoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
-                @Override
+              new DoFn<KV<KV<URI, String>, Long>, KV<URI, KV<String, Long>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey().getKey();
                   String word = c.element().getKey().getValue();
@@ -307,8 +307,8 @@ public class TfIdf {
       // divided by the total number of words in the document.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTf = uriToWordAndCountAndTotal
           .apply("ComputeTermFrequencies", ParDo.of(
-              new OldDoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
+              new DoFn<KV<URI, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   URI uri = c.element().getKey();
                   Long wordTotal = c.element().getValue().getOnly(wordTotalsTag);
@@ -328,12 +328,12 @@ public class TfIdf {
       // documents in which the word appears divided by the total
       // number of documents in the corpus. Note how the total number of
       // documents is passed as a side input; the same value is
-      // presented to each invocation of the OldDoFn.
+      // presented to each invocation of the DoFn.
       PCollection<KV<String, Double>> wordToDf = wordToDocCount
           .apply("ComputeDocFrequencies", ParDo
               .withSideInputs(totalDocuments)
-              .of(new OldDoFn<KV<String, Long>, KV<String, Double>>() {
-                @Override
+              .of(new DoFn<KV<String, Long>, KV<String, Double>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Long documentCount = c.element().getValue();
@@ -361,8 +361,8 @@ public class TfIdf {
       // divided by the log of the document frequency.
       PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = wordToUriAndTfAndDf
           .apply("ComputeTfIdf", ParDo.of(
-              new OldDoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
-                @Override
+              new DoFn<KV<String, CoGbkResult>, KV<String, KV<URI, Double>>>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   String word = c.element().getKey();
                   Double df = c.element().getValue().getOnly(dfTag);
@@ -400,8 +400,8 @@ public class TfIdf {
     @Override
     public PDone apply(PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf) {
       return wordToUriAndTfIdf
-          .apply("Format", ParDo.of(new OldDoFn<KV<String, KV<URI, Double>>, String>() {
-            @Override
+          .apply("Format", ParDo.of(new DoFn<KV<String, KV<URI, Double>>, String>() {
+            @ProcessElement
             public void processElement(ProcessContext c) {
               c.output(String.format("%s,\t%s,\t%f",
                   c.element().getKey(),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
index 0ed89d2..aff41cc 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TopWikipediaSessions.java
@@ -26,12 +26,12 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableComparator;
 import org.apache.beam.sdk.transforms.Top;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.CalendarWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.Sessions;
@@ -85,8 +85,8 @@ public class TopWikipediaSessions {
   /**
    * Extracts user and timestamp from a TableRow representing a Wikipedia edit.
    */
-  static class ExtractUserAndTimestamp extends OldDoFn<TableRow, String> {
-    @Override
+  static class ExtractUserAndTimestamp extends DoFn<TableRow, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = c.element();
       int timestamp = (Integer) row.get("timestamp");
@@ -132,24 +132,21 @@ public class TopWikipediaSessions {
     }
   }
 
-  static class SessionsToStringsDoFn extends OldDoFn<KV<String, Long>, KV<String, Long>>
-      implements RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
+  static class SessionsToStringsDoFn extends DoFn<KV<String, Long>, KV<String, Long>> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       c.output(KV.of(
-          c.element().getKey() + " : " + c.window(), c.element().getValue()));
+          c.element().getKey() + " : " + window, c.element().getValue()));
     }
   }
 
-  static class FormatOutputDoFn extends OldDoFn<List<KV<String, Long>>, String>
-      implements RequiresWindowAccess {
-    @Override
-    public void processElement(ProcessContext c) {
+  static class FormatOutputDoFn extends DoFn<List<KV<String, Long>>, String> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
       for (KV<String, Long> item : c.element()) {
         String session = item.getKey();
         long count = item.getValue();
-        c.output(session + " : " + count + " : " + ((IntervalWindow) c.window()).start());
+        c.output(session + " : " + count + " : " + ((IntervalWindow) window).start());
       }
     }
   }
@@ -168,8 +165,8 @@ public class TopWikipediaSessions {
           .apply(ParDo.of(new ExtractUserAndTimestamp()))
 
           .apply("SampleUsers", ParDo.of(
-              new OldDoFn<String, String>() {
-                @Override
+              new DoFn<String, String>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   if (Math.abs(c.element().hashCode()) <= Integer.MAX_VALUE * samplingThreshold) {
                     c.output(c.element());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
index 9122015..394b432 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficMaxLaneFlow.java
@@ -30,7 +30,7 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -145,12 +145,12 @@ public class TrafficMaxLaneFlow {
   /**
    * Extract the timestamp field from the input string, and use it as the element timestamp.
    */
-  static class ExtractTimestamps extends OldDoFn<String, String> {
+  static class ExtractTimestamps extends DoFn<String, String> {
     private static final DateTimeFormatter dateTimeFormat =
         DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
 
-    @Override
-    public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
       String[] items = c.element().split(",");
       if (items.length > 0) {
         try {
@@ -170,9 +170,9 @@ public class TrafficMaxLaneFlow {
    * information. The number of lanes for which data is present depends upon which freeway the data
    * point comes from.
    */
-  static class ExtractFlowInfoFn extends OldDoFn<String, KV<String, LaneInfo>> {
+  static class ExtractFlowInfoFn extends DoFn<String, KV<String, LaneInfo>> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] items = c.element().split(",");
       if (items.length < 48) {
@@ -226,8 +226,8 @@ public class TrafficMaxLaneFlow {
    * Format the results of the Max Lane flow calculation to a TableRow, to save to BigQuery.
    * Add the timestamp from the window context.
    */
-  static class FormatMaxesFn extends OldDoFn<KV<String, LaneInfo>, TableRow> {
-    @Override
+  static class FormatMaxesFn extends DoFn<KV<String, LaneInfo>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
 
       LaneInfo laneInfo = c.element().getValue();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
index 30091b6..ef716e9 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/TrafficRoutes.java
@@ -29,8 +29,8 @@ import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
 import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
@@ -149,12 +149,12 @@ public class TrafficRoutes {
   /**
    * Extract the timestamp field from the input string, and use it as the element timestamp.
    */
-  static class ExtractTimestamps extends OldDoFn<String, String> {
+  static class ExtractTimestamps extends DoFn<String, String> {
     private static final DateTimeFormatter dateTimeFormat =
         DateTimeFormat.forPattern("MM/dd/yyyy HH:mm:ss");
 
-    @Override
-    public void processElement(OldDoFn<String, String>.ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(DoFn<String, String>.ProcessContext c) throws Exception {
       String[] items = c.element().split(",");
       String timestamp = tryParseTimestamp(items);
       if (timestamp != null) {
@@ -171,9 +171,9 @@ public class TrafficRoutes {
    * Filter out readings for the stations along predefined 'routes', and output
    * (station, speed info) keyed on route.
    */
-  static class ExtractStationSpeedFn extends OldDoFn<String, KV<String, StationSpeed>> {
+  static class ExtractStationSpeedFn extends DoFn<String, KV<String, StationSpeed>> {
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] items = c.element().split(",");
       String stationType = tryParseStationType(items);
@@ -200,8 +200,8 @@ public class TrafficRoutes {
    * Note: these calculations are for example purposes only, and are unrealistic and oversimplified.
    */
   static class GatherStats
-      extends OldDoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
-    @Override
+      extends DoFn<KV<String, Iterable<StationSpeed>>, KV<String, RouteInfo>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws IOException {
       String route = c.element().getKey();
       double speedSum = 0.0;
@@ -243,8 +243,8 @@ public class TrafficRoutes {
   /**
    * Format the results of the slowdown calculations to a TableRow, to save to BigQuery.
    */
-  static class FormatStatsFn extends OldDoFn<KV<String, RouteInfo>, TableRow> {
-    @Override
+  static class FormatStatsFn extends DoFn<KV<String, RouteInfo>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       RouteInfo routeInfo = c.element().getValue();
       TableRow row = new TableRow()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
index 6002b11..09d9c29 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryTornadoes.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Count;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -81,8 +81,8 @@ public class BigQueryTornadoes {
    * Examines each row in the input table. If a tornado was recorded
    * in that sample, the month in which it occurred is output.
    */
-  static class ExtractTornadoesFn extends OldDoFn<TableRow, Integer> {
-    @Override
+  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+    @ProcessElement
     public void processElement(ProcessContext c){
       TableRow row = c.element();
       if ((Boolean) row.get("tornado")) {
@@ -95,8 +95,8 @@ public class BigQueryTornadoes {
    * Prepares the data for writing to BigQuery by building a TableRow object containing an
    * integer representation of month and the number of tornadoes that occurred in each month.
    */
-  static class FormatCountsFn extends OldDoFn<KV<Integer, Long>, TableRow> {
-    @Override
+  static class FormatCountsFn extends DoFn<KV<Integer, Long>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = new TableRow()
           .set("month", c.element().getKey())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
index d0bce5d..67918a3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/CombinePerKeyExamples.java
@@ -26,7 +26,7 @@ import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -90,11 +90,11 @@ public class CombinePerKeyExamples {
    * Examines each row in the input table. If the word is greater than or equal to MIN_WORD_LENGTH,
    * outputs word, play_name.
    */
-  static class ExtractLargeWordsFn extends OldDoFn<TableRow, KV<String, String>> {
+  static class ExtractLargeWordsFn extends DoFn<TableRow, KV<String, String>> {
     private final Aggregator<Long, Long> smallerWords =
         createAggregator("smallerWords", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c){
       TableRow row = c.element();
       String playName = (String) row.get("corpus");
@@ -114,8 +114,8 @@ public class CombinePerKeyExamples {
    * Prepares the data for writing to BigQuery by building a TableRow object
    * containing a word with a string listing the plays in which it appeared.
    */
-  static class FormatShakespeareOutputFn extends OldDoFn<KV<String, String>, TableRow> {
-    @Override
+  static class FormatShakespeareOutputFn extends DoFn<KV<String, String>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = new TableRow()
           .set("word", c.element().getKey())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
index 1850e89..21220b8 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/DatastoreWordCount.java
@@ -32,8 +32,8 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 
 import com.google.datastore.v1beta3.Entity;
@@ -79,11 +79,11 @@ import javax.annotation.Nullable;
 public class DatastoreWordCount {
 
   /**
-   * A OldDoFn that gets the content of an entity (one line in a
+   * A {@link DoFn} that gets the content of an entity (one line in a
    * Shakespeare play) and converts it to a string.
    */
-  static class GetContentFn extends OldDoFn<Entity, String> {
-    @Override
+  static class GetContentFn extends DoFn<Entity, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       Map<String, Value> props = c.element().getProperties();
       Value value = props.get("content");
@@ -108,9 +108,9 @@ public class DatastoreWordCount {
   }
 
   /**
-   * A OldDoFn that creates entity for every line in Shakespeare.
+   * A {@link DoFn} that creates an entity for every line in Shakespeare.
    */
-  static class CreateEntityFn extends OldDoFn<String, Entity> {
+  static class CreateEntityFn extends DoFn<String, Entity> {
     private final String namespace;
     private final String kind;
     private final Key ancestorKey;
@@ -140,7 +140,7 @@ public class DatastoreWordCount {
       return entityBuilder.build();
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       c.output(makeEntity(c.element()));
     }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
index 06fba77..9a0f7a2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/FilterExamples.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.View;
@@ -98,8 +98,8 @@ public class FilterExamples {
    * Examines each row in the input table. Outputs only the subset of the cells this example
    * is interested in-- the mean_temp and year, month, and day-- as a bigquery table row.
    */
-  static class ProjectionFn extends OldDoFn<TableRow, TableRow> {
-    @Override
+  static class ProjectionFn extends DoFn<TableRow, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c){
       TableRow row = c.element();
       // Grab year, month, day, mean_temp from the row
@@ -119,16 +119,16 @@ public class FilterExamples {
    * Implements 'filter' functionality.
    *
    * <p>Examines each row in the input table. Outputs only rows from the month
-   * monthFilter, which is passed in as a parameter during construction of this OldDoFn.
+   * monthFilter, which is passed in as a parameter during construction of this DoFn.
    */
-  static class FilterSingleMonthDataFn extends OldDoFn<TableRow, TableRow> {
+  static class FilterSingleMonthDataFn extends DoFn<TableRow, TableRow> {
     Integer monthFilter;
 
     public FilterSingleMonthDataFn(Integer monthFilter) {
       this.monthFilter = monthFilter;
     }
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c){
       TableRow row = c.element();
       Integer month;
@@ -143,8 +143,8 @@ public class FilterExamples {
    * Examines each row (weather reading) in the input table. Output the temperature
    * reading for that row ('mean_temp').
    */
-  static class ExtractTempFn extends OldDoFn<TableRow, Double> {
-    @Override
+  static class ExtractTempFn extends DoFn<TableRow, Double> {
+    @ProcessElement
     public void processElement(ProcessContext c){
       TableRow row = c.element();
       Double meanTemp = Double.parseDouble(row.get("mean_temp").toString());
@@ -191,8 +191,8 @@ public class FilterExamples {
       PCollection<TableRow> filteredRows = monthFilteredRows
           .apply("ParseAndFilter", ParDo
               .withSideInputs(globalMeanTemp)
-              .of(new OldDoFn<TableRow, TableRow>() {
-                @Override
+              .of(new DoFn<TableRow, TableRow>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   Double meanTemp = Double.parseDouble(c.element().get("mean_temp").toString());
                   Double gTemp = c.sideInput(globalMeanTemp);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
index 5260c0d..5ff2ce2 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/JoinExamples.java
@@ -24,7 +24,7 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
-import org.apache.beam.sdk.transforms.OldDoFn;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult;
 import org.apache.beam.sdk.transforms.join.CoGroupByKey;
@@ -99,8 +99,8 @@ public class JoinExamples {
     // country code 'key' -> string of <event info>, <country name>
     PCollection<KV<String, String>> finalResultCollection =
       kvpCollection.apply("Process", ParDo.of(
-        new OldDoFn<KV<String, CoGbkResult>, KV<String, String>>() {
-          @Override
+        new DoFn<KV<String, CoGbkResult>, KV<String, String>>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             KV<String, CoGbkResult> e = c.element();
             String countryCode = e.getKey();
@@ -116,8 +116,8 @@ public class JoinExamples {
 
     // write to GCS
     PCollection<String> formattedResults = finalResultCollection
-        .apply("Format", ParDo.of(new OldDoFn<KV<String, String>, String>() {
-          @Override
+        .apply("Format", ParDo.of(new DoFn<KV<String, String>, String>() {
+          @ProcessElement
           public void processElement(ProcessContext c) {
             String outputstring = "Country code: " + c.element().getKey()
                 + ", " + c.element().getValue();
@@ -131,8 +131,8 @@ public class JoinExamples {
    * Examines each row (event) in the input table. Output a KV with the key the country
    * code of the event, and the value a string encoding event information.
    */
-  static class ExtractEventDataFn extends OldDoFn<TableRow, KV<String, String>> {
-    @Override
+  static class ExtractEventDataFn extends DoFn<TableRow, KV<String, String>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = c.element();
       String countryCode = (String) row.get("ActionGeo_CountryCode");
@@ -149,8 +149,8 @@ public class JoinExamples {
    * Examines each row (country info) in the input table. Output a KV with the key the country
    * code, and the value the country name.
    */
-  static class ExtractCountryInfoFn extends OldDoFn<TableRow, KV<String, String>> {
-    @Override
+  static class ExtractCountryInfoFn extends DoFn<TableRow, KV<String, String>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = c.element();
       String countryCode = (String) row.get("FIPSCC");

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
index 1bcb491..4f266d3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/MaxPerKeyExamples.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Max;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.values.KV;
@@ -82,8 +82,8 @@ public class MaxPerKeyExamples {
    * Examines each row (weather reading) in the input table. Output the month of the reading,
    * and the mean_temp.
    */
-  static class ExtractTempFn extends OldDoFn<TableRow, KV<Integer, Double>> {
-    @Override
+  static class ExtractTempFn extends DoFn<TableRow, KV<Integer, Double>> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = c.element();
       Integer month = Integer.parseInt((String) row.get("month"));
@@ -96,8 +96,8 @@ public class MaxPerKeyExamples {
    * Format the results to a TableRow, to save to BigQuery.
    *
    */
-  static class FormatMaxesFn extends OldDoFn<KV<Integer, Double>, TableRow> {
-    @Override
+  static class FormatMaxesFn extends DoFn<KV<Integer, Double>, TableRow> {
+    @ProcessElement
     public void processElement(ProcessContext c) {
       TableRow row = new TableRow()
           .set("month", c.element().getKey())

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
index 0be9921..04ac2c3 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/cookbook/TriggerExample.java
@@ -28,14 +28,14 @@ import org.apache.beam.sdk.options.Default;
 import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.StreamingOptions;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.AfterEach;
 import org.apache.beam.sdk.transforms.windowing.AfterProcessingTime;
 import org.apache.beam.sdk.transforms.windowing.AfterWatermark;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Repeatedly;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -342,9 +342,9 @@ public class TriggerExample {
           .apply(GroupByKey.<String, Integer>create());
 
       PCollection<KV<String, String>> results = flowPerFreeway.apply(ParDo.of(
-          new OldDoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
+          new DoFn<KV<String, Iterable<Integer>>, KV<String, String>>() {
 
-            @Override
+            @ProcessElement
             public void processElement(ProcessContext c) throws Exception {
               Iterable<Integer> flows = c.element().getValue();
               Integer sum = 0;
@@ -365,22 +365,21 @@ public class TriggerExample {
    * Format the results of the Total flow calculation to a TableRow, to save to BigQuery.
    * Adds the triggerType, pane information, processing time and the window timestamp.
    * */
-  static class FormatTotalFlow extends OldDoFn<KV<String, String>, TableRow>
-  implements  RequiresWindowAccess {
+  static class FormatTotalFlow extends DoFn<KV<String, String>, TableRow> {
     private String triggerType;
 
     public FormatTotalFlow(String triggerType) {
       this.triggerType = triggerType;
     }
-    @Override
-    public void processElement(ProcessContext c) throws Exception {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) throws Exception {
       String[] values = c.element().getValue().split(",");
       TableRow row = new TableRow()
           .set("trigger_type", triggerType)
           .set("freeway", c.element().getKey())
           .set("total_flow", Integer.parseInt(values[0]))
           .set("number_of_records", Long.parseLong(values[1]))
-          .set("window", c.window().toString())
+          .set("window", window.toString())
           .set("isFirst", c.pane().isFirst())
           .set("isLast", c.pane().isLast())
           .set("timing", c.pane().getTiming().toString())
@@ -394,8 +393,8 @@ public class TriggerExample {
    * Extract the freeway and total flow in a reading.
    * Freeway is used as key since we are calculating the total flow for each freeway.
    */
-  static class ExtractFlowInfo extends OldDoFn<String, KV<String, Integer>> {
-    @Override
+  static class ExtractFlowInfo extends DoFn<String, KV<String, Integer>> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       String[] laneInfo = c.element().split(",");
       if (laneInfo[0].equals("timestamp")) {
@@ -471,13 +470,13 @@ public class TriggerExample {
    * Add current time to each record.
    * Also insert a delay at random to demo the triggers.
    */
-  public static class InsertDelays extends OldDoFn<String, String> {
+  public static class InsertDelays extends DoFn<String, String> {
     private static final double THRESHOLD = 0.001;
     // MIN_DELAY and MAX_DELAY in minutes.
     private static final int MIN_DELAY = 1;
     private static final int MAX_DELAY = 100;
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       Instant timestamp = Instant.now();
       if (Math.random() < THRESHOLD){

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
index 26bf8fb..9d36a3e 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/WordCountTest.java
@@ -26,6 +26,7 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.values.PCollection;
@@ -46,7 +47,7 @@ import java.util.List;
 @RunWith(JUnit4.class)
 public class WordCountTest {
 
-  /** Example test that tests a specific OldDoFn. */
+  /** Example test that tests a specific {@link DoFn}. */
   @Test
   public void testExtractWordsFn() throws Exception {
     DoFnTester<String, String> extractWordsFn =

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
index 6f68ce8..6f28dec 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/complete/AutoCompleteTest.java
@@ -23,8 +23,8 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Filter;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
@@ -171,8 +171,8 @@ public class AutoCompleteTest implements Serializable {
       extends PTransform<PCollection<TimestampedValue<T>>, PCollection<T>> {
     @Override
     public PCollection<T> apply(PCollection<TimestampedValue<T>> input) {
-      return input.apply(ParDo.of(new OldDoFn<TimestampedValue<T>, T>() {
-        @Override
+      return input.apply(ParDo.of(new DoFn<TimestampedValue<T>, T>() {
+        @ProcessElement
         public void processElement(ProcessContext c) {
           c.outputWithTimestamp(c.element().getValue(), c.element().getTimestamp());
         }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
----------------------------------------------------------------------
diff --git a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
index e72a9e8..fee3c14 100644
--- a/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
+++ b/examples/java/src/test/java/org/apache/beam/examples/cookbook/TriggerExampleTest.java
@@ -24,8 +24,8 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.DoFnTester;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
@@ -141,8 +141,8 @@ public class TriggerExampleTest {
     return Joiner.on(",").join(entries);
   }
 
-  static class FormatResults extends OldDoFn<TableRow, String> {
-    @Override
+  static class FormatResults extends DoFn<TableRow, String> {
+    @ProcessElement
     public void processElement(ProcessContext c) throws Exception {
       TableRow element = c.element();
       TableRow row = new TableRow()

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
index b1407f6..01ffb1d 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/GameStats.java
@@ -27,15 +27,15 @@ import org.apache.beam.sdk.options.Description;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.Mean;
-import org.apache.beam.sdk.transforms.OldDoFn;
-import org.apache.beam.sdk.transforms.OldDoFn.RequiresWindowAccess;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.Values;
 import org.apache.beam.sdk.transforms.View;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
@@ -126,10 +126,10 @@ public class GameStats extends LeaderBoard {
           .apply("ProcessAndFilter", ParDo
               // use the derived mean total score as a side input
               .withSideInputs(globalMeanScore)
-              .of(new OldDoFn<KV<String, Integer>, KV<String, Integer>>() {
+              .of(new DoFn<KV<String, Integer>, KV<String, Integer>>() {
                 private final Aggregator<Long, Long> numSpammerUsers =
                   createAggregator("SpammerUsers", new Sum.SumLongFn());
-                @Override
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   Integer score = c.element().getValue();
                   Double gmc = c.sideInput(globalMeanScore);
@@ -149,12 +149,10 @@ public class GameStats extends LeaderBoard {
   /**
    * Calculate and output an element's session duration.
    */
-  private static class UserSessionInfoFn extends OldDoFn<KV<String, Integer>, Integer>
-      implements RequiresWindowAccess {
-
-    @Override
-    public void processElement(ProcessContext c) {
-      IntervalWindow w = (IntervalWindow) c.window();
+  private static class UserSessionInfoFn extends DoFn<KV<String, Integer>, Integer> {
+    @ProcessElement
+    public void processElement(ProcessContext c, BoundedWindow window) {
+      IntervalWindow w = (IntervalWindow) window;
       int duration = new Duration(
           w.start(), w.end()).toPeriod().toStandardMinutes().getMinutes();
       c.output(duration);
@@ -281,8 +279,8 @@ public class GameStats extends LeaderBoard {
       // Filter out the detected spammer users, using the side input derived above.
       .apply("FilterOutSpammers", ParDo
               .withSideInputs(spammersView)
-              .of(new OldDoFn<GameActionInfo, GameActionInfo>() {
-                @Override
+              .of(new DoFn<GameActionInfo, GameActionInfo>() {
+                @ProcessElement
                 public void processElement(ProcessContext c) {
                   // If the user is not in the spammers Map, output the data element.
                   if (c.sideInput(spammersView).get(c.element().getUser().trim()) == null) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
index 00dc8a4..c97eb41 100644
--- a/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
+++ b/examples/java8/src/main/java/org/apache/beam/examples/complete/game/UserScore.java
@@ -28,8 +28,8 @@ import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.options.Validation;
 import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.MapElements;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.Sum;
@@ -123,14 +123,14 @@ public class UserScore {
    * user2_AsparagusPig,AsparagusPig,10,1445230923951,2015-11-02 09:09:28.224
    * The human-readable time string is not used here.
    */
-  static class ParseEventFn extends OldDoFn<String, GameActionInfo> {
+  static class ParseEventFn extends DoFn<String, GameActionInfo> {
 
     // Log and count parse errors.
     private static final Logger LOG = LoggerFactory.getLogger(ParseEventFn.class);
     private final Aggregator<Long, Long> numParseErrors =
         createAggregator("ParseErrors", new Sum.SumLongFn());
 
-    @Override
+    @ProcessElement
     public void processElement(ProcessContext c) {
       String[] components = c.element().split(",");
       try {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/71e027dc/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
----------------------------------------------------------------------
diff --git a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
index 01efad8..75d371a 100644
--- a/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
+++ b/examples/java8/src/test/java/org/apache/beam/examples/complete/game/UserScoreTest.java
@@ -83,7 +83,7 @@ public class UserScoreTest implements Serializable {
       KV.of("AndroidGreenKookaburra", 23),
       KV.of("BisqueBilby", 14));
 
-  /** Test the ParseEventFn OldDoFn. */
+  /** Test the {@link ParseEventFn} {@link DoFn}. */
   @Test
   public void testParseEventFn() throws Exception {
     DoFnTester<String, GameActionInfo> parseEventFn =


Mime
View raw message