beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [1/2] beam git commit: [BEAM-1573] Use Kafka serializers instead of coders in KafkaIO
Date Wed, 26 Apr 2017 21:38:27 GMT
Repository: beam
Updated Branches:
  refs/heads/master a8d76603b -> af8ead44e


[BEAM-1573] Use Kafka serializers instead of coders in KafkaIO


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d841e5db
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d841e5db
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d841e5db

Branch: refs/heads/master
Commit: d841e5dbb3b23d9eb81f7a380daf401bad3367be
Parents: a8d7660
Author: peay <peay@protonmail.com>
Authored: Sun Mar 26 10:51:59 2017 -0400
Committer: Eugene Kirpichov <kirpichov@google.com>
Committed: Wed Apr 26 14:35:43 2017 -0700

----------------------------------------------------------------------
 .../runners/spark/SparkRunnerDebuggerTest.java  |  10 +-
 .../ResumeFromCheckpointStreamingTest.java      |  22 +-
 .../org/apache/beam/sdk/io/kafka/KafkaIO.java   | 394 +++++++++++++------
 .../CoderBasedKafkaDeserializer.java            |  71 ++++
 .../CoderBasedKafkaSerializer.java              |  73 ++++
 .../serialization/InstantDeserializer.java      |  45 +++
 .../kafka/serialization/InstantSerializer.java  |  45 +++
 .../io/kafka/serialization/package-info.java    |  22 ++
 .../apache/beam/sdk/io/kafka/KafkaIOTest.java   | 119 ++++--
 9 files changed, 640 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
index 905b30e..ff43fa6 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/SparkRunnerDebuggerTest.java
@@ -44,6 +44,8 @@ import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionList;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
 import org.junit.Rule;
@@ -118,14 +120,14 @@ public class SparkRunnerDebuggerTest {
     KafkaIO.Read<String, String> read = KafkaIO.<String, String>read()
         .withBootstrapServers("mykafka:9092")
         .withTopics(Collections.singletonList("my_input_topic"))
-        .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(StringUtf8Coder.of());
+        .withKeyDeserializer(StringDeserializer.class)
+        .withValueDeserializer(StringDeserializer.class);
 
     KafkaIO.Write<String, String> write = KafkaIO.<String, String>write()
         .withBootstrapServers("myotherkafka:9092")
         .withTopic("my_output_topic")
-        .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(StringUtf8Coder.of());
+        .withKeySerializer(StringSerializer.class)
+        .withValueSerializer(StringSerializer.class);
 
     KvCoder<String, String> stringKvCoder = KvCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());
 

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
----------------------------------------------------------------------
diff --git a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
index 1aa76a3..7d7fd08 100644
--- a/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
+++ b/runners/spark/src/test/java/org/apache/beam/runners/spark/translation/streaming/ResumeFromCheckpointStreamingTest.java
@@ -39,15 +39,15 @@ import org.apache.beam.runners.spark.SparkPipelineResult;
 import org.apache.beam.runners.spark.TestSparkPipelineOptions;
 import org.apache.beam.runners.spark.UsesCheckpointRecovery;
 import org.apache.beam.runners.spark.aggregators.AggregatorsAccumulator;
-import org.apache.beam.runners.spark.coders.CoderHelpers;
 import org.apache.beam.runners.spark.io.MicrobatchSource;
 import org.apache.beam.runners.spark.metrics.MetricsAccumulator;
 import org.apache.beam.runners.spark.translation.streaming.utils.EmbeddedKafkaCluster;
 import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
 import org.apache.beam.sdk.Pipeline;
-import org.apache.beam.sdk.coders.InstantCoder;
 import org.apache.beam.sdk.coders.StringUtf8Coder;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
+import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
+import org.apache.beam.sdk.io.kafka.serialization.InstantSerializer;
 import org.apache.beam.sdk.metrics.Counter;
 import org.apache.beam.sdk.metrics.MetricNameFilter;
 import org.apache.beam.sdk.metrics.MetricResult;
