beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] beam git commit: [BEAM-2114] Wrap inferred KafkaIO Coders with NullableCoder
Date Fri, 28 Apr 2017 21:14:12 GMT
Repository: beam
Updated Branches:
  refs/heads/master dbd44faf3 -> a3e7383cc


[BEAM-2114] Wrap inferred KafkaIO Coders with NullableCoder


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b16e29cb
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b16e29cb
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b16e29cb

Branch: refs/heads/master
Commit: b16e29cbb7edbc2e06dee1ec8512625731980584
Parents: dbd44fa
Author: Devon Meunier <devon.meunier@shopify.com>
Authored: Fri Apr 28 15:24:36 2017 -0400
Committer: Dan Halperin <dhalperi@google.com>
Committed: Fri Apr 28 14:14:04 2017 -0700

----------------------------------------------------------------------
 .../main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java |  5 +++--
 .../java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java  | 12 ++++++------
 2 files changed, 9 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b16e29cb/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index a0977b7..47d8281 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -61,6 +61,7 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.coders.NullableCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
@@ -270,7 +271,7 @@ public class KafkaIO {
    * deserializer argument using the {@link Coder} registry.
    */
   @VisibleForTesting
-  static <T> Coder<T> inferCoder(
+  static <T> NullableCoder<T> inferCoder(
       CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer)
{
     checkNotNull(deserializer);
 
@@ -289,7 +290,7 @@ public class KafkaIO {
         try {
           @SuppressWarnings("unchecked")
           Class<T> clazz = (Class<T>) parameter;
-          return coderRegistry.getDefaultCoder(clazz);
+          return NullableCoder.of(coderRegistry.getDefaultCoder(clazz));
         } catch (CannotProvideCoderException e) {
           LOG.warn("Could not infer coder from deserializer type", e);
         }

http://git-wip-us.apache.org/repos/asf/beam/blob/b16e29cb/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index e6ed2f7..d713d90 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -183,7 +183,7 @@ public class KafkaIOTest {
     // our responsibility to make sure currently enqueued records sync with partition offsets.
     // The following task will be called inside each invocation to MockConsumer.poll().
     // We enqueue only the records with the offset >= partition's current position.
-    Runnable recordEnquerTask = new Runnable() {
+    Runnable recordEnqueueTask = new Runnable() {
       @Override
       public void run() {
         // add all the records with offset >= current partition position.
@@ -199,7 +199,7 @@ public class KafkaIOTest {
       }
     };
 
-    consumer.schedulePollTask(recordEnquerTask);
+    consumer.schedulePollTask(recordEnqueueTask);
     return consumer;
   }
 
@@ -739,16 +739,16 @@ public class KafkaIOTest {
   public void testInferKeyCoder() {
     CoderRegistry registry = CoderRegistry.createDefault();
 
-    assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class)
+    assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class).getValueCoder()
             instanceof VarLongCoder);
 
-    assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class)
+    assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class).getValueCoder()
             instanceof StringUtf8Coder);
 
-    assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class)
+    assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class).getValueCoder()
             instanceof InstantCoder);
 
-    assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class)
+    assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class).getValueCoder()
             instanceof VarLongCoder);
   }
 


Mime
View raw message