Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 74348200CDD for ; Mon, 24 Jul 2017 06:34:51 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 72DE21642D9; Mon, 24 Jul 2017 04:34:51 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A5BBA16429E for ; Mon, 24 Jul 2017 06:34:49 +0200 (CEST) Received: (qmail 90163 invoked by uid 500); 24 Jul 2017 04:34:47 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 88868 invoked by uid 99); 24 Jul 2017 04:34:47 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 24 Jul 2017 04:34:47 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 799A4F32D5; Mon, 24 Jul 2017 04:34:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kenn@apache.org To: commits@beam.apache.org Date: Mon, 24 Jul 2017 04:35:22 -0000 Message-Id: <8c06f00718414161ae6b808d2c24fc99@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [39/50] [abbrv] beam git commit: Removes OldDoFn and its kin from runners-core archived-at: Mon, 24 Jul 2017 04:34:51 -0000 http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java deleted file mode 100644 index 761ffb8..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/GroupAlsoByWindowsProperties.java +++ /dev/null @@ -1,744 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.contains; -import static org.hamcrest.Matchers.containsInAnyOrder; -import static org.hamcrest.Matchers.equalTo; -import static org.hamcrest.Matchers.hasSize; -import static org.junit.Assert.assertThat; - -import com.google.common.base.Predicate; -import com.google.common.cache.CacheBuilder; -import com.google.common.cache.CacheLoader; -import com.google.common.cache.LoadingCache; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import javax.annotation.Nullable; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.options.PipelineOptionsFactory; -import org.apache.beam.sdk.transforms.Combine.CombineFn; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.FixedWindows; -import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.Sessions; -import org.apache.beam.sdk.transforms.windowing.SlidingWindows; -import org.apache.beam.sdk.transforms.windowing.TimestampCombiner; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.TimestampedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.apache.beam.sdk.values.WindowingStrategy; -import org.joda.time.Duration; -import org.joda.time.Instant; - -/** - * Properties of {@link GroupAlsoByWindowsDoFn}. - * - *

Some properties may not hold of some implementations, due to restrictions on the context in - * which the implementation is applicable. For example, some {@code GroupAlsoByWindows} may not - * support merging windows. - */ -public class GroupAlsoByWindowsProperties { - - /** - * A factory of {@link GroupAlsoByWindowsDoFn} so that the various properties can provide the - * appropriate windowing strategy under test. - */ - public interface GroupAlsoByWindowsDoFnFactory { - GroupAlsoByWindowsDoFn forStrategy( - WindowingStrategy strategy, StateInternalsFactory stateInternalsFactory); - } - - /** - * Tests that for empty input and the given {@link WindowingStrategy}, the provided GABW - * implementation produces no output. - * - *

The input type is deliberately left as a wildcard, since it is not relevant. - */ - public static void emptyInputEmptyOutput( - GroupAlsoByWindowsDoFnFactory gabwFactory) throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - // This key should never actually be used, though it is eagerly passed to the - // StateInternalsFactory so must be non-null - @SuppressWarnings("unchecked") - K fakeKey = (K) "this key should never be used"; - - List>> result = - runGABW( - gabwFactory, - windowingStrategy, - fakeKey, - Collections.>emptyList()); - - assertThat(result, hasSize(0)); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows. - */ - public static void groupsElementsIntoFixedWindows( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "key", - WindowedValue.of( - "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 10)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(10, 20)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them into sliding windows. - * - *

In the input here, each element occurs in multiple windows. - */ - public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "key", - WindowedValue.of( - "v1", - new Instant(5), - Arrays.asList(window(-10, 10), window(0, 20)), - PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", - new Instant(15), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(3)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(-10, 10)); - assertThat(item0.getValue().getValue(), contains("v1")); - assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(0, 20)); - assertThat(item1.getValue().getValue(), containsInAnyOrder("v1", "v2")); - // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window - assertThat(item1.getTimestamp(), equalTo(new Instant(10))); - - TimestampedValue>> item2 = - getOnlyElementInWindow(result, window(10, 30)); - assertThat(item2.getValue().getValue(), contains("v2")); - // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window - assertThat(item2.getTimestamp(), equalTo(new Instant(20))); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups and combines them according to sliding windows. - * - *

In the input here, each element occurs in multiple windows. - */ - public static void combinesElementsInSlidingWindows( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.EARLIEST); - - List>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of( - 1L, - new Instant(5), - Arrays.asList(window(-10, 10), window(0, 20)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 2L, - new Instant(15), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, - new Instant(18), - Arrays.asList(window(0, 20), window(10, 30)), - PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(3)); - - TimestampedValue> item0 = getOnlyElementInWindow(result, window(-10, 10)); - assertThat(item0.getValue().getKey(), equalTo("k")); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L)))); - assertThat(item0.getTimestamp(), equalTo(new Instant(5L))); - - TimestampedValue> item1 = getOnlyElementInWindow(result, window(0, 20)); - assertThat(item1.getValue().getKey(), equalTo("k")); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L, 4L)))); - // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window - assertThat(item1.getTimestamp(), equalTo(new Instant(10L))); - - TimestampedValue> item2 = getOnlyElementInWindow(result, window(10, 30)); - assertThat(item2.getValue().getKey(), equalTo("k")); - assertThat(item2.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(2L, 4L)))); - // Timestamp adjusted by WindowFn to exceed the end of the prior sliding window - assertThat(item2.getTimestamp(), equalTo(new Instant(20L))); - } - - /** - * Tests that the given GABW implementation correctly groups elements that fall into overlapping - * windows that are not merged. - */ - public static void groupsIntoOverlappingNonmergingWindows( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "key", - WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING), - WindowedValue.of("v2", new Instant(4), Arrays.asList(window(1, 5)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(4), Arrays.asList(window(0, 5)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 5)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); - assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(1, 5)); - assertThat(item1.getValue().getValue(), contains("v2")); - assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); - } - - /** Tests that the given GABW implementation correctly groups elements into merged sessions. */ - public static void groupsElementsInMergedSessions( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "key", - WindowedValue.of( - "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 15)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(15, 25)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per - * session window correctly according to the provided {@link CombineFn}. - */ - public static void combinesElementsPerSession( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))); - - List>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), - WindowedValue.of( - 4L, new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue> item0 = getOnlyElementInWindow(result, window(0, 15)); - assertThat(item0.getValue().getKey(), equalTo("k")); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue> item1 = getOnlyElementInWindow(result, window(15, 25)); - assertThat(item1.getValue().getKey(), equalTo("k")); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link TimestampCombiner#END_OF_WINDOW}. - */ - public static void groupsElementsIntoFixedWindowsWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "key", - WindowedValue.of( - "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 10)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(10, 20)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp according - * to the policy {@link TimestampCombiner#LATEST}. - */ - public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.LATEST); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of( - "v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 10)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(2))); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(10, 20)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(13))); - } - - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions with - * output timestamps at the end of the merged window. - */ - public static void groupsElementsInMergedSessionsWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); - - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of( - "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(15), Arrays.asList(window(15, 25)), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - TimestampedValue>> item0 = - getOnlyElementInWindow(result, window(0, 15)); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, window(15, 25)); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); - } - - /** - * Tests that the given GABW implementation correctly groups elements into merged sessions with - * output timestamps at the end of the merged window. - */ - public static void groupsElementsInMergedSessionsWithLatestTimestamp( - GroupAlsoByWindowsDoFnFactory> gabwFactory) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.LATEST); - - BoundedWindow unmergedWindow = window(15, 25); - List>>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of( - "v1", new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v2", new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), - WindowedValue.of( - "v3", new Instant(15), Arrays.asList(unmergedWindow), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - BoundedWindow mergedWindow = window(0, 15); - TimestampedValue>> item0 = - getOnlyElementInWindow(result, mergedWindow); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(5))); - - TimestampedValue>> item1 = - getOnlyElementInWindow(result, unmergedWindow); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(15))); - } - - /** - * Tests that the given {@link GroupAlsoByWindowsDoFn} implementation combines elements per - * session window correctly according to the provided {@link CombineFn}. - */ - public static void combinesElementsPerSessionWithEndOfWindowTimestamp( - GroupAlsoByWindowsDoFnFactory gabwFactory, - CombineFn combineFn) - throws Exception { - - WindowingStrategy windowingStrategy = - WindowingStrategy.of(Sessions.withGapDuration(Duration.millis(10))) - .withTimestampCombiner(TimestampCombiner.END_OF_WINDOW); - - BoundedWindow secondWindow = window(15, 25); - List>> result = - runGABW( - gabwFactory, - windowingStrategy, - "k", - WindowedValue.of(1L, new Instant(0), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of(2L, new Instant(5), Arrays.asList(window(5, 15)), PaneInfo.NO_FIRING), - WindowedValue.of(4L, new Instant(15), Arrays.asList(secondWindow), PaneInfo.NO_FIRING)); - - assertThat(result, hasSize(2)); - - BoundedWindow firstResultWindow = window(0, 15); - TimestampedValue> item0 = getOnlyElementInWindow(result, firstResultWindow); - assertThat(item0.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(1L, 2L)))); - assertThat(item0.getTimestamp(), equalTo(firstResultWindow.maxTimestamp())); - - TimestampedValue> item1 = getOnlyElementInWindow(result, secondWindow); - assertThat(item1.getValue().getValue(), equalTo(combineFn.apply(ImmutableList.of(4L)))); - assertThat(item1.getTimestamp(), equalTo(secondWindow.maxTimestamp())); - } - - @SafeVarargs - private static - List>> runGABW( - GroupAlsoByWindowsDoFnFactory gabwFactory, - WindowingStrategy windowingStrategy, - K key, - WindowedValue... values) - throws Exception { - return runGABW(gabwFactory, windowingStrategy, key, Arrays.asList(values)); - } - - private static - List>> runGABW( - GroupAlsoByWindowsDoFnFactory gabwFactory, - WindowingStrategy windowingStrategy, - K key, - Collection> values) - throws Exception { - - final StateInternalsFactory stateInternalsCache = new CachingStateInternalsFactory(); - - List>> output = - processElement( - gabwFactory.forStrategy(windowingStrategy, stateInternalsCache), - KV.>>of(key, values)); - - // Sanity check for corruption - for (WindowedValue> value : output) { - assertThat(value.getValue().getKey(), equalTo(key)); - } - - return output; - } - - private static BoundedWindow window(long start, long end) { - return new IntervalWindow(new Instant(start), new Instant(end)); - } - - private static final class CachingStateInternalsFactory implements StateInternalsFactory { - private final LoadingCache stateInternalsCache; - - private CachingStateInternalsFactory() { - this.stateInternalsCache = CacheBuilder.newBuilder().build(new StateInternalsLoader()); - } - - @Override - @SuppressWarnings("unchecked") - public StateInternals stateInternalsForKey(K key) { - try { - return stateInternalsCache.get(key); - } catch (Exception exc) { - throw new RuntimeException(exc); - } - } - } - - private static class StateInternalsLoader extends CacheLoader { - @Override - public StateInternals load(K key) throws Exception { - return InMemoryStateInternals.forKey(key); - } - } - - private static - List>> processElement( - GroupAlsoByWindowsDoFn fn, - KV>> element) - throws Exception { - TestProcessContext c = new TestProcessContext<>(fn, element); - fn.processElement(c); - return c.getOutput(); - } - - private static TimestampedValue> getOnlyElementInWindow( - List>> output, final BoundedWindow window) { - WindowedValue> res = - Iterables.getOnlyElement( - Iterables.filter( - output, - new Predicate>>() { - @Override - public boolean apply(@Nullable WindowedValue> input) { - return input.getWindows().contains(window); - } - })); - return TimestampedValue.of(res.getValue(), res.getTimestamp()); - } - - /** - * A {@link GroupAlsoByWindowsDoFn.ProcessContext} providing just enough context for a {@link - * GroupAlsoByWindowsDoFn} - namely, information about the element and output via {@link - * WindowingInternals}, but no side inputs/outputs and no normal output. - */ - private static class TestProcessContext - extends GroupAlsoByWindowsDoFn.ProcessContext { - private final PipelineOptions options = PipelineOptionsFactory.create(); - private final KV>> element; - private final List>> output = new ArrayList<>(); - - private TestProcessContext( - GroupAlsoByWindowsDoFn fn, - KV>> element) { - fn.super(); - this.element = element; - } - - @Override - public KV>> element() { - return element; - } - - @Override - public Instant timestamp() { - return BoundedWindow.TIMESTAMP_MIN_VALUE; - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public T sideInput(PCollectionView view) { - throw new UnsupportedOperationException(); - } - - @Override - public WindowingInternals>>, KV> - windowingInternals() { - return new WindowingInternals>>, KV>() { - @Override - public void outputWindowedValue( - KV output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - TestProcessContext.this.output.add(WindowedValue.of(output, timestamp, windows, pane)); - } - - @Override - public void outputWindowedValue( - TupleTag tag, - AdditionalOutputT output, - Instant timestamp, - Collection windows, - PaneInfo pane) { - throw new UnsupportedOperationException(); - } - - @Override - public StateInternals stateInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public TimerInternals timerInternals() { - throw new UnsupportedOperationException(); - } - - @Override - public Collection windows() { - return ImmutableList.of(GlobalWindow.INSTANCE); - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public T sideInput(PCollectionView view, BoundedWindow sideInputWindow) { - throw new UnsupportedOperationException(); - } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public void output(KV output) { - throw new UnsupportedOperationException(); - } - - @Override - public void outputWithTimestamp(KV output, Instant timestamp) { - throw new UnsupportedOperationException(); - } - - @Override - public void output(TupleTag tag, T output) { - throw new UnsupportedOperationException(); - } - - @Override - public void outputWithTimestamp(TupleTag tag, T output, Instant timestamp) { - throw new UnsupportedOperationException(); - } - - public List>> getOutput() { - return output; - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java deleted file mode 100644 index 581c3e0..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/NoOpOldDoFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.values.TupleTag; -import org.joda.time.Instant; - -/** - * A {@link OldDoFn} that does nothing with provided elements. Used for testing - * methods provided by the {@link OldDoFn} abstract class. - * - * @param unused. - * @param unused. - */ -class NoOpOldDoFn extends OldDoFn { - @Override - public void processElement(OldDoFn.ProcessContext c) throws Exception { - } - - /** - * Returns a new NoOp Context. - */ - public OldDoFn.Context context() { - return new NoOpDoFnContext(); - } - - /** - * A {@link OldDoFn.Context} that does nothing and returns exclusively null. - */ - private class NoOpDoFnContext extends OldDoFn.Context { - @Override - public PipelineOptions getPipelineOptions() { - return null; - } - @Override - public void output(OutputT output) { - } - @Override - public void outputWithTimestamp(OutputT output, Instant timestamp) { - } - @Override - public void output(TupleTag tag, T output) { - } - @Override - public void outputWithTimestamp(TupleTag tag, T output, - Instant timestamp) { - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java deleted file mode 100644 index f608a81..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/OldDoFnTest.java +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.empty; -import static org.junit.Assert.assertThat; - -import java.io.Serializable; -import org.apache.beam.sdk.transforms.display.DisplayData; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for OldDoFn. - */ -@RunWith(JUnit4.class) -public class OldDoFnTest implements Serializable { - - @Rule - public transient ExpectedException thrown = ExpectedException.none(); - - @Test - public void testPopulateDisplayDataDefaultBehavior() { - OldDoFn usesDefault = - new OldDoFn() { - @Override - public void processElement(ProcessContext c) throws Exception {} - }; - - DisplayData data = DisplayData.from(usesDefault); - assertThat(data.items(), empty()); - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java deleted file mode 100644 index a73ef5e..0000000 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimpleOldDoFnRunnerTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.core; - -import static org.hamcrest.Matchers.is; -import static org.mockito.Mockito.mock; - -import java.util.Arrays; -import java.util.List; -import org.apache.beam.runners.core.BaseExecutionContext.StepContext; -import org.apache.beam.sdk.util.SystemDoFnInternal; -import org.apache.beam.sdk.util.UserCodeException; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.values.TupleTag; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.ExpectedException; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** - * Tests for {@link SimpleOldDoFnRunner} functionality. - */ -@RunWith(JUnit4.class) -public class SimpleOldDoFnRunnerTest { - @Rule - public ExpectedException thrown = ExpectedException.none(); - - @Test - public void testExceptionsWrappedAsUserCodeException() { - ThrowingDoFn fn = new ThrowingDoFn(); - DoFnRunner runner = createRunner(fn); - - thrown.expect(UserCodeException.class); - thrown.expectCause(is(fn.exceptionToThrow)); - - runner.processElement(WindowedValue.valueInGlobalWindow("anyValue")); - } - - @Test - public void testSystemDoFnInternalExceptionsNotWrapped() { - ThrowingSystemDoFn fn = new ThrowingSystemDoFn(); - DoFnRunner runner = createRunner(fn); - - thrown.expect(is(fn.exceptionToThrow)); - - runner.processElement(WindowedValue.valueInGlobalWindow("anyValue")); - } - - private DoFnRunner createRunner(OldDoFn fn) { - // Pass in only necessary parameters for the test - List> additionalOutputTags = Arrays.asList(); - StepContext context = mock(StepContext.class); - return new SimpleOldDoFnRunner<>( - null, fn, null, null, null, additionalOutputTags, context, null); - } - - static class ThrowingDoFn extends OldDoFn { - final Exception exceptionToThrow = - new UnsupportedOperationException("Expected exception"); - - @Override - public void processElement(ProcessContext c) throws Exception { - throw exceptionToThrow; - } - } - - @SystemDoFnInternal - static class ThrowingSystemDoFn extends ThrowingDoFn { - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 1d079d9..84be15d 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -24,7 +24,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.ArrayList; import java.util.Collection; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; @@ -143,9 +143,9 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); reduceFn = SystemReduceFn.buffering(valueCoder); droppedDueToClosedWindow = Metrics.counter(GroupAlsoByWindowEvaluator.class, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); droppedDueToLateness = Metrics.counter(GroupAlsoByWindowEvaluator.class, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java index 1d843f9..815b6ba 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -23,7 +23,8 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.GroupAlsoByWindowsAggregators; +import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.LateDataUtils; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -76,7 +77,7 @@ import scala.reflect.ClassTag; import scala.runtime.AbstractFunction1; /** - * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn} + * An implementation of {@link GroupAlsoByWindow} * logic for grouping by windows and controlling trigger firings and pane accumulation. * *

