Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40B32200BA6 for ; Tue, 13 Sep 2016 02:40:40 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 3F52C160AB8; Tue, 13 Sep 2016 00:40:40 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B731F160AD8 for ; Tue, 13 Sep 2016 02:40:38 +0200 (CEST) Received: (qmail 711 invoked by uid 500); 13 Sep 2016 00:40:38 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 624 invoked by uid 99); 13 Sep 2016 00:40:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 56840C004D for ; Tue, 13 Sep 2016 00:40:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.344 X-Spam-Level: X-Spam-Status: No, score=-4.344 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-1.124] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id i6pQRRiwwkab for ; Tue, 13 Sep 2016 00:40:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 79ADC5FAC8 for ; Tue, 13 Sep 2016 00:40:33 +0000 (UTC) Received: (qmail 98601 invoked by uid 99); 13 Sep 2016 00:40:32 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Sep 2016 00:40:32 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B27CAE0230; Tue, 13 Sep 2016 00:40:32 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dhalperi@apache.org To: commits@beam.incubator.apache.org Date: Tue, 13 Sep 2016 00:40:47 -0000 Message-Id: In-Reply-To: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> References: <5d2051da80084a09aedbe0b48aa93047@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [16/50] [abbrv] incubator-beam git commit: Add Latest CombineFn and PTransforms archived-at: Tue, 13 Sep 2016 00:40:40 -0000 Add Latest CombineFn and PTransforms Add DoFnTester support for specifying input timestamps Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6ee7b620 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6ee7b620 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6ee7b620 Branch: refs/heads/gearpump-runner Commit: 6ee7b620bf8e2ee07c0f30e9ff20363e79765405 Parents: 28ad44d Author: Scott Wegner Authored: Thu Aug 18 13:56:34 2016 -0700 Committer: Dan Halperin Committed: Mon Sep 12 17:40:11 2016 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/coders/NullableCoder.java | 7 + .../apache/beam/sdk/transforms/DoFnTester.java | 33 ++- .../org/apache/beam/sdk/transforms/Latest.java | 203 ++++++++++++++++ .../beam/sdk/values/TimestampedValue.java | 14 ++ .../beam/sdk/transforms/DoFnTesterTest.java | 34 ++- .../beam/sdk/transforms/LatestFnTests.java | 233 +++++++++++++++++++ .../apache/beam/sdk/transforms/LatestTest.java | 146 ++++++++++++ .../beam/sdk/values/TimestampedValueTest.java | 83 +++++++ 8 files changed, 747 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java index 44aadbd..9c6c7c0 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/coders/NullableCoder.java @@ -65,6 +65,13 @@ public class NullableCoder extends StandardCoder { this.valueCoder = valueCoder; } + /** + * Returns the inner {@link Coder} wrapped by this {@link NullableCoder} instance. + */ + public Coder getValueCoder() { + return valueCoder; + } + @Override public void encode(@Nullable T value, OutputStream outStream, Context context) throws IOException, CoderException { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index b867a55..0e018ba 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -17,6 +17,9 @@ */ package org.apache.beam.sdk.transforms; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + import com.google.common.base.Function; import com.google.common.base.MoreObjects; import com.google.common.collect.ImmutableList; @@ -221,9 +224,26 @@ public class DoFnTester { * been finished */ public void processElement(InputT element) throws Exception { - if (state == State.FINISHED) { - throw new IllegalStateException("finishBundle() has already been called"); - } + processTimestampedElement(TimestampedValue.atMinimumTimestamp(element)); + } + + /** + * Calls {@link OldDoFn#processElement} on the {@code OldDoFn} under test, in a + * context where {@link OldDoFn.ProcessContext#element} returns the + * given element and timestamp. + * + *

Will call {@link #startBundle} automatically, if it hasn't + * already been called. + * + *

If the input timestamp is {@literal null}, the minimum timestamp will be used. + * + * @throws IllegalStateException if the {@code OldDoFn} under test has already + * been finished + */ + public void processTimestampedElement(TimestampedValue element) throws Exception { + checkNotNull(element, "Timestamped element cannot be null"); + checkState(state != State.FINISHED, "finishBundle() has already been called"); + if (state == State.UNSTARTED) { startBundle(); } @@ -522,10 +542,13 @@ public class DoFnTester { private TestProcessContext createProcessContext( OldDoFn fn, - InputT elem) { + TimestampedValue elem) { + WindowedValue windowedValue = WindowedValue.timestampedValueInGlobalWindow( + elem.getValue(), elem.getTimestamp()); + return new TestProcessContext<>(fn, createContext(fn), - WindowedValue.valueInGlobalWindow(elem), + windowedValue, mainOutputTag, sideInputs); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java new file mode 100644 index 0000000..7f13649 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java @@ -0,0 +1,203 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkNotNull; +import static com.google.common.base.Preconditions.checkState; + +import java.util.Iterator; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +/** + * {@link PTransform} and {@link Combine.CombineFn} for computing the latest element + * in a {@link PCollection}. + * + *

Example 1: compute the latest value for each session: + *


+ * PCollection input = ...;
+ * PCollection sessioned = input
+ *    .apply(Window.into(Sessions.withGapDuration(Duration.standardMinutes(5)));
+ * PCollection latestValues = sessioned.apply(Latest.globally());
+ * 
+ * + *

Example 2: track a latest computed value in an aggregator: + *


+ * class MyDoFn extends DoFn {
+ *  private Aggregator, Double> latestValue =
+ *    createAggregator("latestValue", new Latest.LatestFn());
+ *
+ *  {@literal @}ProcessElement
+ *  public void processElement(ProcessContext c) {
+ *    double val = // ..
+ *    latestValue.addValue(TimestampedValue.of(val, c.timestamp()));
+ *    // ..
+ *  }
+ * }
+ * 
+ * + *

For elements with the same timestamp, the element chosen for output is arbitrary. + */ +public class Latest { + // Do not instantiate + private Latest() {} + + /** + * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This is + * particularly useful as an {@link Aggregator}. + * + * @param Type of input element. + * @see Latest + */ + public static class LatestFn + extends Combine.CombineFn, TimestampedValue, T> { + /** Construct a new {@link LatestFn} instance. */ + public LatestFn() {} + + @Override + public TimestampedValue createAccumulator() { + return TimestampedValue.atMinimumTimestamp(null); + } + + @Override + public TimestampedValue addInput(TimestampedValue accumulator, + TimestampedValue input) { + checkNotNull(accumulator, "accumulator must be non-null"); + checkNotNull(input, "input must be non-null"); + + if (input.getTimestamp().isBefore(accumulator.getTimestamp())) { + return accumulator; + } else { + return input; + } + } + + @Override + public Coder> getAccumulatorCoder(CoderRegistry registry, + Coder> inputCoder) throws CannotProvideCoderException { + return NullableCoder.of(inputCoder); + } + + @Override + public Coder getDefaultOutputCoder(CoderRegistry registry, + Coder> inputCoder) throws CannotProvideCoderException { + checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder, + "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder); + + TimestampedValue.TimestampedValueCoder inputTVCoder = + (TimestampedValue.TimestampedValueCoder) inputCoder; + return NullableCoder.of(inputTVCoder.getValueCoder()); + } + + @Override + public TimestampedValue mergeAccumulators(Iterable> accumulators) { + checkNotNull(accumulators, "accumulators must be non-null"); + + Iterator> iter = accumulators.iterator(); + if (!iter.hasNext()) { + return createAccumulator(); + } + + TimestampedValue merged = iter.next(); + while (iter.hasNext()) { + merged = addInput(merged, iter.next()); + } + + return merged; + } + + @Override + public T extractOutput(TimestampedValue accumulator) { + return accumulator.getValue(); + } + } + + /** + * Returns a {@link PTransform} that takes as input a {@link PCollection} and returns a + * {@link PCollection} whose contents is the latest element according to its event time, or + * {@literal null} if there are no elements. + * + * @param The type of the elements being combined. + */ + public static PTransform, PCollection> globally() { + return new Globally<>(); + } + + /** + * Returns a {@link PTransform} that takes as input a {@code PCollection>} and returns a + * {@code PCollection>} whose contents is the latest element per-key according to its + * event time. + * + * @param The key type of the elements being combined. + * @param The value type of the elements being combined. + */ + public static PTransform>, PCollection>> perKey() { + return new PerKey<>(); + } + + private static class Globally extends PTransform, PCollection> { + @Override + public PCollection apply(PCollection input) { + Coder inputCoder = input.getCoder(); + + return input + .apply("Reify Timestamps", ParDo.of( + new DoFn>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + })).setCoder(TimestampedValue.TimestampedValueCoder.of(inputCoder)) + .apply("Latest Value", Combine.globally(new LatestFn())) + .setCoder(NullableCoder.of(inputCoder)); + } + } + + private static class PerKey + extends PTransform>, PCollection>> { + @Override + public PCollection> apply(PCollection> input) { + checkNotNull(input); + checkArgument(input.getCoder() instanceof KvCoder, + "Input specifiedCoder must be an instance of KvCoder, but was %s", input.getCoder()); + + @SuppressWarnings("unchecked") + KvCoder inputCoder = (KvCoder) input.getCoder(); + return input + .apply("Reify Timestamps", ParDo.of( + new DoFn, KV>>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(KV.of(c.element().getKey(), TimestampedValue.of(c.element().getValue(), + c.timestamp()))); + } + })).setCoder(KvCoder.of( + inputCoder.getKeyCoder(), + TimestampedValue.TimestampedValueCoder.of(inputCoder.getValueCoder()))) + .apply("Latest Value", Combine., V>perKey(new LatestFn())) + .setCoder(inputCoder); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java index f2ad616..dd80fb2 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/values/TimestampedValue.java @@ -31,6 +31,7 @@ import java.util.Objects; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.InstantCoder; import org.apache.beam.sdk.coders.StandardCoder; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.util.PropertyNames; import org.joda.time.Instant; @@ -43,6 +44,13 @@ import org.joda.time.Instant; * @param the type of the value */ public class TimestampedValue { + /** + * Returns a new {@link TimestampedValue} with the + * {@link BoundedWindow#TIMESTAMP_MIN_VALUE minimum timestamp}. + */ + public static TimestampedValue atMinimumTimestamp(V value) { + return of(value, BoundedWindow.TIMESTAMP_MIN_VALUE); + } /** * Returns a new {@code TimestampedValue} with the given value and timestamp. @@ -136,6 +144,10 @@ public class TimestampedValue { return Arrays.>asList(valueCoder); } + public Coder getValueCoder() { + return valueCoder; + } + public static List getInstanceComponents(TimestampedValue exampleValue) { return Arrays.asList(exampleValue.getValue()); } @@ -147,6 +159,8 @@ public class TimestampedValue { private final Instant timestamp; protected TimestampedValue(V value, Instant timestamp) { + checkNotNull(timestamp, "timestamp must be non-null"); + this.value = value; this.timestamp = timestamp; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index 2649be5..3ed30fd 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItems; @@ -35,7 +36,9 @@ import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; import org.joda.time.Instant; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.ExpectedException; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -44,6 +47,7 @@ import org.junit.runners.JUnit4; */ @RunWith(JUnit4.class) public class DoFnTesterTest { + @Rule public ExpectedException thrown = ExpectedException.none(); @Test public void processElement() throws Exception { @@ -126,6 +130,16 @@ public class DoFnTesterTest { } @Test + public void processElementAfterFinish() throws Exception { + DoFnTester tester = DoFnTester.of(new CounterDoFn()); + tester.finishBundle(); + + thrown.expect(IllegalStateException.class); + thrown.expectMessage("finishBundle() has already been called"); + tester.processElement(1L); + } + + @Test public void processBatch() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester tester = DoFnTester.of(counterDoFn); @@ -145,7 +159,25 @@ public class DoFnTesterTest { } @Test - public void processElementWithTimestamp() throws Exception { + public void processTimestampedElement() throws Exception { + DoFn> reifyTimestamps = new ReifyTimestamps(); + + DoFnTester> tester = DoFnTester.of(reifyTimestamps); + + TimestampedValue input = TimestampedValue.of(1L, new Instant(100)); + tester.processTimestampedElement(input); + assertThat(tester.takeOutputElements(), contains(input)); + } + + static class ReifyTimestamps extends DoFn> { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(TimestampedValue.of(c.element(), c.timestamp())); + } + } + + @Test + public void processElementWithOutputTimestamp() throws Exception { CounterDoFn counterDoFn = new CounterDoFn(); DoFnTester tester = DoFnTester.of(counterDoFn); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java new file mode 100644 index 0000000..84b5b68 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.nullValue; +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.Lists; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Objects; +import org.apache.beam.sdk.coders.CannotProvideCoderException; +import org.apache.beam.sdk.coders.CoderRegistry; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.values.TimestampedValue; +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest.LatestFn}. + * */ +@RunWith(JUnit4.class) +public class LatestFnTests { + private static final Instant INSTANT = new Instant(100); + private static final long VALUE = 100 * INSTANT.getMillis(); + + private static final TimestampedValue TV = TimestampedValue.of(VALUE, INSTANT); + private static final TimestampedValue TV_MINUS_TEN = + TimestampedValue.of(VALUE - 10, INSTANT.minus(10)); + private static final TimestampedValue TV_PLUS_TEN = + TimestampedValue.of(VALUE + 10, INSTANT.plus(10)); + + @Rule + public final ExpectedException thrown = ExpectedException.none(); + + private final Latest.LatestFn fn = new Latest.LatestFn<>(); + private final Instant baseTimestamp = Instant.now(); + + @Test + public void testDefaultValue() { + assertThat(fn.defaultValue(), nullValue()); + } + + @Test + public void testCreateAccumulator() { + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator()); + } + + @Test + public void testAddInputInitialAdd() { + TimestampedValue input = TV; + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputMinTimestamp() { + TimestampedValue input = TimestampedValue.atMinimumTimestamp(1234L); + assertEquals(input, fn.addInput(fn.createAccumulator(), input)); + } + + @Test + public void testAddInputEarlierValue() { + assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN)); + } + + @Test + public void testAddInputLaterValue() { + assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN)); + } + + @Test + public void testAddInputSameTimestamp() { + TimestampedValue accum = TimestampedValue.of(100L, INSTANT); + TimestampedValue input = TimestampedValue.of(200L, INSTANT); + + assertThat("Latest for values with the same timestamp is chosen arbitrarily", + fn.addInput(accum, input), isOneOf(accum, input)); + } + + @Test + public void testAddInputNullAccumulator() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulators"); + fn.addInput(null, TV); + } + + @Test + public void testAddInputNullInput() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("input"); + fn.addInput(TV, null); + } + + @Test + public void testAddInputNullValue() { + TimestampedValue input = TimestampedValue.of(null, INSTANT.plus(10)); + assertEquals("Null values are allowed", input, fn.addInput(TV, input)); + } + + @Test + public void testMergeAccumulatorsMultipleValues() { + Iterable> accums = Lists.newArrayList( + TV, + TV_PLUS_TEN, + TV_MINUS_TEN + ); + + assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums)); + } + + @Test + public void testMergeAccumulatorsSingleValue() { + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV))); + } + + @Test + public void testMergeAccumulatorsEmptyIterable() { + ArrayList> emptyAccums = Lists.newArrayList(); + assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums)); + } + + @Test + public void testMergeAccumulatorsDefaultAccumulator() { + TimestampedValue defaultAccum = fn.createAccumulator(); + assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsAllDefaultAccumulators() { + TimestampedValue defaultAccum = fn.createAccumulator(); + assertEquals(defaultAccum, fn.mergeAccumulators( + Lists.newArrayList(defaultAccum, defaultAccum))); + } + + @Test + public void testMergeAccumulatorsNullIterable() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("accumulators"); + fn.mergeAccumulators(null); + } + + @Test + public void testExtractOutput() { + assertEquals(TV.getValue(), fn.extractOutput(TV)); + } + + @Test + public void testExtractOutputDefaultAggregator() { + TimestampedValue accum = fn.createAccumulator(); + assertThat(fn.extractOutput(accum), nullValue()); + } + + @Test + public void testExtractOutputNullValue() { + TimestampedValue accum = TimestampedValue.of(null, baseTimestamp); + assertEquals(null, fn.extractOutput(accum)); + } + + @Test + public void testAggregator() throws Exception { + LatestAggregatorsFn doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue()); + DoFnTester harness = DoFnTester.of(doFn); + for (TimestampedValue element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN)) { + harness.processTimestampedElement(element); + } + + assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg)); + assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg)); + assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue()); + } + + @Test + public void testDefaultCoderHandlesNull() throws CannotProvideCoderException { + Latest.LatestFn fn = new Latest.LatestFn<>(); + + CoderRegistry registry = new CoderRegistry(); + TimestampedValue.TimestampedValueCoder inputCoder = + TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of()); + + assertThat("Default output coder should handle null values", + fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + assertThat("Default accumulator coder should handle null values", + fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class)); + } + + static class LatestAggregatorsFn extends DoFn { + private final T specialValue; + LatestAggregatorsFn(T specialValue) { + this.specialValue = specialValue; + } + + Aggregator, T> allValuesAgg = + createAggregator("allValues", new Latest.LatestFn()); + + Aggregator, T> specialValueAgg = + createAggregator("oneValue", new Latest.LatestFn()); + + Aggregator, T> noValuesAgg = + createAggregator("noValues", new Latest.LatestFn()); + + @ProcessElement + public void processElement(ProcessContext c) { + TimestampedValue val = TimestampedValue.of(c.element(), c.timestamp()); + allValuesAgg.addValue(val); + if (Objects.equals(c.element(), specialValue)) { + specialValueAgg.addValue(val); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java new file mode 100644 index 0000000..ce9ae37 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestTest.java @@ -0,0 +1,146 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.instanceOf; +import static org.junit.Assert.assertEquals; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.beam.sdk.coders.AvroCoder; +import org.apache.beam.sdk.coders.BigEndianLongCoder; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.NullableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.testing.NeedsRunner; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.TimestampedValue; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link Latest} {@link PTransform} and {@link Combine.CombineFn}. + */ +@RunWith(JUnit4.class) +public class LatestTest implements Serializable { + @Rule public transient ExpectedException thrown = ExpectedException.none(); + + @Test + @Category(NeedsRunner.class) + public void testGloballyEventTimestamp() { + TestPipeline p = TestPipeline.create(); + PCollection output = + p.apply(Create.timestamped( + TimestampedValue.of("foo", new Instant(100)), + TimestampedValue.of("bar", new Instant(300)), + TimestampedValue.of("baz", new Instant(200)) + )) + .apply(Latest.globally()); + + PAssert.that(output).containsInAnyOrder("bar"); + p.run(); + } + + @Test + public void testGloballyOutputCoder() { + TestPipeline p = TestPipeline.create(); + BigEndianLongCoder inputCoder = BigEndianLongCoder.of(); + + PCollection output = + p.apply(Create.of(1L, 2L).withCoder(inputCoder)) + .apply(Latest.globally()); + + Coder outputCoder = output.getCoder(); + assertThat(outputCoder, instanceOf(NullableCoder.class)); + assertEquals(inputCoder, ((NullableCoder) outputCoder).getValueCoder()); + } + + @Test + @Category(NeedsRunner.class) + public void testGloballyEmptyCollection() { + TestPipeline p = TestPipeline.create(); + PCollection emptyInput = p.apply(Create.of() + // Explicitly set coder such that then runner enforces encodability. + .withCoder(VarLongCoder.of())); + PCollection output = emptyInput.apply(Latest.globally()); + + PAssert.that(output).containsInAnyOrder((Long) null); + p.run(); + } + + @Test + @Category(NeedsRunner.class) + public void testPerKeyEventTimestamp() { + TestPipeline p = TestPipeline.create(); + PCollection> output = + p.apply(Create.timestamped( + TimestampedValue.of(KV.of("A", "foo"), new Instant(100)), + TimestampedValue.of(KV.of("B", "bar"), new Instant(300)), + TimestampedValue.of(KV.of("A", "baz"), new Instant(200)) + )) + .apply(Latest.perKey()); + + PAssert.that(output).containsInAnyOrder(KV.of("B", "bar"), KV.of("A", "baz")); + p.run(); + } + + @Test + public void testPerKeyOutputCoder() { + TestPipeline p = TestPipeline.create(); + KvCoder inputCoder = KvCoder.of( + AvroCoder.of(String.class), AvroCoder.of(Long.class)); + + PCollection> output = + p.apply(Create.of(KV.of("foo", 1L)).withCoder(inputCoder)) + .apply(Latest.perKey()); + + assertEquals("Should use input coder for outputs", inputCoder, output.getCoder()); + } + + @Test + @Category(NeedsRunner.class) + public void testPerKeyEmptyCollection() { + TestPipeline p = TestPipeline.create(); + PCollection> output = + p.apply(Create.>of().withCoder(KvCoder.of( + StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply(Latest.perKey()); + + PAssert.that(output).empty(); + p.run(); + } + + /** Helper method to easily create a timestamped value. */ + private static TimestampedValue timestamped(Instant timestamp) { + return TimestampedValue.of(uniqueLong.incrementAndGet(), timestamp); + } + private static final AtomicLong uniqueLong = new AtomicLong(); +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6ee7b620/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java new file mode 100644 index 0000000..a982f31 --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/values/TimestampedValueTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.beam.sdk.values; + +import static org.junit.Assert.assertEquals; + +import com.google.common.testing.EqualsTester; + +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; + +import org.joda.time.Instant; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.ExpectedException; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Unit tests for {@link TimestampedValue}. + */ +@RunWith(JUnit4.class) +public class TimestampedValueTest { + @Rule + public ExpectedException thrown = ExpectedException.none(); + + @Test + public void testValues() { + Instant now = Instant.now(); + TimestampedValue tsv = TimestampedValue.of("foobar", now); + + assertEquals(now, tsv.getTimestamp()); + assertEquals("foobar", tsv.getValue()); + } + + @Test + public void testAtMinimumTimestamp() { + TimestampedValue tsv = TimestampedValue.atMinimumTimestamp("foobar"); + assertEquals(BoundedWindow.TIMESTAMP_MIN_VALUE, tsv.getTimestamp()); + } + + @Test + public void testNullTimestamp() { + thrown.expect(NullPointerException.class); + thrown.expectMessage("timestamp"); + TimestampedValue.of("foobar", null); + } + + @Test + public void testNullValue() { + TimestampedValue tsv = TimestampedValue.atMinimumTimestamp(null); + assertEquals(null, tsv.getValue()); + } + + @Test + public void testEquality() { + new EqualsTester() + .addEqualityGroup( + TimestampedValue.of("foo", new Instant(1000)), + TimestampedValue.of("foo", new Instant(1000))) + .addEqualityGroup(TimestampedValue.of("foo", new Instant(2000))) + .addEqualityGroup(TimestampedValue.of("bar", new Instant(1000))) + .addEqualityGroup( + TimestampedValue.of("foo", BoundedWindow.TIMESTAMP_MIN_VALUE), + TimestampedValue.atMinimumTimestamp("foo")) + .testEquals(); + } +}