beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [2/3] incubator-beam git commit: Move CopyOnAccessStateInternals to runners/direct
Date Thu, 08 Dec 2016 04:21:37 GMT
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/09e2f309/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
deleted file mode 100644
index ad70bca..0000000
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java
+++ /dev/null
@@ -1,552 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.sdk.util.state;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
-import static org.hamcrest.Matchers.is;
-import static org.hamcrest.Matchers.nullValue;
-import static org.hamcrest.Matchers.theInstance;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.spy;
-import static org.mockito.Mockito.verify;
-
-import org.apache.beam.sdk.coders.CannotProvideCoderException;
-import org.apache.beam.sdk.coders.CoderRegistry;
-import org.apache.beam.sdk.coders.StringUtf8Coder;
-import org.apache.beam.sdk.coders.VarIntCoder;
-import org.apache.beam.sdk.testing.TestPipeline;
-import org.apache.beam.sdk.transforms.Combine.CombineFn;
-import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn;
-import org.apache.beam.sdk.transforms.Sum;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFn;
-import org.apache.beam.sdk.transforms.windowing.OutputTimeFns;
-import org.joda.time.Instant;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.JUnit4;
-
-/**
- * Tests for {@link CopyOnAccessInMemoryStateInternals}.
- */
-@RunWith(JUnit4.class)
-public class CopyOnAccessInMemoryStateInternalsTest {
-  @Rule public ExpectedException thrown = ExpectedException.none();
-  private String key = "foo";
-  @Test
-  public void testGetWithEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = internals.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-    assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
-    BagState<String> reReadStringBag = internals.state(namespace, bagTag);
-    assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
-  }
-
-  @Test
-  public void testGetWithAbsentInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = internals.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-    assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
-    BagState<String> reReadVoidBag = internals.state(namespace, bagTag);
-    assertThat(reReadVoidBag.read(), containsInAnyOrder("baz", "bar"));
-
-    BagState<String> underlyingState = underlying.state(namespace, bagTag);
-    assertThat(underlyingState.read(), emptyIterable());
-  }
-
-  /**
-   * Tests that retrieving state with an underlying StateInternals with an existing value
returns
-   * a value that initially has equal value to the provided state but can be modified without
-   * modifying the existing state.
-   */
-  @Test
-  public void testGetWithPresentInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, ValueState<String>> valueTag = StateTags.value("foo", StringUtf8Coder.of());
-    ValueState<String> underlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.read(), nullValue(String.class));
-
-    underlyingValue.write("bar");
-    assertThat(underlyingValue.read(), equalTo("bar"));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    ValueState<String> copyOnAccessState = internals.state(namespace, valueTag);
-    assertThat(copyOnAccessState.read(), equalTo("bar"));
-
-    copyOnAccessState.write("baz");
-    assertThat(copyOnAccessState.read(), equalTo("baz"));
-    assertThat(underlyingValue.read(), equalTo("bar"));
-
-    ValueState<String> reReadUnderlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
-  public void testBagStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<Integer>> valueTag = StateTags.bag("foo", VarIntCoder.of());
-    BagState<Integer> underlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.read(), emptyIterable());
-
-    underlyingValue.add(1);
-    assertThat(underlyingValue.read(), containsInAnyOrder(1));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    BagState<Integer> copyOnAccessState = internals.state(namespace, valueTag);
-    assertThat(copyOnAccessState.read(), containsInAnyOrder(1));
-
-    copyOnAccessState.add(4);
-    assertThat(copyOnAccessState.read(), containsInAnyOrder(4, 1));
-    assertThat(underlyingValue.read(), containsInAnyOrder(1));
-
-    BagState<Integer> reReadUnderlyingValue = underlying.state(namespace, valueTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
-  public void testAccumulatorCombiningStateWithUnderlying() throws CannotProvideCoderException
{
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CombineFn<Long, long[], Long> sumLongFn = new Sum.SumLongFn();
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    CoderRegistry reg = TestPipeline.create().getCoderRegistry();
-    StateTag<Object, AccumulatorCombiningState<Long, long[], Long>> stateTag
=
-        StateTags.combiningValue("summer",
-            sumLongFn.getAccumulatorCoder(reg, reg.getDefaultCoder(Long.class)), sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), equalTo(0L));
-
-    underlyingValue.add(1L);
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
-    assertThat(copyOnAccessState.read(), equalTo(1L));
-
-    copyOnAccessState.add(4L);
-    assertThat(copyOnAccessState.read(), equalTo(5L));
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace,
stateTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
-  public void testKeyedAccumulatorCombiningStateWithUnderlying() throws Exception {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    KeyedCombineFn<String, Long, long[], Long> sumLongFn = new Sum.SumLongFn().asKeyedFn();
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    CoderRegistry reg = TestPipeline.create().getCoderRegistry();
-    StateTag<String, AccumulatorCombiningState<Long, long[], Long>> stateTag
=
-        StateTags.keyedCombiningValue(
-            "summer",
-            sumLongFn.getAccumulatorCoder(
-                reg, StringUtf8Coder.of(), reg.getDefaultCoder(Long.class)),
-            sumLongFn);
-    CombiningState<Long, Long> underlyingValue = underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), equalTo(0L));
-
-    underlyingValue.add(1L);
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    CombiningState<Long, Long> copyOnAccessState = internals.state(namespace, stateTag);
-    assertThat(copyOnAccessState.read(), equalTo(1L));
-
-    copyOnAccessState.add(4L);
-    assertThat(copyOnAccessState.read(), equalTo(5L));
-    assertThat(underlyingValue.read(), equalTo(1L));
-
-    CombiningState<Long, Long> reReadUnderlyingValue = underlying.state(namespace,
stateTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
-  public void testWatermarkHoldStateWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    OutputTimeFn<BoundedWindow> outputTimeFn =
-        OutputTimeFns.outputAtEarliestInputTimestamp();
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =
-        StateTags.watermarkStateInternal("wmstate", outputTimeFn);
-    WatermarkHoldState<?> underlyingValue = underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), nullValue());
-
-    underlyingValue.add(new Instant(250L));
-    assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-    WatermarkHoldState<BoundedWindow> copyOnAccessState = internals.state(namespace,
stateTag);
-    assertThat(copyOnAccessState.read(), equalTo(new Instant(250L)));
-
-    copyOnAccessState.add(new Instant(100L));
-    assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
-    assertThat(underlyingValue.read(), equalTo(new Instant(250L)));
-
-    copyOnAccessState.add(new Instant(500L));
-    assertThat(copyOnAccessState.read(), equalTo(new Instant(100L)));
-
-    WatermarkHoldState<BoundedWindow> reReadUnderlyingValue =
-        underlying.state(namespace, stateTag);
-    assertThat(underlyingValue.read(), equalTo(reReadUnderlyingValue.read()));
-  }
-
-  @Test
-  public void testCommitWithoutUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = internals.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-    assertThat(stringBag.read(), containsInAnyOrder("baz", "bar"));
-
-    internals.commit();
-
-    BagState<String> reReadStringBag = internals.state(namespace, bagTag);
-    assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
-    assertThat(internals.isEmpty(), is(false));
-  }
-
-  @Test
-  public void testCommitWithUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = underlying.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-
-    internals.commit();
-    BagState<String> reReadStringBag = internals.state(namespace, bagTag);
-    assertThat(reReadStringBag.read(), containsInAnyOrder("baz", "bar"));
-
-    reReadStringBag.add("spam");
-
-    BagState<String> underlyingState = underlying.state(namespace, bagTag);
-    assertThat(underlyingState.read(), containsInAnyOrder("spam", "bar", "baz"));
-    assertThat(underlyingState, is(theInstance(stringBag)));
-    assertThat(internals.isEmpty(), is(false));
-  }
-
-  @Test
-  public void testCommitWithClearedInUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> secondUnderlying =
-        spy(CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying));
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, secondUnderlying);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = underlying.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-    stringBag.clear();
-    // We should not read through the cleared bag
-    secondUnderlying.commit();
-
-    // Should not be visible
-    stringBag.add("foo");
-
-    internals.commit();
-    BagState<String> internalsStringBag = internals.state(namespace, bagTag);
-    assertThat(internalsStringBag.read(), emptyIterable());
-    verify(secondUnderlying, never()).state(namespace, bagTag);
-    assertThat(internals.isEmpty(), is(false));
-  }
-
-  @Test
-  public void testCommitWithOverwrittenUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = underlying.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-
-    BagState<String> internalsState = internals.state(namespace, bagTag);
-    internalsState.add("eggs");
-    internalsState.add("ham");
-    internalsState.add("0x00ff00");
-    internalsState.add("&");
-
-    internals.commit();
-
-    BagState<String> reReadInternalState = internals.state(namespace, bagTag);
-    assertThat(
-        reReadInternalState.read(),
-        containsInAnyOrder("bar", "baz", "0x00ff00", "eggs", "&", "ham"));
-    BagState<String> reReadUnderlyingState = underlying.state(namespace, bagTag);
-    assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
-  }
-
-  @Test
-  public void testCommitWithAddedUnderlying() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
-    internals.commit();
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = underlying.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-
-    BagState<String> internalState = internals.state(namespace, bagTag);
-    assertThat(internalState.read(), emptyIterable());
-
-    BagState<String> reReadUnderlyingState = underlying.state(namespace, bagTag);
-    assertThat(reReadUnderlyingState.read(), containsInAnyOrder("bar", "baz"));
-  }
-
-  @Test
-  public void testCommitWithEmptyTableIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    internals.commit();
-
-    assertThat(internals.isEmpty(), is(true));
-  }
-
-  @Test
-  public void testCommitWithOnlyClearedValuesIsEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = internals.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("foo");
-    stringBag.clear();
-
-    internals.commit();
-
-    assertThat(internals.isEmpty(), is(true));
-  }
-
-  @Test
-  public void testCommitWithEmptyNewAndFullUnderlyingIsNotEmpty() {
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, underlying);
-
-    StateNamespace namespace = new StateNamespaceForTest("foo");
-    StateTag<Object, BagState<String>> bagTag = StateTags.bag("foo", StringUtf8Coder.of());
-    BagState<String> stringBag = underlying.state(namespace, bagTag);
-    assertThat(stringBag.read(), emptyIterable());
-
-    stringBag.add("bar");
-    stringBag.add("baz");
-
-    internals.commit();
-    assertThat(internals.isEmpty(), is(false));
-  }
-
-  @Test
-  public void testGetEarliestWatermarkHoldAfterCommit() {
-    BoundedWindow first = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(2048L);
-      }
-    };
-    BoundedWindow second = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(689743L);
-      }
-    };
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
-        internals.state(StateNamespaces.window(null, first), firstHoldAddress);
-    firstHold.add(new Instant(22L));
-
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
-        internals.state(StateNamespaces.window(null, second), secondHoldAddress);
-    secondHold.add(new Instant(2L));
-
-    internals.commit();
-    assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(2L)));
-  }
-
-  @Test
-  public void testGetEarliestWatermarkHoldWithEarliestInUnderlyingTable() {
-    BoundedWindow first = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(2048L);
-      }
-    };
-    BoundedWindow second = new BoundedWindow() {
-      @Override
-      public Instant maxTimestamp() {
-        return new Instant(689743L);
-      }
-    };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
-        underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
-    firstHold.add(new Instant(22L));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
-
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
-        internals.state(StateNamespaces.window(null, second), secondHoldAddress);
-    secondHold.add(new Instant(244L));
-
-    internals.commit();
-    assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(22L)));
-  }
-
-  @Test
-  public void testGetEarliestWatermarkHoldWithEarliestInNewTable() {
-    BoundedWindow first =
-        new BoundedWindow() {
-          @Override
-          public Instant maxTimestamp() {
-            return new Instant(2048L);
-          }
-        };
-    BoundedWindow second =
-        new BoundedWindow() {
-          @Override
-          public Instant maxTimestamp() {
-            return new Instant(689743L);
-          }
-        };
-    CopyOnAccessInMemoryStateInternals<String> underlying =
-        CopyOnAccessInMemoryStateInternals.withUnderlying("foo", null);
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> firstHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> firstHold =
-        underlying.state(StateNamespaces.window(null, first), firstHoldAddress);
-    firstHold.add(new Instant(224L));
-
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying("foo", underlying.commit());
-
-    StateTag<Object, WatermarkHoldState<BoundedWindow>> secondHoldAddress =
-        StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp());
-    WatermarkHoldState<BoundedWindow> secondHold =
-        internals.state(StateNamespaces.window(null, second), secondHoldAddress);
-    secondHold.add(new Instant(24L));
-
-    internals.commit();
-    assertThat(internals.getEarliestWatermarkHold(), equalTo(new Instant(24L)));
-  }
-
-  @Test
-  public void testGetEarliestHoldBeforeCommit() {
-    CopyOnAccessInMemoryStateInternals<String> internals =
-        CopyOnAccessInMemoryStateInternals.withUnderlying(key, null);
-
-    internals
-        .state(
-            StateNamespaces.global(),
-            StateTags.watermarkStateInternal("foo", OutputTimeFns.outputAtEarliestInputTimestamp()))
-        .add(new Instant(1234L));
-
-    thrown.expect(IllegalStateException.class);
-    thrown.expectMessage(CopyOnAccessInMemoryStateInternals.class.getSimpleName());
-    thrown.expectMessage("Can't get the earliest watermark hold");
-    thrown.expectMessage("before it is committed");
-
-    internals.getEarliestWatermarkHold();
-  }
-}


Mime
View raw message