beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From reuven...@apache.org
Subject [beam] branch master updated: Merge pull request #9612: [BEAM-6855] Side inputs are not supported when using the state API
Date Fri, 04 Oct 2019 10:43:40 GMT
This is an automated email from the ASF dual-hosted git repository.

reuvenlax pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 6cc1c1f  Merge pull request #9612: [BEAM-6855] Side inputs are not supported when
using the state API
6cc1c1f is described below

commit 6cc1c1f905ac9b0b74a2eefcc8be7fb24ccb98bb
Author: Salman Raza <50444219+salmanVD@users.noreply.github.com>
AuthorDate: Fri Oct 4 15:43:03 2019 +0500

    Merge pull request #9612: [BEAM-6855] Side inputs are not supported when using the state
API
---
 .../SimplePushbackSideInputDoFnRunnerTest.java     | 220 +++++++++++++++++++++
 .../org/apache/beam/sdk/transforms/ParDoTest.java  |  58 ++++++
 2 files changed, 278 insertions(+)

diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
index c6fd952..28b387e 100644
--- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
+++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SimplePushbackSideInputDoFnRunnerTest.java
@@ -17,36 +17,56 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.when;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.metrics.MetricsContainerImpl;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.metrics.MetricName;
+import org.apache.beam.sdk.metrics.MetricsEnvironment;
+import org.apache.beam.sdk.state.StateSpec;
+import org.apache.beam.sdk.state.StateSpecs;
 import org.apache.beam.sdk.state.TimeDomain;
+import org.apache.beam.sdk.state.ValueState;
 import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.ValidatesRunner;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.DoFnSchemaInformation;
 import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
 import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.util.IdentitySideInputWindowFn;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
+import org.apache.beam.sdk.values.WindowingStrategy;
+import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.MoreObjects;
 import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
+import org.joda.time.Duration;
 import org.joda.time.Instant;
 import org.junit.Before;
 import org.junit.Rule;
 import org.junit.Test;
+import org.junit.experimental.categories.Category;
 import org.junit.runner.RunWith;
 import org.junit.runners.JUnit4;
 import org.mockito.Mock;
@@ -56,9 +76,27 @@ import org.mockito.MockitoAnnotations;
 /** Tests for {@link SimplePushbackSideInputDoFnRunner}. */
 @RunWith(JUnit4.class)
 public class SimplePushbackSideInputDoFnRunnerTest {
+  @Mock StepContext mockStepContext;
   @Mock private ReadyCheckingSideInputReader reader;
   private TestDoFnRunner<Integer, Integer> underlying;
   private PCollectionView<Integer> singletonView;
+  private DoFnRunner<KV<String, Integer>, Integer> statefulRunner;
+
+  private static final long WINDOW_SIZE = 10;
+  private static final long ALLOWED_LATENESS = 1;
+
+  private static final IntervalWindow WINDOW_1 =
+      new IntervalWindow(new Instant(0), new Instant(10));
+
+  private static final IntervalWindow WINDOW_2 =
+      new IntervalWindow(new Instant(10), new Instant(20));
+
+  private static final WindowingStrategy<?, ?> WINDOWING_STRATEGY =
+      WindowingStrategy.of(FixedWindows.of(Duration.millis(WINDOW_SIZE)))
+          .withAllowedLateness(Duration.millis(ALLOWED_LATENESS));
+
+  private InMemoryStateInternals<String> stateInternals;
+  private InMemoryTimerInternals timerInternals;
 
   @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false);
 
