kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject kafka git commit: KAFKA-5311: Support ExtendedDeserializer in Kafka Streams
Date Fri, 02 Jun 2017 08:26:34 GMT
Repository: kafka
Updated Branches:
  refs/heads/0.11.0 da6da5356 -> 963c423c1


KAFKA-5311: Support ExtendedDeserializer in Kafka Streams

Author: Dale Peakall <dale@peakall.net>

Reviewers: Michael André Pearce <michael.andre.pearce@me.com>, Matthias J. Sax <matthias@confluent.io>,
Bill Bejeck <bbejeck@gmail.com>, Damian Guy <damian.guy@gmail.com>, Ismael Juma
<ismael@juma.me.uk>

Closes #3199 from subnova/streams-extendeddeserializer

(cherry picked from commit 1188db5657e1db6944b59b1c91a75e47856f4e5c)
Signed-off-by: Ismael Juma <ismael@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/963c423c
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/963c423c
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/963c423c

Branch: refs/heads/0.11.0
Commit: 963c423c1923add6c7e71c3bf8a34df6d12c385a
Parents: da6da53
Author: Dale Peakall <dale@peakall.net>
Authored: Fri Jun 2 09:25:58 2017 +0100
Committer: Ismael Juma <ismael@juma.me.uk>
Committed: Fri Jun 2 09:26:45 2017 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  5 +-
 .../kafka/clients/producer/KafkaProducer.java   |  6 +-
 .../kafka/clients/producer/MockProducer.java    |  6 +-
 .../serialization/ExtendedDeserializer.java     |  4 ++
 .../serialization/ExtendedSerializer.java       |  4 ++
 .../kstream/internals/ChangedDeserializer.java  | 24 +++++---
 .../kstream/internals/ChangedSerializer.java    | 22 ++++---
 .../streams/processor/internals/SourceNode.java | 24 +++++---
 .../internals/SourceNodeRecordDeserializer.java |  4 +-
 .../SourceNodeRecordDeserializerTest.java       |  5 +-
 .../processor/internals/SourceNodeTest.java     | 65 ++++++++++++++++++++
 11 files changed, 128 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index defbbb7..9be8aa3 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -85,6 +85,7 @@ import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static java.util.Collections.emptyList;
