beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/9] beam git commit: BEAM-1421 Latest should comply with PTransform style guide
Date Wed, 01 Mar 2017 04:10:48 GMT
Repository: beam
Updated Branches:
  refs/heads/master 63b63f0b5 -> d66029caf


BEAM-1421 Latest should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/0b9afe8b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/0b9afe8b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/0b9afe8b

Branch: refs/heads/master
Commit: 0b9afe8bf59245d437a33ee11fdcaa8e57ad4ffe
Parents: a7c60cc
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Tue Feb 7 16:18:26 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Feb 28 20:10:36 2017 -0800

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Latest.java  | 80 ++++++++++++--------
 1 file changed, 48 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/0b9afe8b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
index 9c2d715..e6892c5 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Latest.java
@@ -21,6 +21,7 @@ import static com.google.common.base.Preconditions.checkArgument;
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Preconditions.checkState;
 
+import com.google.common.annotations.VisibleForTesting;
 import java.util.Iterator;
 import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
@@ -58,12 +59,43 @@ import org.apache.beam.sdk.values.TimestampedValue;
  * }
  * }</pre>
  *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with
the
+ * {@link Combine} transform.
+ *
  * <p>For elements with the same timestamp, the element chosen for output is arbitrary.
  */
 public class Latest {
   // Do not instantiate
   private Latest() {}
 
+  /** Returns a {@link Combine.CombineFn} that selects the latest element among its inputs.
*/
+  public static <T> Combine.CombineFn<TimestampedValue<T>, ?, T> combineFn()
{
+    return new LatestFn<>();
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and
returns a
+   * {@code PCollection<T>} whose contents is the latest element according to its event
time, or
+   * {@literal null} if there are no elements.
+   *
+   * @param <T> The type of the elements being combined.
+   */
+  public static <T> PTransform<PCollection<T>, PCollection<T>> globally()
{
+    return new Globally<>();
+  }
+
+  /**
+   * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>}
and returns a
+   * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key
according to its
+   * event time.
+   *
+   * @param <K> The key type of the elements being combined.
+   * @param <V> The value type of the elements being combined.
+   */
+  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
V>>> perKey() {
+    return new PerKey<>();
+  }
+
   /**
    * A {@link Combine.CombineFn} that computes the latest element from a set of inputs. This
is
    * particularly useful as an {@link Aggregator}.
@@ -71,7 +103,8 @@ public class Latest {
    * @param <T> Type of input element.
    * @see Latest
    */
-  public static class LatestFn<T>
+  @VisibleForTesting
+  static class LatestFn<T>
       extends Combine.CombineFn<TimestampedValue<T>, TimestampedValue<T>,
T> {
     /** Construct a new {@link LatestFn} instance. */
     public LatestFn() {}
@@ -82,8 +115,8 @@ public class Latest {
     }
 
     @Override
-    public TimestampedValue<T> addInput(TimestampedValue<T> accumulator,
-        TimestampedValue<T> input) {
+    public TimestampedValue<T> addInput(
+        TimestampedValue<T> accumulator, TimestampedValue<T> input) {
       checkNotNull(accumulator, "accumulator must be non-null");
       checkNotNull(input, "input must be non-null");
 
@@ -95,16 +128,20 @@ public class Latest {
     }
 
     @Override
-    public Coder<TimestampedValue<T>> getAccumulatorCoder(CoderRegistry registry,
-        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException
{
+    public Coder<TimestampedValue<T>> getAccumulatorCoder(
+        CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+        throws CannotProvideCoderException {
       return NullableCoder.of(inputCoder);
     }
 
     @Override
-    public Coder<T> getDefaultOutputCoder(CoderRegistry registry,
-        Coder<TimestampedValue<T>> inputCoder) throws CannotProvideCoderException
{
-      checkState(inputCoder instanceof TimestampedValue.TimestampedValueCoder,
-          "inputCoder must be a TimestampedValueCoder, but was %s", inputCoder);
+    public Coder<T> getDefaultOutputCoder(
+        CoderRegistry registry, Coder<TimestampedValue<T>> inputCoder)
+        throws CannotProvideCoderException {
+      checkState(
+          inputCoder instanceof TimestampedValue.TimestampedValueCoder,
+          "inputCoder must be a TimestampedValueCoder, but was %s",
+          inputCoder);
 
       TimestampedValue.TimestampedValueCoder<T> inputTVCoder =
           (TimestampedValue.TimestampedValueCoder<T>) inputCoder;
@@ -134,29 +171,7 @@ public class Latest {
     }
   }
 
-  /**
-   * Returns a {@link PTransform} that takes as input a {@code PCollection<T>} and
returns a
-   * {@code PCollection<T>} whose contents is the latest element according to its event
time, or
-   * {@literal null} if there are no elements.
-   *
-   * @param <T> The type of the elements being combined.
-   */
-  public static <T> PTransform<PCollection<T>, PCollection<T>> globally()
{
-    return new Globally<>();
-  }
-
-  /**
-   * Returns a {@link PTransform} that takes as input a {@code PCollection<KV<K, V>>}
and returns a
-   * {@code PCollection<KV<K, V>>} whose contents is the latest element per-key
according to its
-   * event time.
-   *
-   * @param <K> The key type of the elements being combined.
-   * @param <V> The value type of the elements being combined.
-   */
-  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
V>>> perKey() {
-    return new PerKey<>();
-  }
-
+  /** Implementation of {@link #globally()}. */
   private static class Globally<T> extends PTransform<PCollection<T>, PCollection<T>>
{
     @Override
     public PCollection<T> expand(PCollection<T> input) {
@@ -175,6 +190,7 @@ public class Latest {
     }
   }
 
+  /** Implementation of {@link #perKey()}. */
   private static class PerKey<K, V>
       extends PTransform<PCollection<KV<K, V>>, PCollection<KV<K, V>>>
{
     @Override


Mime
View raw message