beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From amits...@apache.org
Subject [19/23] beam git commit: Move LateDataUtils and UnsupportedSideInputReader to runners-core.
Date Tue, 28 Feb 2017 22:35:26 GMT
Move LateDataUtils and UnsupportedSideInputReader to runners-core.

Add missing licenses.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/123f4820
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/123f4820
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/123f4820

Branch: refs/heads/master
Commit: 123f4820b68dd13690be59a7b8b946e81031b6a7
Parents: 4ca5680
Author: Sela <ansela@paypal.com>
Authored: Mon Feb 27 20:24:54 2017 +0200
Committer: Sela <ansela@paypal.com>
Committed: Wed Mar 1 00:18:07 2017 +0200

----------------------------------------------------------------------
 .../apache/beam/runners/core/LateDataUtils.java | 88 +++++++++++++++++++
 .../core/UnsupportedSideInputReader.java        | 52 +++++++++++
 .../runners/spark/TestSparkPipelineOptions.java | 17 ++++
 .../SparkGroupAlsoByWindowViaWindowSet.java     |  4 +-
 .../beam/runners/spark/util/LateDataUtils.java  | 92 --------------------
 .../spark/util/UnsupportedSideInputReader.java  | 52 -----------
 .../apache/beam/runners/spark/PipelineRule.java | 17 ++++
 7 files changed, 176 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
