beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From lc...@apache.org
Subject [2/3] incubator-beam git commit: Move LatestFnTests to LatestFnTest
Date Thu, 06 Oct 2016 16:57:24 GMT
Move LatestFnTests to LatestFnTest


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/52e43ac7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/52e43ac7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/52e43ac7

Branch: refs/heads/master
Commit: 52e43ac7b8257ecbcda61eb3b14406c36df08a3b
Parents: 60a8aef
Author: Kenneth Knowles <klk@google.com>
Authored: Tue Oct 4 13:23:37 2016 -0700
Committer: Luke Cwik <lcwik@google.com>
Committed: Thu Oct 6 09:49:53 2016 -0700

----------------------------------------------------------------------
 .../beam/sdk/transforms/LatestFnTest.java       | 233 +++++++++++++++++++
 .../beam/sdk/transforms/LatestFnTests.java      | 233 -------------------
 2 files changed, 233 insertions(+), 233 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
new file mode 100644
index 0000000..31acb08
--- /dev/null
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTest.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.transforms;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.isOneOf;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.Lists;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Objects;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.NullableCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.beam.sdk.values.TimestampedValue;
+import org.joda.time.Instant;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Unit tests for {@link Latest.LatestFn}.
+ * */
+@RunWith(JUnit4.class)
+public class LatestFnTest {
+  private static final Instant INSTANT = new Instant(100);
+  private static final long VALUE = 100 * INSTANT.getMillis();
+
+  private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
+  private static final TimestampedValue<Long> TV_MINUS_TEN =
+      TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
+  private static final TimestampedValue<Long> TV_PLUS_TEN =
+      TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
+
+  @Rule
+  public final ExpectedException thrown = ExpectedException.none();
+
+  private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+  private final Instant baseTimestamp = Instant.now();
+
+  @Test
+  public void testDefaultValue() {
+    assertThat(fn.defaultValue(), nullValue());
+  }
+
+  @Test
+  public void testCreateAccumulator() {
+    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
+  }
+
+  @Test
+  public void testAddInputInitialAdd() {
+    TimestampedValue<Long> input = TV;
+    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+  }
+
+  @Test
+  public void testAddInputMinTimestamp() {
+    TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
+    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
+  }
+
+  @Test
+  public void testAddInputEarlierValue() {
+    assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
+  }
+
+  @Test
+  public void testAddInputLaterValue() {
+    assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
+  }
+
+  @Test
+  public void testAddInputSameTimestamp() {
+    TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
+    TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
+
+    assertThat("Latest for values with the same timestamp is chosen arbitrarily",
+        fn.addInput(accum, input), isOneOf(accum, input));
+  }
+
+  @Test
+  public void testAddInputNullAccumulator() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("accumulator");
+    fn.addInput(null, TV);
+  }
+
+  @Test
+  public void testAddInputNullInput() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("input");
+    fn.addInput(TV, null);
+  }
+
+  @Test
+  public void testAddInputNullValue() {
+    TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
+    assertEquals("Null values are allowed", input, fn.addInput(TV, input));
+  }
+
+  @Test
+  public void testMergeAccumulatorsMultipleValues() {
+    Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
+        TV,
+        TV_PLUS_TEN,
+        TV_MINUS_TEN
+    );
+
+    assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
+  }
+
+  @Test
+  public void testMergeAccumulatorsSingleValue() {
+    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsEmptyIterable() {
+    ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
+    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
+  }
+
+  @Test
+  public void testMergeAccumulatorsDefaultAccumulator() {
+    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsAllDefaultAccumulators() {
+    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
+    assertEquals(defaultAccum, fn.mergeAccumulators(
+        Lists.newArrayList(defaultAccum, defaultAccum)));
+  }
+
+  @Test
+  public void testMergeAccumulatorsNullIterable() {
+    thrown.expect(NullPointerException.class);
+    thrown.expectMessage("accumulators");
+    fn.mergeAccumulators(null);
+  }
+
+  @Test
+  public void testExtractOutput() {
+    assertEquals(TV.getValue(), fn.extractOutput(TV));
+  }
+
+  @Test
+  public void testExtractOutputDefaultAggregator() {
+    TimestampedValue<Long> accum = fn.createAccumulator();
+    assertThat(fn.extractOutput(accum), nullValue());
+  }
+
+  @Test
+  public void testExtractOutputNullValue() {
+    TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
+    assertEquals(null, fn.extractOutput(accum));
+  }
+
+  @Test
+  public void testAggregator() throws Exception {
+    LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
+    DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
+    for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN))
{
+      harness.processTimestampedElement(element);
+    }
+
+    assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
+    assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
+    assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
+  }
+
+  @Test
+  public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
+    Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
+
+    CoderRegistry registry = new CoderRegistry();
+    TimestampedValue.TimestampedValueCoder<Long> inputCoder =
+        TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
+
+    assertThat("Default output coder should handle null values",
+        fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+    assertThat("Default accumulator coder should handle null values",
+        fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
+  }
+
+  static class LatestAggregatorsFn<T> extends DoFn<T, T> {
+    private final T specialValue;
+    LatestAggregatorsFn(T specialValue) {
+      this.specialValue = specialValue;
+    }
+
+    Aggregator<TimestampedValue<T>, T> allValuesAgg =
+        createAggregator("allValues", new Latest.LatestFn<T>());
+
+    Aggregator<TimestampedValue<T>, T> specialValueAgg =
+        createAggregator("oneValue", new Latest.LatestFn<T>());
+
+    Aggregator<TimestampedValue<T>, T> noValuesAgg =
+        createAggregator("noValues", new Latest.LatestFn<T>());
+
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
+      allValuesAgg.addValue(val);
+      if (Objects.equals(c.element(), specialValue)) {
+        specialValueAgg.addValue(val);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/52e43ac7/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
deleted file mode 100644
index 459a966..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/LatestFnTests.java
+++ /dev/null
@@ -1,233 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.transforms;
-
-import static org.hamcrest.MatcherAssert.assertThat;
-import static org.hamcrest.Matchers.instanceOf;
-import static org.hamcrest.Matchers.isOneOf;
-import static org.hamcrest.Matchers.nullValue;
-import static org.junit.Assert.assertEquals;
-
-import com.google.common.collect.Lists;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Objects;
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.NullableCoder;
-import org.apache.beam.sdk.coders.VarLongCoder;
-import org.apache.beam.sdk.values.TimestampedValue;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Unit tests for {@link Latest.LatestFn}.
- * */
-@RunWith(JUnit4.class)
-public class LatestFnTests {
-  private static final Instant INSTANT = new Instant(100);
-  private static final long VALUE = 100 * INSTANT.getMillis();
-
-  private static final TimestampedValue<Long> TV = TimestampedValue.of(VALUE, INSTANT);
-  private static final TimestampedValue<Long> TV_MINUS_TEN =
-      TimestampedValue.of(VALUE - 10, INSTANT.minus(10));
-  private static final TimestampedValue<Long> TV_PLUS_TEN =
-      TimestampedValue.of(VALUE + 10, INSTANT.plus(10));
-
-  @Rule
-  public final ExpectedException thrown = ExpectedException.none();
-
-  private final Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
-  private final Instant baseTimestamp = Instant.now();
-
-  @Test
-  public void testDefaultValue() {
-    assertThat(fn.defaultValue(), nullValue());
-  }
-
-  @Test
-  public void testCreateAccumulator() {
-    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.createAccumulator());
-  }
-
-  @Test
-  public void testAddInputInitialAdd() {
-    TimestampedValue<Long> input = TV;
-    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
-  }
-
-  @Test
-  public void testAddInputMinTimestamp() {
-    TimestampedValue<Long> input = TimestampedValue.atMinimumTimestamp(1234L);
-    assertEquals(input, fn.addInput(fn.createAccumulator(), input));
-  }
-
-  @Test
-  public void testAddInputEarlierValue() {
-    assertEquals(TV, fn.addInput(TV, TV_MINUS_TEN));
-  }
-
-  @Test
-  public void testAddInputLaterValue() {
-    assertEquals(TV_PLUS_TEN, fn.addInput(TV, TV_PLUS_TEN));
-  }
-
-  @Test
-  public void testAddInputSameTimestamp() {
-    TimestampedValue<Long> accum = TimestampedValue.of(100L, INSTANT);
-    TimestampedValue<Long> input = TimestampedValue.of(200L, INSTANT);
-
-    assertThat("Latest for values with the same timestamp is chosen arbitrarily",
-        fn.addInput(accum, input), isOneOf(accum, input));
-  }
-
-  @Test
-  public void testAddInputNullAccumulator() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("accumulator");
-    fn.addInput(null, TV);
-  }
-
-  @Test
-  public void testAddInputNullInput() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("input");
-    fn.addInput(TV, null);
-  }
-
-  @Test
-  public void testAddInputNullValue() {
-    TimestampedValue<Long> input = TimestampedValue.of(null, INSTANT.plus(10));
-    assertEquals("Null values are allowed", input, fn.addInput(TV, input));
-  }
-
-  @Test
-  public void testMergeAccumulatorsMultipleValues() {
-    Iterable<TimestampedValue<Long>> accums = Lists.newArrayList(
-        TV,
-        TV_PLUS_TEN,
-        TV_MINUS_TEN
-    );
-
-    assertEquals(TV_PLUS_TEN, fn.mergeAccumulators(accums));
-  }
-
-  @Test
-  public void testMergeAccumulatorsSingleValue() {
-    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsEmptyIterable() {
-    ArrayList<TimestampedValue<Long>> emptyAccums = Lists.newArrayList();
-    assertEquals(TimestampedValue.atMinimumTimestamp(null), fn.mergeAccumulators(emptyAccums));
-  }
-
-  @Test
-  public void testMergeAccumulatorsDefaultAccumulator() {
-    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
-    assertEquals(TV, fn.mergeAccumulators(Lists.newArrayList(TV, defaultAccum)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsAllDefaultAccumulators() {
-    TimestampedValue<Long> defaultAccum = fn.createAccumulator();
-    assertEquals(defaultAccum, fn.mergeAccumulators(
-        Lists.newArrayList(defaultAccum, defaultAccum)));
-  }
-
-  @Test
-  public void testMergeAccumulatorsNullIterable() {
-    thrown.expect(NullPointerException.class);
-    thrown.expectMessage("accumulators");
-    fn.mergeAccumulators(null);
-  }
-
-  @Test
-  public void testExtractOutput() {
-    assertEquals(TV.getValue(), fn.extractOutput(TV));
-  }
-
-  @Test
-  public void testExtractOutputDefaultAggregator() {
-    TimestampedValue<Long> accum = fn.createAccumulator();
-    assertThat(fn.extractOutput(accum), nullValue());
-  }
-
-  @Test
-  public void testExtractOutputNullValue() {
-    TimestampedValue<Long> accum = TimestampedValue.of(null, baseTimestamp);
-    assertEquals(null, fn.extractOutput(accum));
-  }
-
-  @Test
-  public void testAggregator() throws Exception {
-    LatestAggregatorsFn<Long> doFn = new LatestAggregatorsFn<>(TV_MINUS_TEN.getValue());
-    DoFnTester<Long, Long> harness = DoFnTester.of(doFn);
-    for (TimestampedValue<Long> element : Arrays.asList(TV, TV_PLUS_TEN, TV_MINUS_TEN))
{
-      harness.processTimestampedElement(element);
-    }
-
-    assertEquals(TV_PLUS_TEN.getValue(), harness.getAggregatorValue(doFn.allValuesAgg));
-    assertEquals(TV_MINUS_TEN.getValue(), harness.getAggregatorValue(doFn.specialValueAgg));
-    assertThat(harness.getAggregatorValue(doFn.noValuesAgg), nullValue());
-  }
-
-  @Test
-  public void testDefaultCoderHandlesNull() throws CannotProvideCoderException {
-    Latest.LatestFn<Long> fn = new Latest.LatestFn<>();
-
-    CoderRegistry registry = new CoderRegistry();
-    TimestampedValue.TimestampedValueCoder<Long> inputCoder =
-        TimestampedValue.TimestampedValueCoder.of(VarLongCoder.of());
-
-    assertThat("Default output coder should handle null values",
-        fn.getDefaultOutputCoder(registry, inputCoder), instanceOf(NullableCoder.class));
-    assertThat("Default accumulator coder should handle null values",
-        fn.getAccumulatorCoder(registry, inputCoder), instanceOf(NullableCoder.class));
-  }
-
-  static class LatestAggregatorsFn<T> extends DoFn<T, T> {
-    private final T specialValue;
-    LatestAggregatorsFn(T specialValue) {
-      this.specialValue = specialValue;
-    }
-
-    Aggregator<TimestampedValue<T>, T> allValuesAgg =
-        createAggregator("allValues", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> specialValueAgg =
-        createAggregator("oneValue", new Latest.LatestFn<T>());
-
-    Aggregator<TimestampedValue<T>, T> noValuesAgg =
-        createAggregator("noValues", new Latest.LatestFn<T>());
-
-    @ProcessElement
-    public void processElement(ProcessContext c) {
-      TimestampedValue<T> val = TimestampedValue.of(c.element(), c.timestamp());
-      allValuesAgg.addValue(val);
-      if (Objects.equals(c.element(), specialValue)) {
-        specialValueAgg.addValue(val);
-      }
-    }
-  }
-}


Mime
View raw message