This implementation is a composite of Spark transformations revolving around state management @@ -210,10 +211,10 @@ public class SparkGroupAlsoByWindowViaWindowSet { final MetricsContainerImpl cellProvider = new MetricsContainerImpl("cellProvider"); final CounterCell droppedDueToClosedWindow = cellProvider.getCounter( MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER)); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER)); final CounterCell droppedDueToLateness = cellProvider.getCounter( MetricName.named(SparkGroupAlsoByWindowViaWindowSet.class, - GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER)); + GroupAlsoByWindowsAggregators.DROPPED_DUE_TO_LATENESS_COUNTER)); AbstractIterator< Tuple2>>*/ List>>> http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java index 18a3dd8..3d76a61 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkAssignWindowFn.java @@ -22,6 +22,7 @@ import com.google.common.collect.Iterables; import java.util.Collection; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.Window.Assign; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowedValue; import org.apache.spark.api.java.function.Function; @@ -29,7 +30,8 @@ import org.joda.time.Instant; /** - * An implementation of {@link org.apache.beam.runners.core.AssignWindowsDoFn} for the Spark runner. + * An implementation of {@link Assign} for the Spark + * runner. */ public class SparkAssignWindowFn implements Function, WindowedValue> { http://git-wip-us.apache.org/repos/asf/beam/blob/6127f532/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java index a70885b..be02335 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/translation/SparkGroupAlsoByWindowViaOutputBufferFn.java @@ -21,7 +21,7 @@ package org.apache.beam.runners.spark.translation; import java.util.ArrayList; import java.util.Collection; import java.util.List; -import org.apache.beam.runners.core.GroupAlsoByWindowViaOutputBufferDoFn; +import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.InMemoryTimerInternals; import org.apache.beam.runners.core.OutputWindowedValue; import org.apache.beam.runners.core.ReduceFnRunner; @@ -45,7 +45,7 @@ import org.apache.spark.api.java.function.FlatMapFunction; import org.joda.time.Instant; /** - * An implementation of {@link GroupAlsoByWindowViaOutputBufferDoFn} + * An implementation of {@link GroupAlsoByWindow} * for the Spark runner. */ public class SparkGroupAlsoByWindowViaOutputBufferFn