beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [15/28] beam git commit: Revert "[BEAM-2610] This closes #3553"
Date Thu, 20 Jul 2017 17:09:42 GMT
http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
deleted file mode 100644
index d057d81..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/SerializableFunctions.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.transforms;
-
-/** Useful {@link SerializableFunction} overrides. */
-public class SerializableFunctions {
-  private static class Identity<T> implements SerializableFunction<T, T> {
-    @Override
-    public T apply(T input) {
-      return input;
-    }
-  }
-
-  private static class Constant<InT, OutT> implements SerializableFunction<InT, OutT> {
-    OutT value;
-
-    Constant(OutT value) {
-      this.value = value;
-    }
-
-    @Override
-    public OutT apply(InT input) {
-      return value;
-    }
-  }
-
-  public static <T> SerializableFunction<T, T> identity() {
-    return new Identity<>();
-  }
-
-  public static <InT, OutT> SerializableFunction<InT, OutT> constant(OutT value) {
-    return new Constant<>(value);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
index 331b143..073c750 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/View.java
@@ -257,10 +257,8 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<List<T>> view =
-          PCollectionViews.listView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, List<T>>of(view));
-      return view;
+      return input.apply(CreatePCollectionView.<T, List<T>>of(PCollectionViews.listView(
+          input, input.getWindowingStrategy(), input.getCoder())));
     }
   }
 
@@ -284,10 +282,8 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Iterable<T>> view =
-          PCollectionViews.iterableView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<T, Iterable<T>>of(view));
-      return view;
+      return input.apply(CreatePCollectionView.<T, Iterable<T>>of(PCollectionViews.iterableView(
+          input, input.getWindowingStrategy(), input.getCoder())));
     }
   }
 
@@ -427,10 +423,11 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, Iterable<V>>> view =
-          PCollectionViews.multimapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(view));
-      return view;
+      return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, Iterable<V>>>of(
+          PCollectionViews.multimapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder())));
     }
   }
 
@@ -462,10 +459,11 @@ public class View {
         throw new IllegalStateException("Unable to create a side-input view from input", e);
       }
 
-      PCollectionView<Map<K, V>> view =
-          PCollectionViews.mapView(input, input.getWindowingStrategy(), input.getCoder());
-      input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(view));
-      return view;
+      return input.apply(CreatePCollectionView.<KV<K, V>, Map<K, V>>of(
+          PCollectionViews.mapView(
+              input,
+              input.getWindowingStrategy(),
+              input.getCoder())));
     }
   }
 
@@ -482,7 +480,7 @@ public class View {
    */
   @Internal
   public static class CreatePCollectionView<ElemT, ViewT>
-      extends PTransform<PCollection<ElemT>, PCollection<ElemT>> {
+      extends PTransform<PCollection<ElemT>, PCollectionView<ViewT>> {
     private PCollectionView<ViewT> view;
 
     private CreatePCollectionView(PCollectionView<ViewT> view) {
@@ -508,10 +506,8 @@ public class View {
     }
 
     @Override
-    public PCollection<ElemT> expand(PCollection<ElemT> input) {
-      return PCollection.<ElemT>createPrimitiveOutputInternal(
-              input.getPipeline(), input.getWindowingStrategy(), input.isBounded())
-          .setCoder(input.getCoder());
+    public PCollectionView<ViewT> expand(PCollection<ElemT> input) {
+      return view;
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
index cf96c9b..5d5887a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyDoFnInvokerFactory.java
@@ -89,7 +89,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
   public static final String PROCESS_CONTEXT_PARAMETER_METHOD = "processContext";
   public static final String ON_TIMER_CONTEXT_PARAMETER_METHOD = "onTimerContext";
   public static final String WINDOW_PARAMETER_METHOD = "window";
-  public static final String PIPELINE_OPTIONS_PARAMETER_METHOD = "pipelineOptions";
   public static final String RESTRICTION_TRACKER_PARAMETER_METHOD = "restrictionTracker";
   public static final String STATE_PARAMETER_METHOD = "state";
   public static final String TIMER_PARAMETER_METHOD = "timer";
@@ -627,11 +626,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
                     getExtraContextFactoryMethodDescription(TIMER_PARAMETER_METHOD, String.class)),
                 TypeCasting.to(new TypeDescription.ForLoadedType(Timer.class)));
           }
-
-          @Override
-          public StackManipulation dispatch(DoFnSignature.Parameter.PipelineOptionsParameter p) {
-            return simpleExtraContextParameter(PIPELINE_OPTIONS_PARAMETER_METHOD);
-          }
         });
   }
 
@@ -640,17 +634,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
    * {@link ProcessElement} method.
    */
   private static final class ProcessElementDelegation extends DoFnMethodDelegation {
-    private static final MethodDescription PROCESS_CONTINUATION_STOP_METHOD;
-
-    static {
-      try {
-        PROCESS_CONTINUATION_STOP_METHOD =
-            new MethodDescription.ForLoadedMethod(DoFn.ProcessContinuation.class.getMethod("stop"));
-      } catch (NoSuchMethodException e) {
-        throw new RuntimeException("Failed to locate ProcessContinuation.stop()");
-      }
-    }
-
     private final DoFnSignature.ProcessElementMethod signature;
 
     /** Implementation of {@link MethodDelegation} for the {@link ProcessElement} method. */
@@ -684,16 +667,6 @@ public class ByteBuddyDoFnInvokerFactory implements DoFnInvokerFactory {
       }
       return new StackManipulation.Compound(pushParameters);
     }