+import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
 
 /**
  * This class manage the fetching process with the brokers.
@@ -150,10 +151,6 @@ public class Fetcher<K, V> implements SubscriptionState.Listener,
Closeable {
         subscriptions.addListener(this);
     }
 
-    private <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T>
deserializer) {
-        return deserializer instanceof ExtendedDeserializer ? (ExtendedDeserializer<T>)
deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
-    }
-
     /**
      * Represents data about an offset returned by a broker.
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 89a18e3..1d16721 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -73,6 +73,8 @@ import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
+import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
+
 /**
  * A Kafka client that publishes records to the Kafka cluster.
  * <P>
@@ -335,10 +337,6 @@ public class KafkaProducer<K, V> implements Producer<K, V>
{
         }
     }
 
-    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer)
{
-        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer
: new ExtendedSerializer.Wrapper<>(serializer);
-    }
-
     private static TransactionManager configureTransactionState(ProducerConfig config) {
 
         TransactionManager transactionManager = null;

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
index 566e43a..210c2bb 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/MockProducer.java
@@ -41,6 +41,8 @@ import java.util.Objects;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 
+import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
+
 /**
  * A mock of the producer interface you can use for testing code that uses Kafka.
  * <p>
@@ -131,10 +133,6 @@ public class MockProducer<K, V> implements Producer<K, V>
{
         this(Cluster.empty(), false, null, null, null);
     }
 
-    private <T> ExtendedSerializer<T> ensureExtended(Serializer<T> serializer)
{
-        return serializer instanceof ExtendedSerializer ? (ExtendedSerializer<T>) serializer
: new ExtendedSerializer.Wrapper<>(serializer);
-    }
-
     @Override
     public void initTransactions() {
         verifyProducerState();

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
index 5de154a..8e95d8d 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedDeserializer.java
@@ -52,5 +52,9 @@ public interface ExtendedDeserializer<T> extends Deserializer<T>
{
         public void close() {
             deserializer.close();
         }
+
+        public static <T> ExtendedDeserializer<T> ensureExtended(Deserializer<T>
deserializer) {
+            return deserializer == null ?  null : deserializer instanceof ExtendedDeserializer
? (ExtendedDeserializer<T>) deserializer : new ExtendedDeserializer.Wrapper<>(deserializer);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
index 8740631..32659b4 100644
--- a/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
+++ b/clients/src/main/java/org/apache/kafka/common/serialization/ExtendedSerializer.java
@@ -51,5 +51,9 @@ public interface ExtendedSerializer<T> extends Serializer<T>
{
         public void close() {
             serializer.close();
         }
+
+        public static <T> ExtendedSerializer<T> ensureExtended(Serializer<T>
serializer) {
+            return serializer == null ? null : serializer instanceof ExtendedSerializer ?
(ExtendedSerializer<T>) serializer : new ExtendedSerializer.Wrapper<>(serializer);
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
index 768bfd8..1363a0b 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedDeserializer.java
@@ -16,27 +16,31 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.ExtendedDeserializer;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-public class ChangedDeserializer<T> implements Deserializer<Change<T>>
{
+import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
+
+public class ChangedDeserializer<T> implements ExtendedDeserializer<Change<T>>
{
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private Deserializer<T> inner;
+    private ExtendedDeserializer<T> inner;
 
     public ChangedDeserializer(Deserializer<T> inner) {
-        this.inner = inner;
+        this.inner = ensureExtended(inner);
     }
 
-    public Deserializer<T> inner() {
+    public ExtendedDeserializer<T> inner() {
         return inner;
     }
 
     public void setInner(Deserializer<T> inner) {
-        this.inner = inner;
+        this.inner = ensureExtended(inner);
     }
 
     @Override
@@ -45,19 +49,23 @@ public class ChangedDeserializer<T> implements Deserializer<Change<T>>
{
     }
 
     @Override
-    public Change<T> deserialize(String topic, byte[] data) {
+    public Change<T> deserialize(String topic, Headers headers, byte[] data) {
 
         byte[] bytes = new byte[data.length - NEWFLAG_SIZE];
 
         System.arraycopy(data, 0, bytes, 0, bytes.length);
 
         if (ByteBuffer.wrap(data).get(data.length - NEWFLAG_SIZE) != 0) {
-            return new Change<>(inner.deserialize(topic, bytes), null);
+            return new Change<>(inner.deserialize(topic, headers, bytes), null);
         } else {
-            return new Change<>(null, inner.deserialize(topic, bytes));
+            return new Change<>(null, inner.deserialize(topic, headers, bytes));
         }
     }
 
+    @Override
+    public Change<T> deserialize(String topic, byte[] data) {
+        return deserialize(topic, null, data);
+    }
 
     @Override
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
index 961e6ff..fa261cb 100644
--- a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/ChangedSerializer.java
@@ -16,20 +16,24 @@
  */
 package org.apache.kafka.streams.kstream.internals;
 
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.ExtendedSerializer;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.errors.StreamsException;
 
 import java.nio.ByteBuffer;
 import java.util.Map;
 
