beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: Fix NPE in Kafka value writer.
Date Thu, 30 Mar 2017 21:02:02 GMT
Repository: beam
Updated Branches:
  refs/heads/master ffd87553f -> 66f249933


Fix NPE in Kafka value writer.

KafkaIO.writer()...values() does not require user to set key coder since the key always null.
Validation passes, but it results in an NPE at runtime when the writer is
tries to instantiates the producer. Set key coder to 'NullOnlyCoder'.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d0462f59
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d0462f59
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d0462f59

Branch: refs/heads/master
Commit: d0462f59548ebed0dd7ae744b138ff956b742cad
Parents: ffd8755
Author: Raghu Angadi <rangadi@google.com>
Authored: Wed Mar 29 23:21:54 2017 -0700
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Thu Mar 30 13:57:26 2017 -0700

----------------------------------------------------------------------
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 22 +++++++++-----------
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 16 ++++++++++++++
 2 files changed, 26 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 7880cbc..bb7d971 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -254,7 +254,6 @@ public class KafkaIO {
   public static <K, V> Write<K, V> write() {
     return new AutoValue_KafkaIO_Write.Builder<K, V>()
         .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
-        .setValueOnly(false)
         .build();
   }
 
@@ -1159,7 +1158,6 @@ public class KafkaIO {
     @Nullable abstract String getTopic();
     @Nullable abstract Coder<K> getKeyCoder();
     @Nullable abstract Coder<V> getValueCoder();
-    abstract boolean getValueOnly();
     abstract Map<String, Object> getProducerConfig();
     @Nullable
     abstract SerializableFunction<Map<String, Object>, Producer<K, V>>
getProducerFactoryFn();
@@ -1171,7 +1169,6 @@ public class KafkaIO {
       abstract Builder<K, V> setTopic(String topic);
       abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
       abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
-      abstract Builder<K, V> setValueOnly(boolean valueOnly);
       abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
       abstract Builder<K, V> setProducerFactoryFn(
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
@@ -1231,7 +1228,7 @@ public class KafkaIO {
      * collections of values rather thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(toBuilder().setValueOnly(true).build());
+      return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
     }
 
     @Override
@@ -1245,9 +1242,7 @@ public class KafkaIO {
       checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");
-      if (!getValueOnly()) {
-        checkNotNull(getKeyCoder(), "Key coder should be set");
-      }
+      checkNotNull(getKeyCoder(), "Key coder should be set");
       checkNotNull(getValueCoder(), "Value coder should be set");
     }
 
@@ -1255,11 +1250,12 @@ public class KafkaIO {
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(
             ProducerConfig.RETRIES_CONFIG, 3,
+            // See comment about custom serializers in KafkaWriter constructor.
             ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
             ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);
 
     /**
-     * A set of properties that are not required or don't make sense for our consumer.
+     * A set of properties that are not required or don't make sense for our producer.
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
         ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
@@ -1373,11 +1369,13 @@ public class KafkaIO {
     KafkaWriter(Write<K, V> spec) {
       this.spec = spec;
 
-      // Set custom kafka serializers. We can not serialize user objects then pass the bytes
to
-      // producer. The key and value objects are used in kafka Partitioner interface.
+      // Set custom kafka serializers. We do not want to serialize user objects then pass
the bytes
+      // to producer since key and value objects are used in Kafka Partitioner interface.
       // This does not matter for default partitioner in Kafka as it uses just the serialized
-      // key bytes to pick a partition. But are making sure user's custom partitioner would
work
-      // as expected.
+      // key bytes to pick a partition. But we don't want to limit use of custom partitions.
+      // We pass key and values objects the user writes directly Kafka and user supplied
+      // coders to serialize them are invoked inside CoderBasedKafkaSerializer.
+      // Use case : write all the events for a single session to same Kafka partition.
 
       this.producerConfig = new HashMap<>(spec.getProducerConfig());
       this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());

http://git-wip-us.apache.org/repos/asf/beam/blob/d0462f59/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 1897127..d1696d0 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -71,9 +71,12 @@ import org.apache.kafka.clients.consumer.MockConsumer;
 import org.apache.kafka.clients.consumer.OffsetResetStrategy;
 import org.apache.kafka.clients.producer.MockProducer;
 import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
 import org.junit.Rule;
@@ -728,8 +731,21 @@ public class KafkaIOTest {
   private static class ProducerFactoryFn
     implements SerializableFunction<Map<String, Object>, Producer<Integer, Long>>
{
 
+    @SuppressWarnings("unchecked")
     @Override
     public Producer<Integer, Long> apply(Map<String, Object> config) {
+
+      // Make sure the config is correctly set up for serializers.
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, true);
+
+      Utils.newInstance(
+          ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))
+              .asSubclass(Serializer.class)
+      ).configure(config, false);
+
       return MOCK_PRODUCER;
     }
   }


Mime
View raw message