beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [17/67] [partial] incubator-beam git commit: Directory reorganization
Date Thu, 24 Mar 2016 02:47:41 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java
deleted file mode 100644
index fac2c28..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/AfterWatermark.java
+++ /dev/null
@@ -1,397 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.transforms.windowing.Trigger.OnceTrigger;
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.ImmutableList;
-
-import org.joda.time.Instant;
-
-import java.util.List;
-import java.util.Objects;
-
-/**
- * <p>{@code AfterWatermark} triggers fire based on progress of the system watermark. This time is a
- * lower-bound, sometimes heuristically established, on event times that have been fully processed
- * by the pipeline.
- *
- * <p>For sources that provide non-heuristic watermarks (e.g.
- * {@link com.google.cloud.dataflow.sdk.io.PubsubIO} when using arrival times as event times), the
- * watermark is a strict guarantee that no data with an event time earlier than
- * that watermark will ever be observed in the pipeline. In this case, it's safe to assume that any
- * pane triggered by an {@code AfterWatermark} trigger with a reference point at or beyond the end
- * of the window will be the last pane ever for that window.
- *
- * <p>For sources that provide heuristic watermarks (e.g.
- * {@link com.google.cloud.dataflow.sdk.io.PubsubIO} when using user-supplied event times), the
- * watermark itself becomes an <i>estimate</i> that no data with an event time earlier than that
- * watermark (i.e. "late data") will ever be observed in the pipeline. These heuristics can
- * often be quite accurate, but the chance of seeing late data for any given window is non-zero.
- * Thus, if absolute correctness over time is important to your use case, you may want to consider
- * using a trigger that accounts for late data. The default trigger,
- * {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}, which fires
- * once when the watermark passes the end of the window and then immediately therafter when any
- * late data arrives, is one such example.
- *
- * <p>The watermark is the clock that defines {@link TimeDomain#EVENT_TIME}.
- *
- * Additionaly firings before or after the watermark can be requested by calling
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)} or
- * {@code AfterWatermark.pastEndOfWindow.withEarlyFirings(OnceTrigger)}.
- *
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class AfterWatermark<W extends BoundedWindow> {
-
-  // Static factory class.
-  private AfterWatermark() {}
-
-  /**
-   * Creates a trigger that fires when the watermark passes the end of the window.
-   */
-  public static <W extends BoundedWindow> FromEndOfWindow<W> pastEndOfWindow() {
-    return new FromEndOfWindow<W>();
-  }
-
-  /**
-   * Interface for building an AfterWatermarkTrigger with early firings already filled in.
-   */
-  public interface AfterWatermarkEarly<W extends BoundedWindow> extends TriggerBuilder<W> {
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires after the watermark has passed the end of the window.
-     */
-    TriggerBuilder<W> withLateFirings(OnceTrigger<W> lateTrigger);
-  }
-
-  /**
-   * Interface for building an AfterWatermarkTrigger with late firings already filled in.
-   */
-  public interface AfterWatermarkLate<W extends BoundedWindow> extends TriggerBuilder<W> {
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires before the watermark has passed the end of the window.
-     */
-    TriggerBuilder<W> withEarlyFirings(OnceTrigger<W> earlyTrigger);
-  }
-
-  /**
-   * A trigger which never fires. Used for the "early" trigger when only a late trigger was
-   * specified.
-   */
-  private static class NeverTrigger<W extends BoundedWindow> extends OnceTrigger<W> {
-
-    protected NeverTrigger() {
-      super(null);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception { }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception { }
-
-    @Override
-    protected Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(W window) {
-      return BoundedWindow.TIMESTAMP_MAX_VALUE;
-    }
-
-    @Override
-    public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-      return false;
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception {
-      throw new UnsupportedOperationException(
-          String.format("%s should never fire", getClass().getSimpleName()));
-    }
-  }
-
-  private static class AfterWatermarkEarlyAndLate<W extends BoundedWindow>
-      extends Trigger<W>
-      implements TriggerBuilder<W>, AfterWatermarkEarly<W>, AfterWatermarkLate<W> {
-
-    private static final int EARLY_INDEX = 0;
-    private static final int LATE_INDEX = 1;
-
-    private final OnceTrigger<W> earlyTrigger;
-    private final OnceTrigger<W> lateTrigger;
-
-    @SuppressWarnings("unchecked")
-    private AfterWatermarkEarlyAndLate(OnceTrigger<W> earlyTrigger, OnceTrigger<W> lateTrigger) {
-      super(lateTrigger == null
-          ? ImmutableList.<Trigger<W>>of(earlyTrigger)
-          : ImmutableList.<Trigger<W>>of(earlyTrigger, lateTrigger));
-      this.earlyTrigger = checkNotNull(earlyTrigger, "earlyTrigger should not be null");
-      this.lateTrigger = lateTrigger;
-    }
-
-    @Override
-    public TriggerBuilder<W> withEarlyFirings(OnceTrigger<W> earlyTrigger) {
-      return new AfterWatermarkEarlyAndLate<W>(earlyTrigger, lateTrigger);
-    }
-
-    @Override
-    public TriggerBuilder<W> withLateFirings(OnceTrigger<W> lateTrigger) {
-      return new AfterWatermarkEarlyAndLate<W>(earlyTrigger, lateTrigger);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception {
-      if (!c.trigger().isMerging()) {
-        // If merges can never happen, we just run the unfinished subtrigger
-        c.trigger().firstUnfinishedSubTrigger().invokeOnElement(c);
-      } else {
-        // If merges can happen, we run for all subtriggers because they might be
-        // de-activated or re-activated
-        for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-          subTrigger.invokeOnElement(c);
-        }
-      }
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      ExecutableTrigger<W> earlySubtrigger = c.trigger().subTrigger(EARLY_INDEX);
-      // We check the early trigger to determine if we are still processing it or
-      // if the end of window has transitioned us to the late trigger
-      OnMergeContext earlyContext = c.forTrigger(earlySubtrigger);
-
-      // If the early trigger is still active in any merging window then it is still active in
-      // the new merged window, because even if the merged window is "done" some pending elements
-      // haven't had a chance to fire.
-      if (!earlyContext.trigger().finishedInAllMergingWindows() || !endOfWindowReached(c)) {
-        earlyContext.trigger().setFinished(false);
-        if (lateTrigger != null) {
-          ExecutableTrigger<W> lateSubtrigger = c.trigger().subTrigger(LATE_INDEX);
-          OnMergeContext lateContext = c.forTrigger(lateSubtrigger);
-          lateContext.trigger().setFinished(false);
-          lateSubtrigger.invokeClear(lateContext);
-        }
-      } else {
-        // Otherwise the early trigger and end-of-window bit is done for good.
-        earlyContext.trigger().setFinished(true);
-        if (lateTrigger != null) {
-          c.trigger().subTrigger(LATE_INDEX).invokeOnMerge(c);
-        }
-      }
-    }
-
-    @Override
-    public Trigger<W> getContinuationTrigger() {
-      return new AfterWatermarkEarlyAndLate<W>(
-          earlyTrigger.getContinuationTrigger(),
-          lateTrigger == null ? null : lateTrigger.getContinuationTrigger());
-    }
-
-    @Override
-    protected Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-      throw new UnsupportedOperationException(
-          "Should not call getContinuationTrigger(List<Trigger<W>>)");
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(W window) {
-      // Even without an early or late trigger, we'll still produce a firing at the watermark.
-      return window.maxTimestamp();
-    }
-
-    private boolean endOfWindowReached(Trigger<W>.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-      if (!context.trigger().isFinished(EARLY_INDEX)) {
-        // We have not yet transitioned to late firings.
-        // We should fire if either the trigger is ready or we reach the end of the window.
-        return context.trigger().subTrigger(EARLY_INDEX).invokeShouldFire(context)
-            || endOfWindowReached(context);
-      } else if (lateTrigger == null) {
-        return false;
-      } else {
-        // We are running the late trigger
-        return context.trigger().subTrigger(LATE_INDEX).invokeShouldFire(context);
-      }
-    }
-
-    @Override
-    public void onFire(Trigger<W>.TriggerContext context) throws Exception {
-      if (!context.forTrigger(context.trigger().subTrigger(EARLY_INDEX)).trigger().isFinished()) {
-        onNonLateFiring(context);
-      } else if (lateTrigger != null) {
-        onLateFiring(context);
-      } else {
-        // all done
-        context.trigger().setFinished(true);
-      }
-    }
-
-    private void onNonLateFiring(Trigger<W>.TriggerContext context) throws Exception {
-      // We have not yet transitioned to late firings.
-      ExecutableTrigger<W> earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX);
-      Trigger<W>.TriggerContext earlyContext = context.forTrigger(earlySubtrigger);
-
-      if (!endOfWindowReached(context)) {
-        // This is an early firing, since we have not arrived at the end of the window
-        // Implicitly repeats
-        earlySubtrigger.invokeOnFire(context);
-        earlySubtrigger.invokeClear(context);
-        earlyContext.trigger().setFinished(false);
-      } else {
-        // We have arrived at the end of the window; terminate the early trigger
-        // and clear out the late trigger's state
-        if (earlySubtrigger.invokeShouldFire(context)) {
-          earlySubtrigger.invokeOnFire(context);
-        }
-        earlyContext.trigger().setFinished(true);
-        earlySubtrigger.invokeClear(context);
-
-        if (lateTrigger == null) {
-          // Done if there is no late trigger.
-          context.trigger().setFinished(true);
-        } else {
-          // If there is a late trigger, we transition to it, and need to clear its state
-          // because it was run in parallel.
-          context.trigger().subTrigger(LATE_INDEX).invokeClear(context);
-        }
-      }
-
-    }
-
-    private void onLateFiring(Trigger<W>.TriggerContext context) throws Exception {
-      // We are firing the late trigger, with implicit repeat
-      ExecutableTrigger<W> lateSubtrigger = context.trigger().subTrigger(LATE_INDEX);
-      lateSubtrigger.invokeOnFire(context);
-      // It is a OnceTrigger, so it must have finished; unfinished it and clear it
-      lateSubtrigger.invokeClear(context);
-      context.forTrigger(lateSubtrigger).trigger().setFinished(false);
-    }
-  }
-
-  /**
-   * A watermark trigger targeted relative to the end of the window.
-   */
-  public static class FromEndOfWindow<W extends BoundedWindow> extends OnceTrigger<W> {
-
-    private FromEndOfWindow() {
-      super(null);
-    }
-
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires before the watermark has passed the end of the window.
-     */
-    public AfterWatermarkEarly<W> withEarlyFirings(OnceTrigger<W> earlyFirings) {
-      Preconditions.checkNotNull(earlyFirings,
-          "Must specify the trigger to use for early firings");
-      return new AfterWatermarkEarlyAndLate<W>(earlyFirings, null);
-    }
-
-    /**
-     * Creates a new {@code Trigger} like the this, except that it fires repeatedly whenever
-     * the given {@code Trigger} fires after the watermark has passed the end of the window.
-     */
-    public AfterWatermarkLate<W> withLateFirings(OnceTrigger<W> lateFirings) {
-      Preconditions.checkNotNull(lateFirings,
-          "Must specify the trigger to use for late firings");
-      return new AfterWatermarkEarlyAndLate<W>(new NeverTrigger<W>(), lateFirings);
-    }
-
-    @Override
-    public void onElement(OnElementContext c) throws Exception {
-      // We're interested in knowing when the input watermark passes the end of the window.
-      // (It is possible this has already happened, in which case the timer will be fired
-      // almost immediately).
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-
-    @Override
-    public void onMerge(OnMergeContext c) throws Exception {
-      // NOTE that the ReduceFnRunner will delete all end-of-window timers for the
-      // merged-away windows.
-
-      if (!c.trigger().finishedInAllMergingWindows()) {
-        // If the trigger is still active in any merging window then it is still active in the new
-        // merged window, because even if the merged window is "done" some pending elements haven't
-        // had a chance to fire
-        c.trigger().setFinished(false);
-      } else if (!endOfWindowReached(c)) {
-        // If the end of the new window has not been reached, then the trigger is active again.
-        c.trigger().setFinished(false);
-      } else {
-        // Otherwise it is done for good
-        c.trigger().setFinished(true);
-      }
-    }
-
-    @Override
-    public Instant getWatermarkThatGuaranteesFiring(W window) {
-      return window.maxTimestamp();
-    }
-
-    @Override
-    public FromEndOfWindow<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-      return this;
-    }
-
-    @Override
-    public String toString() {
-      return "AfterWatermark.pastEndOfWindow()";
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-      return obj instanceof FromEndOfWindow;
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-
-    @Override
-    public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-      return endOfWindowReached(context);
-    }
-
-    private boolean endOfWindowReached(Trigger<W>.TriggerContext context) {
-      return context.currentEventTime() != null
-          && context.currentEventTime().isAfter(context.window().maxTimestamp());
-    }
-
-    @Override
-    protected void onOnlyFiring(Trigger<W>.TriggerContext context) throws Exception { }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/BoundedWindow.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/BoundedWindow.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/BoundedWindow.java
deleted file mode 100644
index 0afd8e3..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/BoundedWindow.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import org.joda.time.Instant;
-
-import java.util.concurrent.TimeUnit;
-
-/**
- * A {@code BoundedWindow} represents a finite grouping of elements, with an
- * upper bound (larger timestamps represent more recent data) on the timestamps
- * of elements that can be placed in the window. This finiteness means that for
- * every window, at some point in time, all data for that window will have
- * arrived and can be processed together.
- *
- * <p>Windows must also implement {@link Object#equals} and
- * {@link Object#hashCode} such that windows that are logically equal will
- * be treated as equal by {@code equals()} and {@code hashCode()}.
- */
-public abstract class BoundedWindow {
-  // The min and max timestamps that won't overflow when they are converted to
-  // usec.
-  public static final Instant TIMESTAMP_MIN_VALUE =
-      new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MIN_VALUE));
-  public static final Instant TIMESTAMP_MAX_VALUE =
-      new Instant(TimeUnit.MICROSECONDS.toMillis(Long.MAX_VALUE));
-
-  /**
-   * Returns the inclusive upper bound of timestamps for values in this window.
-   */
-  public abstract Instant maxTimestamp();
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/CalendarWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/CalendarWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/CalendarWindows.java
deleted file mode 100644
index de5140f..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/CalendarWindows.java
+++ /dev/null
@@ -1,348 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.DateTime;
-import org.joda.time.DateTimeZone;
-import org.joda.time.Days;
-import org.joda.time.Instant;
-import org.joda.time.Months;
-import org.joda.time.Years;
-
-/**
- * A collection of {@link WindowFn}s that windows values into calendar-based
- * windows such as spans of days, months, or years.
- *
- * <p>For example, to group data into quarters that change on the 15th, use
- * {@code CalendarWindows.months(3).withStartingMonth(2014, 1).beginningOnDay(15)}.
- */
-public class CalendarWindows {
-
-  /**
-   * Returns a {@link WindowFn} that windows elements into periods measured by days.
-   *
-   * <p>For example, {@code CalendarWindows.days(1)} will window elements into
-   * separate windows for each day.
-   */
-  public static DaysWindows days(int number) {
-    return new DaysWindows(number, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
-  }
-
-  /**
-   * Returns a {@link WindowFn} that windows elements into periods measured by weeks.
-   *
-   * <p>For example, {@code CalendarWindows.weeks(1, DateTimeConstants.TUESDAY)} will
-   * window elements into week-long windows starting on Tuesdays.
-   */
-  public static DaysWindows weeks(int number, int startDayOfWeek) {
-    return new DaysWindows(
-        7 * number,
-        new DateTime(0, DateTimeZone.UTC).withDayOfWeek(startDayOfWeek),
-        DateTimeZone.UTC);
-  }
-
-  /**
-   * Returns a {@link WindowFn} that windows elements into periods measured by months.
-   *
-   * <p>For example,
-   * {@code CalendarWindows.months(8).withStartingMonth(2014, 1).beginningOnDay(10)}
-   * will window elements into 8 month windows where that start on the 10th day of month,
-   * and the first window begins in January 2014.
-   */
-  public static MonthsWindows months(int number) {
-    return new MonthsWindows(number, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
-  }
-
-  /**
-   * Returns a {@link WindowFn} that windows elements into periods measured by years.
-   *
-   * <p>For example,
-   * {@code CalendarWindows.years(1).withTimeZone(DateTimeZone.forId("America/Los_Angeles"))}
-   * will window elements into year-long windows that start at midnight on Jan 1, in the
-   * America/Los_Angeles time zone.
-   */
-  public static YearsWindows years(int number) {
-    return new YearsWindows(number, 1, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC);
-  }
-
-  /**
-   * A {@link WindowFn} that windows elements into periods measured by days.
-   *
-   * <p>By default, periods of multiple days are measured starting at the
-   * epoch.  This can be overridden with {@link #withStartingDay}.
-   *
-   * <p>The time zone used to determine calendar boundaries is UTC, unless this
-   * is overridden with the {@link #withTimeZone} method.
-   */
-  public static class DaysWindows extends PartitioningWindowFn<Object, IntervalWindow> {
-    public DaysWindows withStartingDay(int year, int month, int day) {
-      return new DaysWindows(
-          number, new DateTime(year, month, day, 0, 0, timeZone), timeZone);
-    }
-
-    public DaysWindows withTimeZone(DateTimeZone timeZone) {
-      return new DaysWindows(
-          number, startDate.withZoneRetainFields(timeZone), timeZone);
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-
-    private int number;
-    private DateTime startDate;
-    private DateTimeZone timeZone;
-
-    private DaysWindows(int number, DateTime startDate, DateTimeZone timeZone) {
-      this.number = number;
-      this.startDate = startDate;
-      this.timeZone = timeZone;
-    }
-
-    @Override
-    public IntervalWindow assignWindow(Instant timestamp) {
-      DateTime datetime = new DateTime(timestamp, timeZone);
-
-      int dayOffset = Days.daysBetween(startDate, datetime).getDays() / number * number;
-
-      DateTime begin = startDate.plusDays(dayOffset);
-      DateTime end = begin.plusDays(number);
-
-      return new IntervalWindow(begin.toInstant(), end.toInstant());
-    }
-
-    @Override
-    public Coder<IntervalWindow> windowCoder() {
-      return IntervalWindow.getCoder();
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      if (!(other instanceof DaysWindows)) {
-        return false;
-      }
-      DaysWindows that = (DaysWindows) other;
-      return number == that.number
-          && startDate == that.startDate
-          && timeZone == that.timeZone;
-    }
-
-    public int getNumber() {
-      return number;
-    }
-
-    public DateTime getStartDate() {
-      return startDate;
-    }
-
-    public DateTimeZone getTimeZone() {
-      return timeZone;
-    }
-
-  }
-
-  /**
-   * A {@link WindowFn} that windows elements into periods measured by months.
-   *
-   * <p>By default, periods of multiple months are measured starting at the
-   * epoch.  This can be overridden with {@link #withStartingMonth}.
-   *
-   * <p>Months start on the first day of each calendar month, unless overridden by
-   * {@link #beginningOnDay}.
-   *
-   * <p>The time zone used to determine calendar boundaries is UTC, unless this
-   * is overridden with the {@link #withTimeZone} method.
-   */
-  public static class MonthsWindows extends PartitioningWindowFn<Object, IntervalWindow> {
-    public MonthsWindows beginningOnDay(int dayOfMonth) {
-      return new MonthsWindows(
-          number, dayOfMonth, startDate, timeZone);
-    }
-
-    public MonthsWindows withStartingMonth(int year, int month) {
-      return new MonthsWindows(
-          number, dayOfMonth, new DateTime(year, month, 1, 0, 0, timeZone), timeZone);
-    }
-
-    public MonthsWindows withTimeZone(DateTimeZone timeZone) {
-      return new MonthsWindows(
-          number, dayOfMonth, startDate.withZoneRetainFields(timeZone), timeZone);
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-
-    private int number;
-    private int dayOfMonth;
-    private DateTime startDate;
-    private DateTimeZone timeZone;
-
-    private MonthsWindows(int number, int dayOfMonth, DateTime startDate, DateTimeZone timeZone) {
-      this.number = number;
-      this.dayOfMonth = dayOfMonth;
-      this.startDate = startDate;
-      this.timeZone = timeZone;
-    }
-
-    @Override
-    public IntervalWindow assignWindow(Instant timestamp) {
-      DateTime datetime = new DateTime(timestamp, timeZone);
-
-      int monthOffset =
-          Months.monthsBetween(startDate.withDayOfMonth(dayOfMonth), datetime).getMonths()
-          / number * number;
-
-      DateTime begin = startDate.withDayOfMonth(dayOfMonth).plusMonths(monthOffset);
-      DateTime end = begin.plusMonths(number);
-
-      return new IntervalWindow(begin.toInstant(), end.toInstant());
-    }
-
-    @Override
-    public Coder<IntervalWindow> windowCoder() {
-      return IntervalWindow.getCoder();
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      if (!(other instanceof MonthsWindows)) {
-        return false;
-      }
-      MonthsWindows that = (MonthsWindows) other;
-      return number == that.number
-          && dayOfMonth == that.dayOfMonth
-          && startDate == that.startDate
-          && timeZone == that.timeZone;
-    }
-
-    public int getNumber() {
-      return number;
-    }
-
-    public int getDayOfMonth() {
-      return dayOfMonth;
-    }
-
-    public DateTime getStartDate() {
-      return startDate;
-    }
-
-    public DateTimeZone getTimeZone() {
-      return timeZone;
-    }
-
-  }
-
-  /**
-   * A {@link WindowFn} that windows elements into periods measured by years.
-   *
-   * <p>By default, periods of multiple years are measured starting at the
-   * epoch.  This can be overridden with {@link #withStartingYear}.
-   *
-   * <p>Years start on the first day of each calendar year, unless overridden by
-   * {@link #beginningOnDay}.
-   *
-   * <p>The time zone used to determine calendar boundaries is UTC, unless this
-   * is overridden with the {@link #withTimeZone} method.
-   */
-  public static class YearsWindows extends PartitioningWindowFn<Object, IntervalWindow> {
-    public YearsWindows beginningOnDay(int monthOfYear, int dayOfMonth) {
-      return new YearsWindows(
-          number, monthOfYear, dayOfMonth, startDate, timeZone);
-    }
-
-    public YearsWindows withStartingYear(int year) {
-      return new YearsWindows(
-          number, monthOfYear, dayOfMonth, new DateTime(year, 1, 1, 0, 0, timeZone), timeZone);
-    }
-
-    public YearsWindows withTimeZone(DateTimeZone timeZone) {
-      return new YearsWindows(
-          number, monthOfYear, dayOfMonth, startDate.withZoneRetainFields(timeZone), timeZone);
-    }
-
-    ////////////////////////////////////////////////////////////////////////////
-
-    private int number;
-    private int monthOfYear;
-    private int dayOfMonth;
-    private DateTime startDate;
-    private DateTimeZone timeZone;
-
-    private YearsWindows(
-        int number, int monthOfYear, int dayOfMonth, DateTime startDate, DateTimeZone timeZone) {
-      this.number = number;
-      this.monthOfYear = monthOfYear;
-      this.dayOfMonth = dayOfMonth;
-      this.startDate = startDate;
-      this.timeZone = timeZone;
-    }
-
-    @Override
-    public IntervalWindow assignWindow(Instant timestamp) {
-      DateTime datetime = new DateTime(timestamp, timeZone);
-
-      DateTime offsetStart = startDate.withMonthOfYear(monthOfYear).withDayOfMonth(dayOfMonth);
-
-      int yearOffset =
-          Years.yearsBetween(offsetStart, datetime).getYears() / number * number;
-
-      DateTime begin = offsetStart.plusYears(yearOffset);
-      DateTime end = begin.plusYears(number);
-
-      return new IntervalWindow(begin.toInstant(), end.toInstant());
-    }
-
-    @Override
-    public Coder<IntervalWindow> windowCoder() {
-      return IntervalWindow.getCoder();
-    }
-
-    @Override
-    public boolean isCompatible(WindowFn<?, ?> other) {
-      if (!(other instanceof YearsWindows)) {
-        return false;
-      }
-      YearsWindows that = (YearsWindows) other;
-      return number == that.number
-          && monthOfYear == that.monthOfYear
-          && dayOfMonth == that.dayOfMonth
-          && startDate == that.startDate
-          && timeZone == that.timeZone;
-    }
-
-    public DateTimeZone getTimeZone() {
-      return timeZone;
-    }
-
-    public DateTime getStartDate() {
-      return startDate;
-    }
-
-    public int getDayOfMonth() {
-      return dayOfMonth;
-    }
-
-    public int getMonthOfYear() {
-      return monthOfYear;
-    }
-
-    public int getNumber() {
-      return number;
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java
deleted file mode 100644
index 9ac4abd..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/DefaultTrigger.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.cloud.dataflow.sdk.util.TimeDomain;
-
-import org.joda.time.Instant;
-
-import java.util.List;
-
-/**
- * A trigger that is equivalent to {@code Repeatedly.forever(AfterWatermark.pastEndOfWindow())}.
- * See {@link Repeatedly#forever} and {@link AfterWatermark#pastEndOfWindow} for more details.
- *
- * @param <W> The type of windows being triggered/encoded.
- */
-@Experimental(Experimental.Kind.TRIGGER)
-public class DefaultTrigger<W extends BoundedWindow> extends Trigger<W>{
-
-  private DefaultTrigger() {
-    super(null);
-  }
-
-  /**
-   * Returns the default trigger.
-   */
-  public static <W extends BoundedWindow> DefaultTrigger<W> of() {
-    return new DefaultTrigger<W>();
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    // If the end of the window has already been reached, then we are already ready to fire
-    // and do not need to set a wake-up timer.
-    if (!endOfWindowReached(c)) {
-      c.setTimer(c.window().maxTimestamp(), TimeDomain.EVENT_TIME);
-    }
-  }
-
-  @Override
-  public void clear(TriggerContext c) throws Exception { }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    return window.maxTimestamp();
-  }
-
-  @Override
-  public boolean isCompatible(Trigger<?> other) {
-    // Semantically, all default triggers are identical
-    return other instanceof DefaultTrigger;
-  }
-
-  @Override
-  public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    return this;
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    return endOfWindowReached(context);
-  }
-
-  private boolean endOfWindowReached(Trigger<W>.TriggerContext context) {
-    return context.currentEventTime() != null
-        && context.currentEventTime().isAfter(context.window().maxTimestamp());
-  }
-
-  @Override
-  public void onFire(Trigger<W>.TriggerContext context) throws Exception { }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows.java
deleted file mode 100644
index 12a0f1b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/FixedWindows.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.util.Objects;
-
-/**
- * A {@link WindowFn} that windows values into fixed-size timestamp-based windows.
- *
- * <p>For example, in order to partition the data into 10 minute windows:
- * <pre> {@code
- * PCollection<Integer> items = ...;
- * PCollection<Integer> windowedItems = items.apply(
- *   Window.<Integer>into(FixedWindows.of(Duration.standardMinutes(10))));
- * } </pre>
- */
-public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> {
-
-  /**
-   * Size of this window.
-   */
-  private final Duration size;
-
-  /**
-   * Offset of this window.  Windows start at time
-   * N * size + offset, where 0 is the epoch.
-   */
-  private final Duration offset;
-
-  /**
-   * Partitions the timestamp space into half-open intervals of the form
-   * [N * size, (N + 1) * size), where 0 is the epoch.
-   */
-  public static FixedWindows of(Duration size) {
-    return new FixedWindows(size, Duration.ZERO);
-  }
-
-  /**
-   * Partitions the timestamp space into half-open intervals of the form
-   * [N * size + offset, (N + 1) * size + offset),
-   * where 0 is the epoch.
-   *
-   * @throws IllegalArgumentException if offset is not in [0, size)
-   */
-  public FixedWindows withOffset(Duration offset) {
-    return new FixedWindows(size, offset);
-  }
-
-  private FixedWindows(Duration size, Duration offset) {
-    if (offset.isShorterThan(Duration.ZERO) || !offset.isShorterThan(size)) {
-      throw new IllegalArgumentException(
-          "FixedWindows WindowingStrategies must have 0 <= offset < size");
-    }
-    this.size = size;
-    this.offset = offset;
-  }
-
-  @Override
-  public IntervalWindow assignWindow(Instant timestamp) {
-    long start = timestamp.getMillis()
-        - timestamp.plus(size).minus(offset).getMillis() % size.getMillis();
-    return new IntervalWindow(new Instant(start), size);
-  }
-
-  @Override
-  public Coder<IntervalWindow> windowCoder() {
-    return IntervalWindow.getCoder();
-  }
-
-  @Override
-  public boolean isCompatible(WindowFn<?, ?> other) {
-    return this.equals(other);
-  }
-
-  public Duration getSize() {
-    return size;
-  }
-
-  public Duration getOffset() {
-    return offset;
-  }
-
-  @Override
-  public boolean equals(Object object) {
-    if (!(object instanceof FixedWindows)) {
-      return false;
-    }
-    FixedWindows other = (FixedWindows) object;
-    return getOffset().equals(other.getOffset())
-        && getSize().equals(other.getSize());
-  }
-
-  @Override
-  public int hashCode() {
-    return Objects.hash(size, offset);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindow.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindow.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindow.java
deleted file mode 100644
index d7fc396..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindow.java
+++ /dev/null
@@ -1,68 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * The default window into which all data is placed (via {@link GlobalWindows}).
- */
-public class GlobalWindow extends BoundedWindow {
-  /**
-   * Singleton instance of {@link GlobalWindow}.
-   */
-  public static final GlobalWindow INSTANCE = new GlobalWindow();
-
-  // Triggers use maxTimestamp to set timers' timestamp. Timers fires when
-  // the watermark passes their timestamps. So, the maxTimestamp needs to be
-  // smaller than the TIMESTAMP_MAX_VALUE.
-  // One standard day is subtracted from TIMESTAMP_MAX_VALUE to make sure
-  // the maxTimestamp is smaller than TIMESTAMP_MAX_VALUE even after rounding up
-  // to seconds or minutes.
-  private static final Instant END_OF_GLOBAL_WINDOW =
-      TIMESTAMP_MAX_VALUE.minus(Duration.standardDays(1));
-
-  @Override
-  public Instant maxTimestamp() {
-    return END_OF_GLOBAL_WINDOW;
-  }
-
-  private GlobalWindow() {}
-
-  /**
-   * {@link Coder} for encoding and decoding {@code GlobalWindow}s.
-   */
-  public static class Coder extends AtomicCoder<GlobalWindow> {
-    public static final Coder INSTANCE = new Coder();
-
-    @Override
-    public void encode(GlobalWindow window, OutputStream outStream, Context context) {}
-
-    @Override
-    public GlobalWindow decode(InputStream inStream, Context context) {
-      return GlobalWindow.INSTANCE;
-    }
-
-    private Coder() {}
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindows.java
deleted file mode 100644
index d3d949c..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/GlobalWindows.java
+++ /dev/null
@@ -1,63 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-import java.util.Collections;
-
-/**
- * Default {@link WindowFn} that assigns all data to the same window.
- */
-public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> {
-
-  private static final Collection<GlobalWindow> GLOBAL_WINDOWS =
-      Collections.singletonList(GlobalWindow.INSTANCE);
-
-  @Override
-  public Collection<GlobalWindow> assignWindows(AssignContext c) {
-    return GLOBAL_WINDOWS;
-  }
-
-  @Override
-  public boolean isCompatible(WindowFn<?, ?> o) {
-    return o instanceof GlobalWindows;
-  }
-
-  @Override
-  public Coder<GlobalWindow> windowCoder() {
-    return GlobalWindow.Coder.INSTANCE;
-  }
-
-  @Override
-  public GlobalWindow getSideInputWindow(BoundedWindow window) {
-    return GlobalWindow.INSTANCE;
-  }
-
-  @Override
-  public boolean assignsToSingleWindow() {
-    return true;
-  }
-
-  @Override
-  public Instant getOutputTime(Instant inputTimestamp, GlobalWindow window) {
-    return inputTimestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/IntervalWindow.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/IntervalWindow.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/IntervalWindow.java
deleted file mode 100644
index 58287c7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/IntervalWindow.java
+++ /dev/null
@@ -1,201 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.AtomicCoder;
-import com.google.cloud.dataflow.sdk.coders.Coder;
-import com.google.cloud.dataflow.sdk.coders.CoderException;
-import com.google.cloud.dataflow.sdk.coders.DurationCoder;
-import com.google.cloud.dataflow.sdk.coders.InstantCoder;
-
-import com.fasterxml.jackson.annotation.JsonCreator;
-
-import org.joda.time.Duration;
-import org.joda.time.Instant;
-import org.joda.time.ReadableDuration;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-
-/**
- * An implementation of {@link BoundedWindow} that represents an interval from
- * {@link #start} (inclusive) to {@link #end} (exclusive).
- */
-public class IntervalWindow extends BoundedWindow
-    implements Comparable<IntervalWindow> {
-  /**
-   * Start of the interval, inclusive.
-   */
-  private final Instant start;
-
-  /**
-   * End of the interval, exclusive.
-   */
-  private final Instant end;
-
-  /**
-   * Creates a new IntervalWindow that represents the half-open time
-   * interval [start, end).
-   */
-  public IntervalWindow(Instant start, Instant end) {
-    this.start = start;
-    this.end = end;
-  }
-
-  public IntervalWindow(Instant start, ReadableDuration size) {
-    this.start = start;
-    this.end = start.plus(size);
-  }
-
-  /**
-   * Returns the start of this window, inclusive.
-   */
-  public Instant start() {
-    return start;
-  }
-
-  /**
-   * Returns the end of this window, exclusive.
-   */
-  public Instant end() {
-    return end;
-  }
-
-  /**
-   * Returns the largest timestamp that can be included in this window.
-   */
-  @Override
-  public Instant maxTimestamp() {
-    // end not inclusive
-    return end.minus(1);
-  }
-
-  /**
-   * Returns whether this window contains the given window.
-   */
-  public boolean contains(IntervalWindow other) {
-    return !this.start.isAfter(other.start) && !this.end.isBefore(other.end);
-  }
-
-  /**
-   * Returns whether this window is disjoint from the given window.
-   */
-  public boolean isDisjoint(IntervalWindow other) {
-    return !this.end.isAfter(other.start) || !other.end.isAfter(this.start);
-  }
-
-  /**
-   * Returns whether this window intersects the given window.
-   */
-  public boolean intersects(IntervalWindow other) {
-    return !isDisjoint(other);
-  }
-
-  /**
-   * Returns the minimal window that includes both this window and
-   * the given window.
-   */
-  public IntervalWindow span(IntervalWindow other) {
-    return new IntervalWindow(
-        new Instant(Math.min(start.getMillis(), other.start.getMillis())),
-        new Instant(Math.max(end.getMillis(), other.end.getMillis())));
-  }
-
-  @Override
-  public boolean equals(Object o) {
-    return (o instanceof IntervalWindow)
-        && ((IntervalWindow) o).end.isEqual(end)
-        && ((IntervalWindow) o).start.isEqual(start);
-  }
-
-  @Override
-  public int hashCode() {
-    // The end values are themselves likely to be arithmetic sequence, which
-    // is a poor distribution to use for a hashtable, so we
-    // add a highly non-linear transformation.
-    return (int)
-        (start.getMillis() + modInverse((int) (end.getMillis() << 1) + 1));
-  }
-
-  /**
-   * Compute the inverse of (odd) x mod 2^32.
-   */
-  private int modInverse(int x) {
-    // Cube gives inverse mod 2^4, as x^4 == 1 (mod 2^4) for all odd x.
-    int inverse = x * x * x;
-    // Newton iteration doubles correct bits at each step.
-    inverse *= 2 - x * inverse;
-    inverse *= 2 - x * inverse;
-    inverse *= 2 - x * inverse;
-    return inverse;
-  }
-
-  @Override
-  public String toString() {
-    return "[" + start + ".." + end + ")";
-  }
-
-  @Override
-  public int compareTo(IntervalWindow o) {
-    if (start.isEqual(o.start)) {
-      return end.compareTo(o.end);
-    }
-    return start.compareTo(o.start);
-  }
-
-  /**
-   * Returns a {@link Coder} suitable for {@link IntervalWindow}.
-   */
-  public static Coder<IntervalWindow> getCoder() {
-    return IntervalWindowCoder.of();
-  }
-
-  /**
-   * Encodes an {@link IntervalWindow} as a pair of its upper bound and duration.
-   */
-  private static class IntervalWindowCoder extends AtomicCoder<IntervalWindow> {
-
-    private static final IntervalWindowCoder INSTANCE =
-        new IntervalWindowCoder();
-
-    private static final Coder<Instant> instantCoder = InstantCoder.of();
-    private static final Coder<ReadableDuration> durationCoder = DurationCoder.of();
-
-    @JsonCreator
-    public static IntervalWindowCoder of() {
-      return INSTANCE;
-    }
-
-    @Override
-    public void encode(IntervalWindow window,
-                       OutputStream outStream,
-                       Context context)
-        throws IOException, CoderException {
-      instantCoder.encode(window.end, outStream, context.nested());
-      durationCoder.encode(new Duration(window.start, window.end), outStream, context.nested());
-    }
-
-    @Override
-    public IntervalWindow decode(InputStream inStream, Context context)
-        throws IOException, CoderException {
-      Instant end = instantCoder.decode(inStream, context.nested());
-      ReadableDuration duration = durationCoder.decode(inStream, context.nested());
-      return new IntervalWindow(end.minus(duration), end);
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/InvalidWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/InvalidWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/InvalidWindows.java
deleted file mode 100644
index 596f4e7..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/InvalidWindows.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.coders.Coder;
-
-import org.joda.time.Instant;
-
-import java.util.Collection;
-
-/**
- * A {@link WindowFn} that represents an invalid pipeline state.
- *
- * @param <W> window type
- */
-public class InvalidWindows<W extends BoundedWindow> extends WindowFn<Object, W> {
-  private String cause;
-  private WindowFn<?, W> originalWindowFn;
-
-  public InvalidWindows(String cause, WindowFn<?, W> originalWindowFn) {
-    this.originalWindowFn = originalWindowFn;
-    this.cause = cause;
-  }
-
-  /**
-   * Returns the reason that this {@code WindowFn} is invalid.
-   */
-  public String getCause() {
-    return cause;
-  }
-
-  /**
-   * Returns the original windowFn that this InvalidWindows replaced.
-   */
-  public WindowFn<?, W> getOriginalWindowFn() {
-    return originalWindowFn;
-  }
-
-  @Override
-  public Collection<W> assignWindows(AssignContext c) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public void mergeWindows(MergeContext c) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public Coder<W> windowCoder() {
-    return originalWindowFn.windowCoder();
-  }
-
-  /**
-   * {@code InvalidWindows} objects with the same {@code originalWindowFn} are compatible.
-   */
-  @Override
-  public boolean isCompatible(WindowFn<?, ?> other) {
-    return getClass() == other.getClass()
-        && getOriginalWindowFn().isCompatible(
-            ((InvalidWindows<?>) other).getOriginalWindowFn());
-  }
-
-  @Override
-  public W getSideInputWindow(BoundedWindow window) {
-    throw new UnsupportedOperationException("InvalidWindows is not allowed in side inputs");
-  }
-
-  @Override
-  public Instant getOutputTime(Instant inputTimestamp, W window) {
-    return inputTimestamp;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
deleted file mode 100644
index 4e06234..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/MergeOverlappingIntervalWindows.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * A utility function for merging overlapping {@link IntervalWindow}s.
- */
-public class MergeOverlappingIntervalWindows {
-
-  /**
-   * Merge overlapping {@link IntervalWindow}s.
-   */
-  public static void mergeWindows(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
-    // Merge any overlapping windows into a single window.
-    // Sort the list of existing windows so we only have to
-    // traverse the list once rather than considering all
-    // O(n^2) window pairs.
-    List<IntervalWindow> sortedWindows = new ArrayList<>();
-    for (IntervalWindow window : c.windows()) {
-      sortedWindows.add(window);
-    }
-    Collections.sort(sortedWindows);
-    List<MergeCandidate> merges = new ArrayList<>();
-    MergeCandidate current = new MergeCandidate();
-    for (IntervalWindow window : sortedWindows) {
-      if (current.intersects(window)) {
-        current.add(window);
-      } else {
-        merges.add(current);
-        current = new MergeCandidate(window);
-      }
-    }
-    merges.add(current);
-    for (MergeCandidate merge : merges) {
-      merge.apply(c);
-    }
-  }
-
-  private static class MergeCandidate {
-    private IntervalWindow union;
-    private final List<IntervalWindow> parts;
-    public MergeCandidate() {
-      parts = new ArrayList<>();
-    }
-    public MergeCandidate(IntervalWindow window) {
-      union = window;
-      parts = new ArrayList<>(Arrays.asList(window));
-    }
-    public boolean intersects(IntervalWindow window) {
-      return union == null || union.intersects(window);
-    }
-    public void add(IntervalWindow window) {
-      union = union == null ? window : union.span(window);
-      parts.add(window);
-    }
-    public void apply(WindowFn<?, IntervalWindow>.MergeContext c) throws Exception {
-      if (parts.size() > 1) {
-        c.merge(parts, union);
-      }
-    }
-
-    @Override
-    public String toString() {
-      return "MergeCandidate[union=" + union + ", parts=" + parts + "]";
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/NonMergingWindowFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/NonMergingWindowFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/NonMergingWindowFn.java
deleted file mode 100644
index 8aa66fc..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/NonMergingWindowFn.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-/**
- * Abstract base class for {@link WindowFn}s that do not merge windows.
- *
- * @param <T> type of elements being windowed
- * @param <W> {@link BoundedWindow} subclass used to represent the windows used by this
- *            {@code WindowFn}
- */
-public abstract class NonMergingWindowFn<T, W extends BoundedWindow>
-    extends WindowFn<T, W> {
-  @Override
-  public final void mergeWindows(MergeContext c) { }
-
-  @Override
-  public final boolean isNonMerging() {
-    return true;
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java
deleted file mode 100644
index 652092a..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OrFinallyTrigger.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.util.ExecutableTrigger;
-import com.google.common.annotations.VisibleForTesting;
-
-import org.joda.time.Instant;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Executes the {@code actual} trigger until it finishes or until the {@code until} trigger fires.
- */
-class OrFinallyTrigger<W extends BoundedWindow> extends Trigger<W> {
-
-  private static final int ACTUAL = 0;
-  private static final int UNTIL = 1;
-
-  @VisibleForTesting OrFinallyTrigger(Trigger<W> actual, Trigger.OnceTrigger<W> until) {
-    super(Arrays.asList(actual, until));
-  }
-
-  @Override
-  public void onElement(OnElementContext c) throws Exception {
-    c.trigger().subTrigger(ACTUAL).invokeOnElement(c);
-    c.trigger().subTrigger(UNTIL).invokeOnElement(c);
-  }
-
-  @Override
-  public void onMerge(OnMergeContext c) throws Exception {
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      subTrigger.invokeOnMerge(c);
-    }
-    updateFinishedState(c);
-  }
-
-  @Override
-  public Instant getWatermarkThatGuaranteesFiring(W window) {
-    // This trigger fires once either the trigger or the until trigger fires.
-    Instant actualDeadline = subTriggers.get(ACTUAL).getWatermarkThatGuaranteesFiring(window);
-    Instant untilDeadline = subTriggers.get(UNTIL).getWatermarkThatGuaranteesFiring(window);
-    return actualDeadline.isBefore(untilDeadline) ? actualDeadline : untilDeadline;
-  }
-
-  @Override
-  public Trigger<W> getContinuationTrigger(List<Trigger<W>> continuationTriggers) {
-    // Use OrFinallyTrigger instead of AfterFirst because the continuation of ACTUAL
-    // may not be a OnceTrigger.
-    return Repeatedly.forever(
-        new OrFinallyTrigger<W>(
-            continuationTriggers.get(ACTUAL),
-            (Trigger.OnceTrigger<W>) continuationTriggers.get(UNTIL)));
-  }
-
-  @Override
-  public boolean shouldFire(Trigger<W>.TriggerContext context) throws Exception {
-    return context.trigger().subTrigger(ACTUAL).invokeShouldFire(context)
-        || context.trigger().subTrigger(UNTIL).invokeShouldFire(context);
-  }
-
-  @Override
-  public void onFire(Trigger<W>.TriggerContext context) throws Exception {
-    ExecutableTrigger<W> actualSubtrigger = context.trigger().subTrigger(ACTUAL);
-    ExecutableTrigger<W> untilSubtrigger = context.trigger().subTrigger(UNTIL);
-
-    if (untilSubtrigger.invokeShouldFire(context)) {
-      untilSubtrigger.invokeOnFire(context);
-      actualSubtrigger.invokeClear(context);
-    } else {
-      // If until didn't fire, then the actual must have (or it is forbidden to call
-      // onFire) so we are done only if actual is done.
-      actualSubtrigger.invokeOnFire(context);
-      // Do not clear the until trigger, because it tracks data cross firings.
-    }
-    updateFinishedState(context);
-  }
-
-  private void updateFinishedState(TriggerContext c) throws Exception {
-    boolean anyStillFinished = false;
-    for (ExecutableTrigger<W> subTrigger : c.trigger().subTriggers()) {
-      anyStillFinished |= c.forTrigger(subTrigger).trigger().isFinished();
-    }
-    c.trigger().setFinished(anyStillFinished);
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFn.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFn.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFn.java
deleted file mode 100644
index c5d943d..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFn.java
+++ /dev/null
@@ -1,319 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import java.io.Serializable;
-import java.util.Objects;
-
-/**
- * <b><i>(Experimental)</i></b> A function from timestamps of input values to the timestamp for a
- * computed value.
- *
- * <p>The function is represented via three components:
- * <ol>
- *   <li>{@link #assignOutputTime} calculates an output timestamp for any input
- *       value in a particular window.</li>
- *   <li>The output timestamps for all non-late input values within a window are combined
- *       according to {@link #combine combine()}, a commutative and associative operation on
- *       the output timestamps.</li>
- *   <li>The output timestamp when windows merge is provided by {@link #merge merge()}.</li>
- * </ol>
- *
- * <p>This abstract class cannot be subclassed directly, by design: it may grow
- * in consumer-compatible ways that require mutually-exclusive default implementations. To
- * create a concrete subclass, extend {@link OutputTimeFn.Defaults} or
- * {@link OutputTimeFn.DependsOnlyOnWindow}. Note that as long as this class remains
- * experimental, we may also choose to change it in arbitrary backwards-incompatible ways.
- *
- * @param <W> the type of window. Contravariant: methods accepting any subtype of
- * {@code OutputTimeFn<W>} should use the parameter type {@code OutputTimeFn<? super W>}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializable {
-
-  /**
-   * Private constructor to prevent subclassing other than provided base classes.
-   */
-  private OutputTimeFn() { }
-
-  /**
-   * Returns the output timestamp to use for data depending on the given
-   * {@code inputTimestamp} in the specified {@code window}.
-   *
-   *
-   * <p>The result of this method must be between {@code inputTimestamp} and
-   * {@code window.maxTimestamp()} (inclusive on both sides).
-   *
-   * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B},
-   * then {@code assignOutputTime(A, window) <= assignOutputTime(B, window)}.
-   *
-   * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically
-   * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is
-   * suggested that the result in later overlapping windows is past the end of earlier windows
-   * so that the later windows don't prevent the watermark from
-   * progressing past the end of the earlier window.
-   *
-   * <p>See the overview of {@link OutputTimeFn} for the consistency properties required
-   * between {@link #assignOutputTime}, {@link #combine}, and {@link #merge}.
-   */
-  public abstract Instant assignOutputTime(Instant inputTimestamp, W window);
-
-  /**
-   * Combines the given output times, which must be from the same window, into an output time
-   * for a computed value.
-   *
-   * <ul>
-   *   <li>{@code combine} must be commutative: {@code combine(a, b).equals(combine(b, a))}.</li>
-   *   <li>{@code combine} must be associative:
-   *       {@code combine(a, combine(b, c)).equals(combine(combine(a, b), c))}.</li>
-   * </ul>
-   */
-  public abstract Instant combine(Instant outputTime, Instant otherOutputTime);
-
-  /**
-   * Merges the given output times, presumed to be combined output times for windows that
-   * are merging, into an output time for the {@code resultWindow}.
-   *
-   * <p>When windows {@code w1} and {@code w2} merge to become a new window {@code w1plus2},
-   * then {@link #merge} must be implemented such that the output time is the same as
-   * if all timestamps were assigned in {@code w1plus2}. Formally:
-   *
-   * <p>{@code fn.merge(w, fn.assignOutputTime(t1, w1), fn.assignOutputTime(t2, w2))}
-   *
-   * <p>must be equal to
-   *
-   * <p>{@code fn.combine(fn.assignOutputTime(t1, w1plus2), fn.assignOutputTime(t2, w1plus2))}
-   *
-   * <p>If the assigned time depends only on the window, the correct implementation of
-   * {@link #merge merge()} necessarily returns the result of
-   * {@link #assignOutputTime assignOutputTime(t1, w1plus2)}
-   * (which equals {@link #assignOutputTime assignOutputTime(t2, w1plus2)}.
-   * Defaults for this case are provided by {@link DependsOnlyOnWindow}.
-   *
-   * <p>For many other {@link OutputTimeFn} implementations, such as taking the earliest or latest
-   * timestamp, this will be the same as {@link #combine combine()}. Defaults for this
-   * case are provided by {@link Defaults}.
-   */
-  public abstract Instant merge(W intoWindow, Iterable<? extends Instant> mergingTimestamps);
-
-  /**
-   * Returns {@code true} if the result of combination of many output timestamps actually depends
-   * only on the earliest.
-   *
-   * <p>This may allow optimizations when it is very efficient to retrieve the earliest timestamp
-   * to be combined.
-   */
-  public abstract boolean dependsOnlyOnEarliestInputTimestamp();
-
-  /**
-   * Returns {@code true} if the result does not depend on what outputs were combined but only
-   * the window they are in. The canonical example is if all timestamps are sure to
-   * be the end of the window.
-   *
-   * <p>This may allow optimizations, since it is typically very efficient to retrieve the window
-   * and combining output timestamps is not necessary.
-   *
-   * <p>If the assigned output time for an implementation depends only on the window, consider
-   * extending {@link DependsOnlyOnWindow}, which returns {@code true} here and also provides
-   * a framework for easily implementing a correct {@link #merge}, {@link #combine} and
-   * {@link #assignOutputTime}.
-   */
-  public abstract boolean dependsOnlyOnWindow();
-
-  /**
-   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} where the
-   * output time depends on the input element timestamps and possibly the window.
-   *
-   * <p>To complete an implementation, override {@link #assignOutputTime}, at a minimum.
-   *
-   * <p>By default, {@link #combine} and {@link #merge} return the earliest timestamp of their
-   * inputs.
-   */
-  public abstract static class Defaults<W extends BoundedWindow> extends OutputTimeFn<W> {
-
-    protected Defaults() {
-      super();
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the earlier of the two timestamps.
-     */
-    @Override
-    public Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
-      return Ordering.natural().min(outputTimestamp, otherOutputTimestamp);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {@link #combine combine(outputTimstamp, otherOutputTimestamp)},
-     * by default.
-     */
-    @Override
-    public Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return OutputTimeFns.combineOutputTimes(this, mergingTimestamps);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code false}. An {@link OutputTimeFn} that depends only on the window should extend
-     * {@link OutputTimeFn.DependsOnlyOnWindow}.
-     */
-    @Override
-    public final boolean dependsOnlyOnWindow() {
-      return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} by default.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return false;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
-     *         default.
-     */
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-
-      return this.getClass().equals(other.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-  }
-
-  /**
-   * <b><i>(Experimental)</i></b> Default method implementations for {@link OutputTimeFn} when the
-   * output time depends only on the window.
-   *
-   * <p>To complete an implementation, override {@link #assignOutputTime(BoundedWindow)}.
-   */
-  public abstract static class DependsOnlyOnWindow<W extends BoundedWindow>
-      extends OutputTimeFn<W> {
-
-    protected DependsOnlyOnWindow() {
-      super();
-    }
-
-    /**
-     * Returns the output timestamp to use for data in the specified {@code window}.
-     *
-     * <p>Note that the result of this method must be between the maximum possible input timestamp
-     * in {@code window} and {@code window.maxTimestamp()} (inclusive on both sides).
-     *
-     * <p>For example, using {@code Sessions.withGapDuration(gapDuration)}, we know that all input
-     * timestamps must lie at least {@code gapDuration} from the end of the session, so
-     * {@code window.maxTimestamp() - gapDuration} is an acceptable assigned timestamp.
-     *
-     * @see #assignOutputTime(Instant, BoundedWindow)
-     */
-    protected abstract Instant assignOutputTime(W window);
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of {#link assignOutputTime(BoundedWindow) assignOutputTime(window)}.
-     */
-    @Override
-    public final Instant assignOutputTime(Instant timestamp, W window) {
-      return assignOutputTime(window);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the same timestamp as both argument timestamps, which are necessarily equal.
-     */
-    @Override
-    public final Instant combine(Instant outputTimestamp, Instant otherOutputTimestamp) {
-      return outputTimestamp;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return the result of
-     * {@link #assignOutputTime(BoundedWindow) assignOutputTime(resultWindow)}.
-     */
-    @Override
-    public final Instant merge(W resultWindow, Iterable<? extends Instant> mergingTimestamps) {
-      return assignOutputTime(resultWindow);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}.
-     */
-    @Override
-    public final boolean dependsOnlyOnWindow() {
-      return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. Since the output time depends only on the window, it can
-     * certainly be ascertained given a single input timestamp.
-     */
-    @Override
-    public final boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true} if the two {@link OutputTimeFn} instances have the same class, by
-     *         default.
-     */
-    @Override
-    public boolean equals(Object other) {
-      if (other == null) {
-        return false;
-      }
-
-      return this.getClass().equals(other.getClass());
-    }
-
-    @Override
-    public int hashCode() {
-      return Objects.hash(getClass());
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7bef2b7e/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFns.java
----------------------------------------------------------------------
diff --git a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFns.java b/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFns.java
deleted file mode 100644
index dcc0f5b..0000000
--- a/sdk/src/main/java/com/google/cloud/dataflow/sdk/transforms/windowing/OutputTimeFns.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Copyright (C) 2015 Google Inc.
- *
- * Licensed under the Apache License, Version 2.0 (the "License"); you may not
- * use this file except in compliance with the License. You may obtain a copy of
- * the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
- * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
- * License for the specific language governing permissions and limitations under
- * the License.
- */
-
-package com.google.cloud.dataflow.sdk.transforms.windowing;
-
-import static com.google.common.base.Preconditions.checkArgument;
-
-import com.google.cloud.dataflow.sdk.annotations.Experimental;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Ordering;
-
-import org.joda.time.Instant;
-
-import javax.annotation.Nullable;
-
-/**
- * <b><i>(Experimental)</i></b> Static utility methods and provided implementations for
- * {@link OutputTimeFn}.
- */
-@Experimental(Experimental.Kind.OUTPUT_TIME)
-public class OutputTimeFns {
-  /**
-   * The policy of outputting at the earliest of the input timestamps for non-late input data
-   * that led to a computed value.
-   *
-   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
-   * elements being aggregated via some function {@code f} into
-   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
-   * timestamp of the result will be the earliest of the event time timestamps
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtEarliestInputTimestamp() {
-    return new OutputAtEarliestInputTimestamp();
-  }
-
-  /**
-   * The policy of holding the watermark to the latest of the input timestamps
-   * for non-late input data that led to a computed value.
-   *
-   * <p>For example, suppose <i>v</i><sub>1</sub> through <i>v</i><sub>n</sub> are all on-time
-   * elements being aggregated via some function {@code f} into
-   * {@code f}(<i>v</i><sub>1</sub>, ..., <i>v</i><sub>n</sub>. When emitted, the output
-   * timestamp of the result will be the latest of the event time timestamps
-   *
-   * <p>If data arrives late, it has no effect on the output timestamp.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtLatestInputTimestamp() {
-    return new OutputAtLatestInputTimestamp();
-  }
-
-  /**
-   * The policy of outputting with timestamps at the end of the window.
-   *
-   * <p>Note that this output timestamp depends only on the window. See
-   * {#link dependsOnlyOnWindow()}.
-   *
-   * <p>When windows merge, instead of using {@link OutputTimeFn#combine} to obtain an output
-   * timestamp for the results in the new window, it is mandatory to obtain a new output
-   * timestamp from {@link OutputTimeFn#assignOutputTime} with the new window and an arbitrary
-   * timestamp (because it is guaranteed that the timestamp is irrelevant).
-   *
-   * <p>For non-merging window functions, this {@link OutputTimeFn} works transparently.
-   */
-  public static OutputTimeFn<BoundedWindow> outputAtEndOfWindow() {
-    return new OutputAtEndOfWindow();
-  }
-
-  /**
-   * Applies the given {@link OutputTimeFn} to the given output times, obtaining
-   * the output time for a value computed. See {@link OutputTimeFn#combine} for
-   * a full specification.
-   *
-   * @throws IllegalArgumentException if {@code outputTimes} is empty.
-   */
-  public static Instant combineOutputTimes(
-      OutputTimeFn<?> outputTimeFn, Iterable<? extends Instant> outputTimes) {
-    checkArgument(
-        !Iterables.isEmpty(outputTimes),
-        "Collection of output times must not be empty in %s.combineOutputTimes",
-        OutputTimeFns.class.getName());
-
-    @Nullable
-    Instant combinedOutputTime = null;
-    for (Instant outputTime : outputTimes) {
-      combinedOutputTime =
-          combinedOutputTime == null
-              ? outputTime : outputTimeFn.combine(combinedOutputTime, outputTime);
-    }
-    return combinedOutputTime;
-  }
-
-  /**
-   * See {@link #outputAtEarliestInputTimestamp}.
-   */
-  private static class OutputAtEarliestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
-    @Override
-    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-      return inputTimestamp;
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().min(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code true}. The result of any combine will be the earliest input timestamp.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return true;
-    }
-  }
-
-  /**
-   * See {@link #outputAtLatestInputTimestamp}.
-   */
-  private static class OutputAtLatestInputTimestamp extends OutputTimeFn.Defaults<BoundedWindow> {
-    @Override
-    public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) {
-      return inputTimestamp;
-    }
-
-    @Override
-    public Instant combine(Instant outputTime, Instant otherOutputTime) {
-      return Ordering.natural().max(outputTime, otherOutputTime);
-    }
-
-    /**
-     * {@inheritDoc}
-     *
-     * @return {@code false}.
-     */
-    @Override
-    public boolean dependsOnlyOnEarliestInputTimestamp() {
-      return false;
-    }
-  }
-
-  private static class OutputAtEndOfWindow extends OutputTimeFn.DependsOnlyOnWindow<BoundedWindow> {
-
-    /**
-     *{@inheritDoc}
-     *
-     *@return {@code window.maxTimestamp()}.
-     */
-    @Override
-    protected Instant assignOutputTime(BoundedWindow window) {
-      return window.maxTimestamp();
-    }
-  }
-}


Mime
View raw message