beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [7/9] beam git commit: BEAM-1417 Count should comply with PTransform style guide
Date Wed, 01 Mar 2017 04:10:54 GMT
BEAM-1417 Count should comply with PTransform style guide


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9957c895
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9957c895
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9957c895

Branch: refs/heads/master
Commit: 9957c895b1b1c3e491f288d17e70445c9864742a
Parents: f5056ef
Author: Eugene Kirpichov <kirpichov@google.com>
Authored: Wed Feb 8 16:27:06 2017 -0800
Committer: Dan Halperin <dhalperi@google.com>
Committed: Tue Feb 28 20:10:37 2017 -0800

----------------------------------------------------------------------
 .../beam/examples/complete/AutoComplete.java    |  2 +-
 .../flink/examples/streaming/AutoComplete.java  |  2 +-
 .../translation/streaming/CreateStreamTest.java |  4 +++-
 .../org/apache/beam/sdk/transforms/Count.java   | 24 +++++++++++++-------
 .../apache/beam/sdk/testing/TestStreamTest.java |  4 +++-
 5 files changed, 24 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
----------------------------------------------------------------------
diff --git a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
index c815f27..861a292 100644
--- a/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
+++ b/examples/java/src/main/java/org/apache/beam/examples/complete/AutoComplete.java
@@ -116,7 +116,7 @@ public class AutoComplete {
     public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String>
input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
-        .apply(new Count.PerElement<String>())
+        .apply(Count.<String>perElement())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
----------------------------------------------------------------------
diff --git a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
index f33e616..d07df29 100644
--- a/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
+++ b/runners/flink/examples/src/main/java/org/apache/beam/runners/flink/examples/streaming/AutoComplete.java
@@ -85,7 +85,7 @@ public class AutoComplete {
     public PCollection<KV<String, List<CompletionCandidate>>> expand(PCollection<String>
input) {
       PCollection<CompletionCandidate> candidates = input
         // First count how often each token appears.
-        .apply(new Count.PerElement<String>())
+        .apply(Count.<String>perElement())
 
         // Map the KV outputs of Count into our own CompletionCandiate class.
         .apply("CreateCompletionCandidates", ParDo.of(

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
index ff77535..b32f5f3 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/CreateStreamTest.java
@@ -31,6 +31,7 @@ import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -114,7 +115,8 @@ public class CreateStreamTest implements Serializable {
         .apply(GroupByKey.<Integer, Integer>create())
         .apply(Values.<Iterable<Integer>>create())
         .apply(Flatten.<Integer>iterables());
-    PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Long> count =
+        windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
     PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
 
     IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index d164978..fd91430 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -34,31 +34,39 @@ import org.apache.beam.sdk.values.PCollection;
 
 
 /**
- * {@code PTransorm}s to count the elements in a {@link PCollection}.
+ * {@link PTransform PTransforms} to count the elements in a {@link PCollection}.
  *
  * <p>{@link Count#perElement()} can be used to count the number of occurrences of
each
  * distinct element in the PCollection, {@link Count#perKey()} can be used to count the
  * number of values per key, and {@link Count#globally()} can be used to count the total
  * number of elements in a PCollection.
+ *
+ * <p>{@link #combineFn} can also be used manually, in combination with state and with
the
+ * {@link Combine} transform.
  */
 public class Count {
   private Count() {
     // do not instantiate
   }
 
+  /** Returns a {@link CombineFn} that counts the number of its inputs. */
+  public static <T> CombineFn<T, ?, Long> combineFn() {
+    return new CountFn<T>();
+  }
+
   /**
-   * Returns a {@link Combine.Globally} {@link PTransform} that counts the number of elements
in
+   * Returns a {@link PTransform} that counts the number of elements in
    * its input {@link PCollection}.
    */
-  public static <T> Combine.Globally<T, Long> globally() {
+  public static <T> PTransform<PCollection<T>, PCollection<Long>>
globally() {
     return Combine.globally(new CountFn<T>());
   }
 
   /**
-   * Returns a {@link Combine.PerKey} {@link PTransform} that counts the number of elements
+   * Returns a {@link PTransform} that counts the number of elements
    * associated with each key of its input {@link PCollection}.
    */
-  public static <K, V> Combine.PerKey<K, V, Long> perKey() {
+  public static <K, V> PTransform<PCollection<KV<K, V>>, PCollection<KV<K,
Long>>> perKey() {
     return Combine.<K, V, Long>perKey(new CountFn<V>());
   }
 
@@ -68,7 +76,7 @@ public class Count {
    *
    * <p>See {@link PerElement Count.PerElement} for more details.
    */
-  public static <T> PerElement<T> perElement() {
+  public static <T> PTransform<PCollection<T>, PCollection<KV<T, Long>>>
perElement() {
     return new PerElement<>();
   }
 
@@ -97,10 +105,10 @@ public class Count {
    * @param <T> the type of the elements of the input {@code PCollection}, and the
type of the keys
    * of the output {@code PCollection}
    */
-  public static class PerElement<T>
+  private static class PerElement<T>
       extends PTransform<PCollection<T>, PCollection<KV<T, Long>>>
{
 
-    public PerElement() { }
+    private PerElement() { }
 
     @Override
     public PCollection<KV<T, Long>> expand(PCollection<T> input) {

http://git-wip-us.apache.org/repos/asf/beam/blob/9957c895/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
index 1514601..614831d 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/TestStreamTest.java
@@ -28,6 +28,7 @@ import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.coders.VarIntCoder;
 import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.testing.TestStream.Builder;
+import org.apache.beam.sdk.transforms.Combine;
 import org.apache.beam.sdk.transforms.Count;
 import org.apache.beam.sdk.transforms.Flatten;
 import org.apache.beam.sdk.transforms.GroupByKey;
@@ -96,7 +97,8 @@ public class TestStreamTest implements Serializable {
         .apply(GroupByKey.<Integer, Integer>create())
         .apply(Values.<Iterable<Integer>>create())
         .apply(Flatten.<Integer>iterables());
-    PCollection<Long> count = windowed.apply(Count.<Integer>globally().withoutDefaults());
+    PCollection<Long> count =
+        windowed.apply(Combine.globally(Count.<Integer>combineFn()).withoutDefaults());
     PCollection<Integer> sum = windowed.apply(Sum.integersGlobally().withoutDefaults());
 
     IntervalWindow window = new IntervalWindow(instant, instant.plus(Duration.standardMinutes(5L)));


Mime
View raw message