@@ -75,6 +75,7 @@ import org.apache.beam.sdk.values.PDone;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -119,18 +120,7 @@ public class ResumeFromCheckpointStreamingTest {
     producerProps.put("request.required.acks", 1);
     producerProps.put("bootstrap.servers", EMBEDDED_KAFKA_CLUSTER.getBrokerList());
     Serializer<String> stringSerializer = new StringSerializer();
-    Serializer<Instant> instantSerializer = new Serializer<Instant>() {
-      @Override
-      public void configure(Map<String, ?> configs, boolean isKey) { }
-
-      @Override
-      public byte[] serialize(String topic, Instant data) {
-        return CoderHelpers.toByteArray(data, InstantCoder.of());
-      }
-
-      @Override
-      public void close() { }
-    };
+    Serializer<Instant> instantSerializer = new InstantSerializer();
 
     try (@SuppressWarnings("unchecked") KafkaProducer<String, Instant> kafkaProducer =
         new KafkaProducer(producerProps, stringSerializer, instantSerializer)) {
@@ -232,8 +222,8 @@ public class ResumeFromCheckpointStreamingTest {
     KafkaIO.Read<String, Instant> read = KafkaIO.<String, Instant>read()
         .withBootstrapServers(EMBEDDED_KAFKA_CLUSTER.getBrokerList())
         .withTopics(Collections.singletonList(TOPIC))
-        .withKeyCoder(StringUtf8Coder.of())
-        .withValueCoder(InstantCoder.of())
+        .withKeyDeserializer(StringDeserializer.class)
+        .withValueDeserializer(InstantDeserializer.class)
         .updateConsumerProperties(ImmutableMap.<String, Object>of("auto.offset.reset", "earliest"))
         .withTimestampFn(new SerializableFunction<KV<String, Instant>, Instant>() {
           @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
index 68efb9a..a0977b7 100644
--- a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaIO.java
@@ -34,6 +34,8 @@ import com.google.common.io.Closeables;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.lang.reflect.ParameterizedType;
+import java.lang.reflect.Type;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
@@ -54,9 +56,9 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.annotations.Experimental;
 import org.apache.beam.sdk.coders.AvroCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
 import org.apache.beam.sdk.coders.Coder;
-import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.CoderRegistry;
 import org.apache.beam.sdk.coders.CustomCoder;
 import org.apache.beam.sdk.coders.KvCoder;
 import org.apache.beam.sdk.io.Read.Unbounded;
@@ -64,6 +66,8 @@ import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.CheckpointMark;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
 import org.apache.beam.sdk.io.kafka.KafkaCheckpointMark.PartitionMark;
+import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaDeserializer;
+import org.apache.beam.sdk.io.kafka.serialization.CoderBasedKafkaSerializer;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.options.ValueProvider;
 import org.apache.beam.sdk.transforms.DoFn;
@@ -73,8 +77,6 @@ import org.apache.beam.sdk.transforms.ParDo;
 import org.apache.beam.sdk.transforms.SerializableFunction;
 import org.apache.beam.sdk.transforms.SimpleFunction;
 import org.apache.beam.sdk.transforms.display.DisplayData;
-import org.apache.beam.sdk.util.CoderUtils;
-import org.apache.beam.sdk.util.ExposedByteArrayInputStream;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PBegin;
 import org.apache.beam.sdk.values.PCollection;
@@ -94,6 +96,7 @@ import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.joda.time.Duration;
 import org.joda.time.Instant;
@@ -116,9 +119,8 @@ import org.slf4j.LoggerFactory;
  * <p>Although most applications consume a single topic, the source can be configured to consume
  * multiple topics or even a specific set of {@link TopicPartition}s.
  *
- * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>
- * and one or more topics to consume. The following example illustrates various options for
- * configuring the source :
+ * <p>To configure a Kafka source, you must specify at the minimum Kafka <tt>bootstrapServers</tt>,
+ * one or more topics to consume, and key and value deserializers. For example:
  *
  * <pre>{@code
  *
@@ -126,9 +128,9 @@ import org.slf4j.LoggerFactory;
  *    .apply(KafkaIO.<Long, String>read()
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
  *       .withTopic("my_topic")  // use withTopics(List<String>) to read from multiple topics.
- *       // set a Coder for Key and Value
- *       .withKeyCoder(BigEndianLongCoder.of())
- *       .withValueCoder(StringUtf8Coder.of())
+ *       .withKeyDeserializer(LongDeserializer.class)
+ *       .withValueDeserializer(StringDeserializer.class)
+ *
  *       // above four are required configuration. returns PCollection<KafkaRecord<Long, String>>
  *
  *       // rest of the settings are optional :
@@ -150,6 +152,51 @@ import org.slf4j.LoggerFactory;
  *     ...
  * }</pre>
  *
+ * <p>Kafka provides deserializers for common types in
+ * {@link org.apache.kafka.common.serialization}.
+ *
+ * <p>To read Avro data, {@code fromAvro} can be used. This does not require manually specifying
+ * a {@link Coder} or {@link Deserializer}.
+ *
+ * <p>It's also possible to deserialize data using a Beam {@link Coder} via
+ * {@link #readWithCoders(Coder, Coder)}, though this is discouraged because the particular
+ * binary format is not guaranteed by coders. However, this can be useful
+ * when exchanging data with a Beam pipeline that uses the same coder:
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.<MyKey, MyValue>readWithCoders(MyKeyCoder.of(), MyValueCoder.of())
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopic("my_topic")
+ *    )
+ *    ...
+ * }</pre>
+ *
+ * <p>In most cases, you don't need to specify {@link Coder} for key and value in the resulting
+ * collection because the coders are inferred from deserializer types. However, in cases when
+ * coder inference fails, they can be specified manually using {@link Read#withKeyCoder} and
+ * {@link Read#withValueCoder}. Note that the payloads of Kafka messages is interpreted using
+ * key and value <i>deserializers</i>; coders are a Beam implementation detail to help runners
+ * materialize the data for intermediate storage if necessary.
+ *
+ * <pre>{@code
+ *
+ *  pipeline
+ *    .apply(KafkaIO.<Long, Foo>read()
+ *       .withBootstrapServers("broker_1:9092,broker_2:9092")
+ *       .withTopic("my_topic")
+ *
+ *       // infer coder from deserializer
+ *       .withKeyDeserializer(LongDeserializer.class)
+ *
+ *       // explicitly specify coder
+ *       .withValueDeserializer(FooDeserializer.class)
+ *       .withValueCoder(FooCoder.of())
+ *     )
+ *     ...
+ * }</pre>
+ *
  * <h3>Partition Assignment and Checkpointing</h3>
  * The Kafka partitions are evenly distributed among splits (workers).
  * Checkpointing is fully supported and each split can resume from previous checkpoint. See
@@ -165,8 +212,7 @@ import org.slf4j.LoggerFactory;
  *
  * <p>KafkaIO sink supports writing key-value pairs to a Kafka topic. Users can also write
  * just the values. To configure a Kafka sink, you must specify at the minimum Kafka
- * <tt>bootstrapServers</tt> and the topic to write to. The following example illustrates various
- * options for configuring the sink:
+ * <tt>bootstrapServers</tt>, the topic to write to, and key and value serializers. For example:
  *
  * <pre>{@code
  *
@@ -175,9 +221,8 @@ import org.slf4j.LoggerFactory;
  *       .withBootstrapServers("broker_1:9092,broker_2:9092")
  *       .withTopic("results")
  *
- *       // set Coder for Key and Value
- *       .withKeyCoder(BigEndianLongCoder.of())
- *       .withValueCoder(StringUtf8Coder.of())
+ *       .withKeySerializer(LongSerializer.class)
+ *       .withValueSerializer(StringSerializer.class)
  *
  *       // you can further customize KafkaProducer used to write the records by adding more
  *       // settings for ProducerConfig. e.g, to enable compression :
@@ -193,11 +238,16 @@ import org.slf4j.LoggerFactory;
  *  strings.apply(KafkaIO.<Void, String>write()
  *      .withBootstrapServers("broker_1:9092,broker_2:9092")
  *      .withTopic("results")
- *      .withValueCoder(StringUtf8Coder.of()) // just need coder for value
+ *      .withValueSerializer(new StringSerializer()) // just need serializer for value
  *      .values()
  *    );
  * }</pre>
  *
+ * <p>Same notes on coders vs. serializers apply as above for {@link Read}.
+ *
+ * <p>To write Avro data, {@code toAvro} can be used. This does not require specifying serializers
+ * or coders.
+ *
  * <h3>Advanced Kafka Configuration</h3>
  * KafkaIO allows setting most of the properties in {@link ConsumerConfig} for source or in
  * {@link ProducerConfig} for sink. E.g. if you would like to enable offset
@@ -214,18 +264,53 @@ import org.slf4j.LoggerFactory;
  */
 @Experimental
 public class KafkaIO {
+
+  /**
+   * Attempt to infer a {@link Coder} by extracting the type of the deserialized-class from the
+   * deserializer argument using the {@link Coder} registry.
+   */
+  @VisibleForTesting
+  static <T> Coder<T> inferCoder(
+      CoderRegistry coderRegistry, Class<? extends Deserializer<T>> deserializer) {
+    checkNotNull(deserializer);
+
+    for (Type type : deserializer.getGenericInterfaces()) {
+      if (!(type instanceof ParameterizedType)) {
+        continue;
+      }
+
+      // This does not recurse: we will not infer from a class that extends
+      // a class that extends Deserializer<T>.
+      ParameterizedType parameterizedType = (ParameterizedType) type;
+
+      if (parameterizedType.getRawType() == Deserializer.class) {
+        Type parameter = parameterizedType.getActualTypeArguments()[0];
+
+        try {
+          @SuppressWarnings("unchecked")
+          Class<T> clazz = (Class<T>) parameter;
+          return coderRegistry.getDefaultCoder(clazz);
+        } catch (CannotProvideCoderException e) {
+          LOG.warn("Could not infer coder from deserializer type", e);
+        }
+      }
+    }
+
+    throw new RuntimeException("Could not extract deserializer type from " + deserializer);
+  }
+
   /**
    * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
    * configuration should set with {@link Read#withBootstrapServers(String)} and
-   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
-   * custom timestamp and watermark functions.
+   * {@link Read#withTopics(List)}. Other optional settings include key and value
+   * {@link Deserializer}s, custom timestamp and watermark functions.
    */
   public static Read<byte[], byte[]> readBytes() {
     return new AutoValue_KafkaIO_Read.Builder<byte[], byte[]>()
         .setTopics(new ArrayList<String>())
         .setTopicPartitions(new ArrayList<TopicPartition>())
-        .setKeyCoder(ByteArrayCoder.of())
-        .setValueCoder(ByteArrayCoder.of())
+        .setKeyDeserializer(ByteArrayDeserializer.class)
+        .setValueDeserializer(ByteArrayDeserializer.class)
         .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
         .setConsumerConfig(Read.DEFAULT_CONSUMER_PROPERTIES)
         .setMaxNumRecords(Long.MAX_VALUE)
@@ -235,8 +320,8 @@ public class KafkaIO {
   /**
    * Creates an uninitialized {@link Read} {@link PTransform}. Before use, basic Kafka
    * configuration should set with {@link Read#withBootstrapServers(String)} and
-   * {@link Read#withTopics(List)}. Other optional settings include key and value coders,
-   * custom timestamp and watermark functions.
+   * {@link Read#withTopics(List)}. Other optional settings include key and value
+   * {@link Deserializer}s, custom timestamp and watermark functions.
    */
   public static <K, V> Read<K, V> read() {
     return new AutoValue_KafkaIO_Read.Builder<K, V>()
@@ -249,15 +334,87 @@ public class KafkaIO {
   }
 
   /**
+   * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
+   * based on {@link Coder} instances.
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> Read<K, V> readWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
+    // Kafka constructs deserializers directly. Pass coder through consumer
+    // configuration.
+    ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
+    Map<String, Object> config = builder
+            .putAll(Read.DEFAULT_CONSUMER_PROPERTIES)
+            .put(CoderBasedKafkaDeserializer.configForKeyDeserializer(), keyCoder)
+            .put(CoderBasedKafkaDeserializer.configForValueDeserializer(), valueCoder)
+            .build();
+
+    return new AutoValue_KafkaIO_Read.Builder<K, V>()
+            .setTopics(new ArrayList<String>())
+            .setTopicPartitions(new ArrayList<TopicPartition>())
+            .setKeyCoder(keyCoder)
+            .setValueCoder(valueCoder)
+            .setKeyDeserializer((Class) CoderBasedKafkaDeserializer.class)
+            .setValueDeserializer((Class) CoderBasedKafkaDeserializer.class)
+            .setConsumerFactoryFn(Read.KAFKA_CONSUMER_FACTORY_FN)
+            .setConsumerConfig(config)
+            .setMaxNumRecords(Long.MAX_VALUE)
+            .build();
+  }
+
+  /**
+   * Creates an uninitialized {@link Read} {@link PTransform}, using Kafka {@link Deserializer}s
+   * based on {@link AvroCoder}. This reads data in the Avro binary format directly without using
+   * an Avro object container.
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> Read<K, V> fromAvro(Class<K> keyClass, Class<V> valueClass) {
+    return readWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
+  }
+
+  /**
    * Creates an uninitialized {@link Write} {@link PTransform}. Before use, Kafka configuration
    * should be set with {@link Write#withBootstrapServers(String)} and {@link Write#withTopic}
-   * along with {@link Coder}s for (optional) key and values.
+   * along with {@link Deserializer}s for (optional) key and values.
    */
   public static <K, V> Write<K, V> write() {
     return new AutoValue_KafkaIO_Write.Builder<K, V>()
         .setProducerConfig(Write.DEFAULT_PRODUCER_PROPERTIES)
         .build();
   }
+  /**
+   * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
+   * based on {@link Coder} instances.
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> Write<K, V> writeWithCoders(Coder<K> keyCoder, Coder<V> valueCoder) {
+    // Kafka constructs serializers directly. Pass coder through consumer
+    // configuration.
+    ImmutableMap.Builder<String, Object> builder = new ImmutableMap.Builder<>();
+    Map<String, Object> config = builder
+            .putAll(Write.DEFAULT_PRODUCER_PROPERTIES)
+            .put(CoderBasedKafkaSerializer.configForKeySerializer(), keyCoder)
+            .put(CoderBasedKafkaSerializer.configForValueSerializer(), valueCoder)
+            .build();
+
+    CoderBasedKafkaSerializer<K> keySerializer = new CoderBasedKafkaSerializer<K>();
+    CoderBasedKafkaSerializer<V> valueSerializer = new CoderBasedKafkaSerializer<V>();
+
+    return new AutoValue_KafkaIO_Write.Builder<K, V>()
+            .setProducerConfig(config)
+            .setKeySerializer((Class) CoderBasedKafkaSerializer.class)
+            .setValueSerializer((Class) CoderBasedKafkaSerializer.class)
+            .build();
+  }
+
+  /**
+   * Creates an uninitialized {@link Write} {@link PTransform}, using Kafka {@link Serializer}s
+   * based on {@link AvroCoder}. The coder writes Avro data directly without using an Avro object
+   * container.
+   */
+  @SuppressWarnings("unchecked")
+  public static <K, V> Write<K, V> toAvro(Class<K> keyClass, Class<V> valueClass) {
+    return writeWithCoders(AvroCoder.of(keyClass), AvroCoder.of(valueClass));
+  }
 
   ///////////////////////// Read Support \\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\\
 
@@ -273,6 +430,8 @@ public class KafkaIO {
     abstract List<TopicPartition> getTopicPartitions();
     @Nullable abstract Coder<K> getKeyCoder();
     @Nullable abstract Coder<V> getValueCoder();
+    @Nullable abstract Class<? extends Deserializer<K>> getKeyDeserializer();
+    @Nullable abstract Class<? extends Deserializer<V>> getValueDeserializer();
     abstract SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>>
         getConsumerFactoryFn();
     @Nullable abstract SerializableFunction<KafkaRecord<K, V>, Instant> getTimestampFn();
@@ -290,6 +449,9 @@ public class KafkaIO {
       abstract Builder<K, V> setTopicPartitions(List<TopicPartition> topicPartitions);
       abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
       abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
+      abstract Builder<K, V> setKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer);
+      abstract Builder<K, V> setValueDeserializer(
+          Class<? extends Deserializer<V>> valueDeserializer);
       abstract Builder<K, V> setConsumerFactoryFn(
           SerializableFunction<Map<String, Object>, Consumer<byte[], byte[]>> consumerFactoryFn);
       abstract Builder<K, V> setTimestampFn(SerializableFunction<KafkaRecord<K, V>, Instant> fn);
@@ -342,14 +504,28 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Read} with {@link Coder} for key bytes.
+     * Returns a new {@link Read} with a Kafka {@link Deserializer} for key bytes.
+     */
+    public Read<K, V> withKeyDeserializer(Class<? extends Deserializer<K>> keyDeserializer) {
+      return toBuilder().setKeyDeserializer(keyDeserializer).build();
+    }
+
+    /**
+     * Returns a new {@link Read} with a {@link Coder} for the key.
      */
     public Read<K, V> withKeyCoder(Coder<K> keyCoder) {
       return toBuilder().setKeyCoder(keyCoder).build();
     }
 
     /**
-     * Returns a new {@link Read} with {@link Coder} for value bytes.
+     * Returns a new {@link Read} with a Kafka {@link Deserializer} for value bytes.
+     */
+    public Read<K, V> withValueDeserializer(Class<? extends Deserializer<V>> valueDeserializer) {
+      return toBuilder().setValueDeserializer(valueDeserializer).build();
+    }
+
+    /**
+     * Returns a new {@link Read} with a {@link Coder} for values.
      */
     public Read<K, V> withValueCoder(Coder<V> valueCoder) {
       return toBuilder().setValueCoder(valueCoder).build();
@@ -436,20 +612,51 @@ public class KafkaIO {
     }
 
     @Override
-    public void validate(PBegin input) {
+    public void validate(PBegin input)  {
       checkNotNull(getConsumerConfig().get(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkArgument(getTopics().size() > 0 || getTopicPartitions().size() > 0,
           "Kafka topics or topic_partitions are required");
-      checkNotNull(getKeyCoder(), "Key coder must be set");
-      checkNotNull(getValueCoder(), "Value coder must be set");
+      checkNotNull(getKeyDeserializer(), "Key deserializer must be set");
+      checkNotNull(getValueDeserializer(), "Value deserializer must be set");
+
+      if (input != null) {
+        CoderRegistry registry = input.getPipeline().getCoderRegistry();
+
+        checkNotNull(getKeyCoder() != null
+                        ? getKeyCoder()
+                        : inferCoder(registry, getKeyDeserializer()),
+                "Key coder must be set");
+
+        checkNotNull(getValueCoder() != null
+                        ? getValueCoder()
+                        : inferCoder(registry, getValueDeserializer()),
+                "Value coder must be set");
+      } else {
+        checkNotNull(getKeyCoder(), "Key coder must be set");
+        checkNotNull(getValueCoder(), "Value coder must be set");
+      }
     }
 
     @Override
     public PCollection<KafkaRecord<K, V>> expand(PBegin input) {
-     // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
+      // Infer key/value coders if not specified explicitly
+      CoderRegistry registry = input.getPipeline().getCoderRegistry();
+
+      Coder<K> keyCoder = getKeyCoder() != null
+              ? getKeyCoder()
+              : inferCoder(registry, getKeyDeserializer());
+
+      Coder<V> valueCoder = getValueCoder() != null
+              ? getValueCoder()
+              : inferCoder(registry, getValueDeserializer());
+
+      // Handles unbounded source to bounded conversion if maxNumRecords or maxReadTime is set.
       Unbounded<KafkaRecord<K, V>> unbounded =
-          org.apache.beam.sdk.io.Read.from(makeSource());
+          org.apache.beam.sdk.io.Read.from(this
+                  .withKeyCoder(keyCoder)
+                  .withValueCoder(valueCoder)
+                  .makeSource());
 
       PTransform<PBegin, PCollection<KafkaRecord<K, V>>> transform = unbounded;
 
@@ -488,8 +695,8 @@ public class KafkaIO {
      * A set of properties that are not required or don't make sense for our consumer.
      */
     private static final Map<String, String> IGNORED_CONSUMER_PROPERTIES = ImmutableMap.of(
-        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
-        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueCoder instead"
+        ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "Set keyDeserializer instead",
+        ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "Set valueDeserializer instead"
         // "group.id", "enable.auto.commit", "auto.commit.interval.ms" :
         //     lets allow these, applications can have better resume point for restarts.
         );
@@ -739,6 +946,9 @@ public class KafkaIO {
     private Instant curTimestamp;
     private Iterator<PartitionState> curBatch = Collections.emptyIterator();
 
+    private Deserializer<K> keyDeserializerInstance = null;
+    private Deserializer<V> valueDeserializerInstance = null;
+
     private static final Duration KAFKA_POLL_TIMEOUT = Duration.millis(1000);
     private static final Duration NEW_RECORDS_POLL_TIMEOUT = Duration.millis(10);
 
@@ -912,6 +1122,16 @@ public class KafkaIO {
       consumer = spec.getConsumerFactoryFn().apply(spec.getConsumerConfig());
       consumerSpEL.evaluateAssign(consumer, spec.getTopicPartitions());
 
+      try {
+        keyDeserializerInstance = source.spec.getKeyDeserializer().newInstance();
+        valueDeserializerInstance = source.spec.getValueDeserializer().newInstance();
+      } catch (InstantiationException | IllegalAccessException e) {
+        throw new IOException("Could not instantiate deserializers", e);
+      }
+
+      keyDeserializerInstance.configure(spec.getConsumerConfig(), true);
+      valueDeserializerInstance.configure(spec.getConsumerConfig(), false);
+
       for (PartitionState p : partitionStates) {
         if (p.nextOffset != UNINITIALIZED_OFFSET) {
           consumer.seek(p.topicPartition, p.nextOffset);
@@ -1003,15 +1223,15 @@ public class KafkaIO {
 
           curRecord = null; // user coders below might throw.
 
-          // apply user coders. might want to allow skipping records that fail to decode.
-          // TODO: wrap exceptions from coders to make explicit to users
+          // apply user deserializers.
+          // TODO: write records that can't be deserialized to a "dead-letter" additional output.
           KafkaRecord<K, V> record = new KafkaRecord<K, V>(
               rawRecord.topic(),
               rawRecord.partition(),
               rawRecord.offset(),
               consumerSpEL.getRecordTimestamp(rawRecord),
-              decode(rawRecord.key(), source.spec.getKeyCoder()),
-              decode(rawRecord.value(), source.spec.getValueCoder()));
+              keyDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.key()),
+              valueDeserializerInstance.deserialize(rawRecord.topic(), rawRecord.value()));
 
           curTimestamp = (source.spec.getTimestampFn() == null)
               ? Instant.now() : source.spec.getTimestampFn().apply(record);
@@ -1032,16 +1252,6 @@ public class KafkaIO {
       }
     }
 
-    private static byte[] nullBytes = new byte[0];
-    private static <T> T decode(byte[] bytes, Coder<T> coder) throws IOException {
-      // If 'bytes' is null, use byte[0]. It is common for key in Kakfa record to be null.
-      // This makes it impossible for user to distinguish between zero length byte and null.
-      // Alternately, we could have a ByteArrayCoder that handles nulls, and use that for default
-      // coder.
-      byte[] toDecode = bytes == null ? nullBytes : bytes;
-      return coder.decode(new ExposedByteArrayInputStream(toDecode), Coder.Context.OUTER);
-    }
-
     // update latest offset for each partition.
     // called from offsetFetcher thread
     private void updateLatestOffsets() {
@@ -1153,6 +1363,9 @@ public class KafkaIO {
         }
       }
 
+      Closeables.close(keyDeserializerInstance, true);
+      Closeables.close(valueDeserializerInstance, true);
+
       Closeables.close(offsetConsumer, true);
       Closeables.close(consumer, true);
     }
@@ -1167,22 +1380,23 @@ public class KafkaIO {
   @AutoValue
   public abstract static class Write<K, V> extends PTransform<PCollection<KV<K, V>>, PDone> {
     @Nullable abstract String getTopic();
-    @Nullable abstract Coder<K> getKeyCoder();
-    @Nullable abstract Coder<V> getValueCoder();
     abstract Map<String, Object> getProducerConfig();
     @Nullable
     abstract SerializableFunction<Map<String, Object>, Producer<K, V>> getProducerFactoryFn();
 
+    @Nullable abstract Class<? extends Serializer<K>> getKeySerializer();
+    @Nullable abstract Class<? extends Serializer<V>> getValueSerializer();
+
     abstract Builder<K, V> toBuilder();
 
     @AutoValue.Builder
     abstract static class Builder<K, V> {
       abstract Builder<K, V> setTopic(String topic);
-      abstract Builder<K, V> setKeyCoder(Coder<K> keyCoder);
-      abstract Builder<K, V> setValueCoder(Coder<V> valueCoder);
       abstract Builder<K, V> setProducerConfig(Map<String, Object> producerConfig);
       abstract Builder<K, V> setProducerFactoryFn(
           SerializableFunction<Map<String, Object>, Producer<K, V>> fn);
+      abstract Builder<K, V> setKeySerializer(Class<? extends Serializer<K>> serializer);
+      abstract Builder<K, V> setValueSerializer(Class<? extends Serializer<V>> serializer);
       abstract Write<K, V> build();
     }
 
@@ -1204,19 +1418,19 @@ public class KafkaIO {
     }
 
     /**
-     * Returns a new {@link Write} with {@link Coder} for serializing key (if any) to bytes.
+     * Returns a new {@link Write} with {@link Serializer} for serializing key (if any) to bytes.
      * A key is optional while writing to Kafka. Note when a key is set, its hash is used to
      * determine partition in Kafka (see {@link ProducerRecord} for more details).
      */
-    public Write<K, V> withKeyCoder(Coder<K> keyCoder) {
-      return toBuilder().setKeyCoder(keyCoder).build();
+    public Write<K, V> withKeySerializer(Class<? extends Serializer<K>> keySerializer) {
+      return toBuilder().setKeySerializer(keySerializer).build();
     }
 
     /**
-     * Returns a new {@link Write} with {@link Coder} for serializing value to bytes.
+     * Returns a new {@link Write} with {@link Serializer} for serializing value to bytes.
      */
-    public Write<K, V> withValueCoder(Coder<V> valueCoder) {
-      return toBuilder().setValueCoder(valueCoder).build();
+    public Write<K, V> withValueSerializer(Class<? extends Serializer<V>> valueSerializer) {
+      return toBuilder().setValueSerializer(valueSerializer).build();
     }
 
     public Write<K, V> updateProducerProperties(Map<String, Object> configUpdates) {
@@ -1239,7 +1453,7 @@ public class KafkaIO {
      * collections of values rather thank {@link KV}s.
      */
     public PTransform<PCollection<V>, PDone> values() {
-      return new KafkaValueWrite<>(withKeyCoder(new NullOnlyCoder<K>()).toBuilder().build());
+      return new KafkaValueWrite<>(toBuilder().build());
     }
 
     @Override
@@ -1253,26 +1467,19 @@ public class KafkaIO {
       checkNotNull(getProducerConfig().get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG),
           "Kafka bootstrap servers should be set");
       checkNotNull(getTopic(), "Kafka topic should be set");
-      checkNotNull(getKeyCoder(), "Key coder should be set");
-      checkNotNull(getValueCoder(), "Value coder should be set");
     }
 
     // set config defaults
     private static final Map<String, Object> DEFAULT_PRODUCER_PROPERTIES =
         ImmutableMap.<String, Object>of(
-            ProducerConfig.RETRIES_CONFIG, 3,
-            // See comment about custom serializers in KafkaWriter constructor.
-            ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class,
-            ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, CoderBasedKafkaSerializer.class);
+            ProducerConfig.RETRIES_CONFIG, 3);
 
     /**
      * A set of properties that are not required or don't make sense for our producer.
      */
     private static final Map<String, String> IGNORED_PRODUCER_PROPERTIES = ImmutableMap.of(
-        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Set keyCoder instead",
-        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Set valueCoder instead",
-        configForKeySerializer(), "Reserved for internal serializer",
-        configForValueSerializer(), "Reserved for internal serializer"
+        ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "Use withKeySerializer instead",
+        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "Use withValueSerializer instead"
      );
 
     @Override
@@ -1310,7 +1517,7 @@ public class KafkaIO {
               return KV.of(null, element);
             }
           }))
-        .setCoder(KvCoder.of(new NullOnlyCoder<K>(), kvWriteTransform.getValueCoder()))
+        .setCoder(KvCoder.of(new NullOnlyCoder<K>(), input.getCoder()))
         .apply(kvWriteTransform);
     }
 
@@ -1389,8 +1596,11 @@ public class KafkaIO {
       // Use case : write all the events for a single session to same Kafka partition.
 
       this.producerConfig = new HashMap<>(spec.getProducerConfig());
-      this.producerConfig.put(configForKeySerializer(), spec.getKeyCoder());
-      this.producerConfig.put(configForValueSerializer(), spec.getValueCoder());
+
+      this.producerConfig.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+                              spec.getKeySerializer());
+      this.producerConfig.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+                              spec.getValueSerializer());
     }
 
     private synchronized void checkForFailures() throws IOException {
@@ -1427,48 +1637,4 @@ public class KafkaIO {
       }
     }
   }
-
-  /**
-   * Implements Kafka's {@link Serializer} with a {@link Coder}. The coder is stored as serialized
-   * value in producer configuration map.
-   */
-  public static class CoderBasedKafkaSerializer<T> implements Serializer<T> {
-
-    @SuppressWarnings("unchecked")
-    @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-      String configKey = isKey ? configForKeySerializer() : configForValueSerializer();
-      coder = (Coder<T>) configs.get(configKey);
-      checkNotNull(coder, "could not instantiate coder for Kafka serialization");
-    }
-
-    @Override
-    public byte[] serialize(String topic, @Nullable T data) {
-      if (data == null) {
-        return null; // common for keys to be null
-      }
-
-      try {
-        return CoderUtils.encodeToByteArray(coder, data);
-      } catch (CoderException e) {
-        throw new RuntimeException(e);
-      }
-    }
-
-    @Override
-    public void close() {
-    }
-
-    private Coder<T> coder = null;
-    private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer";
-  }
-
-
-  private static String configForKeySerializer() {
-    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key");
-  }
-
-  private static String configForValueSerializer() {
-    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value");
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
new file mode 100644
index 0000000..ca552fb
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaDeserializer.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * Implements a Kafka {@link Deserializer} with a {@link Coder}.
+ *
+ * <p>As Kafka instantiates serializers directly, the coder must be stored as serialized value in
+ * the producer configuration map.
+ */
+public class CoderBasedKafkaDeserializer<T> implements Deserializer<T> {
+  @SuppressWarnings("unchecked")
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    String configKey = isKey ? configForKeyDeserializer() : configForValueDeserializer();
+    coder = (Coder<T>) configs.get(configKey);
+    checkNotNull(coder, "could not instantiate coder for Kafka deserialization");
+  }
+
+  @Override
+  public T deserialize(String topic, byte[] data) {
+    if (data == null) {
+      return null;
+    }
+
+    try {
+      return CoderUtils.decodeFromByteArray(coder, data);
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {}
+
+  public static String configForKeyDeserializer() {
+    return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "key");
+  }
+
+  public static String configForValueDeserializer() {
+    return String.format(CoderBasedKafkaDeserializer.CONFIG_FORMAT, "value");
+  }
+
+  private Coder<T> coder = null;
+  private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.deserializer";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
new file mode 100644
index 0000000..1044d6f
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/CoderBasedKafkaSerializer.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import static com.google.common.base.Preconditions.checkNotNull;
+
+import java.util.Map;
+import javax.annotation.Nullable;
+
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.util.CoderUtils;
+import org.apache.kafka.common.serialization.Serializer;
+
+/**
+ * Implements Kafka's {@link Serializer} with a {@link Coder}.
+ *
+ * <p>As Kafka instantiates serializers directly, the coder
+ * must be stored as serialized value in the producer configuration map.
+ */
+public class CoderBasedKafkaSerializer<T> implements Serializer<T> {
+  @SuppressWarnings("unchecked")
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {
+    String configKey = isKey ? configForKeySerializer() : configForValueSerializer();
+    coder = (Coder<T>) configs.get(configKey);
+    checkNotNull(coder, "could not instantiate coder for Kafka serialization");
+  }
+
+  @Override
+  public byte[] serialize(String topic, @Nullable T data) {
+    if (data == null) {
+      return null; // common for keys to be null
+    }
+
+    try {
+      return CoderUtils.encodeToByteArray(coder, data);
+    } catch (CoderException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public void close() {
+  }
+
+  public static String configForKeySerializer() {
+    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "key");
+  }
+
+  public static String configForValueSerializer() {
+    return String.format(CoderBasedKafkaSerializer.CONFIG_FORMAT, "value");
+  }
+
+  private Coder<T> coder = null;
+  private static final String CONFIG_FORMAT = "beam.coder.based.kafka.%s.serializer";
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
new file mode 100644
index 0000000..fe4749f
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantDeserializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.joda.time.Instant;
+
+/**
+ * Kafka {@link Deserializer} for {@link Instant}.
+ *
+ * <p>This decodes the number of milliseconds since epoch using {@link LongDeserializer}.
+ */
+public class InstantDeserializer implements Deserializer<Instant> {
+  private static final LongDeserializer LONG_DESERIALIZER = new LongDeserializer();
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {}
+
+  @Override
+  public Instant deserialize(String topic, byte[] bytes) {
+    return new Instant(LONG_DESERIALIZER.deserialize(topic, bytes));
+  }
+
+  @Override
+  public void close() {}
+}

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
new file mode 100644
index 0000000..8fa4429
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/InstantSerializer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.beam.sdk.io.kafka.serialization;
+
+import java.util.Map;
+
+import org.apache.kafka.common.serialization.LongSerializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.joda.time.Instant;
+
+/**
+ * Kafka {@link Serializer} for {@link Instant}.
+ *
+ * <p>This encodes the number of milliseconds since epoch using {@link LongSerializer}.
+ */
+public class InstantSerializer implements Serializer<Instant> {
+  private static final LongSerializer LONG_SERIALIZER = new LongSerializer();
+
+  @Override
+  public void configure(Map<String, ?> configs, boolean isKey) {}
+
+  @Override
+  public byte[] serialize(String topic, Instant instant) {
+    return LONG_SERIALIZER.serialize(topic, instant.getMillis());
+  }
+
+  @Override
+  public void close() {}
+};

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
new file mode 100644
index 0000000..747d64c
--- /dev/null
+++ b/sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/serialization/package-info.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/**
+ * Kafka serializers and deserializers.
+ */
+package org.apache.beam.sdk.io.kafka.serialization;

http://git-wip-us.apache.org/repos/asf/beam/blob/d841e5db/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
----------------------------------------------------------------------
diff --git a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
index 2b11162..e6ed2f7 100644
--- a/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
+++ b/sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaIOTest.java
@@ -41,12 +41,15 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import javax.annotation.Nullable;
 import org.apache.beam.sdk.Pipeline.PipelineExecutionException;
-import org.apache.beam.sdk.coders.BigEndianIntegerCoder;
-import org.apache.beam.sdk.coders.BigEndianLongCoder;
-import org.apache.beam.sdk.coders.ByteArrayCoder;
+import org.apache.beam.sdk.coders.CoderRegistry;
+import org.apache.beam.sdk.coders.InstantCoder;
+import org.apache.beam.sdk.coders.StringUtf8Coder;
+import org.apache.beam.sdk.coders.VarIntCoder;
+import org.apache.beam.sdk.coders.VarLongCoder;
 import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.UnboundedSource;
 import org.apache.beam.sdk.io.UnboundedSource.UnboundedReader;
+import org.apache.beam.sdk.io.kafka.serialization.InstantDeserializer;
 import org.apache.beam.sdk.options.PipelineOptionsFactory;
 import org.apache.beam.sdk.testing.PAssert;
 import org.apache.beam.sdk.testing.TestPipeline;
@@ -74,7 +77,14 @@ import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.ByteArrayDeserializer;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.LongDeserializer;
+import org.apache.kafka.common.serialization.LongSerializer;
 import org.apache.kafka.common.serialization.Serializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.utils.Utils;
 import org.hamcrest.collection.IsIterableContainingInAnyOrder;
 import org.joda.time.Instant;
@@ -231,8 +241,8 @@ public class KafkaIOTest {
         .withTopics(topics)
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 20 partitions
-        .withKeyCoder(BigEndianIntegerCoder.of())
-        .withValueCoder(BigEndianLongCoder.of())
+        .withKeyDeserializer(IntegerDeserializer.class)
+        .withValueDeserializer(LongDeserializer.class)
         .withMaxNumRecords(numElements);
 
     if (timestampFn != null) {
@@ -303,9 +313,9 @@ public class KafkaIOTest {
         .withTopic("my_topic")
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             ImmutableList.of(topic), 10, numElements, OffsetResetStrategy.EARLIEST))
-        .withKeyCoder(BigEndianIntegerCoder.of())
-        .withValueCoder(BigEndianLongCoder.of())
-        .withMaxNumRecords(numElements);
+        .withMaxNumRecords(numElements)
+        .withKeyDeserializer(IntegerDeserializer.class)
+        .withValueDeserializer(LongDeserializer.class);
 
     PCollection<Long> input = p
         .apply(reader.withoutMetadata())
@@ -326,8 +336,8 @@ public class KafkaIOTest {
         .withTopicPartitions(ImmutableList.of(new TopicPartition("test", 5)))
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             topics, 10, numElements, OffsetResetStrategy.EARLIEST)) // 10 partitions
-        .withKeyCoder(ByteArrayCoder.of())
-        .withValueCoder(BigEndianLongCoder.of())
+        .withKeyDeserializer(ByteArrayDeserializer.class)
+        .withValueDeserializer(LongDeserializer.class)
         .withMaxNumRecords(numElements / 10);
 
     PCollection<Long> input = p
@@ -386,8 +396,14 @@ public class KafkaIOTest {
     int numElements = 1000;
     int numSplits = 10;
 
+    // Coders must be specified explicitly here due to the way the transform
+    // is used in the test.
     UnboundedSource<KafkaRecord<Integer, Long>, ?> initial =
-        mkKafkaReadTransform(numElements, null).makeSource();
+        mkKafkaReadTransform(numElements, null)
+                .withKeyCoder(VarIntCoder.of())
+                .withValueCoder(VarLongCoder.of())
+                .makeSource();
+
     List<? extends UnboundedSource<KafkaRecord<Integer, Long>, ?>> splits =
         initial.split(numSplits, p.getOptions());
     assertEquals("Expected exact splitting", numSplits, splits.size());
@@ -510,8 +526,8 @@ public class KafkaIOTest {
         .withTopics(topics)
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             topics, 10, numElements, OffsetResetStrategy.LATEST))
-        .withKeyCoder(BigEndianIntegerCoder.of())
-        .withValueCoder(BigEndianLongCoder.of())
+        .withKeyDeserializer(IntegerDeserializer.class)
+        .withValueDeserializer(LongDeserializer.class)
         .withMaxNumRecords(numElements)
         .withTimestampFn(new ValueAsTimestampFn())
         .makeSource()
@@ -554,8 +570,8 @@ public class KafkaIOTest {
         .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
-            .withKeyCoder(BigEndianIntegerCoder.of())
-            .withValueCoder(BigEndianLongCoder.of())
+            .withKeySerializer(IntegerSerializer.class)
+            .withValueSerializer(LongSerializer.class)
             .withProducerFactoryFn(new ProducerFactoryFn()));
 
       p.run();
@@ -587,7 +603,7 @@ public class KafkaIOTest {
         .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
-            .withValueCoder(BigEndianLongCoder.of())
+            .withValueSerializer(LongSerializer.class)
             .withProducerFactoryFn(new ProducerFactoryFn())
             .values());
 
@@ -628,8 +644,8 @@ public class KafkaIOTest {
         .apply(KafkaIO.<Integer, Long>write()
             .withBootstrapServers("none")
             .withTopic(topic)
-            .withKeyCoder(BigEndianIntegerCoder.of())
-            .withValueCoder(BigEndianLongCoder.of())
+            .withKeySerializer(IntegerSerializer.class)
+            .withValueSerializer(LongSerializer.class)
             .withProducerFactoryFn(new ProducerFactoryFn()));
 
       try {
@@ -664,8 +680,8 @@ public class KafkaIOTest {
             new TopicPartition("test", 6)))
         .withConsumerFactoryFn(new ConsumerFactoryFn(
             Lists.newArrayList("test"), 10, 10, OffsetResetStrategy.EARLIEST)) // 10 partitions
-        .withKeyCoder(ByteArrayCoder.of())
-        .withValueCoder(BigEndianLongCoder.of());
+        .withKeyDeserializer(ByteArrayDeserializer.class)
+        .withValueDeserializer(LongDeserializer.class);
 
     DisplayData displayData = DisplayData.from(read);
 
@@ -681,7 +697,7 @@ public class KafkaIOTest {
     KafkaIO.Write<Integer, Long> write = KafkaIO.<Integer, Long>write()
         .withBootstrapServers("myServerA:9092,myServerB:9092")
         .withTopic("myTopic")
-        .withValueCoder(BigEndianLongCoder.of())
+        .withValueSerializer(LongSerializer.class)
         .withProducerFactoryFn(new ProducerFactoryFn());
 
     DisplayData displayData = DisplayData.from(write);
@@ -691,6 +707,51 @@ public class KafkaIOTest {
     assertThat(displayData, hasDisplayItem("retries", 3));
   }
 
+  // interface for testing coder inference
+  private interface DummyInterface<T> {
+  }
+
+  // interface for testing coder inference
+  private interface DummyNonparametricInterface {
+  }
+
+  // class for testing coder inference
+  private static class DeserializerWithInterfaces
+          implements DummyInterface<String>, DummyNonparametricInterface,
+            Deserializer<Long> {
+
+    @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+    }
+
+    @Override
+    public Long deserialize(String topic, byte[] bytes) {
+      return 0L;
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  @Test
+  public void testInferKeyCoder() {
+    CoderRegistry registry = CoderRegistry.createDefault();
+
+    assertTrue(KafkaIO.inferCoder(registry, LongDeserializer.class)
+            instanceof VarLongCoder);
+
+    assertTrue(KafkaIO.inferCoder(registry, StringDeserializer.class)
+            instanceof StringUtf8Coder);
+
+    assertTrue(KafkaIO.inferCoder(registry, InstantDeserializer.class)
+            instanceof InstantCoder);
+
+    assertTrue(KafkaIO.inferCoder(registry, DeserializerWithInterfaces.class)
+            instanceof VarLongCoder);
+  }
+
   private static void verifyProducerRecords(String topic, int numElements, boolean keyIsAbsent) {
 
     // verify that appropriate messages are written to kafka
@@ -724,8 +785,8 @@ public class KafkaIOTest {
   private static final MockProducer<Integer, Long> MOCK_PRODUCER =
     new MockProducer<Integer, Long>(
       false, // disable synchronous completion of send. see ProducerSendCompletionThread below.
-      new KafkaIO.CoderBasedKafkaSerializer<Integer>(),
-      new KafkaIO.CoderBasedKafkaSerializer<Long>()) {
+      new IntegerSerializer(),
+      new LongSerializer()) {
 
       // override flush() so that it does not complete all the waiting sends, giving a chance to
       // ProducerCompletionThread to inject errors.
@@ -754,10 +815,14 @@ public class KafkaIOTest {
     public Producer<Integer, Long> apply(Map<String, Object> config) {
 
       // Make sure the config is correctly set up for serializers.
-      Utils.newInstance(
-          ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
-              .asSubclass(Serializer.class)
-      ).configure(config, true);
+
+      // There may not be a key serializer if we're interested only in values.
+      if (config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG) != null) {
+        Utils.newInstance(
+                ((Class<?>) config.get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG))
+                        .asSubclass(Serializer.class)
+        ).configure(config, true);
+      }
 
       Utils.newInstance(
           ((Class<?>) config.get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG))


Mime
View raw message