beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bchamb...@apache.org
Subject [2/2] incubator-beam git commit: Replace ParDo with simpler transforms where possible
Date Wed, 17 Aug 2016 23:22:50 GMT
Replace ParDo with simpler transforms where possible

There are a number of places in the Java SDK where we use
ParDo.of(DoFn) when MapElements or other higher-level
composites are applicable and readable. This change
alters a number of those.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/236945d2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/236945d2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/236945d2

Branch: refs/heads/master
Commit: 236945d2504b73de91f7292219e0b15a53e062f5
Parents: 89367cf
Author: Kenneth Knowles <klk@google.com>
Authored: Wed Jul 27 14:23:15 2016 -0700
Committer: bchambers <bchambers@google.com>
Committed: Wed Aug 17 16:09:01 2016 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/transforms/Combine.java | 28 ++++++++++----------
 .../org/apache/beam/sdk/transforms/Count.java   |  8 +++---
 .../beam/sdk/transforms/FlatMapElements.java    |  4 +--
 .../org/apache/beam/sdk/transforms/Flatten.java | 12 ++++-----
 .../org/apache/beam/sdk/transforms/Keys.java    |  8 +++---
 .../org/apache/beam/sdk/transforms/KvSwap.java  |  9 +++----
 .../apache/beam/sdk/transforms/MapElements.java | 16 ++++++++---
 .../beam/sdk/transforms/RemoveDuplicates.java   |  8 +++---
 .../org/apache/beam/sdk/transforms/Values.java  |  8 +++---
 .../apache/beam/sdk/transforms/WithKeys.java    |  9 +++----
 .../beam/sdk/transforms/windowing/Window.java   | 11 ++++----
 .../java/org/apache/beam/sdk/PipelineTest.java  | 12 ++++-----
 .../java/org/apache/beam/sdk/io/WriteTest.java  |  4 ++-
 .../beam/sdk/transforms/MapElementsTest.java    |  8 +++---
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 10 ++++---
 15 files changed, 81 insertions(+), 74 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
index 6ba3f8a..56c0bc4 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Combine.java
@@ -2121,14 +2121,14 @@ public class Combine {
                                inputCoder.getValueCoder()))
           .setWindowingStrategyInternal(preCombineStrategy)
           .apply("PreCombineHot", Combine.perKey(hotPreCombine))
-          .apply("StripNonce", ParDo.of(
-              new DoFn<KV<KV<K, Integer>, AccumT>,
-                                     KV<K, InputOrAccum<InputT, AccumT>>>()
{
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  c.output(KV.of(
-                      c.element().getKey().getKey(),
-                      InputOrAccum.<InputT, AccumT>accum(c.element().getValue())));
+          .apply("StripNonce", MapElements.via(
+              new SimpleFunction<KV<KV<K, Integer>, AccumT>,
+                       KV<K, InputOrAccum<InputT, AccumT>>>() {
+                @Override
+                public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<KV<K,
Integer>, AccumT> elem) {
+                  return KV.of(
+                      elem.getKey().getKey(),
+                      InputOrAccum.<InputT, AccumT>accum(elem.getValue()));
                 }
               }))
           .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder))