@@ -72,6 +110,26 @@ public class SimplePushbackSideInputDoFnRunnerTest {
             .apply(Sum.integersGlobally().asSingletonView());
 
     underlying = new TestDoFnRunner<>();
+
+    DoFn<KV<String, Integer>, Integer> fn = new MyDoFn();
+
+    MockitoAnnotations.initMocks(this);
+    when(mockStepContext.timerInternals()).thenReturn(timerInternals);
+
+    stateInternals = new InMemoryStateInternals<>("hello");
+    timerInternals = new InMemoryTimerInternals();
+
+    when(mockStepContext.stateInternals()).thenReturn((StateInternals) stateInternals);
+    when(mockStepContext.timerInternals()).thenReturn(timerInternals);
+
+    statefulRunner =
+        DoFnRunners.defaultStatefulDoFnRunner(
+            fn,
+            getDoFnRunner(fn),
+            WINDOWING_STRATEGY,
+            new StatefulDoFnRunner.TimeInternalsCleanupTimer(timerInternals, WINDOWING_STRATEGY),
+            new StatefulDoFnRunner.StateInternalsStateCleaner<>(
+                fn, stateInternals, (Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder()));
   }
 
   private SimplePushbackSideInputDoFnRunner<Integer, Integer> createRunner(
@@ -276,4 +334,166 @@ public class SimplePushbackSideInputDoFnRunnerTest {
       finished = true;
     }
   }
+
+  private SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> createRunner(
+      DoFnRunner<KV<String, Integer>, Integer> doFnRunner,
+      ImmutableList<PCollectionView<?>> views) {
+    SimplePushbackSideInputDoFnRunner<KV<String, Integer>, Integer> runner =
+        SimplePushbackSideInputDoFnRunner.create(doFnRunner, views, reader);
+    runner.startBundle();
+    return runner;
+  }
+
+  @Test
+  @Category({ValidatesRunner.class})
+  public void testLateDroppingForStatefulDoFnRunner() throws Exception {
+    MetricsContainerImpl container = new MetricsContainerImpl("any");
+    MetricsEnvironment.setCurrentContainer(container);
+
+    timerInternals.advanceInputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+    timerInternals.advanceOutputWatermark(new Instant(BoundedWindow.TIMESTAMP_MAX_VALUE));
+
+    PushbackSideInputDoFnRunner runner =
+        createRunner(statefulRunner, ImmutableList.of(singletonView));
+
+    runner.startBundle();
+
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    WindowedValue<Integer> multiWindow =
+        WindowedValue.of(
+            1,
+            new Instant(0),
+            ImmutableList.of(new IntervalWindow(new Instant(0), new Instant(0L + WINDOW_SIZE))),
+            PaneInfo.ON_TIME_AND_ONLY_FIRING);
+
+    runner.processElementInReadyWindows(multiWindow);
+
+    long droppedValues =
+        container
+            .getCounter(
+                MetricName.named(
+                    StatefulDoFnRunner.class, StatefulDoFnRunner.DROPPED_DUE_TO_LATENESS_COUNTER))
+            .getCumulative();
+    assertEquals(1L, droppedValues);
+
+    runner.finishBundle();
+  }
+
+  @Test
+  @Category({ValidatesRunner.class})
+  public void testGarbageCollectForStatefulDoFnRunner() throws Exception {
+    timerInternals.advanceInputWatermark(new Instant(1L));
+
+    MyDoFn fn = new MyDoFn();
+    StateTag<ValueState<Integer>> stateTag = StateTags.tagForSpec(fn.stateId,
fn.intState);
+
+    PushbackSideInputDoFnRunner runner =
+        createRunner(statefulRunner, ImmutableList.of(singletonView));
+
+    Instant elementTime = new Instant(1);
+
+    when(reader.isReady(Mockito.eq(singletonView), Mockito.any(BoundedWindow.class)))
+        .thenReturn(true);
+
+    // first element, key is hello, WINDOW_1
+    runner.processElementInReadyWindows(
+        WindowedValue.of(KV.of("hello", 1), elementTime, WINDOW_1, PaneInfo.NO_FIRING));
+
+    assertEquals(1, (int) stateInternals.state(windowNamespace(WINDOW_1), stateTag).read());
+
+    // second element, key is hello, WINDOW_2
+    runner.processElementInReadyWindows(
+        WindowedValue.of(
+            KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));
+
+    runner.processElementInReadyWindows(
+        WindowedValue.of(
+            KV.of("hello", 1), elementTime.plus(WINDOW_SIZE), WINDOW_2, PaneInfo.NO_FIRING));
+
+    assertEquals(2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
+
+    // advance watermark past end of WINDOW_1 + allowed lateness
+    // the cleanup timer is set to window.maxTimestamp() + allowed lateness + 1
+    // to ensure that state is still available when a user timer for window.maxTimestamp()
fires
+    advanceInputWatermark(
+        timerInternals,
+        WINDOW_1
+            .maxTimestamp()
+            .plus(ALLOWED_LATENESS)
+            .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
+            .plus(1), // so the watermark is past the GC horizon, not on it
+        runner);
+
+    assertTrue(
+        stateInternals.isEmptyForTesting(
+            stateInternals.state(windowNamespace(WINDOW_1), stateTag)));
+
+    assertEquals(2, (int) stateInternals.state(windowNamespace(WINDOW_2), stateTag).read());
+
+    // advance watermark past end of WINDOW_2 + allowed lateness
+    advanceInputWatermark(
+        timerInternals,
+        WINDOW_2
+            .maxTimestamp()
+            .plus(ALLOWED_LATENESS)
+            .plus(StatefulDoFnRunner.TimeInternalsCleanupTimer.GC_DELAY_MS)
+            .plus(1), // so the watermark is past the GC horizon, not on it
+        runner);
+
+    assertTrue(
+        stateInternals.isEmptyForTesting(
+            stateInternals.state(windowNamespace(WINDOW_2), stateTag)));
+  }
+
+  private static void advanceInputWatermark(
+      InMemoryTimerInternals timerInternals,
+      Instant newInputWatermark,
+      PushbackSideInputDoFnRunner<?, ?> toTrigger)
+      throws Exception {
+    timerInternals.advanceInputWatermark(newInputWatermark);
+    TimerInternals.TimerData timer;
+    while ((timer = timerInternals.removeNextEventTimer()) != null) {
+      StateNamespace namespace = timer.getNamespace();
+      checkArgument(namespace instanceof StateNamespaces.WindowNamespace);
+      BoundedWindow window = ((StateNamespaces.WindowNamespace) namespace).getWindow();
+      toTrigger.onTimer(timer.getTimerId(), window, timer.getTimestamp(), timer.getDomain());
+    }
+  }
+
+  private static StateNamespace windowNamespace(IntervalWindow window) {
+    return StateNamespaces.window((Coder) WINDOWING_STRATEGY.getWindowFn().windowCoder(),
window);
+  }
+
+  private static class MyDoFn extends DoFn<KV<String, Integer>, Integer> {
+
+    public final String stateId = "foo";
+
+    @StateId(stateId)
+    public final StateSpec<ValueState<Integer>> intState = StateSpecs.value(VarIntCoder.of());
+
+    @ProcessElement
+    public void processElement(ProcessContext c, @StateId(stateId) ValueState<Integer>
state) {
+      Integer currentValue = MoreObjects.firstNonNull(state.read(), 0);
+      state.write(currentValue + 1);
+    }
+  }
+
+  private DoFnRunner<KV<String, Integer>, Integer> getDoFnRunner(
+      DoFn<KV<String, Integer>, Integer> fn) {
+    return new SimpleDoFnRunner<>(
+        null,
+        fn,
+        NullSideInputReader.empty(),
+        null,
+        null,
+        Collections.emptyList(),
+        mockStepContext,
+        null,
+        Collections.emptyMap(),
+        WINDOWING_STRATEGY,
+        DoFnSchemaInformation.create(),
+        Collections.emptyMap());
+  }
 }
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
index cb96855..ee7c784 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/ParDoTest.java
@@ -2294,6 +2294,64 @@ public class ParDoTest implements Serializable {
           .containsInAnyOrder(Lists.newArrayList(12, 42, 84, 97), Lists.newArrayList(0, 1,
2));
       pipeline.run();
     }
+
+    @Test
+    @Category({ValidatesRunner.class, UsesStatefulParDo.class, UsesSideInputs.class})
+    public void testStateSideInput() {
+
+      // SideInput tag id
+      final String sideInputTag1 = "tag1";
+
+      Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
+      pipeline.getCoderRegistry().registerCoderForClass(MyInteger.class, myIntegerCoder);
+      final PCollectionView<Integer> sideInput =
+          pipeline
+              .apply("CreateSideInput1", Create.of(2))
+              .apply("ViewSideInput1", View.asSingleton());
+
+      TestSimpleStatefulDoFn fn = new TestSimpleStatefulDoFn(sideInput);
+      pipeline
+          .apply(Create.of(KV.of(1, 2)))
+          .apply(ParDo.of(fn).withSideInput(sideInputTag1, sideInput));
+
+      pipeline.run();
+    }
+
+    private static class TestSimpleStatefulDoFn extends DoFn<KV<Integer, Integer>,
Integer> {
+
+      // SideInput tag id
+      final String sideInputTag1 = "tag1";
+      private final PCollectionView<Integer> view;
+
+      final String stateId = "foo";
+      Coder<MyInteger> myIntegerCoder = MyIntegerCoder.of();
+
+      @StateId(stateId)
+      private final StateSpec<BagState<MyInteger>> bufferState = StateSpecs.bag();
+
+      private TestSimpleStatefulDoFn(PCollectionView<Integer> view) {
+        this.view = view;
+      }
+
+      @ProcessElement
+      public void processElem(
+          ProcessContext c,
+          @SideInput(sideInputTag1) Integer sideInputTag,
+          @StateId(stateId) BagState<MyInteger> state) {
+        state.add(new MyInteger(sideInputTag));
+        c.output(sideInputTag);
+      }
+
+      @Override
+      public boolean equals(Object other) {
+        return other instanceof TestSimpleStatefulDoFn;
+      }
+
+      @Override
+      public int hashCode() {
+        return getClass().hashCode();
+      }
+    }
   }
 
   /** Tests for state coder inference behaviors. */


Mime
View raw message