beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Work logged] (BEAM-5149) Add support to the Java SDK harness to merge windows
Date Mon, 10 Sep 2018 18:24:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-5149?focusedWorklogId=142850&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-142850
]

ASF GitHub Bot logged work on BEAM-5149:
----------------------------------------

                Author: ASF GitHub Bot
            Created on: 10/Sep/18 18:23
            Start Date: 10/Sep/18 18:23
    Worklog Time Spent: 10m 
      Work Description: lukecwik closed pull request #6222: [BEAM-5149] Add support for the
Java SDK harness to merge windows.
URL: https://github.com/apache/beam/pull/6222
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/model/pipeline/src/main/proto/beam_runner_api.proto b/model/pipeline/src/main/proto/beam_runner_api.proto
index c6ee2656b06..0df8cc8e888 100644
--- a/model/pipeline/src/main/proto/beam_runner_api.proto
+++ b/model/pipeline/src/main/proto/beam_runner_api.proto
@@ -202,9 +202,35 @@ message StandardPTransforms {
     // Payload: TestStreamPayload
     TEST_STREAM = 5 [(beam_urn) = "urn:beam:transform:teststream:v1"];
 
-    // Represents mapping of main input window into side input window.
-    // Payload: serialized WindowMappingFn.
+    // Represents mapping of main input window onto side input window.
+    //
+    // Side input window mapping function:
+    // Input: KV<nonce, MainInputWindow>
+    // Output: KV<nonce, SideInputWindow>
+    //
+    // For each main input window, the side input window is returned. The
+    // nonce is used by a runner to associate each input with its output.
+    // The nonce is represented as an opaque set of bytes.
+    //
+    // Payload: WindowMappingFn from SideInputSpec.
     MAP_WINDOWS = 6 [(beam_urn) = "beam:transform:map_windows:v1"];
+
+    // Used to merge windows during a GroupByKey.
+    //
+    // Window merging function:
+    // Input: KV<nonce, iterable<OriginalWindow>>
+    // Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow,
iterable<ConsumedOriginalWindow>>>>
+    //
+    // For each set of original windows, a list of all unmerged windows is
+    // output alongside a map of merged window to set of consumed windows.
+    // All original windows must be contained in either the unmerged original
+    // window set or one of the consumed original window sets. Each original
+    // window can only be part of one output set. The nonce is used by a runner
+    // to associate each input with its output. The nonce is represented as an
+    // opaque set of bytes.
+    //
+    // Payload: WindowFn from WindowingStrategy.
+    MERGE_WINDOWS = 7 [(beam_urn) = "beam:transform:merge_windows:v1"];
   }
   enum DeprecatedPrimitives {
     // Represents the operation to read a Bounded or Unbounded source.
diff --git a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
index e6c36a2dd42..8bece334982 100644
--- a/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
+++ b/runners/core-construction-java/src/main/java/org/apache/beam/runners/core/construction/WindowingStrategyTranslation.java
@@ -199,6 +199,13 @@ public static TimestampCombiner timestampCombinerFromProto(RunnerApi.OutputTime.
   // This URN says that the WindowFn is just a UDF blob the Java SDK understands
   // TODO: standardize such things
   public static final String SERIALIZED_JAVA_WINDOWFN_URN = "beam:windowfn:javasdk:v0.1";
+  public static final String GLOBAL_WINDOWS_URN =
+      BeamUrns.getUrn(GlobalWindowsPayload.Enum.PROPERTIES);
+  public static final String FIXED_WINDOWS_URN =
+      BeamUrns.getUrn(FixedWindowsPayload.Enum.PROPERTIES);
+  public static final String SLIDING_WINDOWS_URN =
+      BeamUrns.getUrn(SlidingWindowsPayload.Enum.PROPERTIES);
+  public static final String SESSION_WINDOWS_URN = BeamUrns.getUrn(SessionsPayload.Enum.PROPERTIES);
 
   /**
    * Converts a {@link WindowFn} into a {@link RunnerApi.MessageWithComponents} where {@link
@@ -210,7 +217,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents
com
     if (windowFn instanceof GlobalWindows) {
       return SdkFunctionSpec.newBuilder()
           .setEnvironmentId(components.getOnlyEnvironmentId())
-          .setSpec(FunctionSpec.newBuilder().setUrn(getUrn(GlobalWindowsPayload.Enum.PROPERTIES)))
+          .setSpec(FunctionSpec.newBuilder().setUrn(GLOBAL_WINDOWS_URN))
           .build();
     } else if (windowFn instanceof FixedWindows) {
       FixedWindowsPayload fixedWindowsPayload =
@@ -222,7 +229,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents
com
           .setEnvironmentId(components.getOnlyEnvironmentId())
           .setSpec(
               FunctionSpec.newBuilder()
-                  .setUrn(getUrn(FixedWindowsPayload.Enum.PROPERTIES))
+                  .setUrn(FIXED_WINDOWS_URN)
                   .setPayload(fixedWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof SlidingWindows) {
@@ -236,7 +243,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents
com
           .setEnvironmentId(components.getOnlyEnvironmentId())
           .setSpec(
               FunctionSpec.newBuilder()
-                  .setUrn(getUrn(SlidingWindowsPayload.Enum.PROPERTIES))
+                  .setUrn(SLIDING_WINDOWS_URN)
                   .setPayload(slidingWindowsPayload.toByteString()))
           .build();
     } else if (windowFn instanceof Sessions) {
@@ -248,7 +255,7 @@ public static SdkFunctionSpec toProto(WindowFn<?, ?> windowFn, SdkComponents
com
           .setEnvironmentId(components.getOnlyEnvironmentId())
           .setSpec(
               FunctionSpec.newBuilder()
-                  .setUrn(getUrn(SessionsPayload.Enum.PROPERTIES))
+                  .setUrn(SESSION_WINDOWS_URN)
                   .setPayload(sessionsPayload.toByteString()))
           .build();
     } else {
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
index c43680b0d96..3bee525f1cd 100644
--- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMappingFnRunner.java
@@ -32,9 +32,17 @@
 import org.apache.beam.sdk.values.KV;
 
 /**
- * Maps windows using a window mapping fn. The input is {@link KV} with the key being a nonce
and
- * the value being a window, the output must be a {@link KV} with the key being the same
nonce as
- * the input and the value being the mapped window.
+ * Represents mapping of main input window onto side input window.
+ *
+ * <p>Side input window mapping function:
+ *
+ * <ul>
+ *   <li>Input: {@code KV<nonce, MainInputWindow>}
+ *   <li>Output: {@code KV<nonce, SideInputWindow>}
+ * </ul>
+ *
+ * <p>For each main input window, the side input window is returned. The nonce is used
by a runner
+ * to associate each input with its output. The nonce is represented as an opaque set of
bytes.
  */
 public class WindowMappingFnRunner {
   static final String URN = BeamUrns.getUrn(StandardPTransforms.Primitives.MAP_WINDOWS);
diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
new file mode 100644
index 00000000000..261a9794836
--- /dev/null
+++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/WindowMergingFnRunner.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import com.google.auto.service.AutoService;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.Sets;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.PTransform;
+import org.apache.beam.model.pipeline.v1.RunnerApi.StandardPTransforms;
+import org.apache.beam.runners.core.construction.BeamUrns;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowFn.MergeContext;
+import org.apache.beam.sdk.values.KV;
+
+/**
+ * Merges windows using a {@link org.apache.beam.sdk.transforms.windowing.WindowFn}.
+ *
+ * <p>Window merging function:
+ *
+ * <ul>
+ *   <li>Input: {@code KV<nonce, iterable<OriginalWindow>>}
+ *   <li>Output: {@code KV<nonce, KV<iterable<UnmergedOriginalWindow>,
iterable<KV<MergedWindow,
+ *       iterable<ConsumedOriginalWindow>>>>}
+ * </ul>
+ *
+ * <p>For each set of original windows, a list of all unmerged windows is output alongside
a map of
+ * merged window to set of consumed windows. All original windows must be contained in either
the
+ * unmerged original window set or one of the consumed original window sets. Each original
window
+ * can only be part of one output set. The nonce is used by a runner to associate each input
with
+ * its output. The nonce is represented as an opaque set of bytes.
+ */
+public abstract class WindowMergingFnRunner<T, W extends BoundedWindow> {
+  static final String URN = BeamUrns.getUrn(StandardPTransforms.Primitives.MERGE_WINDOWS);
+
+  /**
+   * A registrar which provides a factory to handle merging windows based upon the {@link
WindowFn}.
+   */
+  @AutoService(PTransformRunnerFactory.Registrar.class)
+  public static class Registrar implements PTransformRunnerFactory.Registrar {
+
+    @Override
+    public Map<String, PTransformRunnerFactory> getPTransformRunnerFactories() {
+      return ImmutableMap.of(
+          URN,
+          MapFnRunners.forValueMapFnFactory(WindowMergingFnRunner::createMapFunctionForPTransform));
+    }
+  }
+
+  static <T, W extends BoundedWindow>
+      ThrowingFunction<KV<T, Iterable<W>>, KV<T, KV<Iterable<W>,
Iterable<KV<W, Iterable<W>>>>>>
+          createMapFunctionForPTransform(String ptransformId, PTransform ptransform)
+              throws IOException {
+    RunnerApi.SdkFunctionSpec payload =
+        RunnerApi.SdkFunctionSpec.parseFrom(ptransform.getSpec().getPayload());
+
+    WindowFn<?, W> windowFn =
+        (WindowFn<?, W>) WindowingStrategyTranslation.windowFnFromProto(payload);
+    return WindowMergingFnRunner.<T, W>create(windowFn)::mergeWindows;
+  }
+
+  static <T, W extends BoundedWindow> WindowMergingFnRunner<T, W> create(WindowFn<?,
W> windowFn) {
+    if (windowFn.isNonMerging()) {
+      return new NonMergingWindowFnRunner();
+    } else {
+      return new MergingViaWindowFnRunner(windowFn);
+    }
+  }
+
+  /**
+   * Returns the set of unmerged windows and a mapping from merged windows to sets of original
+   * windows.
+   */
+  abstract KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>
mergeWindows(
+      KV<T, Iterable<W>> windowsToMerge) throws Exception;
+
+  /////////////////////////////////////////////////////////////////////////////////////////////////
+
+  /**
+   * An optimized version of window merging where the {@link WindowFn} does not do any window
+   * merging.
+   *
+   * <p>Note that this is likely to never be invoked and the identity mapping will
be handled
+   * directly by runners. We have this here because runners may not perform this optimization.
+   */
+  private static class NonMergingWindowFnRunner<T, W extends BoundedWindow>
+      extends WindowMergingFnRunner<T, W> {
+    @Override
+    KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>
mergeWindows(
+        KV<T, Iterable<W>> windowsToMerge) {
+      return KV.of(
+          windowsToMerge.getKey(), KV.of(windowsToMerge.getValue(), Collections.emptyList()));
+    }
+  }
+
+  /** An implementation which uses a {@link WindowFn} to merge windows. */
+  private static class MergingViaWindowFnRunner<T, W extends BoundedWindow>
+      extends WindowMergingFnRunner<T, W> {
+    private final WindowFn<?, W> windowFn;
+    private final WindowFn<?, W>.MergeContext mergeContext;
+    private Collection<W> currentWindows;
+    private List<KV<W, Collection<W>>> mergedWindows;
+
+    private MergingViaWindowFnRunner(WindowFn<?, W> windowFn) {
+      this.windowFn = windowFn;
+      this.mergedWindows = new ArrayList<>();
+      this.currentWindows = new ArrayList<>();
+      this.mergeContext =
+          windowFn.new MergeContext() {
+
+            @Override
+            public Collection<W> windows() {
+              return currentWindows;
+            }
+
+            @Override
+            public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception
{
+              mergedWindows.add(KV.of(mergeResult, toBeMerged));
+            }
+          };
+    }
+
+    @Override
+    KV<T, KV<Iterable<W>, Iterable<KV<W, Iterable<W>>>>>
mergeWindows(
+        KV<T, Iterable<W>> windowsToMerge) throws Exception {
+      currentWindows = Sets.newHashSet(windowsToMerge.getValue());
+      windowFn.mergeWindows((MergeContext) mergeContext);
+      for (KV<W, Collection<W>> mergedWindow : mergedWindows) {
+        currentWindows.removeAll(mergedWindow.getValue());
+      }
+      return KV.of(windowsToMerge.getKey(), KV.of(currentWindows, (Iterable) mergedWindows));
+    }
+  }
+}
diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
new file mode 100644
index 00000000000..11220efa20a
--- /dev/null
+++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/WindowMergingFnRunnerTest.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.fn.harness;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThat;
+
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Sets;
+import java.util.Collections;
+import org.apache.beam.model.pipeline.v1.RunnerApi;
+import org.apache.beam.model.pipeline.v1.RunnerApi.Environment;
+import org.apache.beam.runners.core.construction.SdkComponents;
+import org.apache.beam.runners.core.construction.WindowingStrategyTranslation;
+import org.apache.beam.sdk.fn.function.ThrowingFunction;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.GlobalWindows;
+import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
+import org.apache.beam.sdk.transforms.windowing.Sessions;
+import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.values.KV;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link WindowMergingFnRunner}. */
+@RunWith(JUnit4.class)
+public class WindowMergingFnRunnerTest {
+  @Test
+  public void testWindowMergingWithNonMergingWindowFn() throws Exception {
+    ThrowingFunction<
+            KV<Object, Iterable<BoundedWindow>>,
+            KV<
+                Object,
+                KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>>
+        mapFunction =
+            WindowMergingFnRunner.createMapFunctionForPTransform(
+                "ptransformId", createMergeTransformForWindowFn(new GlobalWindows()));
+
+    KV<Object, Iterable<BoundedWindow>> input =
+        KV.of(
+            "abc",
+            ImmutableList.of(new IntervalWindow(Instant.now(), Duration.standardMinutes(1))));
+
+    assertEquals(
+        KV.of(input.getKey(), KV.of(input.getValue(), Collections.emptyList())),
+        mapFunction.apply(input));
+  }
+
+  @Test
+  public void testWindowMergingWithMergingWindowFn() throws Exception {
+    ThrowingFunction<
+            KV<Object, Iterable<BoundedWindow>>,
+            KV<
+                Object,
+                KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>>
+        mapFunction =
+            WindowMergingFnRunner.createMapFunctionForPTransform(
+                "ptransformId",
+                createMergeTransformForWindowFn(Sessions.withGapDuration(Duration.millis(5L))));
+
+    // 7, 8 and 10 should all be merged. 1 and 20 should remain in the original set.
+    BoundedWindow[] expectedToBeMerged =
+        new BoundedWindow[] {
+          new IntervalWindow(new Instant(9L), new Instant(11L)),
+          new IntervalWindow(new Instant(10L), new Instant(10L)),
+          new IntervalWindow(new Instant(7L), new Instant(10L))
+        };
+    Iterable<BoundedWindow> expectedToBeUnmerged =
+        Sets.newHashSet(
+            new IntervalWindow(new Instant(1L), new Instant(1L)),
+            new IntervalWindow(new Instant(20L), new Instant(20L)));
+    KV<Object, Iterable<BoundedWindow>> input =
+        KV.of(
+            "abc",
+            ImmutableList.<BoundedWindow>builder()
+                .add(expectedToBeMerged)
+                .addAll(expectedToBeUnmerged)
+                .build());
+
+    KV<Object, KV<Iterable<BoundedWindow>, Iterable<KV<BoundedWindow, Iterable<BoundedWindow>>>>>
+        output = mapFunction.apply(input);
+    assertEquals(input.getKey(), output.getKey());
+    assertEquals(expectedToBeUnmerged, output.getValue().getKey());
+    KV<BoundedWindow, Iterable<BoundedWindow>> mergedOutput =
+        Iterables.getOnlyElement(output.getValue().getValue());
+    assertEquals(new IntervalWindow(new Instant(7L), new Instant(11L)), mergedOutput.getKey());
+    assertThat(mergedOutput.getValue(), containsInAnyOrder(expectedToBeMerged));
+  }
+
+  private static <W extends BoundedWindow> RunnerApi.PTransform createMergeTransformForWindowFn(
+      WindowFn<?, W> windowFn) throws Exception {
+    SdkComponents components = SdkComponents.create();
+    components.registerEnvironment(Environment.newBuilder().setUrl("java").build());
+    RunnerApi.FunctionSpec functionSpec =
+        RunnerApi.FunctionSpec.newBuilder()
+            .setUrn(WindowMergingFnRunner.URN)
+            .setPayload(WindowingStrategyTranslation.toProto(windowFn, components).toByteString())
+            .build();
+    return RunnerApi.PTransform.newBuilder().setSpec(functionSpec).build();
+  }
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


Issue Time Tracking
-------------------

    Worklog Id:     (was: 142850)
    Time Spent: 1h 50m  (was: 1h 40m)

> Add support to the Java SDK harness to merge windows
> ----------------------------------------------------
>
>                 Key: BEAM-5149
>                 URL: https://issues.apache.org/jira/browse/BEAM-5149
>             Project: Beam
>          Issue Type: Sub-task
>          Components: sdk-java-harness
>            Reporter: Luke Cwik
>            Assignee: Luke Cwik
>            Priority: Major
>              Labels: portability
>          Time Spent: 1h 50m
>  Remaining Estimate: 0h
>
> Window merging function:
>  
> {code:java}
> Input: KV<nonce, iterable<OriginalWindow>>
> Output: KV<nonce, KV<iterable<UnmergedOriginalWindow>, iterable<KV<MergedWindow,
iterable<ConsumedOriginalWindow>>>> 
> {code}
> For each set of original windows, a list of all unmerged windows is output alongside
a map of merged window to set of consumed windows. All original windows must be contained
in either the unmerged original window set or one of the consumed original window sets. Each
original window can only be part of one output set. The nonce is used by a runner to associate
each input with its output. The nonce is represented as an opaque set of bytes.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message