-
-    @Override
-    protected StackManipulation afterDelegation(MethodDescription instrumentedMethod) {
-      if (TypeDescription.VOID.equals(targetMethod.getReturnType().asErasure())) {
-        return new StackManipulation.Compound(
-            MethodInvocation.invoke(PROCESS_CONTINUATION_STOP_METHOD), MethodReturn.REFERENCE);
-      } else {
-        return MethodReturn.of(targetMethod.getReturnType().asErasure());
-      }
-    }
   }
 
   private static class UserCodeMethodInvocation implements StackManipulation {

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
index 5e31f2e..e031337 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/ByteBuddyOnTimerInvokerFactory.java
@@ -17,7 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-
 import com.google.common.base.CharMatcher;
 import com.google.common.cache.CacheBuilder;
 import com.google.common.cache.CacheLoader;
@@ -62,14 +61,13 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
 
     @SuppressWarnings("unchecked")
     Class<? extends DoFn<?, ?>> fnClass = (Class<? extends DoFn<?, ?>>) fn.getClass();
-    try {
-        OnTimerMethodSpecifier onTimerMethodSpecifier =
-                OnTimerMethodSpecifier.forClassAndTimerId(fnClass, timerId);
-        Constructor<?> constructor = constructorCache.get(onTimerMethodSpecifier);
 
-        OnTimerInvoker<InputT, OutputT> invoker =
+    try {
+      Constructor<?> constructor = constructorCache.get(fnClass).get(timerId);
+      @SuppressWarnings("unchecked")
+      OnTimerInvoker<InputT, OutputT> invoker =
           (OnTimerInvoker<InputT, OutputT>) constructor.newInstance(fn);
-        return invoker;
+      return invoker;
     } catch (InstantiationException
         | IllegalAccessException
         | IllegalArgumentException
@@ -99,31 +97,50 @@ class ByteBuddyOnTimerInvokerFactory implements OnTimerInvokerFactory {
   private static final String FN_DELEGATE_FIELD_NAME = "delegate";
 
   /**
-   * A cache of constructors of generated {@link OnTimerInvoker} classes,
-   * keyed by {@link OnTimerMethodSpecifier}.
+   * A cache of constructors of generated {@link OnTimerInvoker} classes, keyed by {@link DoFn}
+   * class and then by {@link TimerId}.
    *
    * <p>Needed because generating an invoker class is expensive, and to avoid generating an
    * excessive number of classes consuming PermGen memory in Java's that still have PermGen.
    */
-  private final LoadingCache<OnTimerMethodSpecifier, Constructor<?>> constructorCache =
-          CacheBuilder.newBuilder().build(
-          new CacheLoader<OnTimerMethodSpecifier, Constructor<?>>() {
-              @Override
-              public Constructor<?> load(final OnTimerMethodSpecifier onTimerMethodSpecifier)
-                      throws Exception {
-                  DoFnSignature signature =
-                          DoFnSignatures.getSignature(onTimerMethodSpecifier.fnClass());
-                  Class<? extends OnTimerInvoker<?, ?>> invokerClass =
-                          generateOnTimerInvokerClass(signature, onTimerMethodSpecifier.timerId());
-                  try {
-                      return invokerClass.getConstructor(signature.fnClass());
-                  } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
-                      throw new RuntimeException(e);
-                  }
-
-              }
-          });
-    /**
+  private final LoadingCache<Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>
+      constructorCache =
+          CacheBuilder.newBuilder()
+              .build(
+                  new CacheLoader<
+                      Class<? extends DoFn<?, ?>>, LoadingCache<String, Constructor<?>>>() {
+                    @Override
+                    public LoadingCache<String, Constructor<?>> load(
+                        final Class<? extends DoFn<?, ?>> fnClass) throws Exception {
+                      return CacheBuilder.newBuilder().build(new OnTimerConstructorLoader(fnClass));
+                    }
+                  });
+
+  /**
+   * A cache loader fixed to a particular {@link DoFn} class that loads constructors for the
+   * invokers for its {@link OnTimer @OnTimer} methods.
+   */
+  private static class OnTimerConstructorLoader extends CacheLoader<String, Constructor<?>> {
+
+    private final DoFnSignature signature;
+
+    public OnTimerConstructorLoader(Class<? extends DoFn<?, ?>> clazz) {
+      this.signature = DoFnSignatures.getSignature(clazz);
+    }
+
+    @Override
+    public Constructor<?> load(String timerId) throws Exception {
+      Class<? extends OnTimerInvoker<?, ?>> invokerClass =
+          generateOnTimerInvokerClass(signature, timerId);
+      try {
+        return invokerClass.getConstructor(signature.fnClass());
+      } catch (IllegalArgumentException | NoSuchMethodException | SecurityException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  /**
    * Generates a {@link OnTimerInvoker} class for the given {@link DoFnSignature} and {@link
    * TimerId}.
    */

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
index 8b41fee..6fd4052 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnInvoker.java
@@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.reflect;
 
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -54,10 +53,8 @@ public interface DoFnInvoker<InputT, OutputT> {
    * Invoke the {@link DoFn.ProcessElement} method on the bound {@link DoFn}.
    *
    * @param extra Factory for producing extra parameter objects (such as window), if necessary.
-   * @return The {@link DoFn.ProcessContinuation} returned by the underlying method, or {@link
-   *     DoFn.ProcessContinuation#stop()} if it returns {@code void}.
    */
-  DoFn.ProcessContinuation invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
+  void invokeProcessElement(ArgumentProvider<InputT, OutputT> extra);
 
   /** Invoke the appropriate {@link DoFn.OnTimer} method on the bound {@link DoFn}. */
   void invokeOnTimer(String timerId, ArgumentProvider<InputT, OutputT> arguments);
@@ -103,12 +100,7 @@ public interface DoFnInvoker<InputT, OutputT> {
      */
     BoundedWindow window();
 
-    /** Provide {@link PipelineOptions}. */
-    PipelineOptions pipelineOptions();
-
-    /**
-     * Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}.
-     */
+    /** Provide a {@link DoFn.StartBundleContext} to use with the given {@link DoFn}. */
     DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn);
 
     /** Provide a {@link DoFn.FinishBundleContext} to use with the given {@link DoFn}. */
@@ -146,11 +138,6 @@ public interface DoFnInvoker<InputT, OutputT> {
     }
 
     @Override
-    public PipelineOptions pipelineOptions() {
-      return null;
-    }
-
-    @Override
     public DoFn<InputT, OutputT>.StartBundleContext startBundleContext(DoFn<InputT, OutputT> doFn) {
       return null;
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
index bfad69e..0b4bf90 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignature.java
@@ -27,13 +27,11 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
 import org.apache.beam.sdk.state.TimerSpec;
 import org.apache.beam.sdk.transforms.DoFn;
-import org.apache.beam.sdk.transforms.DoFn.ProcessContinuation;
 import org.apache.beam.sdk.transforms.DoFn.StateId;
 import org.apache.beam.sdk.transforms.DoFn.TimerId;
 import org.apache.beam.sdk.transforms.reflect.DoFnSignature.Parameter.RestrictionTrackerParameter;
@@ -195,8 +193,6 @@ public abstract class DoFnSignature {
         return cases.dispatch((StateParameter) this);
       } else if (this instanceof TimerParameter) {
         return cases.dispatch((TimerParameter) this);
-      } else if (this instanceof PipelineOptionsParameter) {
-        return cases.dispatch((PipelineOptionsParameter) this);
       } else {
         throw new IllegalStateException(
             String.format("Attempt to case match on unknown %s subclass %s",
@@ -216,7 +212,6 @@ public abstract class DoFnSignature {
       ResultT dispatch(RestrictionTrackerParameter p);
       ResultT dispatch(StateParameter p);
       ResultT dispatch(TimerParameter p);
-      ResultT dispatch(PipelineOptionsParameter p);
 
       /**
        * A base class for a visitor with a default method for cases it is not interested in.
@@ -264,11 +259,6 @@ public abstract class DoFnSignature {
         public ResultT dispatch(TimerParameter p) {
           return dispatchDefault(p);
         }
-
-        @Override
-        public ResultT dispatch(PipelineOptionsParameter p) {
-          return dispatchDefault(p);
-        }
       }
     }
 
@@ -297,11 +287,6 @@ public abstract class DoFnSignature {
       return new AutoValue_DoFnSignature_Parameter_WindowParameter(windowT);
     }
 
-    /** Returns a {@link PipelineOptionsParameter}. */
-    public static PipelineOptionsParameter pipelineOptions() {
-      return new AutoValue_DoFnSignature_Parameter_PipelineOptionsParameter();
-    }
-
     /**
      * Returns a {@link RestrictionTrackerParameter}.
      */
@@ -321,14 +306,6 @@ public abstract class DoFnSignature {
     }
 
     /**
-     * Descriptor for a {@link Parameter} of a subtype of {@link PipelineOptions}.
-     */
-    @AutoValue
-    public abstract static class PipelineOptionsParameter extends Parameter {
-      PipelineOptionsParameter() {}
-    }
-
-    /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.StartBundleContext}.
      *
      * <p>All such descriptors are equal.
@@ -337,7 +314,6 @@ public abstract class DoFnSignature {
     public abstract static class StartBundleContextParameter extends Parameter {
       StartBundleContextParameter() {}
     }
-
     /**
      * Descriptor for a {@link Parameter} of type {@link DoFn.FinishBundleContext}.
      *
@@ -434,21 +410,16 @@ public abstract class DoFnSignature {
     @Nullable
     public abstract TypeDescriptor<? extends BoundedWindow> windowT();
 
-    /** Whether this {@link DoFn} returns a {@link ProcessContinuation} or void. */
-    public abstract boolean hasReturnValue();
-
     static ProcessElementMethod create(
         Method targetMethod,
         List<Parameter> extraParameters,
         TypeDescriptor<?> trackerT,
-        @Nullable TypeDescriptor<? extends BoundedWindow> windowT,
-        boolean hasReturnValue) {
+        @Nullable TypeDescriptor<? extends BoundedWindow> windowT) {
       return new AutoValue_DoFnSignature_ProcessElementMethod(
           targetMethod,
           Collections.unmodifiableList(extraParameters),
           trackerT,
-          windowT,
-          hasReturnValue);
+          windowT);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
index de57c3b..bb191b1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/DoFnSignatures.java
@@ -17,8 +17,6 @@
  */
 package org.apache.beam.sdk.transforms.reflect;
 
-import static com.google.common.base.Preconditions.checkState;
-
 import com.google.auto.value.AutoValue;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Predicates;
@@ -44,7 +42,6 @@ import java.util.List;
 import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.state.State;
 import org.apache.beam.sdk.state.StateSpec;
 import org.apache.beam.sdk.state.Timer;
@@ -81,23 +78,19 @@ public class DoFnSignatures {
       ImmutableList.of(
           Parameter.ProcessContextParameter.class,
           Parameter.WindowParameter.class,
-          Parameter.PipelineOptionsParameter.class,
           Parameter.TimerParameter.class,
           Parameter.StateParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_SPLITTABLE_PROCESS_ELEMENT_PARAMETERS =
           ImmutableList.of(
-              Parameter.PipelineOptionsParameter.class,
-              Parameter.ProcessContextParameter.class,
-              Parameter.RestrictionTrackerParameter.class);
+              Parameter.ProcessContextParameter.class, Parameter.RestrictionTrackerParameter.class);
 
   private static final Collection<Class<? extends Parameter>>
       ALLOWED_ON_TIMER_PARAMETERS =
           ImmutableList.of(
               Parameter.OnTimerContextParameter.class,
               Parameter.WindowParameter.class,
-              Parameter.PipelineOptionsParameter.class,
               Parameter.TimerParameter.class,
               Parameter.StateParameter.class);
 
@@ -194,15 +187,6 @@ public class DoFnSignatures {
           extraParameters, Predicates.instanceOf(WindowParameter.class));
     }
 
-    /**
-     * Indicates whether a {@link Parameter.PipelineOptionsParameter} is
-     * known in this context.
-     */
-    public boolean hasPipelineOptionsParamter() {
-      return Iterables.any(
-          extraParameters, Predicates.instanceOf(Parameter.PipelineOptionsParameter.class));
-    }
-
     /** The window type, if any, used by this method. */
     @Nullable
     public TypeDescriptor<? extends BoundedWindow> getWindowType() {
@@ -442,8 +426,6 @@ public class DoFnSignatures {
    * <li>If the {@link DoFn} (or any of its supertypes) is annotated as {@link
    *     DoFn.BoundedPerElement} or {@link DoFn.UnboundedPerElement}, use that. Only one of
    *     these must be specified.
-   * <li>If {@link DoFn.ProcessElement} returns {@link DoFn.ProcessContinuation}, assume it is
-   *     unbounded. Otherwise (if it returns {@code void}), assume it is bounded.
    * <li>If {@link DoFn.ProcessElement} returns {@code void}, but the {@link DoFn} is annotated
    *     {@link DoFn.UnboundedPerElement}, this is an error.
    * </ol>
@@ -469,10 +451,7 @@ public class DoFnSignatures {
     }
     if (processElement.isSplittable()) {
       if (isBounded == null) {
-        isBounded =
-            processElement.hasReturnValue()
-                ? PCollection.IsBounded.UNBOUNDED
-                : PCollection.IsBounded.BOUNDED;
+        isBounded = PCollection.IsBounded.BOUNDED;
       }
     } else {
       errors.checkArgument(
@@ -481,7 +460,6 @@ public class DoFnSignatures {
               + ((isBounded == PCollection.IsBounded.BOUNDED)
                   ? DoFn.BoundedPerElement.class.getSimpleName()
                   : DoFn.UnboundedPerElement.class.getSimpleName()));
-      checkState(!processElement.hasReturnValue(), "Should have been inferred splittable");
       isBounded = PCollection.IsBounded.BOUNDED;
     }
     return isBounded;
@@ -718,10 +696,8 @@ public class DoFnSignatures {
       TypeDescriptor<?> outputT,
       FnAnalysisContext fnContext) {
     errors.checkArgument(
-        void.class.equals(m.getReturnType())
-            || DoFn.ProcessContinuation.class.equals(m.getReturnType()),
-        "Must return void or %s",
-        DoFn.ProcessContinuation.class.getSimpleName());
+        void.class.equals(m.getReturnType()),
+        "Must return void");
 
 
     MethodAnalysisContext methodContext = MethodAnalysisContext.create();
@@ -761,11 +737,7 @@ public class DoFnSignatures {
     }
 
     return DoFnSignature.ProcessElementMethod.create(
-        m,
-        methodContext.getExtraParameters(),
-        trackerT,
-        windowT,
-        DoFn.ProcessContinuation.class.equals(m.getReturnType()));
+        m, methodContext.getExtraParameters(), trackerT, windowT);
   }
 
   private static void checkParameterOneOf(
@@ -817,12 +789,6 @@ public class DoFnSignatures {
           "Multiple %s parameters",
           BoundedWindow.class.getSimpleName());
       return Parameter.boundedWindow((TypeDescriptor<? extends BoundedWindow>) paramT);
-    } else if (PipelineOptions.class.equals(rawType)) {
-      methodErrors.checkArgument(
-          !methodContext.hasPipelineOptionsParamter(),
-          "Multiple %s parameters",
-          PipelineOptions.class.getSimpleName());
-      return Parameter.pipelineOptions();
     } else if (RestrictionTracker.class.isAssignableFrom(rawType)) {
       methodErrors.checkArgument(
           !methodContext.hasRestrictionTrackerParameter(),

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
deleted file mode 100644
index edf7e3c..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/reflect/OnTimerMethodSpecifier.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms.reflect;
-
-import com.google.auto.value.AutoValue;
-import org.apache.beam.sdk.transforms.DoFn;
-
-/**
- * Used by {@link ByteBuddyOnTimerInvokerFactory} to Dynamically generate
- * {@link OnTimerInvoker} instances for invoking a particular
- * {@link DoFn.TimerId} on a particular {@link DoFn}.
- */
-
-@AutoValue
-abstract class OnTimerMethodSpecifier {
-    public abstract Class<? extends DoFn<?, ?>> fnClass();
-    public abstract String timerId();
-    public static OnTimerMethodSpecifier
-    forClassAndTimerId(Class<? extends DoFn<?, ?>> fnClass, String timerId){
-        return  new AutoValue_OnTimerMethodSpecifier(fnClass, timerId);
-    }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
new file mode 100644
index 0000000..104f5f2
--- /dev/null
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRange.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms.splittabledofn;
+
+import static com.google.common.base.Preconditions.checkArgument;
+
+import java.io.Serializable;
+
+/** A restriction represented by a range of integers [from, to). */
+public class OffsetRange
+    implements Serializable, HasDefaultTracker<OffsetRange, OffsetRangeTracker> {
+  private final long from;
+  private final long to;
+
+  public OffsetRange(long from, long to) {
+    checkArgument(from <= to, "Malformed range [%s, %s)", from, to);
+    this.from = from;
+    this.to = to;
+  }
+
+  public long getFrom() {
+    return from;
+  }
+
+  public long getTo() {
+    return to;
+  }
+
+  @Override
+  public OffsetRangeTracker newTracker() {
+    return new OffsetRangeTracker(this);
+  }
+
+  @Override
+  public String toString() {
+    return "[" + from + ", " + to + ')';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    OffsetRange that = (OffsetRange) o;
+
+    if (from != that.from) {
+      return false;
+    }
+    return to == that.to;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (int) (from ^ (from >>> 32));
+    result = 31 * result + (int) (to ^ (to >>> 32));
+    return result;
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
index 4987409..0271a0d 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/OffsetRangeTracker.java
@@ -21,8 +21,6 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
-import com.google.common.base.MoreObjects;
-import org.apache.beam.sdk.io.range.OffsetRange;
 import org.apache.beam.sdk.transforms.DoFn;
 
 /**
@@ -101,13 +99,4 @@ public class OffsetRangeTracker implements RestrictionTracker<OffsetRange> {
         lastAttemptedOffset + 1,
         range.getTo());
   }
-
-  @Override
-  public String toString() {
-    return MoreObjects.toStringHelper(this)
-        .add("range", range)
-        .add("lastClaimedOffset", lastClaimedOffset)
-        .add("lastAttemptedOffset", lastAttemptedOffset)
-        .toString();
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
index 8cb0a6b..27ef68f 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/splittabledofn/RestrictionTracker.java
@@ -31,13 +31,10 @@ public interface RestrictionTracker<RestrictionT> {
   RestrictionT currentRestriction();
 
   /**
-   * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible:
-   * after this method returns, the tracker MUST refuse all future claim calls, and {@link
-   * #checkDone} MUST succeed.
-   *
-   * <p>Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the
-   * work: the old value of {@link #currentRestriction} is equivalent to the new value and the
-   * return value of this method combined. Must be called at most once on a given object.
+   * Signals that the current {@link DoFn.ProcessElement} call should terminate as soon as possible.
+   * Modifies {@link #currentRestriction}. Returns a restriction representing the rest of the work:
+   * the old value of {@link #currentRestriction} is equivalent to the new value and the return
+   * value of this method combined. Must be called at most once on a given object.
    */
   RestrictionT checkpoint();
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
index c68c497..d48d26b 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java
@@ -79,11 +79,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
   }
 
   @Override
-  public boolean assignsToOneWindow() {
-    return true;
-  }
-
-  @Override
   public boolean equals(Object other) {
     return other instanceof GlobalWindows;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
index 341ba27..40ee68a 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java
@@ -58,9 +58,4 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow>
   public Instant getOutputTime(Instant inputTimestamp, W window) {
     return inputTimestamp;
   }
-
-  @Override
-  public final boolean assignsToOneWindow() {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
index 150b956..f657884 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java
@@ -148,11 +148,6 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> {
   }
 
   @Override
-  public boolean assignsToOneWindow() {
-    return !this.period.isShorterThan(this.size);
-  }
-
-  @Override
   public void verifyCompatibility(WindowFn<?, ?> other) throws IncompatibleWindowException {
     if (!this.isCompatible(other)) {
       throw new IncompatibleWindowException(

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index a12be6d..105ebfb 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -163,24 +163,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
-   * Specifies the conditions under which an on-time pane will be created when a window is closed.
-   */
-  public enum OnTimeBehavior {
-    /**
-     * Always fire the on-time pane. Even if there is no new data since the previous firing,
-     * an element will be produced.
-     *
-     * <p>This is the default behavior.
-     */
-    FIRE_ALWAYS,
-    /**
-     * Only fire the on-time pane if there is new data since the previous firing.
-     */
-    FIRE_IF_NON_EMPTY
-
-  }
-
-  /**
    * Creates a {@code Window} {@code PTransform} that uses the given
    * {@link WindowFn} to window the data.
    *
@@ -213,7 +195,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   @Nullable abstract AccumulationMode getAccumulationMode();
   @Nullable abstract Duration getAllowedLateness();
   @Nullable abstract ClosingBehavior getClosingBehavior();
-  @Nullable abstract OnTimeBehavior getOnTimeBehavior();
   @Nullable abstract TimestampCombiner getTimestampCombiner();
 
   abstract Builder<T> toBuilder();
@@ -225,7 +206,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     abstract Builder<T> setAccumulationMode(AccumulationMode mode);
     abstract Builder<T> setAllowedLateness(Duration allowedLateness);
     abstract Builder<T> setClosingBehavior(ClosingBehavior closingBehavior);
-    abstract Builder<T> setOnTimeBehavior(OnTimeBehavior onTimeBehavior);
     abstract Builder<T> setTimestampCombiner(TimestampCombiner timestampCombiner);
 
     abstract Window<T> build();
@@ -319,15 +299,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
   }
 
   /**
-   * <b><i>(Experimental)</i></b> Override the default {@link OnTimeBehavior}, to control
-   * whether to output an empty on-time pane.
-   */
-  @Experimental(Kind.TRIGGER)
-  public Window<T> withOnTimeBehavior(OnTimeBehavior behavior) {
-    return toBuilder().setOnTimeBehavior(behavior).build();
-  }
-
-  /**
    * Get the output strategy of this {@link Window Window PTransform}. For internal use
    * only.
    */
@@ -350,9 +321,6 @@ public abstract class Window<T> extends PTransform<PCollection<T>, PCollection<T
     if (getClosingBehavior() != null) {
       result = result.withClosingBehavior(getClosingBehavior());
     }
-    if (getOnTimeBehavior() != null) {
-      result = result.withOnTimeBehavior(getOnTimeBehavior());
-    }
     if (getTimestampCombiner() != null) {
       result = result.withTimestampCombiner(getTimestampCombiner());
     }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
index ffe85f3..001d630 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java
@@ -180,17 +180,6 @@ public abstract class WindowFn<T, W extends BoundedWindow>
   }
 
   /**
-   * Returns true if this {@link WindowFn} always assigns an element to exactly one window.
-   *
-   * <p>If this varies per-element, or cannot be determined, conservatively return false.
-   *
-   * <p>By default, returns false.
-   */
-  public boolean assignsToOneWindow() {
-    return false;
-  }
-
-  /**
    * Returns a {@link TypeDescriptor} capturing what is known statically about the window type of
    * this {@link WindowFn} instance's most-derived class.
    *

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
index ef6d833..a4bfdda 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/IdentityWindowFn.java
@@ -116,9 +116,4 @@ public class IdentityWindowFn<T> extends NonMergingWindowFn<T, BoundedWindow> {
   public Instant getOutputTime(Instant inputTimestamp, BoundedWindow window) {
     return inputTimestamp;
   }
-
-  @Override
-  public boolean assignsToOneWindow() {
-    return true;
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
index 4063d11..f210fd8 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollection.java
@@ -20,8 +20,6 @@ package org.apache.beam.sdk.values;
 import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkState;
 
-import java.util.Collections;
-import java.util.Map;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
@@ -228,11 +226,6 @@ public class PCollection<T> extends PValueBase implements PValue {
     return super.getName();
   }
 
-  @Override
-  public final Map<TupleTag<?>, PValue> expand() {
-    return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
-  }
-
   /**
    * Sets the name of this {@link PCollection}.  Returns {@code this}.
    *
@@ -321,11 +314,6 @@ public class PCollection<T> extends PValueBase implements PValue {
 
   private IsBounded isBounded;
 
-  /**
-   * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
-   */
-  private final TupleTag<?> tag = new TupleTag<>();
-
   private PCollection(Pipeline p) {
     super(p);
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
index e17e146..74887c7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PCollectionViews.java
@@ -21,7 +21,6 @@ import com.google.common.base.Function;
 import com.google.common.base.MoreObjects;
 import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Multimap;
 import java.io.IOException;
@@ -39,7 +38,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.transforms.Materialization;
 import org.apache.beam.sdk.transforms.Materializations;
-import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ViewFn;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.InvalidWindows;
@@ -141,18 +139,6 @@ public class PCollectionViews {
   }
 
   /**
-   * Expands a list of {@link PCollectionView} into the form needed for
-   * {@link PTransform#getAdditionalInputs()}.
-   */
-  public static Map<TupleTag<?>, PValue> toAdditionalInputs(Iterable<PCollectionView<?>> views) {
-    ImmutableMap.Builder<TupleTag<?>, PValue> additionalInputs = ImmutableMap.builder();
-    for (PCollectionView<?> view : views) {
-      additionalInputs.put(view.getTagInternal(), view.getPCollection());
-    }
-    return additionalInputs.build();
-  }
-
-  /**
    * Implementation of conversion of singleton {@code Iterable<WindowedValue<T>>} to {@code T}.
    *
    * <p>For internal use only.
@@ -184,15 +170,6 @@ public class PCollectionViews {
     }
 
     /**
-     * Returns if a default value was specified.
-     */
-    @Deprecated
-    @Internal
-    public boolean hasDefault() {
-      return hasDefault;
-    }
-
-    /**
      * Returns the default value that was specified.
      *
      * <p>For internal use only.
@@ -296,16 +273,6 @@ public class PCollectionViews {
             }
           }));
     }
-
-    @Override
-    public boolean equals(Object other) {
-      return other instanceof ListViewFn;
-    }
-
-    @Override
-    public int hashCode() {
-      return ListViewFn.class.hashCode();
-    }
   }
 
   /**
@@ -524,10 +491,5 @@ public class PCollectionViews {
     public String toString() {
       return MoreObjects.toStringHelper(this).add("tag", tag).toString();
     }
-
-    @Override
-    public Map<TupleTag<?>, PValue> expand() {
-      return Collections.<TupleTag<?>, PValue>singletonMap(tag, pCollection);
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
index f312eac..6f638d7 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/PValueBase.java
@@ -19,6 +19,8 @@ package org.apache.beam.sdk.values;
 
 import static com.google.common.base.Preconditions.checkState;
 
+import java.util.Collections;
+import java.util.Map;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.annotations.Internal;
 import org.apache.beam.sdk.transforms.PTransform;
@@ -85,6 +87,11 @@ public abstract class PValueBase implements PValue {
   private String name;
 
   /**
+   * A local {@link TupleTag} used in the expansion of this {@link PValueBase}.
+   */
+  private TupleTag<?> tag = new TupleTag<>();
+
+  /**
    * Whether this {@link PValueBase} has been finalized, and its core
    * properties, e.g., name, can no longer be changed.
    */
@@ -101,6 +108,11 @@ public abstract class PValueBase implements PValue {
   }
 
   @Override
+  public final Map<TupleTag<?>, PValue> expand() {
+    return Collections.<TupleTag<?>, PValue>singletonMap(tag, this);
+  }
+
+  @Override
   public void finishSpecifying(PInput input, PTransform<?, ?> transform) {
     finishedSpecifying = true;
   }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
deleted file mode 100644
index e56af13..0000000
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/ShardedKey.java
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.beam.sdk.values;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/** A key and a shard number. */
-public class ShardedKey<K> implements Serializable {
-  private static final long serialVersionUID = 1L;
-  private final K key;
-  private final int shardNumber;
-
-  public static <K> ShardedKey<K> of(K key, int shardNumber) {
-    return new ShardedKey<>(key, shardNumber);
-  }
-
-  private ShardedKey(K key, int shardNumber) {
-    this.key = key;
-    this.shardNumber = shardNumber;
-  }
-
-  public K getKey() {
-    return key;
-  }
-
-  public int getShardNumber() {
-    return shardNumber;
-  }
-
-  @Override
-  public String toString() {
-    return "key: " + key + " shard: " + shardNumber;
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    if (!(o instanceof ShardedKey)) {
-      return false;
-    }
-    ShardedKey<K> other = (ShardedKey<K>) o;
-    return Objects.equals(key, other.key) && Objects.equals(shardNumber, other.shardNumber);
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(key, shardNumber);
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
index 3b74e69..8a773e2 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/WindowingStrategy.java
@@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
 import org.apache.beam.sdk.transforms.windowing.TimestampCombiner;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
-import org.apache.beam.sdk.transforms.windowing.Window.OnTimeBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.joda.time.Duration;
 
@@ -60,7 +59,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
   private final AccumulationMode mode;
   private final Duration allowedLateness;
   private final ClosingBehavior closingBehavior;
-  private final OnTimeBehavior onTimeBehavior;
   private final TimestampCombiner timestampCombiner;
   private final boolean triggerSpecified;
   private final boolean modeSpecified;
@@ -73,8 +71,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
       AccumulationMode mode, boolean modeSpecified,
       Duration allowedLateness, boolean allowedLatenessSpecified,
       TimestampCombiner timestampCombiner, boolean timestampCombinerSpecified,
-      ClosingBehavior closingBehavior,
-      OnTimeBehavior onTimeBehavior) {
+      ClosingBehavior closingBehavior) {
     this.windowFn = windowFn;
     this.trigger = trigger;
     this.triggerSpecified = triggerSpecified;
@@ -83,7 +80,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     this.allowedLateness = allowedLateness;
     this.allowedLatenessSpecified = allowedLatenessSpecified;
     this.closingBehavior = closingBehavior;
-    this.onTimeBehavior = onTimeBehavior;
     this.timestampCombiner = timestampCombiner;
     this.timestampCombinerSpecified = timestampCombinerSpecified;
   }
@@ -102,8 +98,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         AccumulationMode.DISCARDING_FIRED_PANES, false,
         DEFAULT_ALLOWED_LATENESS, false,
         TimestampCombiner.END_OF_WINDOW, false,
-        ClosingBehavior.FIRE_IF_NON_EMPTY,
-        OnTimeBehavior.FIRE_ALWAYS);
+        ClosingBehavior.FIRE_IF_NON_EMPTY);
   }
 
   public WindowFn<T, W> getWindowFn() {
@@ -138,10 +133,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
     return closingBehavior;
   }
 
-  public OnTimeBehavior getOnTimeBehavior() {
-    return onTimeBehavior;
-  }
-
   public TimestampCombiner getTimestampCombiner() {
     return timestampCombiner;
   }
@@ -161,8 +152,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   /**
@@ -176,8 +166,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, true,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   /**
@@ -194,8 +183,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   /**
@@ -209,8 +197,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, true,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   public WindowingStrategy<T, W> withClosingBehavior(ClosingBehavior closingBehavior) {
@@ -220,19 +207,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
-  }
-
-  public WindowingStrategy<T, W> withOnTimeBehavior(OnTimeBehavior onTimeBehavior) {
-    return new WindowingStrategy<T, W>(
-        windowFn,
-        trigger, triggerSpecified,
-        mode, modeSpecified,
-        allowedLateness, allowedLatenessSpecified,
-        timestampCombiner, timestampCombinerSpecified,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   @Experimental(Experimental.Kind.OUTPUT_TIME)
@@ -244,8 +219,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, modeSpecified,
         allowedLateness, allowedLatenessSpecified,
         timestampCombiner, true,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 
   @Override
@@ -272,7 +246,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         && getMode().equals(other.getMode())
         && getAllowedLateness().equals(other.getAllowedLateness())
         && getClosingBehavior().equals(other.getClosingBehavior())
-        && getOnTimeBehavior().equals(other.getOnTimeBehavior())
         && getTrigger().equals(other.getTrigger())
         && getTimestampCombiner().equals(other.getTimestampCombiner())
         && getWindowFn().equals(other.getWindowFn());
@@ -305,7 +278,6 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab
         mode, true,
         allowedLateness, true,
         timestampCombiner, true,
-        closingBehavior,
-        onTimeBehavior);
+        closingBehavior);
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
index 260e47a..6d01d32 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/AvroIOTest.java
@@ -54,11 +54,10 @@ import org.apache.avro.reflect.ReflectData;
 import org.apache.avro.reflect.ReflectDatumReader;
 import org.apache.beam.sdk.coders.AvroCoder;
 import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.FileBasedSink.FilenamePolicy;
-import org.apache.beam.sdk.io.FileBasedSink.OutputFileHints;
 import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
-import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.testing.NeedsRunner;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -277,42 +276,37 @@ public class AvroIOTest {
   }
 
   private static class WindowedFilenamePolicy extends FilenamePolicy {
-    final ResourceId outputFilePrefix;
+    final String outputFilePrefix;
 
-    WindowedFilenamePolicy(ResourceId outputFilePrefix) {
+    WindowedFilenamePolicy(String outputFilePrefix) {
       this.outputFilePrefix = outputFilePrefix;
     }
 
     @Override
-    public ResourceId windowedFilename(WindowedContext input, OutputFileHints outputFileHints) {
-      String filenamePrefix =
-          outputFilePrefix.isDirectory() ? "" : firstNonNull(outputFilePrefix.getFilename(), "");
-
-      String filename =
-          String.format(
-              "%s-%s-%s-of-%s-pane-%s%s%s",
-              filenamePrefix,
-              input.getWindow(),
-              input.getShardNumber(),
-              input.getNumShards() - 1,
-              input.getPaneInfo().getIndex(),
-              input.getPaneInfo().isLast() ? "-final" : "",
-              outputFileHints.getSuggestedFilenameSuffix());
-      return outputFilePrefix
-          .getCurrentDirectory()
-          .resolve(filename, StandardResolveOptions.RESOLVE_FILE);
+    public ResourceId windowedFilename(
+        ResourceId outputDirectory, WindowedContext input, String extension) {
+      String filename = String.format(
+          "%s-%s-%s-of-%s-pane-%s%s%s",
+          outputFilePrefix,
+          input.getWindow(),
+          input.getShardNumber(),
+          input.getNumShards() - 1,
+          input.getPaneInfo().getIndex(),
+          input.getPaneInfo().isLast() ? "-final" : "",
+          extension);
+      return outputDirectory.resolve(filename, StandardResolveOptions.RESOLVE_FILE);
     }
 
     @Override
-    public ResourceId unwindowedFilename(Context input, OutputFileHints outputFileHints) {
+    public ResourceId unwindowedFilename(
+        ResourceId outputDirectory, Context input, String extension) {
       throw new UnsupportedOperationException("Expecting windowed outputs only");
     }
 
     @Override
     public void populateDisplayData(DisplayData.Builder builder) {
-      builder.add(
-          DisplayData.item("fileNamePrefix", outputFilePrefix.toString())
-              .withLabel("File Name Prefix"));
+      builder.add(DisplayData.item("fileNamePrefix", outputFilePrefix)
+          .withLabel("File Name Prefix"));
     }
   }
 
@@ -365,18 +359,15 @@ public class AvroIOTest {
         Arrays.copyOfRange(secondWindowArray, 1, secondWindowArray.length))
         .advanceWatermarkToInfinity();
 
-    FilenamePolicy policy =
-        new WindowedFilenamePolicy(FileBasedSink.convertToFileResourceIfPossible(baseFilename));
+    FilenamePolicy policy = new WindowedFilenamePolicy(baseFilename);
     windowedAvroWritePipeline
         .apply(values)
         .apply(Window.<GenericClass>into(FixedWindows.of(Duration.standardMinutes(1))))
-        .apply(
-            AvroIO.write(GenericClass.class)
-                .to(policy)
-                .withTempDirectory(
-                    StaticValueProvider.of(FileSystems.matchNewResource(baseDir.toString(), true)))
-                .withWindowedWrites()
-                .withNumShards(2));
+        .apply(AvroIO.write(GenericClass.class)
+            .to(baseFilename)
+            .withFilenamePolicy(policy)
+            .withWindowedWrites()
+            .withNumShards(2));
     windowedAvroWritePipeline.run();
 
     // Validate that the data written matches the expected elements in the expected order
@@ -503,14 +494,13 @@ public class AvroIOTest {
       expectedFiles.add(
           new File(
               DefaultFilenamePolicy.constructName(
-                      FileBasedSink.convertToFileResourceIfPossible(outputFilePrefix),
-                      shardNameTemplate,
-                      "" /* no suffix */,
-                      i,
-                      numShards,
-                      null,
-                      null)
-                  .toString()));
+                  outputFilePrefix,
+                  shardNameTemplate,
+                  "" /* no suffix */,
+                  i,
+                  numShards,
+                  null,
+                  null)));
     }
 
     List<String> actualElements = new ArrayList<>();
@@ -582,4 +572,15 @@ public class AvroIOTest {
     assertThat(displayData, hasDisplayItem("numShards", 100));
     assertThat(displayData, hasDisplayItem("codec", CodecFactory.snappyCodec().toString()));
   }
+
+  @Test
+  public void testWindowedWriteRequiresFilenamePolicy() {
+    PCollection<String> emptyInput = p.apply(Create.empty(StringUtf8Coder.of()));
+    AvroIO.Write write = AvroIO.write(String.class).to("/tmp/some/file").withWindowedWrites();
+
+    expectedException.expect(IllegalStateException.class);
+    expectedException.expectMessage(
+        "When using windowed writes, a filename policy must be set via withFilenamePolicy()");
+    emptyInput.apply(write);
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
index 9dc6d33..217420c 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DefaultFilenamePolicyTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.sdk.io;
 
+import static org.apache.beam.sdk.io.DefaultFilenamePolicy.constructName;
 import static org.junit.Assert.assertEquals;
 
-import org.apache.beam.sdk.io.fs.ResourceId;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
@@ -30,108 +30,69 @@ import org.junit.runners.JUnit4;
 @RunWith(JUnit4.class)
 public class DefaultFilenamePolicyTest {
 
-  private static String constructName(
-      String baseFilename,
-      String shardTemplate,
-      String suffix,
-      int shardNum,
-      int numShards,
-      String paneStr,
-      String windowStr) {
-    ResourceId constructed =
-        DefaultFilenamePolicy.constructName(
-            FileSystems.matchNewResource(baseFilename, false),
-            shardTemplate,
-            suffix,
-            shardNum,
-            numShards,
-            paneStr,
-            windowStr);
-    return constructed.toString();
-  }
-
   @Test
   public void testConstructName() {
-    assertEquals(
-        "/path/to/output-001-of-123.txt",
-        constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
 
-    assertEquals(
-        "/path/to/out.txt/part-00042",
-        constructName("/path/to/out.txt", "/part-SSSSS", "", 42, 100, null, null));
+    assertEquals("out.txt/part-00042",
+        constructName("out.txt", "/part-SSSSS", "", 42, 100, null, null));
 
-    assertEquals("/path/to/out.txt", constructName("/path/to/ou", "t.t", "xt", 1, 1, null, null));
+    assertEquals("out.txt",
+        constructName("ou", "t.t", "xt", 1, 1, null, null));
 
-    assertEquals(
-        "/path/to/out0102shard.txt",
-        constructName("/path/to/out", "SSNNshard", ".txt", 1, 2, null, null));
+    assertEquals("out0102shard.txt",
+        constructName("out", "SSNNshard", ".txt", 1, 2, null, null));
 
-    assertEquals(
-        "/path/to/out-2/1.part-1-of-2.txt",
-        constructName("/path/to/out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
+    assertEquals("out-2/1.part-1-of-2.txt",
+        constructName("out", "-N/S.part-S-of-N", ".txt", 1, 2, null, null));
   }
 
   @Test
   public void testConstructNameWithLargeShardCount() {
-    assertEquals(
-        "/out-100-of-5000.txt", constructName("/out", "-SS-of-NN", ".txt", 100, 5000, null, null));
+    assertEquals("out-100-of-5000.txt",
+        constructName("out", "-SS-of-NN", ".txt", 100, 5000, null, null));
   }
 
   @Test
   public void testConstructWindowedName() {
-    assertEquals(
-        "/path/to/output-001-of-123.txt",
-        constructName("/path/to/output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
-
-    assertEquals(
-        "/path/to/output-001-of-123-PPP-W.txt",
-        constructName("/path/to/output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
-
-    assertEquals(
-        "/path/to/out" + ".txt/part-00042-myPaneStr-myWindowStr",
-        constructName(
-            "/path/to/out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr", "myWindowStr"));
-
-    assertEquals(
-        "/path/to/out.txt",
-        constructName("/path/to/ou", "t.t", "xt", 1, 1, "myPaneStr2", "anotherWindowStr"));
-
-    assertEquals(
-        "/path/to/out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
-        constructName(
-            "/path/to/out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr", "oneMoreWindowStr"));
-
-    assertEquals(
-        "/out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
-            + "panemyPaneStr3.txt",
-        constructName(
-            "/out",
-            "-N/S.part-S-of-N-W-P-windowW-paneP",
-            ".txt",
-            1,
-            2,
-            "myPaneStr3",
-            "slidingWindow1"));
+    assertEquals("output-001-of-123.txt",
+        constructName("output", "-SSS-of-NNN", ".txt", 1, 123, null, null));
+
+    assertEquals("output-001-of-123-PPP-W.txt",
+        constructName("output", "-SSS-of-NNN-PPP-W", ".txt", 1, 123, null, null));
+
+    assertEquals("out.txt/part-00042-myPaneStr-myWindowStr",
+        constructName("out.txt", "/part-SSSSS-P-W", "", 42, 100, "myPaneStr",
+            "myWindowStr"));
+
+    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "myPaneStr2",
+        "anotherWindowStr"));
+
+    assertEquals("out0102shard-oneMoreWindowStr-anotherPaneStr.txt",
+        constructName("out", "SSNNshard-W-P", ".txt", 1, 2, "anotherPaneStr",
+            "oneMoreWindowStr"));
+
+    assertEquals("out-2/1.part-1-of-2-slidingWindow1-myPaneStr3-windowslidingWindow1-"
+        + "panemyPaneStr3.txt",
+        constructName("out", "-N/S.part-S-of-N-W-P-windowW-paneP", ".txt", 1, 2, "myPaneStr3",
+        "slidingWindow1"));
 
     // test first/last pane
-    assertEquals(
-        "/out.txt/part-00042-myWindowStr-pane-11-true-false",
-        constructName(
-            "/out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false", "myWindowStr"));
-
-    assertEquals(
-        "/path/to/out.txt",
-        constructName("/path/to/ou", "t.t", "xt", 1, 1, "pane", "anotherWindowStr"));
-
-    assertEquals(
-        "/out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
-        constructName(
-            "/out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false", "oneMoreWindowStr"));
-
-    assertEquals(
-        "/path/to/out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
-        constructName(
-            "/path/to/out", "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
+    assertEquals("out.txt/part-00042-myWindowStr-pane-11-true-false",
+        constructName("out.txt", "/part-SSSSS-W-P", "", 42, 100, "pane-11-true-false",
+            "myWindowStr"));
+
+    assertEquals("out.txt", constructName("ou", "t.t", "xt", 1, 1, "pane",
+        "anotherWindowStr"));
+
+    assertEquals("out0102shard-oneMoreWindowStr-pane--1-false-false-pane--1-false-false.txt",
+        constructName("out", "SSNNshard-W-P-P", ".txt", 1, 2, "pane--1-false-false",
+            "oneMoreWindowStr"));
+
+    assertEquals("out-2/1.part-1-of-2-sWindow1-winsWindow1-ppaneL.txt",
+        constructName("out",
+        "-N/S.part-S-of-N-W-winW-pP", ".txt", 1, 2, "paneL", "sWindow1"));
   }
 
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
index a7644b6..6615a2e 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/DrunkWritableByteChannelFactory.java
@@ -39,7 +39,7 @@ public class DrunkWritableByteChannelFactory implements WritableByteChannelFacto
   }
 
   @Override
-  public String getSuggestedFilenameSuffix() {
+  public String getFilenameSuffix() {
     return ".drunk";
   }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
index 755bb59..caad759 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/FileBasedSinkTest.java
@@ -103,7 +103,7 @@ public class FileBasedSinkTest {
 
     SimpleSink.SimpleWriter writer =
         buildWriteOperationWithTempDir(getBaseTempDirectory()).createWriter();
-    writer.openUnwindowed(testUid, -1, null);
+    writer.openUnwindowed(testUid, -1);
     for (String value : values) {
       writer.write(value);
     }
@@ -198,27 +198,23 @@ public class FileBasedSinkTest {
       throws Exception {
     int numFiles = temporaryFiles.size();
 
-    List<FileResult<Void>> fileResults = new ArrayList<>();
+    List<FileResult> fileResults = new ArrayList<>();
     // Create temporary output bundles and output File objects.
     for (int i = 0; i < numFiles; i++) {
       fileResults.add(
-          new FileResult<Void>(
+          new FileResult(
               LocalResources.fromFile(temporaryFiles.get(i), false),
               WriteFiles.UNKNOWN_SHARDNUM,
               null,
-              null,
               null));
     }
 
     writeOp.finalize(fileResults);
 
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
     for (int i = 0; i < numFiles; i++) {
-      ResourceId outputFilename =
-          writeOp
-              .getSink()
-              .getDynamicDestinations()
-              .getFilenamePolicy(null)
-              .unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED);
+      ResourceId outputFilename = writeOp.getSink().getFilenamePolicy()
+          .unwindowedFilename(outputDirectory, new Context(i, numFiles), "");
       assertTrue(new File(outputFilename.toString()).exists());
       assertFalse(temporaryFiles.get(i).exists());
     }
@@ -235,12 +231,11 @@ public class FileBasedSinkTest {
   private void testRemoveTemporaryFiles(int numFiles, ResourceId tempDirectory)
       throws Exception {
     String prefix = "file";
-    SimpleSink<Void> sink =
-        SimpleSink.makeSimpleSink(
-            getBaseOutputDirectory(), prefix, "", "", CompressionType.UNCOMPRESSED);
+    SimpleSink sink =
+        new SimpleSink(getBaseOutputDirectory(), prefix, "", "");
 
-    WriteOperation<String, Void> writeOp =
-        new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
+    WriteOperation<String> writeOp =
+        new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
 
     List<File> temporaryFiles = new ArrayList<>();
     List<File> outputFiles = new ArrayList<>();
@@ -277,6 +272,8 @@ public class FileBasedSinkTest {
   @Test
   public void testCopyToOutputFiles() throws Exception {
     SimpleSink.SimpleWriteOperation writeOp = buildWriteOperation();
+    ResourceId outputDirectory = writeOp.getSink().getBaseOutputDirectoryProvider().get();
+
     List<String> inputFilenames = Arrays.asList("input-1", "input-2", "input-3");
     List<String> inputContents = Arrays.asList("1", "2", "3");
     List<String> expectedOutputFilenames = Arrays.asList(
@@ -295,14 +292,9 @@ public class FileBasedSinkTest {
       File inputTmpFile = tmpFolder.newFile(inputFilenames.get(i));
       List<String> lines = Collections.singletonList(inputContents.get(i));
       writeFile(lines, inputTmpFile);
-      inputFilePaths.put(
-          LocalResources.fromFile(inputTmpFile, false),
-          writeOp
-              .getSink()
-              .getDynamicDestinations()
-              .getFilenamePolicy(null)
-              .unwindowedFilename(
-                  new Context(i, inputFilenames.size()), CompressionType.UNCOMPRESSED));
+      inputFilePaths.put(LocalResources.fromFile(inputTmpFile, false),
+          writeOp.getSink().getFilenamePolicy()
+              .unwindowedFilename(outputDirectory, new Context(i, inputFilenames.size()), ""));
     }
 
     // Copy input files to output files.
@@ -319,8 +311,7 @@ public class FileBasedSinkTest {
       ResourceId outputDirectory, FilenamePolicy policy, int numFiles) {
     List<ResourceId> filenames = new ArrayList<>();
     for (int i = 0; i < numFiles; i++) {
-      filenames.add(
-          policy.unwindowedFilename(new Context(i, numFiles), CompressionType.UNCOMPRESSED));
+      filenames.add(policy.unwindowedFilename(outputDirectory, new Context(i, numFiles), ""));
     }
     return filenames;
   }
@@ -335,10 +326,8 @@ public class FileBasedSinkTest {
     List<ResourceId> actual;
     ResourceId root = getBaseOutputDirectory();
 
-    SimpleSink<Void> sink =
-        SimpleSink.makeSimpleSink(
-            root, "file", ".SSSSS.of.NNNNN", ".test", CompressionType.UNCOMPRESSED);
-    FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
+    SimpleSink sink = new SimpleSink(root, "file", ".SSSSS.of.NNNNN", ".test");
+    FilenamePolicy policy = sink.getFilenamePolicy();
 
     expected = Arrays.asList(
         root.resolve("file.00000.of.00003.test", StandardResolveOptions.RESOLVE_FILE),
@@ -363,9 +352,8 @@ public class FileBasedSinkTest {
   @Test
   public void testCollidingOutputFilenames() throws IOException {
     ResourceId root = getBaseOutputDirectory();
-    SimpleSink<Void> sink =
-        SimpleSink.makeSimpleSink(root, "file", "-NN", "test", CompressionType.UNCOMPRESSED);
-    SimpleSink.SimpleWriteOperation<Void> writeOp = new SimpleSink.SimpleWriteOperation<>(sink);
+    SimpleSink sink = new SimpleSink(root, "file", "-NN", "test");
+    SimpleSink.SimpleWriteOperation writeOp = new SimpleSink.SimpleWriteOperation(sink);
 
     ResourceId temp1 = root.resolve("temp1", StandardResolveOptions.RESOLVE_FILE);
     ResourceId temp2 = root.resolve("temp2", StandardResolveOptions.RESOLVE_FILE);
@@ -373,11 +361,11 @@ public class FileBasedSinkTest {
     ResourceId output = root.resolve("file-03.test", StandardResolveOptions.RESOLVE_FILE);
     // More than one shard does.
     try {
-      Iterable<FileResult<Void>> results =
+      Iterable<FileResult> results =
           Lists.newArrayList(
-              new FileResult<Void>(temp1, 1, null, null, null),
-              new FileResult<Void>(temp2, 1, null, null, null),
-              new FileResult<Void>(temp3, 1, null, null, null));
+              new FileResult(temp1, 1, null, null),
+              new FileResult(temp2, 1, null, null),
+              new FileResult(temp3, 1, null, null));
       writeOp.buildOutputFilenames(results);
       fail("Should have failed.");
     } catch (IllegalStateException exn) {
@@ -391,10 +379,8 @@ public class FileBasedSinkTest {
     List<ResourceId> expected;
     List<ResourceId> actual;
     ResourceId root = getBaseOutputDirectory();
-    SimpleSink<Void> sink =
-        SimpleSink.makeSimpleSink(
-            root, "file", "-SSSSS-of-NNNNN", "", CompressionType.UNCOMPRESSED);
-    FilenamePolicy policy = sink.getDynamicDestinations().getFilenamePolicy(null);
+    SimpleSink sink = new SimpleSink(root, "file", "-SSSSS-of-NNNNN", "");
+    FilenamePolicy policy = sink.getFilenamePolicy();
 
     expected = Arrays.asList(
         root.resolve("file-00000-of-00003", StandardResolveOptions.RESOLVE_FILE),
@@ -500,11 +486,10 @@ public class FileBasedSinkTest {
   public void testFileBasedWriterWithWritableByteChannelFactory() throws Exception {
     final String testUid = "testId";
     ResourceId root = getBaseOutputDirectory();
-    WriteOperation<String, Void> writeOp =
-        SimpleSink.makeSimpleSink(
-                root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
+    WriteOperation<String> writeOp =
+        new SimpleSink(root, "file", "-SS-of-NN", "txt", new DrunkWritableByteChannelFactory())
             .createWriteOperation();
-    final Writer<String, Void> writer = writeOp.createWriter();
+    final Writer<String> writer = writeOp.createWriter();
     final ResourceId expectedFile =
         writeOp.tempDirectory.get().resolve(testUid, StandardResolveOptions.RESOLVE_FILE);
 
@@ -518,7 +503,7 @@ public class FileBasedSinkTest {
     expected.add("footer");
     expected.add("footer");
 
-    writer.openUnwindowed(testUid, -1, null);
+    writer.openUnwindowed(testUid, -1);
     writer.write("a");
     writer.write("b");
     final FileResult result = writer.close();
@@ -528,20 +513,20 @@ public class FileBasedSinkTest {
   }
 
   /** Build a SimpleSink with default options. */
-  private SimpleSink<Void> buildSink() {
-    return SimpleSink.makeSimpleSink(
-        getBaseOutputDirectory(), "file", "-SS-of-NN", ".test", CompressionType.UNCOMPRESSED);
+  private SimpleSink buildSink() {
+    return new SimpleSink(getBaseOutputDirectory(), "file", "-SS-of-NN", ".test");
   }
 
-  /** Build a SimpleWriteOperation with default options and the given temporary directory. */
-  private SimpleSink.SimpleWriteOperation<Void> buildWriteOperationWithTempDir(
-      ResourceId tempDirectory) {
-    SimpleSink<Void> sink = buildSink();
-    return new SimpleSink.SimpleWriteOperation<>(sink, tempDirectory);
+  /**
+   * Build a SimpleWriteOperation with default options and the given temporary directory.
+   */
+  private SimpleSink.SimpleWriteOperation buildWriteOperationWithTempDir(ResourceId tempDirectory) {
+    SimpleSink sink = buildSink();
+    return new SimpleSink.SimpleWriteOperation(sink, tempDirectory);
   }
 
   /** Build a write operation with the default options for it and its parent sink. */
-  private SimpleSink.SimpleWriteOperation<Void> buildWriteOperation() {
+  private SimpleSink.SimpleWriteOperation buildWriteOperation() {
     return buildSink().createWriteOperation();
   }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/c1b2b96a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
index 9196178..c97313d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/SimpleSink.java
@@ -19,55 +19,33 @@ package org.apache.beam.sdk.io;
 
 import java.nio.ByteBuffer;
 import java.nio.channels.WritableByteChannel;
-import org.apache.beam.sdk.io.DefaultFilenamePolicy.Params;
-import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
 import org.apache.beam.sdk.io.fs.ResourceId;
 import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
 import org.apache.beam.sdk.util.MimeTypes;
 
 /**
- * A simple {@link FileBasedSink} that writes {@link String} values as lines with header and footer.
+ * A simple {@link FileBasedSink} that writes {@link String} values as lines with
+ * header and footer.
  */
-class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
-  public SimpleSink(
-      ResourceId tempDirectory,
-      DynamicDestinations<String, DestinationT> dynamicDestinations,
-      WritableByteChannelFactory writableByteChannelFactory) {
-    super(StaticValueProvider.of(tempDirectory), dynamicDestinations, writableByteChannelFactory);
+class SimpleSink extends FileBasedSink<String> {
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix) {
+    this(baseOutputDirectory, prefix, template, suffix, CompressionType.UNCOMPRESSED);
   }
 
-  public static SimpleSink<Void> makeSimpleSink(
-      ResourceId tempDirectory, FilenamePolicy filenamePolicy) {
-    return new SimpleSink<>(
-        tempDirectory,
-        DynamicFileDestinations.<String>constant(filenamePolicy),
-        CompressionType.UNCOMPRESSED);
-  }
-
-  public static SimpleSink<Void> makeSimpleSink(
-      ResourceId baseDirectory,
-      String prefix,
-      String shardTemplate,
-      String suffix,
-      WritableByteChannelFactory writableByteChannelFactory) {
-    DynamicDestinations<String, Void> dynamicDestinations =
-        DynamicFileDestinations.constant(
-            DefaultFilenamePolicy.fromParams(
-                new Params()
-                    .withBaseFilename(
-                        baseDirectory.resolve(prefix, StandardResolveOptions.RESOLVE_FILE))
-                    .withShardTemplate(shardTemplate)
-                    .withSuffix(suffix)));
-    return new SimpleSink<>(baseDirectory, dynamicDestinations, writableByteChannelFactory);
+  public SimpleSink(ResourceId baseOutputDirectory, String prefix, String template, String suffix,
+                    WritableByteChannelFactory writableByteChannelFactory) {
+    super(
+        StaticValueProvider.of(baseOutputDirectory),
+        new DefaultFilenamePolicy(StaticValueProvider.of(prefix), template, suffix),
+        writableByteChannelFactory);
   }
 
   @Override
-  public SimpleWriteOperation<DestinationT> createWriteOperation() {
-    return new SimpleWriteOperation<>(this);
+  public SimpleWriteOperation createWriteOperation() {
+    return new SimpleWriteOperation(this);
   }
 
-  static final class SimpleWriteOperation<DestinationT>
-      extends WriteOperation<String, DestinationT> {
+  static final class SimpleWriteOperation extends WriteOperation<String> {
     public SimpleWriteOperation(SimpleSink sink, ResourceId tempOutputDirectory) {
       super(sink, tempOutputDirectory);
     }
@@ -77,12 +55,12 @@ class SimpleSink<DestinationT> extends FileBasedSink<String, DestinationT> {
     }
 
     @Override
-    public SimpleWriter<DestinationT> createWriter() throws Exception {
-      return new SimpleWriter<>(this);
+    public SimpleWriter createWriter() throws Exception {
+      return new SimpleWriter(this);
     }
   }
 
-  static final class SimpleWriter<DestinationT> extends Writer<String, DestinationT> {
+  static final class SimpleWriter extends Writer<String> {
     static final String HEADER = "header";
     static final String FOOTER = "footer";
 


Mime
View raw message