Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 14CE5200B36 for ; Wed, 22 Jun 2016 04:22:33 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 13597160A4F; Wed, 22 Jun 2016 02:22:33 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E37CA160A60 for ; Wed, 22 Jun 2016 04:22:30 +0200 (CEST) Received: (qmail 70996 invoked by uid 500); 22 Jun 2016 02:22:30 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 70987 invoked by uid 99); 22 Jun 2016 02:22:30 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2016 02:22:30 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 8DCD3180E5F for ; Wed, 22 Jun 2016 02:22:29 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.646 X-Spam-Level: X-Spam-Status: No, score=-4.646 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.426] autolearn=disabled Received: from mx2-lw-us.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id fiD1c1gBfaQB for ; Wed, 22 Jun 2016 02:22:18 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx2-lw-us.apache.org (ASF Mail Server at mx2-lw-us.apache.org) with SMTP id 7EA9D5FE40 for ; Wed, 22 Jun 2016 02:22:16 +0000 (UTC) Received: (qmail 69935 invoked by uid 99); 22 Jun 2016 02:22:15 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 22 Jun 2016 02:22:15 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 517C0E78B2; Wed, 22 Jun 2016 02:22:15 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.incubator.apache.org Date: Wed, 22 Jun 2016 02:22:17 -0000 Message-Id: In-Reply-To: <6fe7585f7ede477fb4712b424ce21b00@git.apache.org> References: <6fe7585f7ede477fb4712b424ce21b00@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [03/12] incubator-beam git commit: Move some easy stuff into runners/core-java archived-at: Wed, 22 Jun 2016 02:22:33 -0000 http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java deleted file mode 100644 index 14ec082..0000000 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WatermarkHold.java +++ /dev/null @@ -1,536 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; -import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; -import org.apache.beam.sdk.util.state.MergingStateAccessor; -import org.apache.beam.sdk.util.state.ReadableState; -import org.apache.beam.sdk.util.state.StateMerging; -import org.apache.beam.sdk.util.state.StateTag; -import org.apache.beam.sdk.util.state.StateTags; -import org.apache.beam.sdk.util.state.WatermarkHoldState; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Preconditions; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.io.Serializable; - -import javax.annotation.Nullable; - -/** - * Implements the logic to hold the output watermark for a computation back - * until it has seen all the elements it needs based on the input watermark for the - * computation. - * - *

The backend ensures the output watermark can never progress beyond the - * input watermark for a computation. GroupAlsoByWindows computations may add a 'hold' - * to the output watermark in order to prevent it progressing beyond a time within a window. - * The hold will be 'cleared' when the associated pane is emitted. - * - *

This class is only intended for use by {@link ReduceFnRunner}. The two evolve together and - * will likely break any other uses. - * - * @param The kind of {@link BoundedWindow} the hold is for. - */ -class WatermarkHold implements Serializable { - /** - * Return tag for state containing the output watermark hold - * used for elements. - */ - public static - StateTag> watermarkHoldTagForOutputTimeFn( - OutputTimeFn outputTimeFn) { - return StateTags.>makeSystemTagInternal( - StateTags.watermarkStateInternal("hold", outputTimeFn)); - } - - /** - * Tag for state containing end-of-window and garbage collection output watermark holds. - * (We can't piggy-back on the data hold state since the outputTimeFn may be - * {@link OutputTimeFns#outputAtLatestInputTimestamp()}, in which case every pane will - * would take the end-of-window time as its element time.) - */ - @VisibleForTesting - public static final StateTag> EXTRA_HOLD_TAG = - StateTags.makeSystemTagInternal(StateTags.watermarkStateInternal( - "extra", OutputTimeFns.outputAtEarliestInputTimestamp())); - - private final TimerInternals timerInternals; - private final WindowingStrategy windowingStrategy; - private final StateTag> elementHoldTag; - - public WatermarkHold(TimerInternals timerInternals, WindowingStrategy windowingStrategy) { - this.timerInternals = timerInternals; - this.windowingStrategy = windowingStrategy; - this.elementHoldTag = watermarkHoldTagForOutputTimeFn(windowingStrategy.getOutputTimeFn()); - } - - /** - * Add a hold to prevent the output watermark progressing beyond the (possibly adjusted) timestamp - * of the element in {@code context}. We allow the actual hold time to be shifted later by - * {@link OutputTimeFn#assignOutputTime}, but no further than the end of the window. The hold will - * remain until cleared by {@link #extractAndRelease}. Return the timestamp at which the hold - * was placed, or {@literal null} if no hold was placed. - * - *

In the following we'll write {@code E} to represent an element's timestamp after passing - * through the window strategy's output time function, {@code IWM} for the local input watermark, - * {@code OWM} for the local output watermark, and {@code GCWM} for the garbage collection - * watermark (which is at {@code IWM - getAllowedLateness}). Time progresses from left to right, - * and we write {@code [ ... ]} to denote a bounded window with implied lower bound. - * - *

Note that the GCWM will be the same as the IWM if {@code getAllowedLateness} - * is {@code ZERO}. - * - *

Here are the cases we need to handle. They are conceptually considered in the - * sequence written since if getAllowedLateness is ZERO the GCWM is the same as the IWM. - *

    - *
  1. (Normal) - *
    -   *          |
    -   *      [   | E        ]
    -   *          |
    -   *         IWM
    -   * 
    - * This is, hopefully, the common and happy case. The element is locally on-time and can - * definitely make it to an {@code ON_TIME} pane which we can still set an end-of-window timer - * for. We place an element hold at E, which may contribute to the {@code ON_TIME} pane's - * timestamp (depending on the output time function). Thus the OWM will not proceed past E - * until the next pane fires. - * - *
  2. (Discard - no target window) - *
    -   *                       |                            |
    -   *      [     E        ] |                            |
    -   *                       |                            |
    -   *                     GCWM  <-getAllowedLateness->  IWM
    -   * 
    - * The element is very locally late. The window has been garbage collected, thus there - * is no target pane E could be assigned to. We discard E. - * - *
  3. (Unobservably late) - *
    -   *          |    |
    -   *      [   | E  |     ]
    -   *          |    |
    -   *         OWM  IWM
    -   * 
    - * The element is locally late, however we can still treat this case as for 'Normal' above - * since the IWM has not yet passed the end of the window and the element is ahead of the - * OWM. In effect, we get to 'launder' the locally late element and consider it as locally - * on-time because no downstream computation can observe the difference. - * - *
  4. (Maybe late 1) - *
    -   *          |            |
    -   *      [   | E        ] |
    -   *          |            |
    -   *         OWM          IWM
    -   * 
    - * The end-of-window timer may have already fired for this window, and thus an {@code ON_TIME} - * pane may have already been emitted. However, if timer firings have been delayed then it - * is possible the {@code ON_TIME} pane has not yet been emitted. We can't place an element - * hold since we can't be sure if it will be cleared promptly. Thus this element *may* find - * its way into an {@code ON_TIME} pane, but if so it will *not* contribute to that pane's - * timestamp. We may however set a garbage collection hold if required. - * - *
  5. (Maybe late 2) - *
    -   *               |   |
    -   *      [     E  |   | ]
    -   *               |   |
    -   *              OWM IWM
    -   * 
    - * The end-of-window timer has not yet fired, so this element may still appear in an - * {@code ON_TIME} pane. However the element is too late to contribute to the output - * watermark hold, and thus won't contribute to the pane's timestamp. We can still place an - * end-of-window hold. - * - *
  6. (Maybe late 3) - *
    -   *               |       |
    -   *      [     E  |     ] |
    -   *               |       |
    -   *              OWM     IWM
    -   * 
    - * As for the (Maybe late 2) case, however we don't even know if the end-of-window timer - * has already fired, or it is about to fire. We can place only the garbage collection hold, - * if required. - * - *
  7. (Definitely late) - *
    -   *                       |   |
    -   *      [     E        ] |   |
    -   *                       |   |
    -   *                      OWM IWM
    -   * 
    - * The element is definitely too late to make an {@code ON_TIME} pane. We are too late to - * place an end-of-window hold. We can still place a garbage collection hold if required. - * - *
- */ - @Nullable - public Instant addHolds(ReduceFn.ProcessValueContext context) { - Instant hold = addElementHold(context); - if (hold == null) { - hold = addEndOfWindowOrGarbageCollectionHolds(context, false/*paneIsEmpty*/); - } - return hold; - } - - /** - * Return {@code timestamp}, possibly shifted forward in time according to the window - * strategy's output time function. - */ - private Instant shift(Instant timestamp, W window) { - Instant shifted = windowingStrategy.getOutputTimeFn().assignOutputTime(timestamp, window); - Preconditions.checkState(!shifted.isBefore(timestamp), - "OutputTimeFn moved element from %s to earlier time %s for window %s", - timestamp, shifted, window); - Preconditions.checkState(timestamp.isAfter(window.maxTimestamp()) - || !shifted.isAfter(window.maxTimestamp()), - "OutputTimeFn moved element from %s to %s which is beyond end of " - + "window %s", - timestamp, shifted, window); - - return shifted; - } - - /** - * Attempt to add an 'element hold'. Return the {@link Instant} at which the hold was - * added (ie the element timestamp plus any forward shift requested by the - * {@link WindowingStrategy#getOutputTimeFn}), or {@literal null} if no hold was added. - * The hold is only added if both: - *
    - *
  1. The backend will be able to respect it. In other words the output watermark cannot - * be ahead of the proposed hold time. - *
  2. A timer will be set (by {@link ReduceFnRunner}) to clear the hold by the end of the - * window. In other words the input watermark cannot be ahead of the end of the window. - *
- * The hold ensures the pane which incorporates the element is will not be considered late by - * any downstream computation when it is eventually emitted. - */ - @Nullable - private Instant addElementHold(ReduceFn.ProcessValueContext context) { - // Give the window function a chance to move the hold timestamp forward to encourage progress. - // (A later hold implies less impediment to the output watermark making progress, which in - // turn encourages end-of-window triggers to fire earlier in following computations.) - Instant elementHold = shift(context.timestamp(), context.window()); - - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - - String which; - boolean tooLate; - // TODO: These case labels could be tightened. - // See the case analysis in addHolds above for the motivation. - if (outputWM != null && elementHold.isBefore(outputWM)) { - which = "too late to effect output watermark"; - tooLate = true; - } else if (context.window().maxTimestamp().isBefore(inputWM)) { - which = "too late for end-of-window timer"; - tooLate = true; - } else { - which = "on time"; - tooLate = false; - Preconditions.checkState(!elementHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Element hold %s is beyond end-of-time", elementHold); - context.state().access(elementHoldTag).add(elementHold); - } - WindowTracing.trace( - "WatermarkHold.addHolds: element hold at {} is {} for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - elementHold, which, context.key(), context.window(), inputWM, - outputWM); - - return tooLate ? null : elementHold; - } - - /** - * Add an end-of-window hold or, if too late for that, a garbage collection hold (if required). - * Return the {@link Instant} at which hold was added, or {@literal null} if no hold was added. - */ - @Nullable - private Instant addEndOfWindowOrGarbageCollectionHolds( - ReduceFn.Context context, boolean paneIsEmpty) { - Instant hold = addEndOfWindowHold(context, paneIsEmpty); - if (hold == null) { - hold = addGarbageCollectionHold(context, paneIsEmpty); - } - return hold; - } - - /** - * Attempt to add an 'end-of-window hold'. Return the {@link Instant} at which the hold was added - * (ie the end of window time), or {@literal null} if no end of window hold is possible and we - * should fallback to a garbage collection hold. - * - *

We only add the hold if we can be sure a timer will be set (by {@link ReduceFnRunner}) - * to clear it. In other words, the input watermark cannot be ahead of the end of window time. - * - *

An end-of-window hold is added in two situations: - *

    - *
  1. An incoming element came in behind the output watermark (so we are too late for placing - * the usual element hold), but it may still be possible to include the element in an - * {@link Timing#ON_TIME} pane. We place the end of window hold to ensure that pane will - * not be considered late by any downstream computation. - *
  2. We guarantee an {@link Timing#ON_TIME} pane will be emitted for all windows which saw at - * least one element, even if that {@link Timing#ON_TIME} pane is empty. Thus when elements in - * a pane are processed due to a fired trigger we must set both an end of window timer and an end - * of window hold. Again, the hold ensures the {@link Timing#ON_TIME} pane will not be considered - * late by any downstream computation. - *
- */ - @Nullable - private Instant addEndOfWindowHold(ReduceFn.Context context, boolean paneIsEmpty) { - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eowHold = context.window().maxTimestamp(); - - if (eowHold.isBefore(inputWM)) { - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is too late for " - + "end-of-window timer for key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - Preconditions.checkState(outputWM == null || !eowHold.isBefore(outputWM), - "End-of-window hold %s cannot be before output watermark %s", - eowHold, outputWM); - Preconditions.checkState(!eowHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "End-of-window hold %s is beyond end-of-time", eowHold); - // If paneIsEmpty then this hold is just for empty ON_TIME panes, so we want to keep - // the hold away from the combining function in elementHoldTag. - // However if !paneIsEmpty then it could make sense to use the elementHoldTag here. - // Alas, onMerge is forced to add an end of window or garbage collection hold without - // knowing whether an element hold is already in place (stopping to check is too expensive). - // This it would end up adding an element hold at the end of the window which could - // upset the elementHoldTag combining function. - context.state().access(EXTRA_HOLD_TAG).add(eowHold); - WindowTracing.trace( - "WatermarkHold.addEndOfWindowHold: end-of-window hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - eowHold, context.key(), context.window(), inputWM, outputWM); - return eowHold; - } - - /** - * Attempt to add a 'garbage collection hold' if it is required. Return the {@link Instant} at - * which the hold was added (ie the end of window time plus allowed lateness), - * or {@literal null} if no hold was added. - * - *

We only add the hold if it is distinct from what would be added by - * {@link #addEndOfWindowHold}. In other words, {@link WindowingStrategy#getAllowedLateness} - * must be non-zero. - * - *

A garbage collection hold is added in two situations: - *

    - *
  1. An incoming element came in behind the output watermark, and was too late for placing - * the usual element hold or an end of window hold. Place the garbage collection hold so that - * we can guarantee when the pane is finally triggered its output will not be dropped due to - * excessive lateness by any downstream computation. - *
  2. The {@link WindowingStrategy#getClosingBehavior()} is - * {@link ClosingBehavior#FIRE_ALWAYS}, and thus we guarantee a final pane will be emitted - * for all windows which saw at least one element. Again, the garbage collection hold guarantees - * that any empty final pane can be given a timestamp which will not be considered beyond - * allowed lateness by any downstream computation. - *
- * - *

We use {@code paneIsEmpty} to distinguish cases 1 and 2. - */ - @Nullable - private Instant addGarbageCollectionHold( - ReduceFn.Context context, boolean paneIsEmpty) { - Instant outputWM = timerInternals.currentOutputWatermarkTime(); - Instant inputWM = timerInternals.currentInputWatermarkTime(); - Instant eow = context.window().maxTimestamp(); - Instant gcHold = eow.plus(windowingStrategy.getAllowedLateness()); - - if (!windowingStrategy.getAllowedLateness().isLongerThan(Duration.ZERO)) { - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since no allowed lateness for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - if (paneIsEmpty && context.windowingStrategy().getClosingBehavior() - == ClosingBehavior.FIRE_IF_NON_EMPTY) { - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is unnecessary " - + "since empty pane and FIRE_IF_NON_EMPTY for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return null; - } - - Preconditions.checkState(!gcHold.isBefore(inputWM), - "Garbage collection hold %s cannot be before input watermark %s", - gcHold, inputWM); - Preconditions.checkState(!gcHold.isAfter(BoundedWindow.TIMESTAMP_MAX_VALUE), - "Garbage collection hold %s is beyond end-of-time", gcHold); - // Same EXTRA_HOLD_TAG vs elementHoldTag discussion as in addEndOfWindowHold above. - context.state().access(EXTRA_HOLD_TAG).add(gcHold); - - WindowTracing.trace( - "WatermarkHold.addGarbageCollectionHold: garbage collection hold at {} is on time for " - + "key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - gcHold, context.key(), context.window(), inputWM, outputWM); - return gcHold; - } - - /** - * Prefetch watermark holds in preparation for merging. - */ - public void prefetchOnMerge(MergingStateAccessor state) { - StateMerging.prefetchWatermarks(state, elementHoldTag); - } - - /** - * Updates the watermark hold when windows merge if it is possible the merged value does - * not equal all of the existing holds. For example, if the new window implies a later - * watermark hold, then earlier holds may be released. - */ - public void onMerge(ReduceFn.OnMergeContext context) { - WindowTracing.debug("WatermarkHold.onMerge: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - StateMerging.mergeWatermarks(context.state(), elementHoldTag, context.window()); - // If we had a cheap way to determine if we have an element hold then we could - // avoid adding an unnecessary end-of-window or garbage collection hold. - // Simply reading the above merged watermark would impose an additional read for the - // common case that the active window has just one underlying state address window and - // the hold depends on the min of the element timestamps. - // At least one merged window must be non-empty for the merge to have been triggered. - StateMerging.clear(context.state(), EXTRA_HOLD_TAG); - addEndOfWindowOrGarbageCollectionHolds(context, false /*paneIsEmpty*/); - } - - /** - * Result of {@link #extractAndRelease}. - */ - public static class OldAndNewHolds { - public final Instant oldHold; - @Nullable - public final Instant newHold; - - public OldAndNewHolds(Instant oldHold, @Nullable Instant newHold) { - this.oldHold = oldHold; - this.newHold = newHold; - } - } - - /** - * Return (a future for) the earliest hold for {@code context}. Clear all the holds after - * reading, but add/restore an end-of-window or garbage collection hold if required. - * - *

The returned timestamp is the output timestamp according to the {@link OutputTimeFn} - * from the windowing strategy of this {@link WatermarkHold}, combined across all the non-late - * elements in the current pane. If there is no such value the timestamp is the end - * of the window. - */ - public ReadableState extractAndRelease( - final ReduceFn.Context context, final boolean isFinished) { - WindowTracing.debug( - "WatermarkHold.extractAndRelease: for key:{}; window:{}; inputWatermark:{}; " - + "outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - final WatermarkHoldState elementHoldState = context.state().access(elementHoldTag); - final WatermarkHoldState extraHoldState = context.state().access(EXTRA_HOLD_TAG); - return new ReadableState() { - @Override - public ReadableState readLater() { - elementHoldState.readLater(); - extraHoldState.readLater(); - return this; - } - - @Override - public OldAndNewHolds read() { - // Read both the element and extra holds. - Instant elementHold = elementHoldState.read(); - Instant extraHold = extraHoldState.read(); - Instant oldHold; - // Find the minimum, accounting for null. - if (elementHold == null) { - oldHold = extraHold; - } else if (extraHold == null) { - oldHold = elementHold; - } else if (elementHold.isBefore(extraHold)) { - oldHold = elementHold; - } else { - oldHold = extraHold; - } - if (oldHold == null || oldHold.isAfter(context.window().maxTimestamp())) { - // If no hold (eg because all elements came in behind the output watermark), or - // the hold was for garbage collection, take the end of window as the result. - WindowTracing.debug( - "WatermarkHold.extractAndRelease.read: clipping from {} to end of window " - + "for key:{}; window:{}", - oldHold, context.key(), context.window()); - oldHold = context.window().maxTimestamp(); - } - WindowTracing.debug("WatermarkHold.extractAndRelease.read: clearing for key:{}; window:{}", - context.key(), context.window()); - - // Clear the underlying state to allow the output watermark to progress. - elementHoldState.clear(); - extraHoldState.clear(); - - @Nullable Instant newHold = null; - if (!isFinished) { - // Only need to leave behind an end-of-window or garbage collection hold - // if future elements will be processed. - newHold = addEndOfWindowOrGarbageCollectionHolds(context, true /*paneIsEmpty*/); - } - - return new OldAndNewHolds(oldHold, newHold); - } - }; - } - - /** - * Clear any remaining holds. - */ - public void clearHolds(ReduceFn.Context context) { - WindowTracing.debug( - "WatermarkHold.clearHolds: For key:{}; window:{}; inputWatermark:{}; outputWatermark:{}", - context.key(), context.window(), timerInternals.currentInputWatermarkTime(), - timerInternals.currentOutputWatermarkTime()); - context.state().access(elementHoldTag).clear(); - context.state().access(EXTRA_HOLD_TAG).clear(); - } - - /** - * Return the current data hold, or null if none. Does not clear. For debugging only. - */ - @Nullable - public Instant getDataCurrent(ReduceFn.Context context) { - return context.state().access(elementHoldTag).read(); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java deleted file mode 100644 index 3e1528f..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/BatchTimerInternalsTest.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.util.TimerInternals.TimerData; -import org.apache.beam.sdk.util.state.StateNamespace; -import org.apache.beam.sdk.util.state.StateNamespaceForTest; - -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -/** - * Tests for {@link BatchTimerInternals}. - */ -@RunWith(JUnit4.class) -public class BatchTimerInternalsTest { - - private static final StateNamespace NS1 = new StateNamespaceForTest("NS1"); - - @Mock - private ReduceFnRunner mockRunner; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testFiringTimers() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(processingTime2); - - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verifyNoMoreInteractions(mockRunner); - - // Advancing just a little shouldn't refire - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); - - // Adding the timer and advancing a little should refire - underTest.setTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime1); - underTest.advanceProcessingTime(mockRunner, new Instant(21)); - Mockito.verifyNoMoreInteractions(mockRunner); - - // And advancing the rest of the way should still have the other timer - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - } - - @Test - public void testTimerOrdering() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData watermarkTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime1 = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - TimerData watermarkTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.EVENT_TIME); - TimerData processingTime2 = TimerData.of(NS1, new Instant(29), TimeDomain.PROCESSING_TIME); - - underTest.setTimer(processingTime1); - underTest.setTimer(watermarkTime1); - underTest.setTimer(processingTime2); - underTest.setTimer(watermarkTime2); - - underTest.advanceInputWatermark(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(watermarkTime1); - Mockito.verify(mockRunner).onTimer(watermarkTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - - underTest.advanceProcessingTime(mockRunner, new Instant(30)); - Mockito.verify(mockRunner).onTimer(processingTime1); - Mockito.verify(mockRunner).onTimer(processingTime2); - Mockito.verifyNoMoreInteractions(mockRunner); - } - - @Test - public void testDeduplicate() throws Exception { - BatchTimerInternals underTest = new BatchTimerInternals(new Instant(0)); - TimerData watermarkTime = TimerData.of(NS1, new Instant(19), TimeDomain.EVENT_TIME); - TimerData processingTime = TimerData.of(NS1, new Instant(19), TimeDomain.PROCESSING_TIME); - underTest.setTimer(watermarkTime); - underTest.setTimer(watermarkTime); - underTest.setTimer(processingTime); - underTest.setTimer(processingTime); - underTest.advanceProcessingTime(mockRunner, new Instant(20)); - underTest.advanceInputWatermark(mockRunner, new Instant(20)); - - Mockito.verify(mockRunner).onTimer(processingTime); - Mockito.verify(mockRunner).onTimer(watermarkTime); - Mockito.verifyNoMoreInteractions(mockRunner); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java deleted file mode 100644 index f653f49..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ /dev/null @@ -1,619 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertThat; - -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.DoFnTester; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import org.joda.time.Duration; -import org.joda.time.Instant; - -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; - -/** - * Properties of {@link GroupAlsoByWindowsDoFn}. - * - *

Some properties may not hold of some implementations, due to restrictions on the context - * in which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not - * support merging windows. - */ -public class GroupAlsoByWindowsProperties { - - /** - * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide - * the appropriate windowing strategy under test. - */ - public interface GroupAlsoByWindowsDoFnFactory { - GroupAlsoByWindowsDoFn - forStrategy(WindowingStrategy strategy); - } - - /** - * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW - * implementation produces no output. - * - *

The input type is deliberately left as a wildcard, since it is not relevant. - */ - public static void emptyInputEmptyOutput( - GroupAlsoByWindowsDoFnFactory gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - DoFnTester>>, KV> result = runGABW( - gabwFactory, - windowingStrategy, - (K) null, // key should never be used - Collections.>emptyList()); - - assertThat(result.peekOutputElements(), hasSize(0)); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows. - */ - public static void groupsElementsIntoFixedWindows( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them into sliding windows. - * - *

In the input here, each element occurs in multiple windows. - */ - public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = WindowingStrategy.of( - SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(5), - Arrays.asList(window(-10, 10), window(0, 20)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(15), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(3)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); - assertThat(item0.getValue().getValue(), contains("v1")); - assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); - assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item1.getTimestamp(), equalTo(new Instant(10))); - - TimestampedValue>> item2 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); - assertThat(item2.getValue().getValue(), contains("v2")); - assertThat(item2.getTimestamp(), equalTo(new Instant(20))); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups and combines them according to sliding windows. - * - *

In the input here, each element occurs in multiple windows. - */ - public static void combinesElementsInSlidingWindows( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); - - DoFnTester>>, KV> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - 1L, - new Instant(5), - Arrays.asList(window(-10, 10), window(0, 20)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 2L, - new Instant(15), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(18), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(3)); - - TimestampedValue> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(-10, 10))); - assertThat(item0.getValue().getKey(), equalTo("k")); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L)))); - assertThat(item0.getTimestamp(), equalTo(new Instant(5L))); - - TimestampedValue> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 20))); - assertThat(item1.getValue().getKey(), equalTo("k")); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))); - assertThat(item1.getTimestamp(), equalTo(new Instant(5L))); - - TimestampedValue> item2 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 30))); - assertThat(item2.getValue().getKey(), equalTo("k")); - assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))); - assertThat(item2.getTimestamp(), equalTo(new Instant(15L))); - } - - /** - * Tests that the given GABW implementation correctly groups elements that fall into overlapping - * windows that are not merged. - */ - public static void groupsIntoOverlappingNonmergingWindows( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 5)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(4), - Arrays.asList(window(1, 5)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(4), - Arrays.asList(window(0, 5)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 5))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); - assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(1, 5))); - assertThat(item1.getValue().getValue(), contains("v2")); - assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); - } - - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions. - */ - public static void groupsElementsInMergedSessions( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per - * session window correctly according to the provided {@link CombineFn}. - */ - public static void combinesElementsPerSession( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - - DoFnTester>>, KV> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - 1L, - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 2L, - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); - assertThat(item0.getValue().getKey(), equalTo("k")); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); - assertThat(item1.getValue().getKey(), equalTo("k")); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp - * according to the policy {@link OutputTimeFns#outputAtEndOfWindow()}. - */ - public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "key", - WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp - * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. - */ - public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - "v1", - new Instant(1), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(2), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(13), - Arrays.asList(window(10, 20)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 10))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(2))); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(10, 20))); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(13))); - } - - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions - * with output timestamps at the end of the merged window. - */ - public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(window(15, 25)), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(0, 15))); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(window(15, 25))); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions - * with output timestamps at the end of the merged window. - */ - public static void groupsElementsInMergedSessionsWithLatestTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtLatestInputTimestamp()); - - BoundedWindow unmergedWindow = window(15, 25); - DoFnTester>>, KV>> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - "v1", - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", - new Instant(15), - Arrays.asList(unmergedWindow), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - BoundedWindow mergedWindow = window(0, 15); - TimestampedValue>> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(mergedWindow)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - - TimestampedValue>> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(unmergedWindow)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(15))); - } - - /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per - * session window correctly according to the provided {@link CombineFn}. - */ - public static void combinesElementsPerSessionWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow()); - - BoundedWindow secondWindow = window(15, 25); - DoFnTester> result = - runGABW(gabwFactory, windowingStrategy, "k", - WindowedValue.of( - 1L, - new Instant(0), - Arrays.asList(window(0, 10)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 2L, - new Instant(5), - Arrays.asList(window(5, 15)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(15), - Arrays.asList(secondWindow), - PaneInfo.NO_FIRING)); - - assertThat(result.peekOutputElements(), hasSize(2)); - - BoundedWindow firstResultWindow = window(0, 15); - TimestampedValue> item0 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(firstResultWindow)); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); - assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp())); - - TimestampedValue> item1 = - Iterables.getOnlyElement(result.peekOutputElementsInWindow(secondWindow)); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getTimestamp(), - equalTo(secondWindow.maxTimestamp())); - } - - @SafeVarargs - private static - DoFnTester>>, KV> runGABW( - GroupAlsoByWindowsDoFnFactory gabwFactory, - WindowingStrategy windowingStrategy, - K key, - WindowedValue... values) throws Exception { - return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values)); - } - - private static - DoFnTester>>, KV> runGABW( - GroupAlsoByWindowsDoFnFactory gabwFactory, - WindowingStrategy windowingStrategy, - K key, - Collection> values) throws Exception { - - TupleTag> outputTag = new TupleTag<>(); - DoFnRunnerBase.ListOutputManager outputManager = new DoFnRunnerBase.ListOutputManager(); - - DoFnTester>>, KV> tester = - DoFnTester.of(gabwFactory.forStrategy(windowingStrategy)); - tester.startBundle(); - tester.processElement(KV.>>of(key, values)); - tester.finishBundle(); - - // Sanity check for corruption - for (KV elem : tester.peekOutputElements()) { - assertThat(elem.getKey(), equalTo(key)); - } - - return tester; - } - - private static BoundedWindow window(long start, long end) { - return new IntervalWindow(new Instant(start), new Instant(end)); - } - -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java deleted file mode 100644 index 4ac6164..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java +++ /dev/null @@ -1,106 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import org.apache.beam.sdk.coders.Coder; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.util.GroupAlsoByWindowsProperties.GroupAlsoByWindowsDoFnFactory; - -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Unit tests for {@link GroupAlsoByWindowsViaOutputBufferDoFn}. - */ -@RunWith(JUnit4.class) -public class GroupAlsoByWindowsViaOutputBufferDoFnTest { - - private class BufferingGABWViaOutputBufferDoFnFactory - implements GroupAlsoByWindowsDoFnFactory> { - - private final Coder inputCoder; - - public BufferingGABWViaOutputBufferDoFnFactory(Coder inputCoder) { - this.inputCoder = inputCoder; - } - - @Override - public GroupAlsoByWindowsDoFn, W> - forStrategy(WindowingStrategy windowingStrategy) { - return new GroupAlsoByWindowsViaOutputBufferDoFn, W>( - windowingStrategy, - SystemReduceFn.buffering(inputCoder)); - } - } - - @Test - public void testEmptyInputEmptyOutput() throws Exception { - GroupAlsoByWindowsProperties.emptyInputEmptyOutput( - new BufferingGABWViaOutputBufferDoFnFactory<>(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindows( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSlidingWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsIntoOverlappingNonmergingWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsIntoOverlappingNonmergingWindows( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsIntoSessions() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessions( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindowsWithEndOfWindowTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoFixedWindowsWithLatestTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithLatestTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } - - @Test - public void testGroupsElementsIntoSessionsWithLatestTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithLatestTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory(StringUtf8Coder.of())); - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java deleted file mode 100644 index d929d39..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/LateDataDroppingDoFnRunnerTest.java +++ /dev/null @@ -1,117 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.transforms.Aggregator; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.LateDataDroppingDoFnRunner.LateDataFilter; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; - -import org.joda.time.Duration; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.MockitoAnnotations; - -import java.util.Arrays; - -/** - * Unit tests for {@link LateDataDroppingDoFnRunner}. - */ -@RunWith(JUnit4.class) -public class LateDataDroppingDoFnRunnerTest { - private static final FixedWindows WINDOW_FN = FixedWindows.of(Duration.millis(10)); - - @Mock private TimerInternals mockTimerInternals; - - @Before - public void setUp() { - MockitoAnnotations.initMocks(this); - } - - @Test - public void testLateDataFilter() throws Exception { - when(mockTimerInternals.currentInputWatermarkTime()).thenReturn(new Instant(15L)); - - InMemoryLongSumAggregator droppedDueToLateness = - new InMemoryLongSumAggregator("droppedDueToLateness"); - LateDataFilter lateDataFilter = new LateDataFilter( - WindowingStrategy.of(WINDOW_FN), mockTimerInternals, droppedDueToLateness); - - Iterable> actual = lateDataFilter.filter( - "a", - ImmutableList.of( - createDatum(13, 13L), - createDatum(5, 5L), // late element, earlier than 4L. - createDatum(16, 16L), - createDatum(18, 18L))); - - Iterable> expected = ImmutableList.of( - createDatum(13, 13L), - createDatum(16, 16L), - createDatum(18, 18L)); - assertThat(expected, containsInAnyOrder(Iterables.toArray(actual, WindowedValue.class))); - assertEquals(1, droppedDueToLateness.sum); - } - - private WindowedValue createDatum(T element, long timestampMillis) { - Instant timestamp = new Instant(timestampMillis); - return WindowedValue.of( - element, - timestamp, - Arrays.asList(WINDOW_FN.assignWindow(timestamp)), - PaneInfo.NO_FIRING); - } - - private static class InMemoryLongSumAggregator implements Aggregator { - private final String name; - private long sum = 0; - - public InMemoryLongSumAggregator(String name) { - this.name = name; - } - - @Override - public void addValue(Long value) { - sum += value; - } - - @Override - public String getName() { - return name; - } - - @Override - public CombineFn getCombineFn() { - return new Sum.SumLongFn(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/0fef8e63/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java deleted file mode 100644 index 8885118..0000000 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/PushbackSideInputDoFnRunnerTest.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.util; - -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.is; -import static org.junit.Assert.assertThat; -import static org.mockito.Mockito.when; - -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.Sum; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Window; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -import com.google.common.collect.ImmutableList; - -import org.hamcrest.Matchers; -import org.joda.time.Instant; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; -import org.mockito.Mock; -import org.mockito.Mockito; -import org.mockito.MockitoAnnotations; - -import java.util.ArrayList; -import java.util.List; - -/** - * Tests for {@link PushbackSideInputDoFnRunner}. - */ -@RunWith(JUnit4.class) -public class PushbackSideInputDoFnRunnerTest { - @Mock private ReadyCheckingSideInputReader reader; - private TestDoFnRunner underlying; - private PCollectionView singletonView; - - @Before - public void setup() { - MockitoAnnotations.initMocks(this); - TestPipeline p = TestPipeline.create(); - PCollection created = p.apply(Create.of(1, 2, 3)); - singletonView = - created - .apply(Window.into(new IdentitySideInputWindowFn())) - .apply(Sum.integersGlobally().asSingletonView()); - - underlying = new TestDoFnRunner<>(); - } - - private PushbackSideInputDoFnRunner createRunner( - ImmutableList> views) { - PushbackSideInputDoFnRunner runner = - PushbackSideInputDoFnRunner.create(underlying, views, reader); - runner.startBundle(); - return runner; - } - - @Test - public void startFinishBundleDelegates() { - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.>of(singletonView)); - - assertThat(underlying.started, is(true)); - assertThat(underlying.finished, is(false)); - runner.finishBundle(); - assertThat(underlying.finished, is(true)); - } - - @Test - public void processElementSideInputNotReady() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(false); - - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.>of(singletonView)); - - WindowedValue oneWindow = - WindowedValue.of( - 2, - new Instant(-2), - new IntervalWindow(new Instant(-500L), new Instant(0L)), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable> oneWindowPushback = - runner.processElementInReadyWindows(oneWindow); - assertThat(oneWindowPushback, containsInAnyOrder(oneWindow)); - assertThat(underlying.inputElems, Matchers.>emptyIterable()); - } - - @Test - public void processElementSideInputNotReadyMultipleWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(false); - - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.>of(singletonView)); - - WindowedValue multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); - assertThat(underlying.inputElems, Matchers.>emptyIterable()); - } - - @Test - public void processElementSideInputNotReadySomeWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.eq(GlobalWindow.INSTANCE))) - .thenReturn(false); - when( - reader.isReady( - Mockito.eq(singletonView), - org.mockito.AdditionalMatchers.not(Mockito.eq(GlobalWindow.INSTANCE)))) - .thenReturn(true); - - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.>of(singletonView)); - - IntervalWindow littleWindow = new IntervalWindow(new Instant(-500L), new Instant(0L)); - IntervalWindow bigWindow = - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)); - WindowedValue multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of(littleWindow, bigWindow, GlobalWindow.INSTANCE), - PaneInfo.NO_FIRING); - Iterable> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat( - multiWindowPushback, - containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); - assertThat(underlying.inputElems, - containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING), - WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING))); - } - - @Test - public void processElementSideInputReadyAllWindows() { - when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class))) - .thenReturn(true); - - ImmutableList> views = ImmutableList.>of(singletonView); - PushbackSideInputDoFnRunner runner = createRunner(views); - - WindowedValue multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, emptyIterable()); - assertThat(underlying.inputElems, - containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); - } - - @Test - public void processElementNoSideInputs() { - PushbackSideInputDoFnRunner runner = - createRunner(ImmutableList.>of()); - - WindowedValue multiWindow = - WindowedValue.of( - 2, - new Instant(-2), - ImmutableList.of( - new IntervalWindow(new Instant(-500L), new Instant(0L)), - new IntervalWindow(BoundedWindow.TIMESTAMP_MIN_VALUE, new Instant(250L)), - GlobalWindow.INSTANCE), - PaneInfo.ON_TIME_AND_ONLY_FIRING); - Iterable> multiWindowPushback = - runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, emptyIterable()); - assertThat(underlying.inputElems, containsInAnyOrder(multiWindow)); - } - - private static class TestDoFnRunner implements DoFnRunner { - List> inputElems; - private boolean started = false; - private boolean finished = false; - - @Override - public void startBundle() { - started = true; - inputElems = new ArrayList<>(); - } - - @Override - public void processElement(WindowedValue elem) { - inputElems.add(elem); - } - - @Override - public void finishBundle() { - finished = true; - } - } -}