beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From k...@apache.org
Subject [1/2] incubator-beam git commit: Add WindowIntoEvaluatorFactory
Date Fri, 08 Apr 2016 18:31:29 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/master 6348a1fe2 -> 5f24cef20


Add WindowIntoEvaluatorFactory

This is a TransformEvaluator-based implementation of the Window#into
primitve, as opposed to the DoFn ProcessContext internals-based
implementation.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64a8fb7d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64a8fb7d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64a8fb7d

Branch: refs/heads/master
Commit: 64a8fb7d2f8b021385959a990c00395b3b0af048
Parents: 6348a1f
Author: Thomas Groh <tgroh@google.com>
Authored: Fri Apr 8 09:29:31 2016 -0700
Committer: Thomas Groh <tgroh@google.com>
Committed: Fri Apr 8 11:15:12 2016 -0700

----------------------------------------------------------------------
 .../PassthroughTransformEvaluator.java          |  49 ++++
 .../inprocess/TransformEvaluatorRegistry.java   |   2 +
 .../inprocess/WindowEvaluatorFactory.java       | 130 +++++++++++
 .../inprocess/WindowEvaluatorFactoryTest.java   | 221 +++++++++++++++++++
 4 files changed, 402 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64a8fb7d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PassthroughTransformEvaluator.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PassthroughTransformEvaluator.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PassthroughTransformEvaluator.java