-public class ChangedSerializer<T> implements Serializer<Change<T>> {
+import static org.apache.kafka.common.serialization.ExtendedSerializer.Wrapper.ensureExtended;
+
+public class ChangedSerializer<T> implements ExtendedSerializer<Change<T>>
{
 
     private static final int NEWFLAG_SIZE = 1;
 
-    private Serializer<T> inner;
+    private ExtendedSerializer<T> inner;
 
     public ChangedSerializer(Serializer<T> inner) {
-        this.inner = inner;
+        this.inner = ensureExtended(inner);
     }
 
     public Serializer<T> inner() {
@@ -37,7 +41,7 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
     }
 
     public void setInner(Serializer<T> inner) {
-        this.inner = inner;
+        this.inner = ensureExtended(inner);
     }
 
     @Override
@@ -50,7 +54,7 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
      * both values are not null
      */
     @Override
-    public byte[] serialize(String topic, Change<T> data) {
+    public byte[] serialize(String topic, Headers headers, Change<T> data) {
         byte[] serializedKey;
 
         // only one of the old / new values would be not null
@@ -59,12 +63,12 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
                 throw new StreamsException("Both old and new values are not null (" + data.oldValue
                         + " : " + data.newValue + ") in ChangeSerializer, which is not allowed.");
 
-            serializedKey = inner.serialize(topic, data.newValue);
+            serializedKey = inner.serialize(topic, headers, data.newValue);
         } else {
             if (data.oldValue == null)
                 throw new StreamsException("Both old and new values are null in ChangeSerializer,
which is not allowed.");
 
-            serializedKey = inner.serialize(topic, data.oldValue);
+            serializedKey = inner.serialize(topic, headers, data.oldValue);
         }
 
         ByteBuffer buf = ByteBuffer.allocate(serializedKey.length + NEWFLAG_SIZE);
@@ -74,6 +78,10 @@ public class ChangedSerializer<T> implements Serializer<Change<T>>
{
         return buf.array();
     }
 
+    @Override
+    public byte[] serialize(String topic, Change<T> data) {
+        return serialize(topic, null, data);
+    }
 
     @Override
     public void close() {

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
index 486a2ea..6d450dd 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNode.java
@@ -16,40 +16,44 @@
  */
 package org.apache.kafka.streams.processor.internals;
 
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.ExtendedDeserializer;
 import org.apache.kafka.streams.kstream.internals.ChangedDeserializer;
 import org.apache.kafka.streams.processor.ProcessorContext;
 import org.apache.kafka.streams.processor.TimestampExtractor;
 
 import java.util.List;
 
+import static org.apache.kafka.common.serialization.ExtendedDeserializer.Wrapper.ensureExtended;
+
 public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
     private final List<String> topics;
 
     private ProcessorContext context;
-    private Deserializer<K> keyDeserializer;
-    private Deserializer<V> valDeserializer;
+    private ExtendedDeserializer<K> keyDeserializer;
+    private ExtendedDeserializer<V> valDeserializer;
     private final TimestampExtractor timestampExtractor;
 
     public SourceNode(String name, List<String> topics, TimestampExtractor timestampExtractor,
Deserializer<K> keyDeserializer, Deserializer<V> valDeserializer) {
         super(name);
         this.topics = topics;
         this.timestampExtractor = timestampExtractor;
-        this.keyDeserializer = keyDeserializer;
-        this.valDeserializer = valDeserializer;
+        this.keyDeserializer = ensureExtended(keyDeserializer);
+        this.valDeserializer = ensureExtended(valDeserializer);
     }
 
     public SourceNode(String name, List<String> topics, Deserializer<K> keyDeserializer,
Deserializer<V> valDeserializer) {
         this(name, topics, null, keyDeserializer, valDeserializer);
     }
 
-    K deserializeKey(String topic, byte[] data) {
-        return keyDeserializer.deserialize(topic, data);
+    K deserializeKey(String topic, Headers headers, byte[] data) {
+        return keyDeserializer.deserialize(topic, headers, data);
     }
 
-    V deserializeValue(String topic, byte[] data) {
-        return valDeserializer.deserialize(topic, data);
+    V deserializeValue(String topic, Headers headers, byte[] data) {
+        return valDeserializer.deserialize(topic, headers, data);
     }
 
     @SuppressWarnings("unchecked")
@@ -60,9 +64,9 @@ public class SourceNode<K, V> extends ProcessorNode<K, V> {
 
         // if deserializers are null, get the default ones from the context
         if (this.keyDeserializer == null)
-            this.keyDeserializer = (Deserializer<K>) context.keySerde().deserializer();
+            this.keyDeserializer = ensureExtended((Deserializer<K>) context.keySerde().deserializer());
         if (this.valDeserializer == null)
-            this.valDeserializer = (Deserializer<V>) context.valueSerde().deserializer();
+            this.valDeserializer = ensureExtended((Deserializer<V>) context.valueSerde().deserializer());
 
         // if value deserializers are for {@code Change} values, set the inner deserializer
when necessary
         if (this.valDeserializer instanceof ChangedDeserializer &&

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
----------------------------------------------------------------------
diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
index 0e9f1ec..f66c0d9 100644
--- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
+++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializer.java
@@ -33,7 +33,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
     public ConsumerRecord<Object, Object> deserialize(final ConsumerRecord<byte[],
byte[]> rawRecord) {
         final Object key;
         try {
-            key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.key());
+            key = sourceNode.deserializeKey(rawRecord.topic(), rawRecord.headers(), rawRecord.key());
         } catch (Exception e) {
             throw new StreamsException(format("Failed to deserialize key for record. topic=%s,
partition=%d, offset=%d",
                                               rawRecord.topic(), rawRecord.partition(), rawRecord.offset()),
e);
@@ -41,7 +41,7 @@ class SourceNodeRecordDeserializer implements RecordDeserializer {
 
         final Object value;
         try {
-            value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.value());
+            value = sourceNode.deserializeValue(rawRecord.topic(), rawRecord.headers(), rawRecord.value());
         } catch (Exception e) {
             throw new StreamsException(format("Failed to deserialize value for record. topic=%s,
partition=%d, offset=%d",
                                               rawRecord.topic(), rawRecord.partition(), rawRecord.offset()),
e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
index b2b1016..a9f41e7 100644
--- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeRecordDeserializerTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.streams.processor.internals;
 
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.header.Headers;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.streams.errors.StreamsException;
 import org.junit.Test;
@@ -91,7 +92,7 @@ public class SourceNodeRecordDeserializerTest {
         }
 
         @Override
-        public Object deserializeKey(final String topic, final byte[] data) {
+        public Object deserializeKey(final String topic, final Headers headers, final byte[]
data) {
             if (keyThrowsException) {
                 throw new RuntimeException();
             }
@@ -99,7 +100,7 @@ public class SourceNodeRecordDeserializerTest {
         }
 
         @Override
-        public Object deserializeValue(final String topic, final byte[] data) {
+        public Object deserializeValue(final String topic, final Headers headers, final byte[]
data) {
             if (valueThrowsException) {
                 throw new RuntimeException();
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/963c423c/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
----------------------------------------------------------------------
diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
new file mode 100644
index 0000000..4c7c3a6
--- /dev/null
+++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/SourceNodeTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.serialization.ExtendedDeserializer;
+import org.apache.kafka.test.MockSourceNode;
+import org.junit.Test;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.MatcherAssert.assertThat;
+
+public class SourceNodeTest {
+    @Test
+    public void shouldProvideTopicHeadersAndDataToKeyDeserializer() {
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
String[]{""}, new TheExtendedDeserializer(), new TheExtendedDeserializer());
+        final RecordHeaders headers = new RecordHeaders();
+        final String deserializeKey = sourceNode.deserializeKey("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
+        assertThat(deserializeKey, is("topic" + headers + "data"));
+    }
+
+    @Test
+    public void shouldProvideTopicHeadersAndDataToValueDeserializer() {
+        final SourceNode<String, String> sourceNode = new MockSourceNode<>(new
String[]{""}, new TheExtendedDeserializer(), new TheExtendedDeserializer());
+        final RecordHeaders headers = new RecordHeaders();
+        final String deserializedValue = sourceNode.deserializeValue("topic", headers, "data".getBytes(StandardCharsets.UTF_8));
+        assertThat(deserializedValue, is("topic" + headers + "data"));
+    }
+
+    public static class TheExtendedDeserializer implements ExtendedDeserializer<String>
{
+        @Override
+        public String deserialize(final String topic, final Headers headers, final byte[]
data) {
+            return topic + headers + new String(data, StandardCharsets.UTF_8);
+        }
+
+        @Override
+        public void configure(final Map<String, ?> configs, final boolean isKey) {
}
+
+        @Override
+        public String deserialize(final String topic, final byte[] data) {
+            return deserialize(topic, null, data);
+        }
+
+        @Override
+        public void close() { }
+    }
+}
\ No newline at end of file


Mime
View raw message