beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [04/51] [abbrv] incubator-beam git commit: Rename DoFn to OldDoFn
Date Sat, 06 Aug 2016 02:52:22 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
index 8a83e44..b27163a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnWithContext.java
@@ -24,7 +24,7 @@ import static com.google.common.base.Preconditions.checkState;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.DoFn.DelegatingAggregator;
+import org.apache.beam.sdk.transforms.OldDoFn.DelegatingAggregator;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.transforms.display.HasDisplayData;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
@@ -127,7 +127,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable,
      *
      * <p>If invoked from {@link ProcessElement}), the timestamp
      * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew}.  The output element will
+     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
      * be in the same windows as the input element.
      *
      * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
@@ -176,7 +176,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable,
      *
      * <p>If invoked from {@link ProcessElement}), the timestamp
      * must not be older than the input element's timestamp minus
-     * {@link DoFn#getAllowedTimestampSkew}.  The output element will
+     * {@link OldDoFn#getAllowedTimestampSkew}.  The output element will
      * be in the same windows as the input element.
      *
      * <p>If invoked from {@link StartBundle} or {@link FinishBundle},
@@ -194,7 +194,7 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable,
   }
 
   /**
-   * Information accessible when running {@link DoFn#processElement}.
+   * Information accessible when running {@link OldDoFn#processElement}.
    */
   public abstract class ProcessContext extends Context {
 
@@ -358,13 +358,13 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable,
   /**
    * Returns an {@link Aggregator} with aggregation logic specified by the
    * {@link CombineFn} argument. The name provided must be unique across
-   * {@link Aggregator}s created within the DoFn. Aggregators can only be created
+   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
    * during pipeline construction.
    *
    * @param name the name of the aggregator
    * @param combiner the {@link CombineFn} to use in the aggregator
    * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
+   *         this OldDoFn
    * @throws NullPointerException if the name or combiner is null
    * @throws IllegalArgumentException if the given name collides with another
    *         aggregator in this scope
@@ -391,13 +391,13 @@ public abstract class DoFnWithContext<InputT, OutputT> implements Serializable,
   /**
    * Returns an {@link Aggregator} with the aggregation logic specified by the
    * {@link SerializableFunction} argument. The name provided must be unique
-   * across {@link Aggregator}s created within the DoFn. Aggregators can only be
+   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
    * created during pipeline construction.
    *
    * @param name the name of the aggregator
    * @param combiner the {@link SerializableFunction} to use in the aggregator
    * @return an aggregator for the provided name and combiner in the scope of
-   *         this DoFn
+   *         this OldDoFn
    * @throws NullPointerException if the name or combiner is null
    * @throws IllegalArgumentException if the given name collides with another
    *         aggregator in this scope

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
index a31799e..4466874 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Filter.java
@@ -202,7 +202,7 @@ public class Filter<T> extends PTransform<PCollection<T>, PCollection<T>> {
 
   @Override
   public PCollection<T> apply(PCollection<T> input) {
-    PCollection<T> output = input.apply(ParDo.of(new DoFn<T, T>() {
+    PCollection<T> output = input.apply(ParDo.of(new OldDoFn<T, T>() {
       @Override
       public void processElement(ProcessContext c) {
         if (predicate.apply(c.element()) == true) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 4f270a7..b48da38 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -133,7 +133,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
       private static final long serialVersionUID = 0L;
       @Override
       public void processElement(ProcessContext c) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 0b83fb6..53e898e 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -174,7 +174,7 @@ public class Flatten {
       Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
 
       return in.apply("FlattenIterables", ParDo.of(
-          new DoFn<Iterable<T>, T>() {
+          new OldDoFn<Iterable<T>, T>() {
             @Override
             public void processElement(ProcessContext c) {
               for (T i : c.element()) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
index 8ad57d2..ed7f411 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/GroupByKey.java
@@ -68,7 +68,7 @@ import org.apache.beam.sdk.values.PCollection.IsBounded;
  * PCollection<KV<String, Iterable<Doc>>> urlToDocs =
  *     urlDocPairs.apply(GroupByKey.<String, Doc>create());
  * PCollection<R> results =
- *     urlToDocs.apply(ParDo.of(new DoFn<KV<String, Iterable<Doc>>, R>() {
+ *     urlToDocs.apply(ParDo.of(new OldDoFn<KV<String, Iterable<Doc>>, R>() {
  *       public void processElement(ProcessContext c) {
  *         String url = c.element().getKey();
  *         Iterable<Doc> docsWithThatUrl = c.element().getValue();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
index ef1e3c6..b5fe60f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/IntraBundleParallelization.java
@@ -40,7 +40,7 @@ import java.util.concurrent.Semaphore;
 import java.util.concurrent.atomic.AtomicReference;
 
 /**
- * Provides multi-threading of {@link DoFn}s, using threaded execution to
+ * Provides multi-threading of {@link OldDoFn}s, using threaded execution to
  * process multiple elements concurrently within a bundle.
  *
  * <p>Note, that each Dataflow worker will already process multiple bundles
@@ -57,7 +57,7 @@ import java.util.concurrent.atomic.AtomicReference;
  * share of the maximum write rate) will take at least 6 seconds to complete (there is additional
  * overhead in the extra parallelization).
  *
- * <p>To parallelize a {@link DoFn} to 10 threads:
+ * <p>To parallelize a {@link OldDoFn} to 10 threads:
  * <pre>{@code
  * PCollection<T> data = ...;
  * data.apply(
@@ -65,18 +65,18 @@ import java.util.concurrent.atomic.AtomicReference;
  *                             .withMaxParallelism(10)));
  * }</pre>
  *
- * <p>An uncaught exception from the wrapped {@link DoFn} will result in the exception
+ * <p>An uncaught exception from the wrapped {@link OldDoFn} will result in the exception
  * being rethrown in later calls to {@link MultiThreadedIntraBundleProcessingDoFn#processElement}
  * or a call to {@link MultiThreadedIntraBundleProcessingDoFn#finishBundle}.
  */
 public class IntraBundleParallelization {
   /**
    * Creates a {@link IntraBundleParallelization} {@link PTransform} for the given
-   * {@link DoFn} that processes elements using multiple threads.
+   * {@link OldDoFn} that processes elements using multiple threads.
    *
    * <p>Note that the specified {@code doFn} needs to be thread safe.
    */
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
+  public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) {
     return new Unbound().of(doFn);
   }
 
@@ -92,7 +92,7 @@ public class IntraBundleParallelization {
    * An incomplete {@code IntraBundleParallelization} transform, with unbound input/output types.
    *
    * <p>Before being applied, {@link IntraBundleParallelization.Unbound#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
+   * invoked to specify the {@link OldDoFn} to invoke, which will also
    * bind the input/output types of this {@code PTransform}.
    */
   public static class Unbound {
@@ -118,18 +118,18 @@ public class IntraBundleParallelization {
 
     /**
      * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified {@link DoFn}.
+     * with the specified {@link OldDoFn}.
      *
      * <p>Note that the specified {@code doFn} needs to be thread safe.
      */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> doFn) {
+    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> doFn) {
       return new Bound<>(doFn, maxParallelism);
     }
   }
 
   /**
    * A {@code PTransform} that, when applied to a {@code PCollection<InputT>},
-   * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements,
+   * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements,
    * with all its outputs collected into an output
    * {@code PCollection<OutputT>}.
    *
@@ -140,10 +140,10 @@ public class IntraBundleParallelization {
    */
   public static class Bound<InputT, OutputT>
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
-    private final DoFn<InputT, OutputT> doFn;
+    private final OldDoFn<InputT, OutputT> doFn;
     private final int maxParallelism;
 
-    Bound(DoFn<InputT, OutputT> doFn, int maxParallelism) {
+    Bound(OldDoFn<InputT, OutputT> doFn, int maxParallelism) {
       checkArgument(maxParallelism > 0,
           "Expected parallelism factor greater than zero, received %s.", maxParallelism);
       this.doFn = doFn;
@@ -160,12 +160,12 @@ public class IntraBundleParallelization {
 
     /**
      * Returns a new {@link IntraBundleParallelization} {@link PTransform} like this one
-     * with the specified {@link DoFn}.
+     * with the specified {@link OldDoFn}.
      *
      * <p>Note that the specified {@code doFn} needs to be thread safe.
      */
     public <NewInputT, NewOutputT> Bound<NewInputT, NewOutputT>
-        of(DoFn<NewInputT, NewOutputT> doFn) {
+        of(OldDoFn<NewInputT, NewOutputT> doFn) {
       return new Bound<>(doFn, maxParallelism);
     }
 
@@ -188,17 +188,19 @@ public class IntraBundleParallelization {
   }
 
   /**
-   * A multi-threaded {@code DoFn} wrapper.
+   * A multi-threaded {@code OldDoFn} wrapper.
    *
-   * @see IntraBundleParallelization#of(DoFn)
+   * @see IntraBundleParallelization#of(OldDoFn)
    *
    * @param <InputT> the type of the (main) input elements
    * @param <OutputT> the type of the (main) output elements
    */
   public static class MultiThreadedIntraBundleProcessingDoFn<InputT, OutputT>
-      extends DoFn<InputT, OutputT> {
+      extends OldDoFn<InputT, OutputT> {
 
-    public MultiThreadedIntraBundleProcessingDoFn(DoFn<InputT, OutputT> doFn, int maxParallelism) {
+    public MultiThreadedIntraBundleProcessingDoFn(
+        OldDoFn<InputT, OutputT> doFn,
+        int maxParallelism) {
       checkArgument(maxParallelism > 0,
           "Expected parallelism factor greater than zero, received %s.", maxParallelism);
       this.doFn = doFn;
@@ -267,7 +269,7 @@ public class IntraBundleParallelization {
     /////////////////////////////////////////////////////////////////////////////
 
     /**
-     * Wraps a DoFn context, forcing single-thread output so that threads don't
+     * Wraps a OldDoFn context, forcing single-thread output so that threads don't
      * propagate through to downstream functions.
      */
     private class WrappedContext extends ProcessContext {
@@ -347,7 +349,7 @@ public class IntraBundleParallelization {
       }
     }
 
-    private final DoFn<InputT, OutputT> doFn;
+    private final OldDoFn<InputT, OutputT> doFn;
     private int maxParallelism;
 
     private transient ExecutorService executor;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 636e306..c8cbce8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,7 +58,7 @@ public class Keys<K> extends PTransform<PCollection<? extends KV<K, ?>>,
   @Override
   public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
     return
-        in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
+        in.apply("Keys", ParDo.of(new OldDoFn<KV<K, ?>, K>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(c.element().getKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index 9597c92..430d37b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,7 +62,7 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K, V>>,
   @Override
   public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
     return
-        in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
+        in.apply("KvSwap", ParDo.of(new OldDoFn<KV<K, V>, KV<V, K>>() {
           @Override
           public void processElement(ProcessContext c) {
             KV<K, V> e = c.element();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index f535111..c83c39f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -104,7 +104,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
 
   @Override
   public PCollection<OutputT> apply(PCollection<InputT> input) {
-    return input.apply("Map", ParDo.of(new DoFn<InputT, OutputT>() {
+    return input.apply("Map", ParDo.of(new OldDoFn<InputT, OutputT>() {
       @Override
       public void processElement(ProcessContext c) {
         c.output(fn.apply(c.element()));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
new file mode 100644
index 0000000..48c6033
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/OldDoFn.java
@@ -0,0 +1,565 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static com.google.common.base.Preconditions.checkNotNull;
+import static com.google.common.base.Preconditions.checkState;
+
+import org.apache.beam.sdk.annotations.Experimental;
+import org.apache.beam.sdk.annotations.Experimental.Kind;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.transforms.Combine.CombineFn;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.display.HasDisplayData;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowingInternals;
+import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.beam.sdk.values.TypeDescriptor;
+
+import com.google.common.base.MoreObjects;
+
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+/**
+ * The argument to {@link ParDo} providing the code to use to process
+ * elements of the input
+ * {@link org.apache.beam.sdk.values.PCollection}.
+ *
+ * <p>See {@link ParDo} for more explanation, examples of use, and
+ * discussion of constraints on {@code OldDoFn}s, including their
+ * serializability, lack of access to global shared mutable state,
+ * requirements for failure tolerance, and benefits of optimization.
+ *
+ * <p>{@code OldDoFn}s can be tested in the context of a particular
+ * {@code Pipeline} by running that {@code Pipeline} on sample input
+ * and then checking its output.  Unit testing of a {@code OldDoFn},
+ * separately from any {@code ParDo} transform or {@code Pipeline},
+ * can be done via the {@link DoFnTester} harness.
+ *
+ * <p>{@link DoFnWithContext} (currently experimental) offers an alternative
+ * mechanism for accessing {@link ProcessContext#window()} without the need
+ * to implement {@link RequiresWindowAccess}.
+ *
+ * <p>See also {@link #processElement} for details on implementing the transformation
+ * from {@code InputT} to {@code OutputT}.
+ *
+ * @param <InputT> the type of the (main) input elements
+ * @param <OutputT> the type of the (main) output elements
+ */
+public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDisplayData {
+
+  /**
+   * Information accessible to all methods in this {@code OldDoFn}.
+   * Used primarily to output elements.
+   */
+  public abstract class Context {
+
+    /**
+     * Returns the {@code PipelineOptions} specified with the
+     * {@link org.apache.beam.sdk.runners.PipelineRunner}
+     * invoking this {@code OldDoFn}.  The {@code PipelineOptions} will
+     * be the default running via {@link DoFnTester}.
+     */
+    public abstract PipelineOptions getPipelineOptions();
+
+    /**
+     * Adds the given element to the main output {@code PCollection}.
+     *
+     * <p>Once passed to {@code output} the element should be considered
+     * immutable and not be modified in any way. It may be cached or retained
+     * by the Dataflow runtime or later steps in the pipeline, or used in
+     * other unspecified ways.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the output
+     * element will have the same timestamp and be in the same windows
+     * as the input element passed to {@link OldDoFn#processElement processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     */
+    public abstract void output(OutputT output);
+
+    /**
+     * Adds the given element to the main output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code outputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     */
+    public abstract void outputWithTimestamp(OutputT output, Instant timestamp);
+
+    /**
+     * Adds the given element to the side output {@code PCollection} with the
+     * given tag.
+     *
+     * <p>Once passed to {@code sideOutput} the element should not be modified
+     * in any way.
+     *
+     * <p>The caller of {@code ParDo} uses {@link ParDo#withOutputTags withOutputTags} to
+     * specify the tags of side outputs that it consumes. Non-consumed side
+     * outputs, e.g., outputs for monitoring purposes only, don't necessarily
+     * need to be specified.
+     *
+     * <p>The output element will have the same timestamp and be in the same
+     * windows as the input element passed to {@link OldDoFn#processElement processElement}.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element. The output element
+     * will have a timestamp of negative infinity.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutput(TupleTag<T> tag, T output);
+
+    /**
+     * Adds the given element to the specified side output {@code PCollection},
+     * with the given timestamp.
+     *
+     * <p>Once passed to {@code sideOutputWithTimestamp} the element should not be
+     * modified in any way.
+     *
+     * <p>If invoked from {@link OldDoFn#processElement processElement}, the timestamp
+     * must not be older than the input element's timestamp minus
+     * {@link OldDoFn#getAllowedTimestampSkew getAllowedTimestampSkew}.  The output element will
+     * be in the same windows as the input element.
+     *
+     * <p>If invoked from {@link #startBundle startBundle} or {@link #finishBundle finishBundle},
+     * this will attempt to use the
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn}
+     * of the input {@code PCollection} to determine what windows the element
+     * should be in, throwing an exception if the {@code WindowFn} attempts
+     * to access any information about the input element except for the
+     * timestamp.
+     *
+     * @see ParDo#withOutputTags
+     */
+    public abstract <T> void sideOutputWithTimestamp(
+        TupleTag<T> tag, T output, Instant timestamp);
+
+    /**
+     * Creates an {@link Aggregator} in the {@link OldDoFn} context with the
+     * specified name and aggregation logic specified by {@link CombineFn}.
+     *
+     * <p>For internal use only.
+     *
+     * @param name the name of the aggregator
+     * @param combiner the {@link CombineFn} to use in the aggregator
+     * @return an aggregator for the provided name and {@link CombineFn} in this
+     *         context
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected abstract <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+        createAggregatorInternal(String name, CombineFn<AggInputT, ?, AggOutputT> combiner);
+
+    /**
+     * Sets up {@link Aggregator}s created by the {@link OldDoFn} so they are
+     * usable within this context.
+     *
+     * <p>This method should be called by runners before {@link OldDoFn#startBundle}
+     * is executed.
+     */
+    @Experimental(Kind.AGGREGATOR)
+    protected final void setupDelegateAggregators() {
+      for (DelegatingAggregator<?, ?> aggregator : aggregators.values()) {
+        setupDelegateAggregator(aggregator);
+      }
+
+      aggregatorsAreFinal = true;
+    }
+
+    private final <AggInputT, AggOutputT> void setupDelegateAggregator(
+        DelegatingAggregator<AggInputT, AggOutputT> aggregator) {
+
+      Aggregator<AggInputT, AggOutputT> delegate = createAggregatorInternal(
+          aggregator.getName(), aggregator.getCombineFn());
+
+      aggregator.setDelegate(delegate);
+    }
+  }
+
+  /**
+   * Information accessible when running {@link OldDoFn#processElement}.
+   */
+  public abstract class ProcessContext extends Context {
+
+    /**
+     * Returns the input element to be processed.
+     *
+     * <p>The element should be considered immutable. The Dataflow runtime will not mutate the
+     * element, so it is safe to cache, etc. The element should not be mutated by any of the
+     * {@link OldDoFn} methods, because it may be cached elsewhere, retained by the Dataflow
+     * runtime, or used in other unspecified ways.
+     */
+    public abstract InputT element();
+
+    /**
+     * Returns the value of the side input for the window corresponding to the
+     * window of the main input element.
+     *
+     * <p>See
+     * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow}
+     * for how this corresponding window is determined.
+     *
+     * @throws IllegalArgumentException if this is not a side input
+     * @see ParDo#withSideInputs
+     */
+    public abstract <T> T sideInput(PCollectionView<T> view);
+
+    /**
+     * Returns the timestamp of the input element.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract Instant timestamp();
+
+    /**
+     * Returns the window into which the input element has been assigned.
+     *
+     * <p>See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     *
+     * @throws UnsupportedOperationException if this {@link OldDoFn} does
+     * not implement {@link RequiresWindowAccess}.
+     */
+    public abstract BoundedWindow window();
+
+    /**
+     * Returns information about the pane within this window into which the
+     * input element has been assigned.
+     *
+     * <p>Generally all data is in a single, uninteresting pane unless custom
+     * triggering and/or late data has been explicitly requested.
+     * See {@link org.apache.beam.sdk.transforms.windowing.Window}
+     * for more information.
+     */
+    public abstract PaneInfo pane();
+
+    /**
+     * Returns the process context to use for implementing windowing.
+     */
+    @Experimental
+    public abstract WindowingInternals<InputT, OutputT> windowingInternals();
+  }
+
+  /**
+   * Returns the allowed timestamp skew duration, which is the maximum
+   * duration that timestamps can be shifted backward in
+   * {@link OldDoFn.Context#outputWithTimestamp}.
+   *
+   * <p>The default value is {@code Duration.ZERO}, in which case
+   * timestamps can only be shifted forward to future.  For infinite
+   * skew, return {@code Duration.millis(Long.MAX_VALUE)}.
+   *
+   * <p> Note that producing an element whose timestamp is less than the
+   * current timestamp may result in late data, i.e. returning a non-zero
+   * value here does not impact watermark calculations used for firing
+   * windows.
+   *
+   * @deprecated does not interact well with the watermark.
+   */
+  @Deprecated
+  public Duration getAllowedTimestampSkew() {
+    return Duration.ZERO;
+  }
+
+  /**
+   * Interface for signaling that a {@link OldDoFn} needs to access the window the
+   * element is being processed in, via {@link OldDoFn.ProcessContext#window}.
+   */
+  @Experimental
+  public interface RequiresWindowAccess {}
+
+  public OldDoFn() {
+    this(new HashMap<String, DelegatingAggregator<?, ?>>());
+  }
+
+  OldDoFn(Map<String, DelegatingAggregator<?, ?>> aggregators) {
+    this.aggregators = aggregators;
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  private final Map<String, DelegatingAggregator<?, ?>> aggregators;
+
+  /**
+   * Protects aggregators from being created after initialization.
+   */
+  private boolean aggregatorsAreFinal;
+
+  /**
+   * Prepares this {@code OldDoFn} instance for processing a batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void startBundle(Context c) throws Exception {
+  }
+
+  /**
+   * Processes one input element.
+   *
+   * <p>The current element of the input {@code PCollection} is returned by
+   * {@link ProcessContext#element() c.element()}. It should be considered immutable. The Dataflow
+   * runtime will not mutate the element, so it is safe to cache, etc. The element should not be
+   * mutated by any of the {@link OldDoFn} methods, because it may be cached elsewhere, retained by
+   * the Dataflow runtime, or used in other unspecified ways.
+   *
+   * <p>A value is added to the main output {@code PCollection} by {@link ProcessContext#output}.
+   * Once passed to {@code output} the element should be considered immutable and not be modified in
+   * any way. It may be cached elsewhere, retained by the Dataflow runtime, or used in other
+   * unspecified ways.
+   *
+   * @see ProcessContext
+   */
+  public abstract void processElement(ProcessContext c) throws Exception;
+
+  /**
+   * Finishes processing this batch of elements.
+   *
+   * <p>By default, does nothing.
+   */
+  public void finishBundle(Context c) throws Exception {
+  }
+
+  /**
+   * {@inheritDoc}
+   *
+   * <p>By default, does not register any display data. Implementors may override this method
+   * to provide their own display data.
+   */
+  @Override
+  public void populateDisplayData(DisplayData.Builder builder) {
+  }
+
+  /////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically
+   * about the input type of this {@code OldDoFn} instance's most-derived
+   * class.
+   *
+   * <p>See {@link #getOutputTypeDescriptor} for more discussion.
+   */
+  protected TypeDescriptor<InputT> getInputTypeDescriptor() {
+    return new TypeDescriptor<InputT>(getClass()) {};
+  }
+
+  /**
+   * Returns a {@link TypeDescriptor} capturing what is known statically
+   * about the output type of this {@code OldDoFn} instance's
+   * most-derived class.
+   *
+   * <p>In the normal case of a concrete {@code OldDoFn} subclass with
+   * no generic type parameters of its own (including anonymous inner
+   * classes), this will be a complete non-generic type, which is good
+   * for choosing a default output {@code Coder<OutputT>} for the output
+   * {@code PCollection<OutputT>}.
+   */
+  protected TypeDescriptor<OutputT> getOutputTypeDescriptor() {
+    return new TypeDescriptor<OutputT>(getClass()) {};
+  }
+
+  /**
+   * Returns an {@link Aggregator} with aggregation logic specified by the
+   * {@link CombineFn} argument. The name provided must be unique across
+   * {@link Aggregator}s created within the OldDoFn. Aggregators can only be created
+   * during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link CombineFn} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT>
+      createAggregator(String name, CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+    checkNotNull(name, "name cannot be null");
+    checkNotNull(combiner, "combiner cannot be null");
+    checkArgument(!aggregators.containsKey(name),
+        "Cannot create aggregator with name %s."
+        + " An Aggregator with that name already exists within this scope.",
+        name);
+
+    checkState(!aggregatorsAreFinal, "Cannot create an aggregator during OldDoFn processing."
+        + " Aggregators should be registered during pipeline construction.");
+
+    DelegatingAggregator<AggInputT, AggOutputT> aggregator =
+        new DelegatingAggregator<>(name, combiner);
+    aggregators.put(name, aggregator);
+    return aggregator;
+  }
+
+  /**
+   * Returns an {@link Aggregator} with the aggregation logic specified by the
+   * {@link SerializableFunction} argument. The name provided must be unique
+   * across {@link Aggregator}s created within the OldDoFn. Aggregators can only be
+   * created during pipeline construction.
+   *
+   * @param name the name of the aggregator
+   * @param combiner the {@link SerializableFunction} to use in the aggregator
+   * @return an aggregator for the provided name and combiner in the scope of
+   *         this OldDoFn
+   * @throws NullPointerException if the name or combiner is null
+   * @throws IllegalArgumentException if the given name collides with another
+   *         aggregator in this scope
+   * @throws IllegalStateException if called during pipeline processing.
+   */
+  protected final <AggInputT> Aggregator<AggInputT, AggInputT> createAggregator(String name,
+      SerializableFunction<Iterable<AggInputT>, AggInputT> combiner) {
+    checkNotNull(combiner, "combiner cannot be null.");
+    return createAggregator(name, Combine.IterableCombineFn.of(combiner));
+  }
+
+  /**
+   * Returns the {@link Aggregator Aggregators} created by this {@code OldDoFn}.
+   */
+  Collection<Aggregator<?, ?>> getAggregators() {
+    return Collections.<Aggregator<?, ?>>unmodifiableCollection(aggregators.values());
+  }
+
+  /**
+   * An {@link Aggregator} that delegates calls to addValue to another
+   * aggregator.
+   *
+   * @param <AggInputT> the type of input element
+   * @param <AggOutputT> the type of output element
+   */
+  static class DelegatingAggregator<AggInputT, AggOutputT> implements
+      Aggregator<AggInputT, AggOutputT>, Serializable {
+    private final UUID id;
+
+    private final String name;
+
+    private final CombineFn<AggInputT, ?, AggOutputT> combineFn;
+
+    private Aggregator<AggInputT, ?> delegate;
+
+    public DelegatingAggregator(String name,
+        CombineFn<? super AggInputT, ?, AggOutputT> combiner) {
+      this.id = UUID.randomUUID();
+      this.name = checkNotNull(name, "name cannot be null");
+      // Safe contravariant cast
+      @SuppressWarnings("unchecked")
+      CombineFn<AggInputT, ?, AggOutputT> specificCombiner =
+          (CombineFn<AggInputT, ?, AggOutputT>) checkNotNull(combiner, "combineFn cannot be null");
+      this.combineFn = specificCombiner;
+    }
+
+    @Override
+    public void addValue(AggInputT value) {
+      if (delegate == null) {
+        throw new IllegalStateException(
+            "addValue cannot be called on Aggregator outside of the execution of a OldDoFn.");
+      } else {
+        delegate.addValue(value);
+      }
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public CombineFn<AggInputT, ?, AggOutputT> getCombineFn() {
+      return combineFn;
+    }
+
+    /**
+     * Sets the current delegate of the Aggregator.
+     *
+     * @param delegate the delegate to set in this aggregator
+     */
+    public void setDelegate(Aggregator<AggInputT, ?> delegate) {
+      this.delegate = delegate;
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(getClass())
+          .add("name", name)
+          .add("combineFn", combineFn)
+          .toString();
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(id, name, combineFn.getClass());
+    }
+
+    /**
+     * Indicates whether some other object is "equal to" this one.
+     *
+     * <p>{@code DelegatingAggregator} instances are equal if they have the same name, their
+     * CombineFns are the same class, and they have identical IDs.
+     */
+    @Override
+    public boolean equals(Object o) {
+      if (o == this) {
+        return true;
+      }
+      if (o == null) {
+        return false;
+      }
+      if (o instanceof DelegatingAggregator) {
+        DelegatingAggregator<?, ?> that = (DelegatingAggregator<?, ?>) o;
+        return Objects.equals(this.id, that.id)
+            && Objects.equals(this.name, that.name)
+            && Objects.equals(this.combineFn.getClass(), that.combineFn.getClass());
+      }
+      return false;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
index fe6e8ad..12ab54d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/PTransform.java
@@ -147,7 +147,7 @@ import java.io.Serializable;
  * implementing {@code Serializable}.
  *
  * <p>{@code PTransform} is marked {@code Serializable} solely
- * because it is common for an anonymous {@code DoFn},
+ * because it is common for an anonymous {@code OldDoFn},
  * instance to be created within an
  * {@code apply()} method of a composite {@code PTransform}.
  *

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
index 16dfcac..36d8101 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/ParDo.java
@@ -52,13 +52,13 @@ import java.util.List;
  * <p>The {@link ParDo} processing style is similar to what happens inside
  * the "Mapper" or "Reducer" class of a MapReduce-style algorithm.
  *
- * <h2>{@link DoFn DoFns}</h2>
+ * <h2>{@link OldDoFn DoFns}</h2>
  *
  * <p>The function to use to process each element is specified by a
- * {@link DoFn DoFn&lt;InputT, OutputT&gt;}, primarily via its
- * {@link DoFn#processElement processElement} method. The {@link DoFn} may also
- * override the default implementations of {@link DoFn#startBundle startBundle}
- * and {@link DoFn#finishBundle finishBundle}.
+ * {@link OldDoFn OldDoFn&lt;InputT, OutputT&gt;}, primarily via its
+ * {@link OldDoFn#processElement processElement} method. The {@link OldDoFn} may also
+ * override the default implementations of {@link OldDoFn#startBundle startBundle}
+ * and {@link OldDoFn#finishBundle finishBundle}.
  *
  * <p>Conceptually, when a {@link ParDo} transform is executed, the
  * elements of the input {@link PCollection} are first divided up
@@ -67,26 +67,27 @@ import java.util.List;
  * For each bundle of input elements processing proceeds as follows:
  *
  * <ol>
- *   <li>If required, a fresh instance of the argument {@link DoFn} is created
+ *   <li>If required, a fresh instance of the argument {@link OldDoFn} is created
  *     on a worker. This may be through deserialization or other means. A
- *     {@link PipelineRunner} may reuse {@link DoFn} instances for multiple bundles.
- *     A {@link DoFn} that has terminated abnormally (by throwing an {@link Exception}
+ *     {@link PipelineRunner} may reuse {@link OldDoFn} instances for multiple bundles.
+ *     A {@link OldDoFn} that has terminated abnormally (by throwing an {@link Exception}
  *     will never be reused.</li>
- *   <li>The {@link DoFn DoFn's} {@link DoFn#startBundle} method is called to
+ *   <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#startBundle} method is called to
  *     initialize it. If this method is not overridden, the call may be optimized
  *     away.</li>
- *   <li>The {@link DoFn DoFn's} {@link DoFn#processElement} method
+ *   <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#processElement} method
  *     is called on each of the input elements in the bundle.</li>
- *   <li>The {@link DoFn DoFn's} {@link DoFn#finishBundle} method is called
- *     to complete its work. After {@link DoFn#finishBundle} is called, the
- *     framework will not again invoke {@link DoFn#processElement} or {@link DoFn#finishBundle}
- *     until a new call to {@link DoFn#startBundle} has occurred.
+ *   <li>The {@link OldDoFn OldDoFn's} {@link OldDoFn#finishBundle} method is called
+ *     to complete its work. After {@link OldDoFn#finishBundle} is called, the
+ *     framework will not again invoke {@link OldDoFn#processElement} or
+ *     {@link OldDoFn#finishBundle}
+ *     until a new call to {@link OldDoFn#startBundle} has occurred.
  *     If this method is not overridden, this call may be optimized away.</li>
  * </ol>
  *
- * Each of the calls to any of the {@link DoFn DoFn's} processing
+ * Each of the calls to any of the {@link OldDoFn OldDoFn's} processing
  * methods can produce zero or more output elements. All of the
- * of output elements from all of the {@link DoFn} instances
+ * of output elements from all of the {@link OldDoFn} instances
  * are included in the output {@link PCollection}.
  *
  * <p>For example:
@@ -94,7 +95,7 @@ import java.util.List;
  * <pre> {@code
  * PCollection<String> lines = ...;
  * PCollection<String> words =
- *     lines.apply(ParDo.of(new DoFn<String, String>() {
+ *     lines.apply(ParDo.of(new OldDoFn<String, String>() {
  *         public void processElement(ProcessContext c) {
  *           String line = c.element();
  *           for (String word : line.split("[^a-zA-Z']+")) {
@@ -102,7 +103,7 @@ import java.util.List;
  *           }
  *         }}));
  * PCollection<Integer> wordLengths =
- *     words.apply(ParDo.of(new DoFn<String, Integer>() {
+ *     words.apply(ParDo.of(new OldDoFn<String, Integer>() {
  *         public void processElement(ProcessContext c) {
  *           String word = c.element();
  *           Integer length = word.length();
@@ -127,9 +128,9 @@ import java.util.List;
  *
  * <pre> {@code
  * PCollection<String> words =
- *     lines.apply("ExtractWords", ParDo.of(new DoFn<String, String>() { ... }));
+ *     lines.apply("ExtractWords", ParDo.of(new OldDoFn<String, String>() { ... }));
  * PCollection<Integer> wordLengths =
- *     words.apply("ComputeWordLengths", ParDo.of(new DoFn<String, Integer>() { ... }));
+ *     words.apply("ComputeWordLengths", ParDo.of(new OldDoFn<String, Integer>() { ... }));
  * } </pre>
  *
  * <h2>Side Inputs</h2>
@@ -141,7 +142,7 @@ import java.util.List;
  * {@link PCollection PCollections} computed by earlier pipeline operations,
  * passed in to the {@link ParDo} transform using
  * {@link #withSideInputs}, and their contents accessible to each of
- * the {@link DoFn} operations via {@link DoFn.ProcessContext#sideInput sideInput}.
+ * the {@link OldDoFn} operations via {@link OldDoFn.ProcessContext#sideInput sideInput}.
  * For example:
  *
  * <pre> {@code
@@ -151,7 +152,7 @@ import java.util.List;
  *     maxWordLengthCutOff.apply(View.<Integer>asSingleton());
  * PCollection<String> wordsBelowCutOff =
  *     words.apply(ParDo.withSideInputs(maxWordLengthCutOffView)
- *                      .of(new DoFn<String, String>() {
+ *                      .of(new OldDoFn<String, String>() {
  *         public void processElement(ProcessContext c) {
  *           String word = c.element();
  *           int lengthCutOff = c.sideInput(maxWordLengthCutOffView);
@@ -170,11 +171,11 @@ import java.util.List;
  * and bundled in a {@link PCollectionTuple}. The {@link TupleTag TupleTags}
  * to be used for the output {@link PCollectionTuple} are specified by
  * invoking {@link #withOutputTags}. Unconsumed side outputs do not
- * necessarily need to be explicitly specified, even if the {@link DoFn}
- * generates them. Within the {@link DoFn}, an element is added to the
+ * necessarily need to be explicitly specified, even if the {@link OldDoFn}
+ * generates them. Within the {@link OldDoFn}, an element is added to the
  * main output {@link PCollection} as normal, using
- * {@link DoFn.Context#output}, while an element is added to a side output
- * {@link PCollection} using {@link DoFn.Context#sideOutput}. For example:
+ * {@link OldDoFn.Context#output}, while an element is added to a side output
+ * {@link PCollection} using {@link OldDoFn.Context#sideOutput}. For example:
  *
  * <pre> {@code
  * PCollection<String> words = ...;
@@ -197,7 +198,7 @@ import java.util.List;
  *         .withOutputTags(wordsBelowCutOffTag,
  *                         TupleTagList.of(wordLengthsAboveCutOffTag)
  *                                     .and(markedWordsTag))
- *         .of(new DoFn<String, String>() {
+ *         .of(new OldDoFn<String, String>() {
  *             // Create a tag for the unconsumed side output.
  *             final TupleTag<String> specialWordsTag =
  *                 new TupleTag<String>(){};
@@ -232,7 +233,7 @@ import java.util.List;
  *
  * <p>Several properties can be specified for a {@link ParDo}
  * {@link PTransform}, including name, side inputs, side output tags,
- * and {@link DoFn} to invoke. Only the {@link DoFn} is required; the
+ * and {@link OldDoFn} to invoke. Only the {@link OldDoFn} is required; the
  * name is encouraged but not required, and side inputs and side
  * output tags are only specified when they're needed. These
  * properties can be specified in any order, as long as they're
@@ -246,23 +247,23 @@ import java.util.List;
  * {@link ParDo.Bound} nested classes, each of which offer
  * property setter instance methods to enable setting additional
  * properties. {@link ParDo.Bound} is used for {@link ParDo}
- * transforms whose {@link DoFn} is specified and whose input and
+ * transforms whose {@link OldDoFn} is specified and whose input and
  * output static types have been bound. {@link ParDo.Unbound ParDo.Unbound} is used
  * for {@link ParDo} transforms that have not yet had their
- * {@link DoFn} specified. Only {@link ParDo.Bound} instances can be
+ * {@link OldDoFn} specified. Only {@link ParDo.Bound} instances can be
  * applied.
  *
  * <p>Another benefit of this approach is that it reduces the number
  * of type parameters that need to be specified manually. In
  * particular, the input and output types of the {@link ParDo}
  * {@link PTransform} are inferred automatically from the type
- * parameters of the {@link DoFn} argument passed to {@link ParDo#of}.
+ * parameters of the {@link OldDoFn} argument passed to {@link ParDo#of}.
  *
  * <h2>Output Coders</h2>
  *
  * <p>By default, the {@link Coder Coder&lt;OutputT&gt;} for the
  * elements of the main output {@link PCollection PCollection&lt;OutputT&gt;} is
- * inferred from the concrete type of the {@link DoFn DoFn&lt;InputT, OutputT&gt;}.
+ * inferred from the concrete type of the {@link OldDoFn OldDoFn&lt;InputT, OutputT&gt;}.
  *
  * <p>By default, the {@link Coder Coder&lt;SideOutputT&gt;} for the elements of
  * a side output {@link PCollection PCollection&lt;SideOutputT&gt;} is inferred
@@ -282,74 +283,74 @@ import java.util.List;
  * This style of {@code TupleTag} instantiation is used in the example of
  * multiple side outputs, above.
  *
- * <h2>Serializability of {@link DoFn DoFns}</h2>
+ * <h2>Serializability of {@link OldDoFn DoFns}</h2>
  *
- * <p>A {@link DoFn} passed to a {@link ParDo} transform must be
- * {@link Serializable}. This allows the {@link DoFn} instance
+ * <p>A {@link OldDoFn} passed to a {@link ParDo} transform must be
+ * {@link Serializable}. This allows the {@link OldDoFn} instance
  * created in this "main program" to be sent (in serialized form) to
  * remote worker machines and reconstituted for bundles of elements
- * of the input {@link PCollection} being processed. A {@link DoFn}
+ * of the input {@link PCollection} being processed. A {@link OldDoFn}
  * can have instance variable state, and non-transient instance
  * variable state will be serialized in the main program and then
  * deserialized on remote worker machines for some number of bundles
  * of elements to process.
  *
- * <p>{@link DoFn DoFns} expressed as anonymous inner classes can be
+ * <p>{@link OldDoFn DoFns} expressed as anonymous inner classes can be
  * convenient, but due to a quirk in Java's rules for serializability,
  * non-static inner or nested classes (including anonymous inner
  * classes) automatically capture their enclosing class's instance in
  * their serialized state. This can lead to including much more than
- * intended in the serialized state of a {@link DoFn}, or even things
+ * intended in the serialized state of a {@link OldDoFn}, or even things
  * that aren't {@link Serializable}.
  *
  * <p>There are two ways to avoid unintended serialized state in a
- * {@link DoFn}:
+ * {@link OldDoFn}:
  *
  * <ul>
  *
- * <li>Define the {@link DoFn} as a named, static class.
+ * <li>Define the {@link OldDoFn} as a named, static class.
  *
- * <li>Define the {@link DoFn} as an anonymous inner class inside of
+ * <li>Define the {@link OldDoFn} as an anonymous inner class inside of
  * a static method.
  *
  * </ul>
  *
  * <p>Both of these approaches ensure that there is no implicit enclosing
- * instance serialized along with the {@link DoFn} instance.
+ * instance serialized along with the {@link OldDoFn} instance.
  *
  * <p>Prior to Java 8, any local variables of the enclosing
  * method referenced from within an anonymous inner class need to be
- * marked as {@code final}. If defining the {@link DoFn} as a named
+ * marked as {@code final}. If defining the {@link OldDoFn} as a named
  * static class, such variables would be passed as explicit
  * constructor arguments and stored in explicit instance variables.
  *
  * <p>There are three main ways to initialize the state of a
- * {@link DoFn} instance processing a bundle:
+ * {@link OldDoFn} instance processing a bundle:
  *
  * <ul>
  *
  * <li>Define instance variable state (including implicit instance
  * variables holding final variables captured by an anonymous inner
- * class), initialized by the {@link DoFn}'s constructor (which is
+ * class), initialized by the {@link OldDoFn}'s constructor (which is
  * implicit for an anonymous inner class). This state will be
- * automatically serialized and then deserialized in the {@code DoFn}
+ * automatically serialized and then deserialized in the {@code OldDoFn}
  * instances created for bundles. This method is good for state
- * known when the original {@code DoFn} is created in the main
+ * known when the original {@code OldDoFn} is created in the main
  * program, if it's not overly large. This is not suitable for any
- * state which must only be used for a single bundle, as {@link DoFn DoFn's}
+ * state which must only be used for a single bundle, as {@link OldDoFn OldDoFn's}
  * may be used to process multiple bundles.
  *
  * <li>Compute the state as a singleton {@link PCollection} and pass it
- * in as a side input to the {@link DoFn}. This is good if the state
+ * in as a side input to the {@link OldDoFn}. This is good if the state
  * needs to be computed by the pipeline, or if the state is very large
  * and so is best read from file(s) rather than sent as part of the
- * {@code DoFn}'s serialized state.
+ * {@code OldDoFn}'s serialized state.
  *
- * <li>Initialize the state in each {@link DoFn} instance, in
- * {@link DoFn#startBundle}. This is good if the initialization
+ * <li>Initialize the state in each {@link OldDoFn} instance, in
+ * {@link OldDoFn#startBundle}. This is good if the initialization
  * doesn't depend on any information known only by the main program or
  * computed by earlier pipeline operations, but is the same for all
- * instances of this {@link DoFn} for all program executions, say
+ * instances of this {@link OldDoFn} for all program executions, say
  * setting up empty caches or initializing constant data.
  *
  * </ul>
@@ -362,13 +363,13 @@ import java.util.List;
  * no support in the Google Cloud Dataflow system for communicating
  * and synchronizing updates to shared state across worker machines,
  * so programs should not access any mutable static variable state in
- * their {@link DoFn}, without understanding that the Java processes
+ * their {@link OldDoFn}, without understanding that the Java processes
  * for the main program and workers will each have its own independent
  * copy of such state, and there won't be any automatic copying of
  * that state across Java processes. All information should be
- * communicated to {@link DoFn} instances via main and side inputs and
+ * communicated to {@link OldDoFn} instances via main and side inputs and
  * serialized state, and all output should be communicated from a
- * {@link DoFn} instance via main and side outputs, in the absence of
+ * {@link OldDoFn} instance via main and side outputs, in the absence of
  * external communication mechanisms written by user code.
  *
  * <h2>Fault Tolerance</h2>
@@ -378,23 +379,23 @@ import java.util.List;
  * While individual failures are rare, the larger the job, the greater
  * the chance that something, somewhere, will fail. The Google Cloud
  * Dataflow service strives to mask such failures automatically,
- * principally by retrying failed {@link DoFn} bundle. This means
- * that a {@code DoFn} instance might process a bundle partially, then
+ * principally by retrying failed {@link OldDoFn} bundle. This means
+ * that a {@code OldDoFn} instance might process a bundle partially, then
  * crash for some reason, then be rerun (often on a different worker
  * machine) on that same bundle and on the same elements as before.
- * Sometimes two or more {@link DoFn} instances will be running on the
+ * Sometimes two or more {@link OldDoFn} instances will be running on the
  * same bundle simultaneously, with the system taking the results of
  * the first instance to complete successfully. Consequently, the
- * code in a {@link DoFn} needs to be written such that these
+ * code in a {@link OldDoFn} needs to be written such that these
  * duplicate (sequential or concurrent) executions do not cause
- * problems. If the outputs of a {@link DoFn} are a pure function of
+ * problems. If the outputs of a {@link OldDoFn} are a pure function of
  * its inputs, then this requirement is satisfied. However, if a
- * {@link DoFn DoFn's} execution has external side-effects, such as performing
- * updates to external HTTP services, then the {@link DoFn DoFn's} code
+ * {@link OldDoFn OldDoFn's} execution has external side-effects, such as performing
+ * updates to external HTTP services, then the {@link OldDoFn OldDoFn's} code
  * needs to take care to ensure that those updates are idempotent and
  * that concurrent updates are acceptable. This property can be
  * difficult to achieve, so it is advisable to strive to keep
- * {@link DoFn DoFns} as pure functions as much as possible.
+ * {@link OldDoFn DoFns} as pure functions as much as possible.
  *
  * <h2>Optimization</h2>
  *
@@ -439,15 +440,15 @@ public class ParDo {
    *
    * <p>Side inputs are {@link PCollectionView PCollectionViews}, whose contents are
    * computed during pipeline execution and then made accessible to
-   * {@link DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}. Each
-   * invocation of the {@link DoFn} receives the same values for these
+   * {@link OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}. Each
+   * invocation of the {@link OldDoFn} receives the same values for these
    * side inputs.
    *
    * <p>See the discussion of Side Inputs above for more explanation.
    *
    * <p>The resulting {@link PTransform} is incomplete, and its
    * input/output types are not yet bound. Use
-   * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
+   * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to
    * invoke, which will also bind the input/output types of this
    * {@link PTransform}.
    */
@@ -460,13 +461,13 @@ public class ParDo {
     *
    * <p>Side inputs are {@link PCollectionView}s, whose contents are
    * computed during pipeline execution and then made accessible to
-   * {@code DoFn} code via {@link DoFn.ProcessContext#sideInput sideInput}.
+   * {@code OldDoFn} code via {@link OldDoFn.ProcessContext#sideInput sideInput}.
    *
    * <p>See the discussion of Side Inputs above for more explanation.
    *
    * <p>The resulting {@link PTransform} is incomplete, and its
    * input/output types are not yet bound. Use
-   * {@link ParDo.Unbound#of} to specify the {@link DoFn} to
+   * {@link ParDo.Unbound#of} to specify the {@link OldDoFn} to
    * invoke, which will also bind the input/output types of this
    * {@link PTransform}.
    */
@@ -482,11 +483,11 @@ public class ParDo {
    *
    * <p>{@link TupleTag TupleTags} are used to name (with its static element
    * type {@code T}) each main and side output {@code PCollection<T>}.
-   * This {@link PTransform PTransform's} {@link DoFn} emits elements to the main
+   * This {@link PTransform PTransform's} {@link OldDoFn} emits elements to the main
    * output {@link PCollection} as normal, using
-   * {@link DoFn.Context#output}. The {@link DoFn} emits elements to
+   * {@link OldDoFn.Context#output}. The {@link OldDoFn} emits elements to
    * a side output {@code PCollection} using
-   * {@link DoFn.Context#sideOutput}, passing that side output's tag
+   * {@link OldDoFn.Context#sideOutput}, passing that side output's tag
    * as an argument. The result of invoking this {@link PTransform}
    * will be a {@link PCollectionTuple}, and any of the the main and
    * side output {@code PCollection}s can be retrieved from it via
@@ -497,7 +498,7 @@ public class ParDo {
    *
    * <p>The resulting {@link PTransform} is incomplete, and its input
    * type is not yet bound. Use {@link ParDo.UnboundMulti#of}
-   * to specify the {@link DoFn} to invoke, which will also bind the
+   * to specify the {@link OldDoFn} to invoke, which will also bind the
    * input type of this {@link PTransform}.
    */
   public static <OutputT> UnboundMulti<OutputT> withOutputTags(
@@ -508,24 +509,24 @@ public class ParDo {
 
   /**
    * Creates a {@link ParDo} {@link PTransform} that will invoke the
-   * given {@link DoFn} function.
+   * given {@link OldDoFn} function.
    *
    * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
    * input being a {@code PCollection<InputT>} and the output a
    * {@code PCollection<OutputT>}, inferred from the types of the argument
-   * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further
+   * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
    * properties can be set on it first.
    */
-  public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+  public static <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
     return of(fn, fn.getClass());
   }
 
   private static <InputT, OutputT> Bound<InputT, OutputT> of(
-          DoFn<InputT, OutputT> fn, Class<?> fnClass) {
+          OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
     return new Unbound().of(fn, fnClass);
   }
 
-  private static <InputT, OutputT> DoFn<InputT, OutputT>
+  private static <InputT, OutputT> OldDoFn<InputT, OutputT>
       adapt(DoFnWithContext<InputT, OutputT> fn) {
     return DoFnReflector.of(fn.getClass()).toDoFn(fn);
   }
@@ -537,11 +538,11 @@ public class ParDo {
    * <p>The resulting {@link PTransform PTransform's} types have been bound, with the
    * input being a {@code PCollection<InputT>} and the output a
    * {@code PCollection<OutputT>}, inferred from the types of the argument
-   * {@code DoFn<InputT, OutputT>}. It is ready to be applied, or further
+   * {@code OldDoFn<InputT, OutputT>}. It is ready to be applied, or further
    * properties can be set on it first.
    *
    * <p>{@link DoFnWithContext} is an experimental alternative to
-   * {@link DoFn} which simplifies accessing the window of the element.
+   * {@link OldDoFn} which simplifies accessing the window of the element.
    */
   @Experimental
   public static <InputT, OutputT> Bound<InputT, OutputT> of(DoFnWithContext<InputT, OutputT> fn) {
@@ -552,7 +553,7 @@ public class ParDo {
    * An incomplete {@link ParDo} transform, with unbound input/output types.
    *
    * <p>Before being applied, {@link ParDo.Unbound#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
+   * invoked to specify the {@link OldDoFn} to invoke, which will also
    * bind the input/output types of this {@link PTransform}.
    */
   public static class Unbound {
@@ -614,18 +615,18 @@ public class ParDo {
 
     /**
      * Returns a new {@link ParDo} {@link PTransform} that's like this
-     * transform but that will invoke the given {@link DoFn}
+     * transform but that will invoke the given {@link OldDoFn}
      * function, and that has its input and output types bound. Does
      * not modify this transform. The resulting {@link PTransform} is
      * sufficiently specified to be applied, but more properties can
      * still be specified.
      */
-    public <InputT, OutputT> Bound<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+    public <InputT, OutputT> Bound<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
       return of(fn, fn.getClass());
     }
 
     private <InputT, OutputT> Bound<InputT, OutputT> of(
-        DoFn<InputT, OutputT> fn, Class<?> fnClass) {
+        OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
       return new Bound<>(name, sideInputs, fn, fnClass);
     }
 
@@ -645,7 +646,7 @@ public class ParDo {
 
   /**
    * A {@link PTransform} that, when applied to a {@code PCollection<InputT>},
-   * invokes a user-specified {@code DoFn<InputT, OutputT>} on all its elements,
+   * invokes a user-specified {@code OldDoFn<InputT, OutputT>} on all its elements,
    * with all its outputs collected into an output
    * {@code PCollection<OutputT>}.
    *
@@ -659,12 +660,12 @@ public class ParDo {
       extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>> {
     // Inherits name.
     private final List<PCollectionView<?>> sideInputs;
-    private final DoFn<InputT, OutputT> fn;
+    private final OldDoFn<InputT, OutputT> fn;
     private final Class<?> fnClass;
 
     Bound(String name,
           List<PCollectionView<?>> sideInputs,
-          DoFn<InputT, OutputT> fn,
+          OldDoFn<InputT, OutputT> fn,
           Class<?> fnClass) {
       super(name);
       this.sideInputs = sideInputs;
@@ -746,9 +747,9 @@ public class ParDo {
     /**
      * {@inheritDoc}
      *
-     * <p>{@link ParDo} registers its internal {@link DoFn} as a subcomponent for display data.
-     * {@link DoFn} implementations can register display data by overriding
-     * {@link DoFn#populateDisplayData}.
+     * <p>{@link ParDo} registers its internal {@link OldDoFn} as a subcomponent for display data.
+     * {@link OldDoFn} implementations can register display data by overriding
+     * {@link OldDoFn#populateDisplayData}.
      */
     @Override
     public void populateDisplayData(Builder builder) {
@@ -756,7 +757,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, fn, fnClass);
     }
 
-    public DoFn<InputT, OutputT> getFn() {
+    public OldDoFn<InputT, OutputT> getFn() {
       return fn;
     }
 
@@ -770,7 +771,7 @@ public class ParDo {
    * input type.
    *
    * <p>Before being applied, {@link ParDo.UnboundMulti#of} must be
-   * invoked to specify the {@link DoFn} to invoke, which will also
+   * invoked to specify the {@link OldDoFn} to invoke, which will also
    * bind the input type of this {@link PTransform}.
    *
    * @param <OutputT> the type of the main output {@code PCollection} elements
@@ -827,16 +828,16 @@ public class ParDo {
     /**
      * Returns a new multi-output {@link ParDo} {@link PTransform}
      * that's like this transform but that will invoke the given
-     * {@link DoFn} function, and that has its input type bound.
+     * {@link OldDoFn} function, and that has its input type bound.
      * Does not modify this transform. The resulting
      * {@link PTransform} is sufficiently specified to be applied, but
      * more properties can still be specified.
      */
-    public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn) {
+    public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn) {
       return of(fn, fn.getClass());
     }
 
-    public <InputT> BoundMulti<InputT, OutputT> of(DoFn<InputT, OutputT> fn, Class<?> fnClass) {
+    public <InputT> BoundMulti<InputT, OutputT> of(OldDoFn<InputT, OutputT> fn, Class<?> fnClass) {
       return new BoundMulti<>(
               name, sideInputs, mainOutputTag, sideOutputTags, fn, fnClass);
     }
@@ -857,7 +858,7 @@ public class ParDo {
   /**
    * A {@link PTransform} that, when applied to a
    * {@code PCollection<InputT>}, invokes a user-specified
-   * {@code DoFn<InputT, OutputT>} on all its elements, which can emit elements
+   * {@code OldDoFn<InputT, OutputT>} on all its elements, which can emit elements
    * to any of the {@link PTransform}'s main and side output
    * {@code PCollection}s, which are bundled into a result
    * {@code PCollectionTuple}.
@@ -871,14 +872,14 @@ public class ParDo {
     private final List<PCollectionView<?>> sideInputs;
     private final TupleTag<OutputT> mainOutputTag;
     private final TupleTagList sideOutputTags;
-    private final DoFn<InputT, OutputT> fn;
+    private final OldDoFn<InputT, OutputT> fn;
     private final Class<?> fnClass;
 
     BoundMulti(String name,
                List<PCollectionView<?>> sideInputs,
                TupleTag<OutputT> mainOutputTag,
                TupleTagList sideOutputTags,
-               DoFn<InputT, OutputT> fn,
+               OldDoFn<InputT, OutputT> fn,
                Class<?> fnClass) {
       super(name);
       this.sideInputs = sideInputs;
@@ -929,7 +930,7 @@ public class ParDo {
           input.isBounded());
 
       // The fn will likely be an instance of an anonymous subclass
-      // such as DoFn<Integer, String> { }, thus will have a high-fidelity
+      // such as OldDoFn<Integer, String> { }, thus will have a high-fidelity
       // TypeDescriptor for the output type.
       outputs.get(mainOutputTag).setTypeDescriptorInternal(fn.getOutputTypeDescriptor());
 
@@ -970,7 +971,7 @@ public class ParDo {
       ParDo.populateDisplayData(builder, fn, fnClass);
     }
 
-    public DoFn<InputT, OutputT> getFn() {
+    public OldDoFn<InputT, OutputT> getFn() {
       return fn;
     }
 
@@ -988,7 +989,7 @@ public class ParDo {
   }
 
   private static void populateDisplayData(
-      DisplayData.Builder builder, DoFn<?, ?> fn, Class<?> fnClass) {
+      DisplayData.Builder builder, OldDoFn<?, ?> fn, Class<?> fnClass) {
     builder
         .include(fn)
         .add(DisplayData.item("fn", fnClass)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
index 6281b30..2ddcc29 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Partition.java
@@ -134,7 +134,7 @@ public class Partition<T> extends PTransform<PCollection<T>, PCollectionList<T>>
     this.partitionDoFn = partitionDoFn;
   }
 
-  private static class PartitionDoFn<X> extends DoFn<X, Void> {
+  private static class PartitionDoFn<X> extends OldDoFn<X, Void> {
     private final int numPartitions;
     private final PartitionFn<? super X> partitionFn;
     private final TupleTagList outputTags;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index b82744d..d82c457 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,7 +85,7 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
   @Override
   public PCollection<T> apply(PCollection<T> in) {
     return in
-        .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
+        .apply("CreateIndex", ParDo.of(new OldDoFn<T, KV<T, Void>>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(KV.of(c.element(), (Void) null));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
index 4fcd17e..724b252 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Sample.java
@@ -164,9 +164,9 @@ public class Sample {
   }
 
   /**
-   * A {@link DoFn} that returns up to limit elements from the side input PCollection.
+   * A {@link OldDoFn} that returns up to limit elements from the side input PCollection.
    */
-  private static class SampleAnyDoFn<T> extends DoFn<Void, T> {
+  private static class SampleAnyDoFn<T> extends OldDoFn<Void, T> {
     long limit;
     final PCollectionView<Iterable<T>> iterableView;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
index a879925..6623c6a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SimpleFunction.java
@@ -29,7 +29,7 @@ public abstract class SimpleFunction<InputT, OutputT>
 
   /**
    * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the input type of this {@code DoFn} instance's most-derived
+   * about the input type of this {@code OldDoFn} instance's most-derived
    * class.
    *
    * <p>See {@link #getOutputTypeDescriptor} for more discussion.
@@ -40,10 +40,10 @@ public abstract class SimpleFunction<InputT, OutputT>
 
   /**
    * Returns a {@link TypeDescriptor} capturing what is known statically
-   * about the output type of this {@code DoFn} instance's
+   * about the output type of this {@code OldDoFn} instance's
    * most-derived class.
    *
-   * <p>In the normal case of a concrete {@code DoFn} subclass with
+   * <p>In the normal case of a concrete {@code OldDoFn} subclass with
    * no generic type parameters of its own (including anonymous inner
    * classes), this will be a complete non-generic type, which is good
    * for choosing a default output {@code Coder<OutputT>} for the output

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 5212261..856e32a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,7 +58,7 @@ public class Values<V> extends PTransform<PCollection<? extends KV<?, V>>,
   @Override
   public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
     return
-        in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
+        in.apply("Values", ParDo.of(new OldDoFn<KV<?, V>, V>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(c.element().getValue());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 7a97c13..8a61637 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -38,7 +38,7 @@ import java.util.Map;
  *
  * <p>When a {@link ParDo} tranform is processing a main input
  * element in a window {@code w} and a {@link PCollectionView} is read via
- * {@link DoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
+ * {@link OldDoFn.ProcessContext#sideInput}, the value of the view for {@code w} is
  * returned.
  *
  * <p>The SDK supports viewing a {@link PCollection}, per window, as a single value,
@@ -118,7 +118,7 @@ import java.util.Map;
  *
  * PCollection PageVisits = urlVisits
  *     .apply(ParDo.withSideInputs(urlToPage)
- *         .of(new DoFn<UrlVisit, PageVisit>() {
+ *         .of(new OldDoFn<UrlVisit, PageVisit>() {
  *             {@literal @}Override
  *             void processElement(ProcessContext context) {
  *               UrlVisit urlVisit = context.element();
@@ -154,11 +154,11 @@ public class View {
    *
    * <p>If the input {@link PCollection} is empty,
    * throws {@link java.util.NoSuchElementException} in the consuming
-   * {@link DoFn}.
+   * {@link OldDoFn}.
    *
    * <p>If the input {@link PCollection} contains more than one
    * element, throws {@link IllegalArgumentException} in the
-   * consuming {@link DoFn}.
+   * consuming {@link OldDoFn}.
    */
   public static <T> AsSingleton<T> asSingleton() {
     return new AsSingleton<>();

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 25116d8..37d45aa 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,7 +113,7 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   @Override
   public PCollection<KV<K, V>> apply(PCollection<V> in) {
     PCollection<KV<K, V>> result =
-        in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
+        in.apply("AddKeys", ParDo.of(new OldDoFn<V, KV<K, V>>() {
           @Override
           public void processElement(ProcessContext c) {
             c.output(KV.of(fn.apply(c.element()),

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
index ef4b269..41b549b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithTimestamps.java
@@ -92,7 +92,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
    * Returns the allowed timestamp skew duration, which is the maximum
    * duration that timestamps can be shifted backwards from the timestamp of the input element.
    *
-   * @see DoFn#getAllowedTimestampSkew()
+   * @see OldDoFn#getAllowedTimestampSkew()
    */
   public Duration getAllowedTimestampSkew() {
     return allowedTimestampSkew;
@@ -105,7 +105,7 @@ public class WithTimestamps<T> extends PTransform<PCollection<T>, PCollection<T>
         .setTypeDescriptorInternal(input.getTypeDescriptor());
   }
 
-  private static class AddTimestampsDoFn<T> extends DoFn<T, T> {
+  private static class AddTimestampsDoFn<T> extends OldDoFn<T, T> {
     private final SerializableFunction<T, Instant> fn;
     private final Duration allowedTimestampSkew;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
index ee7323b..5dcaec8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/display/DisplayData.java
@@ -30,7 +30,6 @@ import com.google.common.collect.Sets;
 import com.fasterxml.jackson.annotation.JsonGetter;
 import com.fasterxml.jackson.annotation.JsonInclude;
 import com.fasterxml.jackson.annotation.JsonValue;
-
 import org.apache.avro.reflect.Nullable;
 import org.joda.time.Duration;
 import org.joda.time.Instant;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
index 5e4cb52..aa26cbb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGbkResult.java
@@ -37,7 +37,6 @@ import com.google.common.collect.PeekingIterator;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
 import com.fasterxml.jackson.annotation.JsonProperty;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
index ba4a4a7..1bd9f4a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/join/CoGroupByKey.java
@@ -19,9 +19,9 @@ package org.apache.beam.sdk.transforms.join;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.KvCoder;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.join.CoGbkResult.CoGbkResultCoder;
@@ -57,7 +57,7 @@ import java.util.List;
  *
  * PCollection<T> finalResultCollection =
  *   coGbkResultCollection.apply(ParDo.of(
- *     new DoFn<KV<K, CoGbkResult>, T>() {
+ *     new OldDoFn<KV<K, CoGbkResult>, T>() {
  *       @Override
  *       public void processElement(ProcessContext c) {
  *         KV<K, CoGbkResult> e = c.element();
@@ -167,12 +167,12 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A DoFn to construct a UnionTable (i.e., a
+   * A OldDoFn to construct a UnionTable (i.e., a
    * {@code PCollection<KV<K, RawUnionValue>>} from a
    * {@code PCollection<KV<K, V>>}.
    */
   private static class ConstructUnionTableFn<K, V> extends
-      DoFn<KV<K, V>, KV<K, RawUnionValue>> {
+      OldDoFn<KV<K, V>, KV<K, RawUnionValue>> {
 
     private final int index;
 
@@ -188,12 +188,12 @@ public class CoGroupByKey<K> extends
   }
 
   /**
-   * A DoFn to construct a CoGbkResult from an input grouped union
+   * A OldDoFn to construct a CoGbkResult from an input grouped union
    * table.
     */
   private static class ConstructCoGbkResultFn<K>
-    extends DoFn<KV<K, Iterable<RawUnionValue>>,
-                 KV<K, CoGbkResult>> {
+    extends OldDoFn<KV<K, Iterable<RawUnionValue>>,
+                     KV<K, CoGbkResult>> {
 
     private final CoGbkResultSchema schema;
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
index bd57339..dc1e74b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java
@@ -23,6 +23,7 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.util.ExecutableTrigger;
 
 import com.google.common.base.Joiner;
+
 import org.joda.time.Instant;
 
 import java.util.Arrays;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
index 563455b..324ab08 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java
@@ -25,6 +25,7 @@ import org.joda.time.Instant;
 
 import java.util.List;
 import java.util.Objects;
+
 import javax.annotation.Nullable;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
index 6f9c717..45898e0 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/IntervalWindow.java
@@ -24,7 +24,6 @@ import org.apache.beam.sdk.coders.DurationCoder;
 import org.apache.beam.sdk.coders.InstantCoder;
 
 import com.fasterxml.jackson.annotation.JsonCreator;
-
 import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.joda.time.ReadableDuration;

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a64baf48/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
index 40f3496..7267d00 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java
@@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.windowing;
 
 import org.apache.beam.sdk.transforms.GroupByKey;
 import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger;
+
 import org.joda.time.Instant;
 
 import java.util.List;



Mime
View raw message