new file mode 100644
index 0000000..50d0ed9
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/PassthroughTransformEvaluator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+
+class PassthroughTransformEvaluator<InputT> implements TransformEvaluator<InputT>
{
+  public static <InputT> PassthroughTransformEvaluator<InputT> create(
+      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output)
{
+    return new PassthroughTransformEvaluator<>(transform, output);
+  }
+
+  private final AppliedPTransform<?, ?, ?> transform;
+  private final UncommittedBundle<InputT> output;
+
+  private PassthroughTransformEvaluator(
+      AppliedPTransform<?, ?, ?> transform, UncommittedBundle<InputT> output)
{
+    this.transform = transform;
+    this.output = output;
+  }
+
+  @Override
+  public void processElement(WindowedValue<InputT> element) throws Exception {
+    output.add(element);
+  }
+
+  @Override
+  public InProcessTransformResult finishBundle() throws Exception {
+    return StepTransformResult.withoutHold(transform).addOutput(output).build();
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64a8fb7d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
index f098c2a..113bad8 100644
--- a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/TransformEvaluatorRegistry.java
@@ -23,6 +23,7 @@ import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
 import com.google.cloud.dataflow.sdk.transforms.Flatten.FlattenPCollectionList;
 import com.google.cloud.dataflow.sdk.transforms.PTransform;
 import com.google.cloud.dataflow.sdk.transforms.ParDo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
 import com.google.common.collect.ImmutableMap;
 
 import java.util.Map;
@@ -47,6 +48,7 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory {
                 new GroupByKeyEvaluatorFactory())
             .put(FlattenPCollectionList.class, new FlattenEvaluatorFactory())
             .put(ViewEvaluatorFactory.WriteView.class, new ViewEvaluatorFactory())
+            .put(Window.Bound.class, new WindowEvaluatorFactory())
             .build();
     return new TransformEvaluatorRegistry(primitives);
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64a8fb7d/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
new file mode 100644
index 0000000..0bdfac9
--- /dev/null
+++ b/sdks/java/core/src/main/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactory.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.PTransform;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window.Bound;
+import com.google.cloud.dataflow.sdk.transforms.windowing.WindowFn;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+
+import org.joda.time.Instant;
+
+import java.util.Collection;
+
+import javax.annotation.Nullable;
+
+/**
+ * The {@link InProcessPipelineRunner} {@link TransformEvaluatorFactory} for the
+ * {@link Bound Window.Bound} primitive {@link PTransform}.
+ */
+class WindowEvaluatorFactory implements TransformEvaluatorFactory {
+
+  @Override
+  public <InputT> TransformEvaluator<InputT> forApplication(
+      AppliedPTransform<?, ?, ?> application,
+      @Nullable CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext)
+      throws Exception {
+    return createTransformEvaluator(
+        (AppliedPTransform) application, inputBundle, evaluationContext);
+  }
+
+  private <InputT> TransformEvaluator<InputT> createTransformEvaluator(
+      AppliedPTransform<PCollection<InputT>, PCollection<InputT>, Window.Bound<InputT>>
transform,
+      CommittedBundle<?> inputBundle,
+      InProcessEvaluationContext evaluationContext) {
+    WindowFn<? super InputT, ?> fn = transform.getTransform().getWindowFn();
+    UncommittedBundle<InputT> outputBundle =
+        evaluationContext.createBundle(inputBundle, transform.getOutput());
+    if (fn == null) {
+      return PassthroughTransformEvaluator.create(transform, outputBundle);
+    }
+    return new WindowIntoEvaluator<>(fn, evaluationContext, outputBundle);
+  }
+
+  private static class WindowIntoEvaluator<InputT> implements TransformEvaluator<InputT>
{
+    private final WindowFn<InputT, ?> windowFn;
+    private final InProcessEvaluationContext context;
+    private final UncommittedBundle<InputT> outputBundle;
+
+    @SuppressWarnings("unchecked")
+    public WindowIntoEvaluator(
+        WindowFn<? super InputT, ?> windowFn,
+        InProcessEvaluationContext context,
+        UncommittedBundle<InputT> outputBundle) {
+      // Safe contravariant cast
+      this.windowFn = (WindowFn<InputT, ?>) windowFn;
+      this.context = context;
+      this.outputBundle = outputBundle;
+    }
+
+    @Override
+    public void processElement(WindowedValue<InputT> element) throws Exception {
+      Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element);
+      outputBundle.add(
+          WindowedValue.<InputT>of(
+              element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING));
+    }
+
+    private <W extends BoundedWindow> Collection<? extends BoundedWindow> assignWindows(
+        WindowFn<InputT, W> windowFn, WindowedValue<InputT> element) throws Exception
{
+      WindowFn<InputT, W>.AssignContext assignContext =
+          new InProcessAssignContext<>(windowFn, element);
+      Collection<? extends BoundedWindow> windows = windowFn.assignWindows(assignContext);
+      return windows;
+    }
+
+    @Override
+    public InProcessTransformResult finishBundle() throws Exception {
+      return StepTransformResult.withoutHold(null).addOutput(outputBundle).build();
+    }
+  }
+
+  private static class InProcessAssignContext<InputT, W extends BoundedWindow>
+      extends WindowFn<InputT, W>.AssignContext {
+    private final WindowedValue<InputT> value;
+
+    public InProcessAssignContext(WindowFn<InputT, W> fn, WindowedValue<InputT>
value) {
+      fn.super();
+      this.value = value;
+    }
+
+    @Override
+    public InputT element() {
+      return value.getValue();
+    }
+
+    @Override
+    public Instant timestamp() {
+      return value.getTimestamp();
+    }
+
+    @Override
+    public Collection<? extends BoundedWindow> windows() {
+      return value.getWindows();
+    }
+
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64a8fb7d/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
new file mode 100644
index 0000000..2b57f33
--- /dev/null
+++ b/sdks/java/core/src/test/java/com/google/cloud/dataflow/sdk/runners/inprocess/WindowEvaluatorFactoryTest.java
@@ -0,0 +1,221 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.dataflow.sdk.runners.inprocess;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.when;
+
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle;
+import com.google.cloud.dataflow.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle;
+import com.google.cloud.dataflow.sdk.testing.TestPipeline;
+import com.google.cloud.dataflow.sdk.transforms.AppliedPTransform;
+import com.google.cloud.dataflow.sdk.transforms.Create;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterPane;
+import com.google.cloud.dataflow.sdk.transforms.windowing.AfterWatermark;
+import com.google.cloud.dataflow.sdk.transforms.windowing.BoundedWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.FixedWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.IntervalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.transforms.windowing.SlidingWindows;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window;
+import com.google.cloud.dataflow.sdk.transforms.windowing.Window.Bound;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import com.google.cloud.dataflow.sdk.values.PCollection;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
+
+import org.hamcrest.Matchers;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+/**
+ * Tests for {@link WindowEvaluatorFactory}.
+ */
+@RunWith(JUnit4.class)
+public class WindowEvaluatorFactoryTest {
+  private static final Instant EPOCH = new Instant(0);
+
+  private PCollection<Long> input;
+  private WindowEvaluatorFactory factory;
+
+  @Mock private InProcessEvaluationContext evaluationContext;
+
+  private BundleFactory bundleFactory;
+
+  private WindowedValue<Long> first =
+      WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
+  private WindowedValue<Long> second =
+      WindowedValue.timestampedValueInGlobalWindow(
+          Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3)));
+  private WindowedValue<Long> third =
+      WindowedValue.of(
+          Long.valueOf(2L),
+          new Instant(-10L),
+          new IntervalWindow(new Instant(-100), EPOCH),
+          PaneInfo.NO_FIRING);
+
+  @Before
+  public void setup() {
+    MockitoAnnotations.initMocks(this);
+    TestPipeline p = TestPipeline.create();
+    input = p.apply(Create.of(1L, 2L, 3L));
+
+    bundleFactory = InProcessBundleFactory.create();
+    factory = new WindowEvaluatorFactory();
+  }
+
+  @Test
+  public void nullWindowFunSucceeds() throws Exception {
+    Bound<Long> transform =
+        Window.<Long>triggering(
+                AfterWatermark.pastEndOfWindow().withEarlyFirings(AfterPane.elementCountAtLeast(1)))
+            .accumulatingFiredPanes();
+    PCollection<Long> triggering = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+
+    UncommittedBundle<Long> outputBundle = createOutputBundle(triggering, inputBundle);
+
+    InProcessTransformResult result = runEvaluator(triggering, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+    assertThat(committed.getElements(), containsInAnyOrder(third, first, second));
+  }
+
+  @Test
+  public void singleWindowFnSucceeds() throws Exception {
+    Duration windowDuration = Duration.standardDays(7);
+    Bound<Long> transform = Window.<Long>into(FixedWindows.of(windowDuration));
+    PCollection<Long> windowed = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+    BoundedWindow firstSecondWindow = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+    BoundedWindow thirdWindow = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+
+    InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+    WindowedValue<Long> expectedNewFirst =
+        WindowedValue.of(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedNewSecond =
+        WindowedValue.of(
+            1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedNewThird =
+        WindowedValue.of(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING);
+    assertThat(
+        committed.getElements(),
+        containsInAnyOrder(expectedNewFirst, expectedNewSecond, expectedNewThird));
+  }
+
+  @Test
+  public void multipleWindowsWindowFnSucceeds() throws Exception {
+    Duration windowDuration = Duration.standardDays(6);
+    Duration slidingBy = Duration.standardDays(3);
+    Bound<Long> transform = Window.into(SlidingWindows.of(windowDuration).every(slidingBy));
+    PCollection<Long> windowed = input.apply(transform);
+
+    CommittedBundle<Long> inputBundle = createInputBundle();
+    UncommittedBundle<Long> outputBundle = createOutputBundle(windowed, inputBundle);
+
+    InProcessTransformResult result = runEvaluator(windowed, inputBundle, transform);
+
+    assertThat(
+        Iterables.getOnlyElement(result.getOutputBundles()),
+        Matchers.<UncommittedBundle<?>>equalTo(outputBundle));
+    CommittedBundle<Long> committed = outputBundle.commit(Instant.now());
+
+    BoundedWindow w1 = new IntervalWindow(EPOCH, EPOCH.plus(windowDuration));
+    BoundedWindow w2 =
+        new IntervalWindow(EPOCH.plus(slidingBy), EPOCH.plus(slidingBy).plus(windowDuration));
+    BoundedWindow wMinus1 = new IntervalWindow(EPOCH.minus(windowDuration), EPOCH);
+    BoundedWindow wMinusSlide =
+        new IntervalWindow(EPOCH.minus(windowDuration).plus(slidingBy), EPOCH.plus(slidingBy));
+
+    WindowedValue<Long> expectedFirst =
+        WindowedValue.of(
+            first.getValue(),
+            first.getTimestamp(),
+            ImmutableSet.of(w1, wMinusSlide),
+            PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedSecond =
+        WindowedValue.of(
+            second.getValue(), second.getTimestamp(), ImmutableSet.of(w1, w2), PaneInfo.NO_FIRING);
+    WindowedValue<Long> expectedThird =
+        WindowedValue.of(
+            third.getValue(),
+            third.getTimestamp(),
+            ImmutableSet.of(wMinus1, wMinusSlide),
+            PaneInfo.NO_FIRING);
+
+    assertThat(
+        committed.getElements(), containsInAnyOrder(expectedFirst, expectedSecond, expectedThird));
+  }
+
+  private CommittedBundle<Long> createInputBundle() {
+    CommittedBundle<Long> inputBundle =
+        bundleFactory
+            .createRootBundle(input)
+            .add(first)
+            .add(second)
+            .add(third)
+            .commit(Instant.now());
+    return inputBundle;
+  }
+
+  private UncommittedBundle<Long> createOutputBundle(
+      PCollection<Long> output, CommittedBundle<Long> inputBundle) {
+    UncommittedBundle<Long> outputBundle = bundleFactory.createBundle(inputBundle,
output);
+    when(evaluationContext.createBundle(inputBundle, output)).thenReturn(outputBundle);
+    return outputBundle;
+  }
+
+  private InProcessTransformResult runEvaluator(
+      PCollection<Long> windowed,
+      CommittedBundle<Long> inputBundle,
+      Window.Bound<Long> windowTransform /* Required while Window.Bound is a composite
*/)
+      throws Exception {
+    TransformEvaluator<Long> evaluator =
+        factory.forApplication(
+            AppliedPTransform.of("Window", input, windowed, windowTransform),
+            inputBundle,
+            evaluationContext);
+
+    evaluator.processElement(first);
+    evaluator.processElement(second);
+    evaluator.processElement(third);
+    InProcessTransformResult result = evaluator.finishBundle();
+    return result;
+  }
+}


Mime
View raw message