new file mode 100644
index 0000000..17bd360
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/LateDataUtils.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.Iterables;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.WindowTracing;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+
+
+/**
+ * Utils to handle late data.
+ */
+public class LateDataUtils {
+
+  /**
+   * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late
input elements.
+   */
+  public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows(
+      final K key,
+      Iterable<WindowedValue<V>> elements,
+      final TimerInternals timerInternals,
+      final WindowingStrategy<?, ?> windowingStrategy,
+      final Aggregator<Long, Long> droppedDueToLateness) {
+    return FluentIterable.from(elements)
+        .transformAndConcat(
+            // Explode windows to filter out expired ones
+            new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>()
{
+              @Override
+              public Iterable<WindowedValue<V>> apply(@Nullable WindowedValue<V>
input) {
+                if (input == null) {
+                  return null;
+                }
+                return input.explodeWindows();
+              }
+            })
+        .filter(
+            new Predicate<WindowedValue<V>>() {
+              @Override
+              public boolean apply(@Nullable WindowedValue<V> input) {
+                if (input == null) {
+                  // drop null elements.
+                  return false;
+                }
+                BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
+                boolean expired =
+                    window
+                        .maxTimestamp()
+                        .plus(windowingStrategy.getAllowedLateness())
+                        .isBefore(timerInternals.currentInputWatermarkTime());
+                if (expired) {
+                  // The element is too late for this window.
+                  droppedDueToLateness.addValue(1L);
+                  WindowTracing.debug(
+                      "GroupAlsoByWindow: Dropping element at {} for key: {}; "
+                          + "window: {} since it is too far behind inputWatermark: {}",
+                      input.getTimestamp(),
+                      key,
+                      window,
+                      timerInternals.currentInputWatermarkTime());
+                }
+                // Keep the element if the window is not expired.
+                return !expired;
+              }
+            });
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
new file mode 100644
index 0000000..4230f8c
--- /dev/null
+++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/UnsupportedSideInputReader.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.core;
+
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.util.SideInputReader;
+import org.apache.beam.sdk.values.PCollectionView;
+
+
+/**
+ * An implementation of a {@link SideInputReader} that actually does not support side-inputs.
+ */
+public class UnsupportedSideInputReader implements SideInputReader {
+  private final String transformName;
+
+  public UnsupportedSideInputReader(String transformName) {
+    this.transformName = transformName;
+  }
+
+  @Override
+  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+
+  @Override
+  public <T> boolean contains(PCollectionView<T> view) {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+
+  @Override
+  public boolean isEmpty() {
+    throw new UnsupportedOperationException(
+        String.format("%s does not support side inputs.", transformName));
+  }
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java
index 2cb58d8..d50b652 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/TestSparkPipelineOptions.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark;
 
 import org.apache.beam.sdk.options.Default;

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
index 24b8508..2f1713a 100644
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
+++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -24,10 +24,12 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.LateDataUtils;
 import org.apache.beam.runners.core.OutputWindowedValue;
 import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
 import org.apache.beam.runners.core.TimerInternals;
+import org.apache.beam.runners.core.UnsupportedSideInputReader;
 import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
 import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.spark.SparkPipelineOptions;
@@ -37,8 +39,6 @@ import org.apache.beam.runners.spark.translation.TranslationUtils;
 import org.apache.beam.runners.spark.translation.WindowingHelpers;
 import org.apache.beam.runners.spark.util.ByteArray;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
-import org.apache.beam.runners.spark.util.LateDataUtils;
-import org.apache.beam.runners.spark.util.UnsupportedSideInputReader;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.IterableCoder;
 import org.apache.beam.sdk.coders.KvCoder;

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
deleted file mode 100644
index 18689bd..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/LateDataUtils.java
+++ /dev/null
@@ -1,92 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import com.google.common.base.Function;
-import com.google.common.base.Predicate;
-import com.google.common.collect.FluentIterable;
-import com.google.common.collect.Iterables;
-import javax.annotation.Nullable;
-import org.apache.beam.runners.core.TimerInternals;
-import org.apache.beam.sdk.transforms.Aggregator;
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.WindowTracing;
-import org.apache.beam.sdk.util.WindowedValue;
-import org.apache.beam.sdk.util.WindowingStrategy;
-
-
-/**
- * Utils to handle late data.
- */
-public class LateDataUtils {
-
-  /**
-   * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late
input
-   * elements.
-   * Taken from Thomas Groh's implementation in the DirectRunner's
-   * GroupAlsoByWindowEvaluatorFactory.
-   */
-  public static <K, V> Iterable<WindowedValue<V>> dropExpiredWindows(
-      final K key,
-      Iterable<WindowedValue<V>> elements,
-      final TimerInternals timerInternals,
-      final WindowingStrategy<?, ?> windowingStrategy,
-      final Aggregator<Long, Long> droppedDueToLateness) {
-    return FluentIterable.from(elements)
-        .transformAndConcat(
-            // Explode windows to filter out expired ones
-            new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>()
{
-              @Override
-              public Iterable<WindowedValue<V>> apply(@Nullable WindowedValue<V>
input) {
-                if (input == null) {
-                  return null;
-                }
-                return input.explodeWindows();
-              }
-            })
-        .filter(
-            new Predicate<WindowedValue<V>>() {
-              @Override
-              public boolean apply(@Nullable WindowedValue<V> input) {
-                if (input == null) {
-                  // drop null elements.
-                  return false;
-                }
-                BoundedWindow window = Iterables.getOnlyElement(input.getWindows());
-                boolean expired =
-                    window
-                        .maxTimestamp()
-                        .plus(windowingStrategy.getAllowedLateness())
-                        .isBefore(timerInternals.currentInputWatermarkTime());
-                if (expired) {
-                  // The element is too late for this window.
-                  droppedDueToLateness.addValue(1L);
-                  WindowTracing.debug(
-                      "GroupAlsoByWindow: Dropping element at {} for key: {}; "
-                          + "window: {} since it is too far behind inputWatermark: {}",
-                      input.getTimestamp(),
-                      key,
-                      window,
-                      timerInternals.currentInputWatermarkTime());
-                }
-                // Keep the element if the window is not expired.
-                return !expired;
-              }
-            });
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
deleted file mode 100644
index 96d889d..0000000
--- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/UnsupportedSideInputReader.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.beam.runners.spark.util;
-
-import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.util.SideInputReader;
-import org.apache.beam.sdk.values.PCollectionView;
-
-
-/**
- * An implementation of a {@link SideInputReader} that actually does not support side-inputs.
- */
-public class UnsupportedSideInputReader implements SideInputReader {
-  private final String transformName;
-
-  public UnsupportedSideInputReader(String transformName) {
-    this.transformName = transformName;
-  }
-
-  @Override
-  public <T> T get(PCollectionView<T> view, BoundedWindow window) {
-    throw new UnsupportedOperationException(
-        String.format("%s does not support side inputs.", transformName));
-  }
-
-  @Override
-  public <T> boolean contains(PCollectionView<T> view) {
-    throw new UnsupportedOperationException(
-        String.format("%s does not support side inputs.", transformName));
-  }
-
-  @Override
-  public boolean isEmpty() {
-    throw new UnsupportedOperationException(
-        String.format("%s does not support side inputs.", transformName));
-  }
-}

http://git-wip-us.apache.org/repos/asf/beam/blob/123f4820/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
index bb42510..77519cd 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/PipelineRule.java
@@ -1,3 +1,20 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 package org.apache.beam.runners.spark;
 
 import org.apache.beam.sdk.Pipeline;


Mime
View raw message