@@ -2137,12 +2137,12 @@ public class Combine {
       PCollection<KV<K, InputOrAccum<InputT, AccumT>>> preprocessedCold
= split
           .get(cold)
           .setCoder(inputCoder)
-          .apply("PrepareCold", ParDo.of(
-              new DoFn<KV<K, InputT>, KV<K, InputOrAccum<InputT, AccumT>>>()
{
-                @ProcessElement
-                public void processElement(ProcessContext c) {
-                  c.output(KV.of(c.element().getKey(),
-                                 InputOrAccum.<InputT, AccumT>input(c.element().getValue())));
+          .apply("PrepareCold", MapElements.via(
+              new SimpleFunction<KV<K, InputT>, KV<K, InputOrAccum<InputT,
AccumT>>>() {
+                @Override
+                public KV<K, InputOrAccum<InputT, AccumT>> apply(KV<K, InputT>
element) {
+                  return KV.of(element.getKey(),
+                                 InputOrAccum.<InputT, AccumT>input(element.getValue()));
                 }
               }))
           .setCoder(KvCoder.of(inputCoder.getKeyCoder(), inputOrAccumCoder));

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
index ac59c76..195c5d1 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java
@@ -107,10 +107,10 @@ public class Count {
     public PCollection<KV<T, Long>> apply(PCollection<T> input) {
       return
           input
-          .apply("Init", ParDo.of(new DoFn<T, KV<T, Void>>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(KV.of(c.element(), (Void) null));
+          .apply("Init", MapElements.via(new SimpleFunction<T, KV<T, Void>>()
{
+            @Override
+            public KV<T, Void> apply(T element) {
+              return KV.of(element, (Void) null);
             }
           }))
           .apply(Count.<T, Void>perKey());

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
index 6f9e3d8..2837c40 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/FlatMapElements.java
@@ -29,7 +29,7 @@ import java.lang.reflect.ParameterizedType;
  * {@link PCollection} and merging the results.
  */
 public class FlatMapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
{
   /**
    * For a {@code SerializableFunction<InputT, ? extends Iterable<OutputT>>}
{@code fn},
    * returns a {@link PTransform} that applies {@code fn} to every element of the input
@@ -130,7 +130,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>>
{
   }
 
   @Override
-  public PCollection<OutputT> apply(PCollection<InputT> input) {
+  public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
     return input.apply(
         "FlatMap",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
index 7e09d7e..f3f4f88 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Flatten.java
@@ -173,13 +173,11 @@ public class Flatten {
       @SuppressWarnings("unchecked")
       Coder<T> elemCoder = ((IterableLikeCoder<T, ?>) inCoder).getElemCoder();
 
-      return in.apply("FlattenIterables", ParDo.of(
-          new DoFn<Iterable<T>, T>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              for (T i : c.element()) {
-                c.output(i);
-              }
+      return in.apply("FlattenIterables", FlatMapElements.via(
+          new SimpleFunction<Iterable<T>, Iterable<T>>() {
+            @Override
+            public Iterable<T> apply(Iterable<T> element) {
+              return element;
             }
           }))
           .setCoder(elemCoder);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
index 5ac1866..2405adf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Keys.java
@@ -58,10 +58,10 @@ public class Keys<K> extends PTransform<PCollection<? extends
KV<K, ?>>,
   @Override
   public PCollection<K> apply(PCollection<? extends KV<K, ?>> in) {
     return
-        in.apply("Keys", ParDo.of(new DoFn<KV<K, ?>, K>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            c.output(c.element().getKey());
+        in.apply("Keys", MapElements.via(new SimpleFunction<KV<K, ?>, K>() {
+          @Override
+          public K apply(KV<K, ?> kv) {
+            return kv.getKey();
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
index d4386d2..2b81ebf 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/KvSwap.java
@@ -62,11 +62,10 @@ public class KvSwap<K, V> extends PTransform<PCollection<KV<K,
V>>,
   @Override
   public PCollection<KV<V, K>> apply(PCollection<KV<K, V>> in) {
     return
-        in.apply("KvSwap", ParDo.of(new DoFn<KV<K, V>, KV<V, K>>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            KV<K, V> e = c.element();
-            c.output(KV.of(e.getValue(), e.getKey()));
+        in.apply("KvSwap", MapElements.via(new SimpleFunction<KV<K, V>, KV<V,
K>>() {
+          @Override
+          public KV<V, K> apply(KV<K, V> kv) {
+            return KV.of(kv.getValue(), kv.getKey());
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
index 17ad6e7..73e4359 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/MapElements.java
@@ -25,7 +25,7 @@ import org.apache.beam.sdk.values.TypeDescriptor;
  * {@code PTransform}s for mapping a simple function over the elements of a {@link PCollection}.
  */
 public class MapElements<InputT, OutputT>
-extends PTransform<PCollection<InputT>, PCollection<OutputT>> {
+extends PTransform<PCollection<? extends InputT>, PCollection<OutputT>>
{
 
   /**
    * For a {@code SerializableFunction<InputT, OutputT>} {@code fn} and output type
descriptor,
@@ -44,8 +44,16 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>>
{
    * descriptor need not be provided.
    */
   public static <InputT, OutputT> MissingOutputTypeDescriptor<InputT, OutputT>
-  via(SerializableFunction<InputT, OutputT> fn) {
-    return new MissingOutputTypeDescriptor<>(fn);
+  via(SerializableFunction<? super InputT, OutputT> fn) {
+
+    // TypeDescriptor interacts poorly with the wildcards needed to correctly express
+    // covariance and contravariance in Java, so instead we cast it to an invariant
+    // function here.
+    @SuppressWarnings("unchecked") // safe covariant cast
+        SerializableFunction<InputT, OutputT> simplerFn =
+        (SerializableFunction<InputT, OutputT>) fn;
+
+    return new MissingOutputTypeDescriptor<>(simplerFn);
   }
 
   /**
@@ -103,7 +111,7 @@ extends PTransform<PCollection<InputT>, PCollection<OutputT>>
{
   }
 
   @Override
-  public PCollection<OutputT> apply(PCollection<InputT> input) {
+  public PCollection<OutputT> apply(PCollection<? extends InputT> input) {
     return input.apply(
         "Map",
         ParDo.of(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
index bba4b51..2744b14 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/RemoveDuplicates.java
@@ -85,10 +85,10 @@ public class RemoveDuplicates<T> extends PTransform<PCollection<T>,
   @Override
   public PCollection<T> apply(PCollection<T> in) {
     return in
-        .apply("CreateIndex", ParDo.of(new DoFn<T, KV<T, Void>>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            c.output(KV.of(c.element(), (Void) null));
+        .apply("CreateIndex", MapElements.via(new SimpleFunction<T, KV<T, Void>>()
{
+          @Override
+          public KV<T, Void> apply(T element) {
+            return KV.of(element, (Void) null);
           }
         }))
         .apply(Combine.<T, Void>perKey(

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
index 34342db..d21d100 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Values.java
@@ -58,10 +58,10 @@ public class Values<V> extends PTransform<PCollection<? extends
KV<?, V>>,
   @Override
   public PCollection<V> apply(PCollection<? extends KV<?, V>> in) {
     return
-        in.apply("Values", ParDo.of(new DoFn<KV<?, V>, V>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            c.output(c.element().getValue());
+        in.apply("Values", MapElements.via(new SimpleFunction<KV<?, V>, V>()
{
+          @Override
+          public V apply(KV<?, V> kv) {
+            return kv.getValue();
           }
         }));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
index 2a44963..8b061f6 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/WithKeys.java
@@ -113,11 +113,10 @@ public class WithKeys<K, V> extends PTransform<PCollection<V>,
   @Override
   public PCollection<KV<K, V>> apply(PCollection<V> in) {
     PCollection<KV<K, V>> result =
-        in.apply("AddKeys", ParDo.of(new DoFn<V, KV<K, V>>() {
-          @ProcessElement
-          public void processElement(ProcessContext c) {
-            c.output(KV.of(fn.apply(c.element()),
-                c.element()));
+        in.apply("AddKeys", MapElements.via(new SimpleFunction<V, KV<K, V>>()
{
+          @Override
+          public KV<K, V> apply(V element) {
+            return KV.of(fn.apply(element), element);
           }
         }));
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index c1b0237..9dd069c 100644
--- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -21,10 +21,10 @@ import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.annotations.Experimental.Kind;
 import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.Coder.NonDeterministicException;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.GroupByKey;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
@@ -645,10 +645,9 @@ public class Window {
           // We first apply a (trivial) transform to the input PCollection to produce a new
           // PCollection. This ensures that we don't modify the windowing strategy of the
input
           // which may be used elsewhere.
-          .apply("Identity", ParDo.of(new DoFn<T, T>() {
-            @ProcessElement
-            public void processElement(ProcessContext c) {
-              c.output(c.element());
+          .apply("Identity", MapElements.via(new SimpleFunction<T, T>() {
+            @Override public T apply(T element) {
+              return element;
             }
           }))
           // Then we modify the windowing strategy.

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
index 8b86499..d7b3ac5 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/PipelineTest.java
@@ -36,10 +36,10 @@ import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.RunnableOnService;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
-import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.Flatten;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
-import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.UserCodeException;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
@@ -146,10 +146,10 @@ public class PipelineTest {
 
   private static PTransform<PCollection<? extends String>, PCollection<String>>
addSuffix(
       final String suffix) {
-    return ParDo.of(new DoFn<String, String>() {
-      @ProcessElement
-      public void processElement(DoFn<String, String>.ProcessContext c) {
-        c.output(c.element() + suffix);
+    return MapElements.via(new SimpleFunction<String, String>() {
+      @Override
+      public String apply(String input) {
+        return input + suffix;
       }
     });
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
index f9bf472..b9ba53b 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/io/WriteTest.java
@@ -91,7 +91,9 @@ public class WriteTest {
   // Static counts of the number of records per shard.
   private static List<Integer> recordsPerShard = new ArrayList<>();
 
-  private static final MapElements<String, String> IDENTITY_MAP =
+  @SuppressWarnings("unchecked") // covariant cast
+  private static final PTransform<PCollection<String>, PCollection<String>>
IDENTITY_MAP =
+      (PTransform)
       MapElements.via(new SimpleFunction<String, String>() {
         @Override
         public String apply(String input) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
index e86a128..7217bca 100644
--- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
+++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/MapElementsTest.java
@@ -233,7 +233,7 @@ public class MapElementsTest implements Serializable {
   }
   @Test
   public void testSimpleFunctionDisplayData() {
-    SimpleFunction<?, ?> simpleFn = new SimpleFunction<Integer, Integer>() {
+    SimpleFunction<Integer, ?> simpleFn = new SimpleFunction<Integer, Integer>()
{
       @Override
       public Integer apply(Integer input) {
         return input;
@@ -255,17 +255,17 @@ public class MapElementsTest implements Serializable {
   @Test
   @Category(RunnableOnService.class)
   public void testPrimitiveDisplayData() {
-    SimpleFunction<?, ?> mapFn = new SimpleFunction<Integer, Integer>() {
+    SimpleFunction<Integer, ?> mapFn = new SimpleFunction<Integer, Integer>()
{
       @Override
       public Integer apply(Integer input) {
         return input;
       }
     };
 
-    MapElements<?, ?> map = MapElements.via(mapFn);
+    MapElements<Integer, ?> map = MapElements.via(mapFn);
     DisplayDataEvaluator evaluator = DisplayDataEvaluator.create();
 
-    Set<DisplayData> displayData = evaluator.displayDataForPrimitiveTransforms(map);
+    Set<DisplayData> displayData = evaluator.<Integer>displayDataForPrimitiveTransforms(map);
     assertThat("MapElements should include the mapFn in its primitive display data",
         displayData, hasItem(hasDisplayItem("mapFn", mapFn.getClass())));
   }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/236945d2/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 2383105..8a0c788 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,9 +34,11 @@ import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.util.CoderUtils;
 import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
 import org.apache.beam.sdk.values.KV;
@@ -1314,10 +1316,10 @@ public class KafkaIO {
     public PDone apply(PCollection<V> input) {
       return input
         .apply("Kafka values with default key",
-          ParDo.of(new DoFn<V, KV<Void, V>>() {
-            @ProcessElement
-            public void processElement(ProcessContext ctx) throws Exception {
-              ctx.output(KV.<Void, V>of(null, ctx.element()));
+          MapElements.via(new SimpleFunction<V, KV<Void, V>>() {
+            @Override
+            public KV<Void, V> apply(V element) {
+              return KV.<Void, V>of(null, element);
             }
           }))
         .setCoder(KvCoder.of(VoidCoder.of(), kvWriteTransform.valueCoder))


Mime
View raw message