kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch trunk updated: KAFKA-5142: Add Connect support for message headers (KIP-145)
Date Wed, 31 Jan 2018 18:40:38 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4c48942  KAFKA-5142: Add Connect support for message headers (KIP-145)
4c48942 is described below

commit 4c48942f9d9e1428e21f934746cb7ce22b3df746
Author: Randall Hauch <rhauch@gmail.com>
AuthorDate: Wed Jan 31 10:40:24 2018 -0800

    KAFKA-5142: Add Connect support for message headers (KIP-145)
    
    **[KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect) has been accepted, and this PR implements KIP-145 except without the SMTs.**
    
    Changed the Connect API and runtime to support message headers as described in [KIP-145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect).
    
    The new `Header` interface defines an immutable representation of a Kafka header (key-value pair) with support for the Connect value types and schemas. This interface provides methods for easily converting between many of the built-in primitive, structured, and logical data types.
    
    The new `Headers` interface defines an ordered collection of headers and is used to track all headers associated with a `ConnectRecord` (and thus `SourceRecord` and `SinkRecord`). This does allow multiple headers with the same key. The `Headers` contains methods for adding, removing, finding, and modifying headers. Convenience methods allow connectors and transforms to easily use and modify the headers for a record.
    
    A new `HeaderConverter` interface is also defined to enable the Connect runtime framework to be able to serialize and deserialize headers between the in-memory representation and Kafka’s byte[] representation. A new `SimpleHeaderConverter` implementation has been added, and this serializes to strings and deserializes by inferring the schemas (`Struct` header values are serialized without the schemas, so they can only be deserialized as `Map` instances without a schema.) The `StringCon [...]
    
    Unit and integration tests are added for `ConnectHeader` and `ConnectHeaders`, the two implementation classes for headers. Additional test methods are added for the methods added to the `Converter` implementations. Finally, the `ConnectRecord` object is already used heavily, so only limited tests need to be added while quite a few of the existing tests already cover the changes.
    
    Author: Randall Hauch <rhauch@gmail.com>
    
    Reviewers: Arjun Satish <arjun@confluent.io>, Ted Yu <yuzhihong@gmail.com>, Magesh Nandakumar <magesh.n.kumar@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #4319 from rhauch/kafka-5142-b
---
 checkstyle/import-control.xml                      |    1 +
 checkstyle/suppressions.xml                        |   10 +-
 .../kafka/connect/connector/ConnectRecord.java     |   61 +-
 .../java/org/apache/kafka/connect/data/Values.java | 1117 ++++++++++++++++++++
 .../apache/kafka/connect/header/ConnectHeader.java |   97 ++
 .../kafka/connect/header/ConnectHeaders.java       |  519 +++++++++
 .../org/apache/kafka/connect/header/Header.java    |   66 ++
 .../org/apache/kafka/connect/header/Headers.java   |  308 ++++++
 .../org/apache/kafka/connect/sink/SinkRecord.java  |   16 +-
 .../apache/kafka/connect/source/SourceRecord.java  |   19 +-
 .../kafka/connect/storage/ConverterConfig.java     |   58 +
 .../kafka/connect/storage/ConverterType.java       |   64 ++
 .../kafka/connect/storage/HeaderConverter.java     |   53 +
 .../connect/storage/SimpleHeaderConverter.java     |   85 ++
 .../kafka/connect/storage/StringConverter.java     |   59 +-
 .../connect/storage/StringConverterConfig.java     |   60 ++
 .../org/apache/kafka/connect/data/ValuesTest.java  |  350 ++++++
 .../kafka/connect/header/ConnectHeaderTest.java    |  108 ++
 .../kafka/connect/header/ConnectHeadersTest.java   |  547 ++++++++++
 .../apache/kafka/connect/sink/SinkRecordTest.java  |  128 +++
 .../kafka/connect/source/SourceRecordTest.java     |  129 +++
 .../kafka/connect/storage/ConverterTypeTest.java   |   31 +
 .../connect/storage/SimpleHeaderConverterTest.java |  220 ++++
 .../kafka/connect/storage/StringConverterTest.java |   18 +
 .../apache/kafka/connect/json/JsonConverter.java   |   61 +-
 .../kafka/connect/json/JsonConverterConfig.java    |   79 ++
 .../kafka/connect/json/JsonConverterTest.java      |   17 +
 .../connect/converters/ByteArrayConverter.java     |   32 +-
 .../kafka/connect/runtime/ConnectorConfig.java     |   19 +-
 .../org/apache/kafka/connect/runtime/Worker.java   |   48 +-
 .../apache/kafka/connect/runtime/WorkerConfig.java |   16 +-
 .../kafka/connect/runtime/WorkerSinkTask.java      |   23 +-
 .../kafka/connect/runtime/WorkerSourceTask.java    |   24 +-
 .../kafka/connect/runtime/isolation/Plugins.java   |   22 +-
 .../kafka/connect/runtime/AbstractHerderTest.java  |   56 +-
 .../runtime/SourceTaskOffsetCommitterTest.java     |   35 +-
 .../kafka/connect/runtime/WorkerSinkTaskTest.java  |    5 +-
 .../runtime/WorkerSinkTaskThreadedTest.java        |    4 +-
 .../connect/runtime/WorkerSourceTaskTest.java      |    6 +-
 .../kafka/connect/runtime/WorkerTaskTest.java      |   21 +-
 .../apache/kafka/connect/runtime/WorkerTest.java   |   40 +-
 docs/connect.html                                  |    4 +-
 docs/upgrade.html                                  |    1 +
 43 files changed, 4479 insertions(+), 158 deletions(-)

diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 18e76e7..5662552 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -277,6 +277,7 @@
     <allow pkg="org.apache.kafka.common" />
     <allow pkg="org.apache.kafka.connect.data" />
     <allow pkg="org.apache.kafka.connect.errors" />
+    <allow pkg="org.apache.kafka.connect.header" />
     <allow pkg="org.apache.kafka.clients" />
     <allow pkg="org.apache.kafka.test"/>
 
diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index c6c2d23..f06155f 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -57,7 +57,7 @@
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
 
     <suppress checks="NPathComplexity"
-              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent).java"/>
+              files="(BufferPool|MetricName|Node|ConfigDef|SslTransportLayer|MetadataResponse|KerberosLogin|Selector|Sender|Serdes|Agent|Values).java"/>
 
     <!-- clients tests -->
     <suppress checks="ClassDataAbstractionCoupling"
@@ -102,9 +102,13 @@
               files="DistributedHerder.java"/>
     <suppress checks="CyclomaticComplexity"
               files="KafkaConfigBackingStore.java"/>
+    <suppress checks="CyclomaticComplexity"
+              files="(Values|ConnectHeader|ConnectHeaders).java"/>
 
     <suppress checks="JavaNCSS"
               files="KafkaConfigBackingStore.java"/>
+    <suppress checks="JavaNCSS"
+              files="Values.java"/>
 
     <suppress checks="NPathComplexity"
               files="ConnectRecord.java"/>
@@ -116,6 +120,10 @@
               files="JsonConverter.java"/>
     <suppress checks="NPathComplexity"
               files="DistributedHerder.java"/>
+    <suppress checks="NPathComplexity"
+              files="ConnectHeaders.java"/>
+    <suppress checks="MethodLength"
+              files="Values.java"/>
 
     <!-- connect tests-->
     <suppress checks="ClassDataAbstractionCoupling"
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index 344e365..2ad8a04 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -17,6 +17,11 @@
 package org.apache.kafka.connect.connector;
 
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+
+import java.util.Objects;
 
 /**
  * <p>
@@ -34,11 +39,19 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
     private final Schema valueSchema;
     private final Object value;
     private final Long timestamp;
+    private final Headers headers;
 
     public ConnectRecord(String topic, Integer kafkaPartition,
                          Schema keySchema, Object key,
                          Schema valueSchema, Object value,
                          Long timestamp) {
+        this(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, new ConnectHeaders());
+    }
+
+    public ConnectRecord(String topic, Integer kafkaPartition,
+                         Schema keySchema, Object key,
+                         Schema valueSchema, Object value,
+                         Long timestamp, Iterable<Header> headers) {
         this.topic = topic;
         this.kafkaPartition = kafkaPartition;
         this.keySchema = keySchema;
@@ -46,6 +59,11 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
         this.valueSchema = valueSchema;
         this.value = value;
         this.timestamp = timestamp;
+        if (headers instanceof ConnectHeaders) {
+            this.headers = (ConnectHeaders) headers;
+        } else {
+            this.headers = new ConnectHeaders(headers);
+        }
     }
 
     public String topic() {
@@ -76,9 +94,46 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
         return timestamp;
     }
 
-    /** Generate a new record of the same type as itself, with the specified parameter values. **/
+    /**
+     * Get the headers for this record.
+     *
+     * @return the headers; never null
+     */
+    public Headers headers() {
+        return headers;
+    }
+
+    /**
+     * Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied
+     * over to the new record. Since the headers are mutable, the resulting record will have a copy of this record's headers.
+     *
+     * @param topic the name of the topic; may be null
+     * @param kafkaPartition the partition number for the Kafka topic; may be null
+     * @param keySchema the schema for the key; may be null
+     * @param key the key; may be null
+     * @param valueSchema the schema for the value; may be null
+     * @param value the value; may be null
+     * @param timestamp the timestamp; may be null
+     * @return the new record
+     */
     public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
 
+    /**
+     * Create a new record of the same type as itself, with the specified parameter values. All other fields in this record will be copied
+     * over to the new record.
+     *
+     * @param topic the name of the topic; may be null
+     * @param kafkaPartition the partition number for the Kafka topic; may be null
+     * @param keySchema the schema for the key; may be null
+     * @param key the key; may be null
+     * @param valueSchema the schema for the value; may be null
+     * @param value the value; may be null
+     * @param timestamp the timestamp; may be null
+     * @param headers the headers; may be null or empty
+     * @return the new record
+     */
+    public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp, Iterable<Header> headers);
+
     @Override
     public String toString() {
         return "ConnectRecord{" +
@@ -87,6 +142,7 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
                 ", key=" + key +
                 ", value=" + value +
                 ", timestamp=" + timestamp +
+                ", headers=" + headers +
                 '}';
     }
 
@@ -113,6 +169,8 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
             return false;
         if (timestamp != null ? !timestamp.equals(that.timestamp) : that.timestamp != null)
             return false;
+        if (!Objects.equals(headers, that.headers))
+            return false;
 
         return true;
     }
@@ -126,6 +184,7 @@ public abstract class ConnectRecord<R extends ConnectRecord<R>> {
         result = 31 * result + (valueSchema != null ? valueSchema.hashCode() : 0);
         result = 31 * result + (value != null ? value.hashCode() : 0);
         result = 31 * result + (timestamp != null ? timestamp.hashCode() : 0);
+        result = 31 * result + headers.hashCode();
         return result;
     }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
new file mode 100644
index 0000000..41040c7
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/data/Values.java
@@ -0,0 +1,1117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.common.utils.Base64;
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.text.CharacterIterator;
+import java.text.DateFormat;
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.text.StringCharacterIterator;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.TimeZone;
+
+/**
+ * Utility for converting from one Connect value to a different form. This is useful when the caller expects a value of a particular type
+ * but is uncertain whether the actual value is one that isn't directly that type but can be converted into that type.
+ *
+ * <p>For example, a caller might expects a particular {@link org.apache.kafka.connect.header.Header} to contain an {@link Type#INT64}
+ * value, when in fact that header contains a string representation of a 32-bit integer. Here, the caller can use the methods in this
+ * class to convert the value to the desired type:
+ * <pre>
+ *     Header header = ...
+ *     long value = Values.convertToLong(header.schema(), header.value());
+ * </pre>
+ *
+ * <p>This class is able to convert any value to a string representation as well as parse those string representations back into most of
+ * the types. The only exception is {@link Struct} values that require a schema and thus cannot be parsed from a simple string.
+ */
+public class Values {
+
+    private static final Logger LOG = LoggerFactory.getLogger(Values.class);
+
+    private static final TimeZone UTC = TimeZone.getTimeZone("UTC");
+    private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
+    private static final SchemaAndValue TRUE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.TRUE);
+    private static final SchemaAndValue FALSE_SCHEMA_AND_VALUE = new SchemaAndValue(Schema.BOOLEAN_SCHEMA, Boolean.FALSE);
+    private static final Schema ARRAY_SELECTOR_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).build();
+    private static final Schema MAP_SELECTOR_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build();
+    private static final Schema STRUCT_SELECTOR_SCHEMA = SchemaBuilder.struct().build();
+    private static final String TRUE_LITERAL = Boolean.TRUE.toString();
+    private static final String FALSE_LITERAL = Boolean.TRUE.toString();
+    private static final long MILLIS_PER_DAY = 24 * 60 * 60 * 1000;
+    private static final String NULL_VALUE = "null";
+    private static final String ISO_8601_DATE_FORMAT_PATTERN = "YYYY-MM-DD";
+    private static final String ISO_8601_TIME_FORMAT_PATTERN = "HH:mm:ss.SSS'Z'";
+    private static final String ISO_8601_TIMESTAMP_FORMAT_PATTERN = ISO_8601_DATE_FORMAT_PATTERN + "'T'" + ISO_8601_TIME_FORMAT_PATTERN;
+
+    private static final String QUOTE_DELIMITER = "\"";
+    private static final String COMMA_DELIMITER = ",";
+    private static final String ENTRY_DELIMITER = ":";
+    private static final String ARRAY_BEGIN_DELIMITER = "[";
+    private static final String ARRAY_END_DELIMITER = "]";
+    private static final String MAP_BEGIN_DELIMITER = "{";
+    private static final String MAP_END_DELIMITER = "}";
+    private static final int ISO_8601_DATE_LENGTH = ISO_8601_DATE_FORMAT_PATTERN.length();
+    private static final int ISO_8601_TIME_LENGTH = ISO_8601_TIME_FORMAT_PATTERN.length() - 2; // subtract single quotes
+    private static final int ISO_8601_TIMESTAMP_LENGTH = ISO_8601_TIMESTAMP_FORMAT_PATTERN.length() - 4; // subtract single quotes
+
+    /**
+     * Convert the specified value to an {@link Type#BOOLEAN} value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a boolean.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a boolean, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a boolean
+     */
+    public static Boolean convertToBoolean(Schema schema, Object value) throws DataException {
+        return (Boolean) convertTo(Schema.OPTIONAL_BOOLEAN_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#INT8} byte value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a byte.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a byte, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a byte
+     */
+    public static Byte convertToByte(Schema schema, Object value) throws DataException {
+        return (Byte) convertTo(Schema.OPTIONAL_INT8_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#INT16} short value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a short.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a short, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a short
+     */
+    public static Short convertToShort(Schema schema, Object value) throws DataException {
+        return (Short) convertTo(Schema.OPTIONAL_INT16_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#INT32} int value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to an integer.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as an integer, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to an integer
+     */
+    public static Integer convertToInteger(Schema schema, Object value) throws DataException {
+        return (Integer) convertTo(Schema.OPTIONAL_INT32_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#INT64} long value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a long.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a long, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a long
+     */
+    public static Long convertToLong(Schema schema, Object value) throws DataException {
+        return (Long) convertTo(Schema.OPTIONAL_INT64_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#FLOAT32} float value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a floating point number.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a float, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a float
+     */
+    public static Float convertToFloat(Schema schema, Object value) throws DataException {
+        return (Float) convertTo(Schema.OPTIONAL_FLOAT32_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#FLOAT64} double value. The supplied schema is required if the value is a logical
+     * type when the schema contains critical information that might be necessary for converting to a floating point number.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a double, or null if the supplied value was null
+     * @throws DataException if the value could not be converted to a double
+     */
+    public static Double convertToDouble(Schema schema, Object value) throws DataException {
+        return (Double) convertTo(Schema.OPTIONAL_FLOAT64_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#STRING} value.
+     * Not supplying a schema may limit the ability to convert to the desired type.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a string, or null if the supplied value was null
+     */
+    public static String convertToString(Schema schema, Object value) {
+        return (String) convertTo(Schema.OPTIONAL_STRING_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#ARRAY} value. If the value is a string representation of an array, this method
+     * will parse the string and its elements to infer the schemas for those elements. Thus, this method supports
+     * arrays of other primitives and structured types. If the value is already an array (or list), this method simply casts and
+     * returns it.
+     *
+     * <p>This method currently does not use the schema, though it may be used in the future.</p>
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a list, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a list value
+     */
+    public static List<?> convertToList(Schema schema, Object value) {
+        return (List<?>) convertTo(ARRAY_SELECTOR_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#MAP} value. If the value is a string representation of a map, this method
+     * will parse the string and its entries to infer the schemas for those entries. Thus, this method supports
+     * maps with primitives and structured keys and values. If the value is already a map, this method simply casts and returns it.
+     *
+     * <p>This method currently does not use the schema, though it may be used in the future.</p>
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a map, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a map value
+     */
+    public static Map<?, ?> convertToMap(Schema schema, Object value) {
+        return (Map<?, ?>) convertTo(MAP_SELECTOR_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Type#STRUCT} value. Structs cannot be converted from other types, so this method returns
+     * a struct only if the supplied value is a struct. If not a struct, this method throws an exception.
+     *
+     * <p>This method currently does not use the schema, though it may be used in the future.</p>
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a struct, or null if the supplied value was null
+     * @throws DataException if the value is not a struct
+     */
+    public static Struct convertToStruct(Schema schema, Object value) {
+        return (Struct) convertTo(STRUCT_SELECTOR_SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Time#SCHEMA time} value.
+     * Not supplying a schema may limit the ability to convert to the desired type.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a time, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a time value
+     */
+    public static java.util.Date convertToTime(Schema schema, Object value) {
+        return (java.util.Date) convertTo(Time.SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Date#SCHEMA date} value.
+     * Not supplying a schema may limit the ability to convert to the desired type.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a date, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a date value
+     */
+    public static java.util.Date convertToDate(Schema schema, Object value) {
+        return (java.util.Date) convertTo(Date.SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Timestamp#SCHEMA timestamp} value.
+     * Not supplying a schema may limit the ability to convert to the desired type.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a timestamp, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a timestamp value
+     */
+    public static java.util.Date convertToTimestamp(Schema schema, Object value) {
+        return (java.util.Date) convertTo(Timestamp.SCHEMA, schema, value);
+    }
+
+    /**
+     * Convert the specified value to an {@link Decimal decimal} value.
+     * Not supplying a schema may limit the ability to convert to the desired type.
+     *
+     * @param schema the schema for the value; may be null
+     * @param value  the value to be converted; may be null
+     * @return the representation as a decimal, or null if the supplied value was null
+     * @throws DataException if the value cannot be converted to a decimal value
+     */
+    public static BigDecimal convertToDecimal(Schema schema, Object value, int scale) {
+        return (BigDecimal) convertTo(Decimal.schema(scale), schema, value);
+    }
+
+    /**
+     * If possible infer a schema for the given value.
+     *
+     * @param value the value whose schema is to be inferred; may be null
+     * @return the inferred schema, or null if the value is null or no schema could be inferred
+     */
+    public static Schema inferSchema(Object value) {
+        if (value instanceof String) {
+            return Schema.STRING_SCHEMA;
+        }
+        if (value instanceof Boolean) {
+            return Schema.BOOLEAN_SCHEMA;
+        }
+        if (value instanceof Byte) {
+            return Schema.INT8_SCHEMA;
+        }
+        if (value instanceof Short) {
+            return Schema.INT16_SCHEMA;
+        }
+        if (value instanceof Integer) {
+            return Schema.INT32_SCHEMA;
+        }
+        if (value instanceof Long) {
+            return Schema.INT64_SCHEMA;
+        }
+        if (value instanceof Float) {
+            return Schema.FLOAT32_SCHEMA;
+        }
+        if (value instanceof Double) {
+            return Schema.FLOAT64_SCHEMA;
+        }
+        if (value instanceof byte[] || value instanceof ByteBuffer) {
+            return Schema.BYTES_SCHEMA;
+        }
+        if (value instanceof List) {
+            List<?> list = (List<?>) value;
+            if (list.isEmpty()) {
+                return null;
+            }
+            SchemaDetector detector = new SchemaDetector();
+            for (Object element : list) {
+                if (!detector.canDetect(element)) {
+                    return null;
+                }
+            }
+            return SchemaBuilder.array(detector.schema()).build();
+        }
+        if (value instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) value;
+            if (map.isEmpty()) {
+                return null;
+            }
+            SchemaDetector keyDetector = new SchemaDetector();
+            SchemaDetector valueDetector = new SchemaDetector();
+            for (Map.Entry<?, ?> entry : map.entrySet()) {
+                if (!keyDetector.canDetect(entry.getKey()) || !valueDetector.canDetect(entry.getValue())) {
+                    return null;
+                }
+            }
+            return SchemaBuilder.map(keyDetector.schema(), valueDetector.schema()).build();
+        }
+        if (value instanceof Struct) {
+            return ((Struct) value).schema();
+        }
+        return null;
+    }
+
+
+    /**
+     * Parse the specified string representation of a value into its schema and value.
+     *
+     * @param value the string form of the value
+     * @return the schema and value; never null, but whose schema and value may be null
+     * @see #convertToString
+     */
+    public static SchemaAndValue parseString(String value) {
+        if (value == null) {
+            return NULL_SCHEMA_AND_VALUE;
+        }
+        if (value.isEmpty()) {
+            return new SchemaAndValue(Schema.STRING_SCHEMA, value);
+        }
+        Parser parser = new Parser(value);
+        return parse(parser, false);
+    }
+
+    /**
+     * Convert the value to the desired type.
+     *
+     * @param toSchema   the schema for the desired type; may not be null
+     * @param fromSchema the schema for the supplied value; may be null if not known
+     * @return the converted value; never null
+     * @throws DataException if the value could not be converted to the desired type
+     */
+    protected static Object convertTo(Schema toSchema, Schema fromSchema, Object value) throws DataException {
+        if (value == null) {
+            if (toSchema.isOptional()) {
+                return null;
+            }
+            throw new DataException("Unable to convert a null value to a schema that requires a value");
+        }
+        switch (toSchema.type()) {
+            case BYTES:
+                if (Decimal.LOGICAL_NAME.equals(toSchema.name())) {
+                    if (value instanceof ByteBuffer) {
+                        value = Utils.toArray((ByteBuffer) value);
+                    }
+                    if (value instanceof byte[]) {
+                        return Decimal.toLogical(toSchema, (byte[]) value);
+                    }
+                    if (value instanceof BigDecimal) {
+                        return value;
+                    }
+                    if (value instanceof Number) {
+                        // Not already a decimal, so treat it as a double ...
+                        double converted = ((Number) value).doubleValue();
+                        return new BigDecimal(converted);
+                    }
+                    if (value instanceof String) {
+                        return new BigDecimal(value.toString()).doubleValue();
+                    }
+                }
+                if (value instanceof ByteBuffer) {
+                    return Utils.toArray((ByteBuffer) value);
+                }
+                if (value instanceof byte[]) {
+                    return value;
+                }
+                if (value instanceof BigDecimal) {
+                    return Decimal.fromLogical(toSchema, (BigDecimal) value);
+                }
+                break;
+            case STRING:
+                StringBuilder sb = new StringBuilder();
+                append(sb, value, false);
+                return sb.toString();
+            case BOOLEAN:
+                if (value instanceof Boolean) {
+                    return value;
+                }
+                if (value instanceof String) {
+                    SchemaAndValue parsed = parseString(value.toString());
+                    if (parsed.value() instanceof Boolean) {
+                        return parsed.value();
+                    }
+                }
+                return asLong(value, fromSchema, null) == 0L ? Boolean.FALSE : Boolean.TRUE;
+            case INT8:
+                if (value instanceof Byte) {
+                    return value;
+                }
+                return (byte) asLong(value, fromSchema, null);
+            case INT16:
+                if (value instanceof Short) {
+                    return value;
+                }
+                return (short) asLong(value, fromSchema, null);
+            case INT32:
+                if (Date.LOGICAL_NAME.equals(toSchema.name())) {
+                    if (value instanceof String) {
+                        SchemaAndValue parsed = parseString(value.toString());
+                        value = parsed.value();
+                    }
+                    if (value instanceof java.util.Date) {
+                        if (fromSchema != null) {
+                            String fromSchemaName = fromSchema.name();
+                            if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
+                                return value;
+                            }
+                            if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+                                // Just get the number of days from this timestamp
+                                long millis = ((java.util.Date) value).getTime();
+                                int days = (int) (millis / MILLIS_PER_DAY); // truncates
+                                return Date.toLogical(toSchema, days);
+                            }
+                        }
+                    }
+                    long numeric = asLong(value, fromSchema, null);
+                    return Date.toLogical(toSchema, (int) numeric);
+                }
+                if (Time.LOGICAL_NAME.equals(toSchema.name())) {
+                    if (value instanceof String) {
+                        SchemaAndValue parsed = parseString(value.toString());
+                        value = parsed.value();
+                    }
+                    if (value instanceof java.util.Date) {
+                        if (fromSchema != null) {
+                            String fromSchemaName = fromSchema.name();
+                            if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
+                                return value;
+                            }
+                            if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+                                // Just get the time portion of this timestamp
+                                Calendar calendar = Calendar.getInstance(UTC);
+                                calendar.setTime((java.util.Date) value);
+                                calendar.set(Calendar.YEAR, 1970);
+                                calendar.set(Calendar.MONTH, 1);
+                                calendar.set(Calendar.DAY_OF_MONTH, 1);
+                                return Time.toLogical(toSchema, (int) calendar.getTimeInMillis());
+                            }
+                        }
+                    }
+                    long numeric = asLong(value, fromSchema, null);
+                    return Time.toLogical(toSchema, (int) numeric);
+                }
+                if (value instanceof Integer) {
+                    return value;
+                }
+                return (int) asLong(value, fromSchema, null);
+            case INT64:
+                if (Timestamp.LOGICAL_NAME.equals(toSchema.name())) {
+                    if (value instanceof String) {
+                        SchemaAndValue parsed = parseString(value.toString());
+                        value = parsed.value();
+                    }
+                    if (value instanceof java.util.Date) {
+                        java.util.Date date = (java.util.Date) value;
+                        if (fromSchema != null) {
+                            String fromSchemaName = fromSchema.name();
+                            if (Date.LOGICAL_NAME.equals(fromSchemaName)) {
+                                int days = Date.fromLogical(fromSchema, date);
+                                long millis = days * MILLIS_PER_DAY;
+                                return Timestamp.toLogical(toSchema, millis);
+                            }
+                            if (Time.LOGICAL_NAME.equals(fromSchemaName)) {
+                                long millis = Time.fromLogical(fromSchema, date);
+                                return Timestamp.toLogical(toSchema, millis);
+                            }
+                            if (Timestamp.LOGICAL_NAME.equals(fromSchemaName)) {
+                                return value;
+                            }
+                        }
+                    }
+                    long numeric = asLong(value, fromSchema, null);
+                    return Timestamp.toLogical(toSchema, numeric);
+                }
+                if (value instanceof Long) {
+                    return value;
+                }
+                return asLong(value, fromSchema, null);
+            case FLOAT32:
+                if (value instanceof Float) {
+                    return value;
+                }
+                return (float) asDouble(value, fromSchema, null);
+            case FLOAT64:
+                if (value instanceof Double) {
+                    return value;
+                }
+                return asDouble(value, fromSchema, null);
+            case ARRAY:
+                if (value instanceof String) {
+                    SchemaAndValue schemaAndValue = parseString(value.toString());
+                    value = schemaAndValue.value();
+                }
+                if (value instanceof List) {
+                    return value;
+                }
+                break;
+            case MAP:
+                if (value instanceof String) {
+                    SchemaAndValue schemaAndValue = parseString(value.toString());
+                    value = schemaAndValue.value();
+                }
+                if (value instanceof Map) {
+                    return value;
+                }
+                break;
+            case STRUCT:
+                if (value instanceof Struct) {
+                    Struct struct = (Struct) value;
+                    return struct;
+                }
+        }
+        throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + toSchema);
+    }
+
+    /**
+     * Convert the specified value to the desired scalar value type.
+     *
+     * @param value      the value to be converted; may not be null
+     * @param fromSchema the schema for the current value type; may not be null
+     * @param error      any previous error that should be included in an exception message; may be null
+     * @return the long value after conversion; never null
+     * @throws DataException if the value could not be converted to a long
+     */
+    protected static long asLong(Object value, Schema fromSchema, Throwable error) {
+        try {
+            if (value instanceof Number) {
+                Number number = (Number) value;
+                return number.longValue();
+            }
+            if (value instanceof String) {
+                return new BigDecimal(value.toString()).longValue();
+            }
+        } catch (NumberFormatException e) {
+            error = e;
+            // fall through
+        }
+        if (fromSchema != null) {
+            String schemaName = fromSchema.name();
+            if (value instanceof java.util.Date) {
+                if (Date.LOGICAL_NAME.equals(schemaName)) {
+                    return Date.fromLogical(fromSchema, (java.util.Date) value);
+                }
+                if (Time.LOGICAL_NAME.equals(schemaName)) {
+                    return Time.fromLogical(fromSchema, (java.util.Date) value);
+                }
+                if (Timestamp.LOGICAL_NAME.equals(schemaName)) {
+                    return Timestamp.fromLogical(fromSchema, (java.util.Date) value);
+                }
+            }
+            throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to " + fromSchema, error);
+        }
+        throw new DataException("Unable to convert " + value + " (" + value.getClass() + ") to a number", error);
+    }
+
+    /**
+     * Convert the specified value with the desired floating point type.
+     *
+     * @param value  the value to be converted; may not be null
+     * @param schema the schema for the current value type; may not be null
+     * @param error  any previous error that should be included in an exception message; may be null
+     * @return the double value after conversion; never null
+     * @throws DataException if the value could not be converted to a double
+     */
+    protected static double asDouble(Object value, Schema schema, Throwable error) {
+        try {
+            if (value instanceof Number) {
+                Number number = (Number) value;
+                return number.doubleValue();
+            }
+            if (value instanceof String) {
+                return new BigDecimal(value.toString()).doubleValue();
+            }
+        } catch (NumberFormatException e) {
+            error = e;
+            // fall through
+        }
+        return asLong(value, schema, error);
+    }
+
+    protected static void append(StringBuilder sb, Object value, boolean embedded) {
+        if (value == null) {
+            sb.append(NULL_VALUE);
+        } else if (value instanceof Number) {
+            sb.append(value);
+        } else if (value instanceof Boolean) {
+            sb.append(value);
+        } else if (value instanceof String) {
+            if (embedded) {
+                String escaped = escape((String) value);
+                sb.append('"').append(escaped).append('"');
+            } else {
+                sb.append(value);
+            }
+        } else if (value instanceof byte[]) {
+            value = Base64.encoder().encodeToString((byte[]) value);
+            if (embedded) {
+                sb.append('"').append(value).append('"');
+            } else {
+                sb.append(value);
+            }
+        } else if (value instanceof ByteBuffer) {
+            byte[] bytes = Utils.readBytes((ByteBuffer) value);
+            append(sb, bytes, embedded);
+        } else if (value instanceof List) {
+            List<?> list = (List<?>) value;
+            sb.append('[');
+            appendIterable(sb, list.iterator());
+            sb.append(']');
+        } else if (value instanceof Map) {
+            Map<?, ?> map = (Map<?, ?>) value;
+            sb.append('{');
+            appendIterable(sb, map.entrySet().iterator());
+            sb.append('}');
+        } else if (value instanceof Struct) {
+            Struct struct = (Struct) value;
+            Schema schema = struct.schema();
+            boolean first = true;
+            sb.append('{');
+            for (Field field : schema.fields()) {
+                if (first) {
+                    first = false;
+                } else {
+                    sb.append(',');
+                }
+                append(sb, field.name(), true);
+                sb.append(':');
+                append(sb, struct.get(field), true);
+            }
+            sb.append('}');
+        } else if (value instanceof Map.Entry) {
+            Map.Entry<?, ?> entry = (Map.Entry<?, ?>) value;
+            append(sb, entry.getKey(), true);
+            sb.append(':');
+            append(sb, entry.getValue(), true);
+        } else if (value instanceof java.util.Date) {
+            java.util.Date dateValue = (java.util.Date) value;
+            String formatted = dateFormatFor(dateValue).format(dateValue);
+            sb.append(formatted);
+        } else {
+            throw new DataException("Failed to serialize unexpected value type " + value.getClass().getName() + ": " + value);
+        }
+    }
+
+    protected static void appendIterable(StringBuilder sb, Iterator<?> iter) {
+        if (iter.hasNext()) {
+            append(sb, iter.next(), true);
+            while (iter.hasNext()) {
+                sb.append(',');
+                append(sb, iter.next(), true);
+            }
+        }
+    }
+
+    protected static String escape(String value) {
+        return value.replaceAll("\\\\", "\\\\\\\\").replaceAll("\"", "\\\\\"");
+    }
+
+    protected static DateFormat dateFormatFor(java.util.Date value) {
+        if (value.getTime() < MILLIS_PER_DAY) {
+            return new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN);
+        }
+        if (value.getTime() % MILLIS_PER_DAY == 0) {
+            return new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN);
+        }
+        return new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN);
+    }
+
+    protected static SchemaAndValue parse(Parser parser, boolean embedded) throws NoSuchElementException {
+        if (!parser.hasNext()) {
+            return null;
+        }
+        if (embedded) {
+            if (parser.canConsume(NULL_VALUE)) {
+                return null;
+            }
+            if (parser.canConsume(QUOTE_DELIMITER)) {
+                StringBuilder sb = new StringBuilder();
+                while (parser.hasNext()) {
+                    if (parser.canConsume(QUOTE_DELIMITER)) {
+                        break;
+                    }
+                    sb.append(parser.next());
+                }
+                return new SchemaAndValue(Schema.STRING_SCHEMA, sb.toString());
+            }
+        }
+        if (parser.canConsume(TRUE_LITERAL)) {
+            return TRUE_SCHEMA_AND_VALUE;
+        }
+        if (parser.canConsume(FALSE_LITERAL)) {
+            return FALSE_SCHEMA_AND_VALUE;
+        }
+        int startPosition = parser.mark();
+        try {
+            if (parser.canConsume(ARRAY_BEGIN_DELIMITER)) {
+                List<Object> result = new ArrayList<>();
+                Schema elementSchema = null;
+                while (parser.hasNext()) {
+                    if (parser.canConsume(ARRAY_END_DELIMITER)) {
+                        Schema listSchema = elementSchema == null ? null : SchemaBuilder.array(elementSchema).schema();
+                        result = alignListEntriesWithSchema(listSchema, result);
+                        return new SchemaAndValue(listSchema, result);
+                    }
+                    SchemaAndValue element = parse(parser, true);
+                    elementSchema = commonSchemaFor(elementSchema, element);
+                    result.add(element.value());
+                    parser.canConsume(COMMA_DELIMITER);
+                }
+                // Missing either a comma or an end delimiter
+                if (COMMA_DELIMITER.equals(parser.previous())) {
+                    throw new DataException("Malformed array: missing element after ','");
+                }
+                throw new DataException("Malformed array: missing terminating ']'");
+            }
+
+            if (parser.canConsume(MAP_BEGIN_DELIMITER)) {
+                Map<Object, Object> result = new LinkedHashMap<>();
+                Schema keySchema = null;
+                Schema valueSchema = null;
+                while (parser.hasNext()) {
+                    if (parser.canConsume(MAP_END_DELIMITER)) {
+                        Schema mapSchema =
+                                keySchema == null || valueSchema == null ? null : SchemaBuilder.map(keySchema, valueSchema).schema();
+                        result = alignMapKeysAndValuesWithSchema(mapSchema, result);
+                        return new SchemaAndValue(mapSchema, result);
+                    }
+                    SchemaAndValue key = parse(parser, true);
+                    if (key == null || key.value() == null) {
+                        throw new DataException("Malformed map entry: null key");
+                    }
+                    if (!parser.canConsume(ENTRY_DELIMITER)) {
+                        throw new DataException("Malformed map entry: missing '='");
+                    }
+                    SchemaAndValue value = parse(parser, true);
+                    Object entryValue = value != null ? value.value() : null;
+                    result.put(key.value(), entryValue);
+                    parser.canConsume(COMMA_DELIMITER);
+                    keySchema = commonSchemaFor(keySchema, key);
+                    valueSchema = commonSchemaFor(valueSchema, value);
+                }
+                // Missing either a comma or an end delimiter
+                if (COMMA_DELIMITER.equals(parser.previous())) {
+                    throw new DataException("Malformed map: missing element after ','");
+                }
+                throw new DataException("Malformed array: missing terminating ']'");
+            }
+        } catch (DataException e) {
+            LOG.debug("Unable to parse the value as a map; reverting to string", e);
+            parser.rewindTo(startPosition);
+        }
+        String token = parser.next().trim();
+        assert !token.isEmpty(); // original can be empty string but is handled right away; no way for token to be empty here
+        char firstChar = token.charAt(0);
+        boolean firstCharIsDigit = Character.isDigit(firstChar);
+        if (firstCharIsDigit || firstChar == '+' || firstChar == '-') {
+            try {
+                // Try to parse as a number ...
+                BigDecimal decimal = new BigDecimal(token);
+                try {
+                    return new SchemaAndValue(Schema.INT8_SCHEMA, decimal.byteValueExact());
+                } catch (ArithmeticException e) {
+                    // continue
+                }
+                try {
+                    return new SchemaAndValue(Schema.INT16_SCHEMA, decimal.shortValueExact());
+                } catch (ArithmeticException e) {
+                    // continue
+                }
+                try {
+                    return new SchemaAndValue(Schema.INT32_SCHEMA, decimal.intValueExact());
+                } catch (ArithmeticException e) {
+                    // continue
+                }
+                try {
+                    return new SchemaAndValue(Schema.INT64_SCHEMA, decimal.longValueExact());
+                } catch (ArithmeticException e) {
+                    // continue
+                }
+                double dValue = decimal.doubleValue();
+                if (dValue != Double.NEGATIVE_INFINITY && dValue != Double.POSITIVE_INFINITY) {
+                    return new SchemaAndValue(Schema.FLOAT64_SCHEMA, dValue);
+                }
+                Schema schema = Decimal.schema(decimal.scale());
+                return new SchemaAndValue(schema, decimal);
+            } catch (NumberFormatException e) {
+                // can't parse as a number
+            }
+        }
+        if (firstCharIsDigit) {
+            // Check for a date, time, or timestamp ...
+            int tokenLength = token.length();
+            if (tokenLength == ISO_8601_DATE_LENGTH) {
+                try {
+                    return new SchemaAndValue(Date.SCHEMA, new SimpleDateFormat(ISO_8601_DATE_FORMAT_PATTERN).parse(token));
+                } catch (ParseException e) {
+                    // not a valid date
+                }
+            } else if (tokenLength == ISO_8601_TIME_LENGTH) {
+                try {
+                    return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIME_FORMAT_PATTERN).parse(token));
+                } catch (ParseException e) {
+                    // not a valid date
+                }
+            } else if (tokenLength == ISO_8601_TIMESTAMP_LENGTH) {
+                try {
+                    return new SchemaAndValue(Time.SCHEMA, new SimpleDateFormat(ISO_8601_TIMESTAMP_FORMAT_PATTERN).parse(token));
+                } catch (ParseException e) {
+                    // not a valid date
+                }
+            }
+        }
+        // At this point, the only thing this can be is a string. Embedded strings were processed above,
+        // so this is not embedded and we can use the original string...
+        return new SchemaAndValue(Schema.STRING_SCHEMA, parser.original());
+    }
+
+    protected static Schema commonSchemaFor(Schema previous, SchemaAndValue latest) {
+        if (latest == null) {
+            return previous;
+        }
+        if (previous == null) {
+            return latest.schema();
+        }
+        Schema newSchema = latest.schema();
+        Type previousType = previous.type();
+        Type newType = newSchema.type();
+        if (previousType != newType) {
+            switch (previous.type()) {
+                case INT8:
+                    if (newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType ==
+                                                                                                                              Type.FLOAT64) {
+                        return newSchema;
+                    }
+                    break;
+                case INT16:
+                    if (newType == Type.INT8) {
+                        return previous;
+                    }
+                    if (newType == Type.INT32 || newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
+                        return newSchema;
+                    }
+                    break;
+                case INT32:
+                    if (newType == Type.INT8 || newType == Type.INT16) {
+                        return previous;
+                    }
+                    if (newType == Type.INT64 || newType == Type.FLOAT32 || newType == Type.FLOAT64) {
+                        return newSchema;
+                    }
+                    break;
+                case INT64:
+                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32) {
+                        return previous;
+                    }
+                    if (newType == Type.FLOAT32 || newType == Type.FLOAT64) {
+                        return newSchema;
+                    }
+                    break;
+                case FLOAT32:
+                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64) {
+                        return previous;
+                    }
+                    if (newType == Type.FLOAT64) {
+                        return newSchema;
+                    }
+                    break;
+                case FLOAT64:
+                    if (newType == Type.INT8 || newType == Type.INT16 || newType == Type.INT32 || newType == Type.INT64 || newType ==
+                                                                                                                           Type.FLOAT32) {
+                        return previous;
+                    }
+                    break;
+            }
+            return null;
+        }
+        if (previous.isOptional() == newSchema.isOptional()) {
+            // Use the optional one
+            return previous.isOptional() ? previous : newSchema;
+        }
+        if (!previous.equals(newSchema)) {
+            return null;
+        }
+        return previous;
+    }
+
+    protected static List<Object> alignListEntriesWithSchema(Schema schema, List<Object> input) {
+        if (schema == null) {
+            return input;
+        }
+        Schema valueSchema = schema.valueSchema();
+        List<Object> result = new ArrayList<>();
+        for (Object value : input) {
+            Object newValue = convertTo(valueSchema, null, value);
+            result.add(newValue);
+        }
+        return result;
+    }
+
+    protected static Map<Object, Object> alignMapKeysAndValuesWithSchema(Schema mapSchema, Map<Object, Object> input) {
+        if (mapSchema == null) {
+            return input;
+        }
+        Schema keySchema = mapSchema.keySchema();
+        Schema valueSchema = mapSchema.valueSchema();
+        Map<Object, Object> result = new LinkedHashMap<>();
+        for (Map.Entry<?, ?> entry : input.entrySet()) {
+            Object newKey = convertTo(keySchema, null, entry.getKey());
+            Object newValue = convertTo(valueSchema, null, entry.getValue());
+            result.put(newKey, newValue);
+        }
+        return result;
+    }
+
+    protected static class SchemaDetector {
+        private Type knownType = null;
+        private boolean optional = false;
+
+        public SchemaDetector() {
+        }
+
+        public boolean canDetect(Object value) {
+            if (value == null) {
+                optional = true;
+                return true;
+            }
+            Schema schema = inferSchema(value);
+            if (schema == null) {
+                return false;
+            }
+            if (knownType == null) {
+                knownType = schema.type();
+            } else if (knownType != schema.type()) {
+                return false;
+            }
+            return true;
+        }
+
+        public Schema schema() {
+            SchemaBuilder builder = SchemaBuilder.type(knownType);
+            if (optional) {
+                builder.optional();
+            }
+            return builder.schema();
+        }
+    }
+
+    protected static class Parser {
+        private final String original;
+        private final CharacterIterator iter;
+        private String nextToken = null;
+        private String previousToken = null;
+
+        public Parser(String original) {
+            this.original = original;
+            this.iter = new StringCharacterIterator(this.original);
+        }
+
+        public int position() {
+            return iter.getIndex();
+        }
+
+        public int mark() {
+            return iter.getIndex() - (nextToken != null ? nextToken.length() : 0);
+        }
+
+        public void rewindTo(int position) {
+            iter.setIndex(position);
+            nextToken = null;
+        }
+
+        public String original() {
+            return original;
+        }
+
+        public boolean hasNext() {
+            return nextToken != null || canConsumeNextToken();
+        }
+
+        protected boolean canConsumeNextToken() {
+            return iter.getEndIndex() > iter.getIndex();
+        }
+
+        public String next() {
+            if (nextToken != null) {
+                previousToken = nextToken;
+                nextToken = null;
+            } else {
+                previousToken = consumeNextToken();
+            }
+            return previousToken;
+        }
+
+        private String consumeNextToken() throws NoSuchElementException {
+            boolean escaped = false;
+            int start = iter.getIndex();
+            char c = iter.current();
+            while (c != CharacterIterator.DONE) {
+                switch (c) {
+                    case '\\':
+                        escaped = !escaped;
+                        break;
+                    case ':':
+                    case ',':
+                    case '{':
+                    case '}':
+                    case '[':
+                    case ']':
+                    case '\"':
+                        if (!escaped) {
+                            if (start < iter.getIndex()) {
+                                // Return the previous token
+                                return original.substring(start, iter.getIndex());
+                            }
+                            // Consume and return this delimiter as a token
+                            iter.next();
+                            return original.substring(start, start + 1);
+                        }
+                        // escaped, so continue
+                        escaped = false;
+                        break;
+                    default:
+                        // If escaped, then we don't care what was escaped
+                        escaped = false;
+                        break;
+                }
+                c = iter.next();
+            }
+            return original.substring(start, iter.getIndex());
+        }
+
+        public String previous() {
+            return previousToken;
+        }
+
+        public boolean canConsume(String expected) {
+            return canConsume(expected, true);
+        }
+
+        public boolean canConsume(String expected, boolean ignoreLeadingAndTrailingWhitespace) {
+            if (isNext(expected, ignoreLeadingAndTrailingWhitespace)) {
+                // consume this token ...
+                nextToken = null;
+                return true;
+            }
+            return false;
+        }
+
+        protected boolean isNext(String expected, boolean ignoreLeadingAndTrailingWhitespace) {
+            if (nextToken == null) {
+                if (!hasNext()) {
+                    return false;
+                }
+                // There's another token, so consume it
+                nextToken = consumeNextToken();
+            }
+            if (ignoreLeadingAndTrailingWhitespace) {
+                nextToken = nextToken.trim();
+                while (nextToken.isEmpty() && canConsumeNextToken()) {
+                    nextToken = consumeNextToken().trim();
+                }
+            }
+            return nextToken.equals(expected);
+        }
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeader.java b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeader.java
new file mode 100644
index 0000000..ed4be09
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeader.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+
+import java.util.Objects;
+
+/**
+ * A {@link Header} implementation.
+ */
+class ConnectHeader implements Header {
+
+    private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
+
+    private final String key;
+    private final SchemaAndValue schemaAndValue;
+
+    protected ConnectHeader(String key, SchemaAndValue schemaAndValue) {
+        Objects.requireNonNull(key, "Null header keys are not permitted");
+        this.key = key;
+        this.schemaAndValue = schemaAndValue != null ? schemaAndValue : NULL_SCHEMA_AND_VALUE;
+        assert this.schemaAndValue != null;
+    }
+
+    @Override
+    public String key() {
+        return key;
+    }
+
+    @Override
+    public Object value() {
+        return schemaAndValue.value();
+    }
+
+    @Override
+    public Schema schema() {
+        Schema schema = schemaAndValue.schema();
+        if (schema == null && value() instanceof Struct) {
+            schema = ((Struct) value()).schema();
+        }
+        return schema;
+    }
+
+    @Override
+    public Header rename(String key) {
+        Objects.requireNonNull(key, "Null header keys are not permitted");
+        if (this.key.equals(key)) {
+            return this;
+        }
+        return new ConnectHeader(key, schemaAndValue);
+    }
+
+    @Override
+    public Header with(Schema schema, Object value) {
+        return new ConnectHeader(key, new SchemaAndValue(schema, value));
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(key, schemaAndValue);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof Header) {
+            Header that = (Header) obj;
+            return Objects.equals(this.key, that.key()) && Objects.equals(this.schema(), that.schema()) && Objects.equals(this.value(),
+                                                                                                                          that.value());
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectHeader(key=" + key + ", value=" + value() + ", schema=" + schema() + ")";
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
new file mode 100644
index 0000000..e3b5e72
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/header/ConnectHeaders.java
@@ -0,0 +1,519 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.common.utils.AbstractIterator;
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.ListIterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * A basic {@link Headers} implementation.
+ */
+public class ConnectHeaders implements Headers {
+
+    private static final int EMPTY_HASH = Objects.hash(new LinkedList<>());
+
+    /**
+     * An immutable and therefore sharable empty iterator.
+     */
+    private static final Iterator<Header> EMPTY_ITERATOR = new Iterator<Header>() {
+        @Override
+        public boolean hasNext() {
+            return false;
+        }
+
+        @Override
+        public Header next() {
+            throw new NoSuchElementException();
+        }
+
+        @Override
+        public void remove() {
+            throw new IllegalStateException();
+        }
+    };
+
+
+    // This field is set lazily, but once set to a list it is never set back to null
+    private LinkedList<Header> headers;
+
+    public ConnectHeaders() {
+    }
+
+    public ConnectHeaders(Iterable<Header> original) {
+        if (original == null) {
+            return;
+        }
+        if (original instanceof ConnectHeaders) {
+            ConnectHeaders originalHeaders = (ConnectHeaders) original;
+            if (!originalHeaders.isEmpty()) {
+                headers = new LinkedList<>(originalHeaders.headers);
+            }
+        } else {
+            headers = new LinkedList<>();
+            for (Header header : original) {
+                headers.add(header);
+            }
+        }
+    }
+
+    @Override
+    public int size() {
+        return headers == null ? 0 : headers.size();
+    }
+
+    @Override
+    public boolean isEmpty() {
+        return headers == null ? true : headers.isEmpty();
+    }
+
+    @Override
+    public Headers clear() {
+        if (headers != null) {
+            headers.clear();
+        }
+        return this;
+    }
+
+    @Override
+    public Headers add(Header header) {
+        Objects.requireNonNull(header, "Unable to add a null header.");
+        if (headers == null) {
+            headers = new LinkedList<>();
+        }
+        headers.add(header);
+        return this;
+    }
+
+    protected Headers addWithoutValidating(String key, Object value, Schema schema) {
+        return add(new ConnectHeader(key, new SchemaAndValue(schema, value)));
+    }
+
+    @Override
+    public Headers add(String key, SchemaAndValue schemaAndValue) {
+        checkSchemaMatches(schemaAndValue);
+        return add(new ConnectHeader(key, schemaAndValue != null ? schemaAndValue : SchemaAndValue.NULL));
+    }
+
+    @Override
+    public Headers add(String key, Object value, Schema schema) {
+        return add(key, value != null || schema != null ? new SchemaAndValue(schema, value) : SchemaAndValue.NULL);
+    }
+
+    @Override
+    public Headers addString(String key, String value) {
+        return addWithoutValidating(key, value, value != null ? Schema.STRING_SCHEMA : Schema.OPTIONAL_STRING_SCHEMA);
+    }
+
+    @Override
+    public Headers addBytes(String key, byte[] value) {
+        return addWithoutValidating(key, value, value != null ? Schema.BYTES_SCHEMA : Schema.OPTIONAL_BYTES_SCHEMA);
+    }
+
+    @Override
+    public Headers addBoolean(String key, boolean value) {
+        return addWithoutValidating(key, value, Schema.BOOLEAN_SCHEMA);
+    }
+
+    @Override
+    public Headers addByte(String key, byte value) {
+        return addWithoutValidating(key, value, Schema.INT8_SCHEMA);
+    }
+
+    @Override
+    public Headers addShort(String key, short value) {
+        return addWithoutValidating(key, value, Schema.INT16_SCHEMA);
+    }
+
+    @Override
+    public Headers addInt(String key, int value) {
+        return addWithoutValidating(key, value, Schema.INT32_SCHEMA);
+    }
+
+    @Override
+    public Headers addLong(String key, long value) {
+        return addWithoutValidating(key, value, Schema.INT64_SCHEMA);
+    }
+
+    @Override
+    public Headers addFloat(String key, float value) {
+        return addWithoutValidating(key, value, Schema.FLOAT32_SCHEMA);
+    }
+
+    @Override
+    public Headers addDouble(String key, double value) {
+        return addWithoutValidating(key, value, Schema.FLOAT64_SCHEMA);
+    }
+
+    @Override
+    public Headers addList(String key, List<?> value, Schema schema) {
+        if (value == null) {
+            return add(key, null, null);
+        }
+        checkSchemaType(schema, Type.ARRAY);
+        return addWithoutValidating(key, value, schema);
+    }
+
+    @Override
+    public Headers addMap(String key, Map<?, ?> value, Schema schema) {
+        if (value == null) {
+            return add(key, null, null);
+        }
+        checkSchemaType(schema, Type.MAP);
+        return addWithoutValidating(key, value, schema);
+    }
+
+    @Override
+    public Headers addStruct(String key, Struct value) {
+        if (value == null) {
+            return add(key, null, null);
+        }
+        checkSchemaType(value.schema(), Type.STRUCT);
+        return addWithoutValidating(key, value, value.schema());
+    }
+
+    @Override
+    public Headers addDecimal(String key, BigDecimal value) {
+        if (value == null) {
+            return add(key, null, null);
+        }
+        // Check that this is a decimal ...
+        Schema schema = Decimal.schema(value.scale());
+        Decimal.fromLogical(schema, value);
+        return addWithoutValidating(key, value, schema);
+    }
+
+    @Override
+    public Headers addDate(String key, java.util.Date value) {
+        if (value != null) {
+            // Check that this is a date ...
+            Date.fromLogical(Date.SCHEMA, value);
+        }
+        return addWithoutValidating(key, value, Date.SCHEMA);
+    }
+
+    @Override
+    public Headers addTime(String key, java.util.Date value) {
+        if (value != null) {
+            // Check that this is a time ...
+            Time.fromLogical(Time.SCHEMA, value);
+        }
+        return addWithoutValidating(key, value, Time.SCHEMA);
+    }
+
+    @Override
+    public Headers addTimestamp(String key, java.util.Date value) {
+        if (value != null) {
+            // Check that this is a timestamp ...
+            Timestamp.fromLogical(Timestamp.SCHEMA, value);
+        }
+        return addWithoutValidating(key, value, Timestamp.SCHEMA);
+    }
+
+    @Override
+    public Header lastWithName(String key) {
+        checkKey(key);
+        if (headers != null) {
+            ListIterator<Header> iter = headers.listIterator(headers.size());
+            while (iter.hasPrevious()) {
+                Header header = iter.previous();
+                if (key.equals(header.key())) {
+                    return header;
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public Iterator<Header> allWithName(String key) {
+        return new FilterByKeyIterator(iterator(), key);
+    }
+
+    @Override
+    public Iterator<Header> iterator() {
+        if (headers != null) {
+            return headers.iterator();
+        }
+        return EMPTY_ITERATOR;
+    }
+
+    @Override
+    public Headers remove(String key) {
+        checkKey(key);
+        if (!headers.isEmpty()) {
+            Iterator<Header> iterator = iterator();
+            while (iterator.hasNext()) {
+                if (iterator.next().key().equals(key)) {
+                    iterator.remove();
+                }
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public Headers retainLatest() {
+        if (!headers.isEmpty()) {
+            Set<String> keys = new HashSet<>();
+            ListIterator<Header> iter = headers.listIterator(headers.size());
+            while (iter.hasPrevious()) {
+                Header header = iter.previous();
+                String key = header.key();
+                if (!keys.add(key)) {
+                    iter.remove();
+                }
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public Headers retainLatest(String key) {
+        checkKey(key);
+        if (!headers.isEmpty()) {
+            boolean found = false;
+            ListIterator<Header> iter = headers.listIterator(headers.size());
+            while (iter.hasPrevious()) {
+                String headerKey = iter.previous().key();
+                if (key.equals(headerKey)) {
+                    if (found)
+                        iter.remove();
+                    found = true;
+                }
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public Headers apply(String key, HeaderTransform transform) {
+        checkKey(key);
+        if (!headers.isEmpty()) {
+            ListIterator<Header> iter = headers.listIterator();
+            while (iter.hasNext()) {
+                Header orig = iter.next();
+                if (orig.key().equals(key)) {
+                    Header updated = transform.apply(orig);
+                    if (updated != null) {
+                        iter.set(updated);
+                    } else {
+                        iter.remove();
+                    }
+                }
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public Headers apply(HeaderTransform transform) {
+        if (!headers.isEmpty()) {
+            ListIterator<Header> iter = headers.listIterator();
+            while (iter.hasNext()) {
+                Header orig = iter.next();
+                Header updated = transform.apply(orig);
+                if (updated != null) {
+                    iter.set(updated);
+                } else {
+                    iter.remove();
+                }
+            }
+        }
+        return this;
+    }
+
+    @Override
+    public int hashCode() {
+        return isEmpty() ? EMPTY_HASH : Objects.hash(headers);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj == this) {
+            return true;
+        }
+        if (obj instanceof Headers) {
+            Headers that = (Headers) obj;
+            Iterator<Header> thisIter = this.iterator();
+            Iterator<Header> thatIter = that.iterator();
+            while (thisIter.hasNext() && thatIter.hasNext()) {
+                if (!Objects.equals(thisIter.next(), thatIter.next()))
+                    return false;
+            }
+            return !thisIter.hasNext() && !thatIter.hasNext();
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectHeaders(headers=" + (headers != null ? headers : "") + ")";
+    }
+
+    @Override
+    public ConnectHeaders duplicate() {
+        return new ConnectHeaders(this);
+    }
+
+    /**
+     * Check that the key is not null
+     *
+     * @param key the key; may not be null
+     * @throws NullPointerException if the supplied key is null
+     */
+    private void checkKey(String key) {
+        Objects.requireNonNull(key, "Header key cannot be null");
+    }
+
+    /**
+     * Check the {@link Schema#type() schema's type} matches the specified type.
+     *
+     * @param schema the schema; never null
+     * @param type   the expected type
+     * @throws DataException if the schema's type does not match the expected type
+     */
+    private void checkSchemaType(Schema schema, Type type) {
+        if (schema.type() != type) {
+            throw new DataException("Expecting " + type + " but instead found " + schema.type());
+        }
+    }
+
+    /**
+     * Check that the value and its schema are compatible.
+     *
+     * @param schemaAndValue the schema and value pair
+     * @throws DataException if the schema is not compatible with the value
+     */
+    // visible for testing
+    void checkSchemaMatches(SchemaAndValue schemaAndValue) {
+        if (schemaAndValue != null) {
+            Schema schema = schemaAndValue.schema();
+            if (schema == null)
+                return;
+            schema = schema.schema(); // in case a SchemaBuilder is used
+            Object value = schemaAndValue.value();
+            if (value == null && !schema.isOptional()) {
+                throw new DataException("A null value requires an optional schema but was " + schema);
+            }
+            if (value != null) {
+                switch (schema.type()) {
+                    case BYTES:
+                        if (value instanceof ByteBuffer)
+                            return;
+                        if (value instanceof byte[])
+                            return;
+                        if (value instanceof BigDecimal && Decimal.LOGICAL_NAME.equals(schema.name()))
+                            return;
+                        break;
+                    case STRING:
+                        if (value instanceof String)
+                            return;
+                        break;
+                    case BOOLEAN:
+                        if (value instanceof Boolean)
+                            return;
+                        break;
+                    case INT8:
+                        if (value instanceof Byte)
+                            return;
+                        break;
+                    case INT16:
+                        if (value instanceof Short)
+                            return;
+                        break;
+                    case INT32:
+                        if (value instanceof Integer)
+                            return;
+                        if (value instanceof java.util.Date && Date.LOGICAL_NAME.equals(schema.name()))
+                            return;
+                        if (value instanceof java.util.Date && Time.LOGICAL_NAME.equals(schema.name()))
+                            return;
+                        break;
+                    case INT64:
+                        if (value instanceof Long)
+                            return;
+                        if (value instanceof java.util.Date && Timestamp.LOGICAL_NAME.equals(schema.name()))
+                            return;
+                        break;
+                    case FLOAT32:
+                        if (value instanceof Float)
+                            return;
+                        break;
+                    case FLOAT64:
+                        if (value instanceof Double)
+                            return;
+                        break;
+                    case ARRAY:
+                        if (value instanceof List)
+                            return;
+                        break;
+                    case MAP:
+                        if (value instanceof Map)
+                            return;
+                        break;
+                    case STRUCT:
+                        if (value instanceof Struct)
+                            return;
+                        break;
+                }
+                throw new DataException("The value " + value + " is not compatible with the schema " + schema);
+            }
+        }
+    }
+
+    private static final class FilterByKeyIterator extends AbstractIterator<Header> {
+
+        private final Iterator<Header> original;
+        private final String key;
+
+        private FilterByKeyIterator(Iterator<Header> original, String key) {
+            this.original = original;
+            this.key = key;
+        }
+
+        protected Header makeNext() {
+            while (original.hasNext()) {
+                Header header = original.next();
+                if (!header.key().equals(key)) {
+                    continue;
+                }
+                return header;
+            }
+            return this.allDone();
+        }
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/header/Header.java b/connect/api/src/main/java/org/apache/kafka/connect/header/Header.java
new file mode 100644
index 0000000..a70d1dc
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/header/Header.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.connect.data.Schema;
+
+/**
+ * A {@link Header} is a key-value pair, and multiple headers can be included with the key, value, and timestamp in each Kafka message.
+ * If the value contains schema information, then the header will have a non-null {@link #schema() schema}.
+ * <p>
+ * This is an immutable interface.
+ */
+public interface Header {
+
+    /**
+     * The header's key, which is not necessarily unique within the set of headers on a Kafka message.
+     *
+     * @return the header's key; never null
+     */
+    String key();
+
+    /**
+     * Return the {@link Schema} associated with this header, if there is one. Not all headers will have schemas.
+     *
+     * @return the header's schema, or null if no schema is associated with this header
+     */
+    Schema schema();
+
+    /**
+     * Get the header's value as deserialized by Connect's header converter.
+     *
+     * @return the deserialized object representation of the header's value; may be null
+     */
+    Object value();
+
+    /**
+     * Return a new {@link Header} object that has the same key but with the supplied value.
+     *
+     * @param schema the schema for the new value; may be null
+     * @param value  the new value
+     * @return the new {@link Header}; never null
+     */
+    Header with(Schema schema, Object value);
+
+    /**
+     * Return a new {@link Header} object that has the same schema and value but with the supplied key.
+     *
+     * @param key the key for the new header; may not be null
+     * @return the new {@link Header}; never null
+     */
+    Header rename(String key);
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java b/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java
new file mode 100644
index 0000000..761abe3
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/header/Headers.java
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.errors.DataException;
+
+import java.math.BigDecimal;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A mutable ordered collection of {@link Header} objects. Note that multiple headers may have the same {@link Header#key() key}.
+ */
+public interface Headers extends Iterable<Header> {
+
+    /**
+     * Get the number of headers in this object.
+     *
+     * @return the number of headers; never negative
+     */
+    int size();
+
+    /**
+     * Determine whether this object has no headers.
+     *
+     * @return true if there are no headers, or false if there is at least one header
+     */
+    boolean isEmpty();
+
+    /**
+     * Get the collection of {@link Header} objects whose {@link Header#key() keys} all match the specified key.
+     *
+     * @param key the key; may not be null
+     * @return the iterator over headers with the specified key; may be null if there are no headers with the specified key
+     */
+    Iterator<Header> allWithName(String key);
+
+    /**
+     * Return the last {@link Header} with the specified key.
+     *
+     * @param key the key for the header; may not be null
+     * @return the last Header, or null if there are no headers with the specified key
+     */
+    Header lastWithName(String key);
+
+    /**
+     * Add the given {@link Header} to this collection.
+     *
+     * @param header the header; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers add(Header header);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key            the header's key; may not be null
+     * @param schemaAndValue the {@link SchemaAndValue} for the header; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers add(String key, SchemaAndValue schemaAndValue);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key    the header's key; may not be null
+     * @param value  the header's value; may be null
+     * @param schema the schema for the header's value; may not be null if the value is not null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers add(String key, Object value, Schema schema);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addString(String key, String value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addBoolean(String key, boolean value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addByte(String key, byte value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addShort(String key, short value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addInt(String key, int value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addLong(String key, long value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addFloat(String key, float value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addDouble(String key, double value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addBytes(String key, byte[] value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key    the header's key; may not be null
+     * @param value  the header's value; may be null
+     * @param schema the schema describing the list value; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     * @throws DataException if the header's value is invalid
+     */
+    Headers addList(String key, List<?> value, Schema schema);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key    the header's key; may not be null
+     * @param value  the header's value; may be null
+     * @param schema the schema describing the map value; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     * @throws DataException if the header's value is invalid
+     */
+    Headers addMap(String key, Map<?, ?> value, Schema schema);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     * @throws DataException if the header's value is invalid
+     */
+    Headers addStruct(String key, Struct value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Decimal} value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's {@link org.apache.kafka.connect.data.Decimal} value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addDecimal(String key, BigDecimal value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Date} value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's {@link org.apache.kafka.connect.data.Date} value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addDate(String key, java.util.Date value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Time} value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's {@link org.apache.kafka.connect.data.Time} value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addTime(String key, java.util.Date value);
+
+    /**
+     * Add to this collection a {@link Header} with the given key and {@link org.apache.kafka.connect.data.Timestamp} value.
+     *
+     * @param key   the header's key; may not be null
+     * @param value the header's {@link org.apache.kafka.connect.data.Timestamp} value; may be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers addTimestamp(String key, java.util.Date value);
+
+    /**
+     * Removes all {@link Header} objects whose {@link Header#key() key} matches the specified key.
+     *
+     * @param key the key; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers remove(String key);
+
+    /**
+     * Removes all but the latest {@link Header} objects whose {@link Header#key() key} matches the specified key.
+     *
+     * @param key the key; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers retainLatest(String key);
+
+    /**
+     * Removes all but the last {@Header} object with each key.
+     *
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers retainLatest();
+
+    /**
+     * Removes all headers from this object.
+     *
+     * @return this object to facilitate chaining multiple methods; never null
+     */
+    Headers clear();
+
+    /**
+     * Create a copy of this {@link Headers} object. The new copy will contain all of the same {@link Header} objects as this object.
+     * @return the copy; never null
+     */
+    Headers duplicate();
+
+    /**
+     * Get all {@link Header}s, apply the transform to each and store the result in place of the original.
+     *
+     * @param transform the transform to apply; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     * @throws DataException if the header's value is invalid
+     */
+    Headers apply(HeaderTransform transform);
+
+    /**
+     * Get all {@link Header}s with the given key, apply the transform to each and store the result in place of the original.
+     *
+     * @param key       the header's key; may not be null
+     * @param transform the transform to apply; may not be null
+     * @return this object to facilitate chaining multiple methods; never null
+     * @throws DataException if the header's value is invalid
+     */
+    Headers apply(String key, HeaderTransform transform);
+
+    /**
+     * A function to transform the supplied {@link Header}. Implementations will likely need to use {@link Header#with(Schema, Object)}
+     * to create the new instance.
+     */
+    interface HeaderTransform {
+        /**
+         * Transform the given {@link Header} and return the updated {@link Header}.
+         *
+         * @param header the input header; never null
+         * @return the new header, or null if the supplied {@link Header} is to be removed
+         */
+        Header apply(Header header);
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index e03a1f1..642b716 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.sink;
 import org.apache.kafka.common.record.TimestampType;
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
 
 /**
  * SinkRecord is a {@link ConnectRecord} that has been read from Kafka and includes the kafkaOffset of
@@ -38,7 +39,12 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
 
     public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
                       Long timestamp, TimestampType timestampType) {
-        super(topic, partition, keySchema, key, valueSchema, value, timestamp);
+        this(topic, partition, keySchema, key, valueSchema, value, kafkaOffset, timestamp, timestampType, null);
+    }
+
+    public SinkRecord(String topic, int partition, Schema keySchema, Object key, Schema valueSchema, Object value, long kafkaOffset,
+                      Long timestamp, TimestampType timestampType, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
         this.kafkaOffset = kafkaOffset;
         this.timestampType = timestampType;
     }
@@ -53,7 +59,13 @@ public class SinkRecord extends ConnectRecord<SinkRecord> {
 
     @Override
     public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
-        return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType);
+        return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers().duplicate());
+    }
+
+    @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
+                                Long timestamp, Iterable<Header> headers) {
+        return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType, headers);
     }
 
     @Override
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index 2f3e5e4..315f0f3 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -18,6 +18,7 @@ package org.apache.kafka.connect.source;
 
 import org.apache.kafka.connect.connector.ConnectRecord;
 import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.header.Header;
 
 import java.util.Map;
 
@@ -69,7 +70,15 @@ public class SourceRecord extends ConnectRecord<SourceRecord> {
                         Schema keySchema, Object key,
                         Schema valueSchema, Object value,
                         Long timestamp) {
-        super(topic, partition, keySchema, key, valueSchema, value, timestamp);
+        this(sourcePartition, sourceOffset, topic, partition, keySchema, key, valueSchema, value, timestamp, null);
+    }
+
+    public SourceRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset,
+                        String topic, Integer partition,
+                        Schema keySchema, Object key,
+                        Schema valueSchema, Object value,
+                        Long timestamp, Iterable<Header> headers) {
+        super(topic, partition, keySchema, key, valueSchema, value, timestamp, headers);
         this.sourcePartition = sourcePartition;
         this.sourceOffset = sourceOffset;
     }
@@ -84,7 +93,13 @@ public class SourceRecord extends ConnectRecord<SourceRecord> {
 
     @Override
     public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
-        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
+        return newRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers().duplicate());
+    }
+
+    @Override
+    public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value,
+                                  Long timestamp, Iterable<Header> headers) {
+        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp, headers);
     }
 
     @Override
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterConfig.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterConfig.java
new file mode 100644
index 0000000..cea6995
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterConfig.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.ValidString.in;
+
+/**
+ * Abstract class that defines the configuration options for {@link Converter} and {@link HeaderConverter} instances.
+ */
+public abstract class ConverterConfig extends AbstractConfig {
+
+    public static final String TYPE_CONFIG = "converter.type";
+    private static final String TYPE_DOC = "How this converter will be used.";
+
+    /**
+     * Create a new {@link ConfigDef} instance containing the configurations defined by ConverterConfig. This can be called by subclasses.
+     *
+     * @return the ConfigDef; never null
+     */
+    public static ConfigDef newConfigDef() {
+        return new ConfigDef().define(TYPE_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE,
+                                      in(ConverterType.KEY.getName(), ConverterType.VALUE.getName(), ConverterType.HEADER.getName()),
+                                      Importance.LOW, TYPE_DOC);
+    }
+
+    protected ConverterConfig(ConfigDef configDef, Map<String, ?> props) {
+        super(configDef, props, true);
+    }
+
+    /**
+     * Get the type of converter as defined by the {@link #TYPE_CONFIG} configuration.
+     * @return the converter type; never null
+     */
+    public ConverterType type() {
+        return ConverterType.withName(getString(TYPE_CONFIG));
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterType.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterType.java
new file mode 100644
index 0000000..446ff8b
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/ConverterType.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Locale;
+import java.util.Map;
+
+/**
+ * The type of {@link Converter} and {@link HeaderConverter}.
+ */
+public enum ConverterType {
+    KEY,
+    VALUE,
+    HEADER;
+
+    private static final Map<String, ConverterType> NAME_TO_TYPE;
+
+    static {
+        ConverterType[] types = ConverterType.values();
+        Map<String, ConverterType> nameToType = new HashMap<>(types.length);
+        for (ConverterType type : types) {
+            nameToType.put(type.name, type);
+        }
+        NAME_TO_TYPE = Collections.unmodifiableMap(nameToType);
+    }
+
+    /**
+     * Find the ConverterType with the given name, using a case-insensitive match.
+     * @param name the name of the converter type; may be null
+     * @return the matching converter type, or null if the supplied name is null or does not match the name of the known types
+     */
+    public static ConverterType withName(String name) {
+        if (name == null) {
+            return null;
+        }
+        return NAME_TO_TYPE.get(name.toLowerCase(Locale.getDefault()));
+    }
+
+    private String name;
+
+    ConverterType() {
+        this.name = this.name().toLowerCase(Locale.ROOT);
+    }
+
+    public String getName() {
+        return name;
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
new file mode 100644
index 0000000..b8455e1
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/HeaderConverter.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.header.Header;
+
+import java.io.Closeable;
+
+public interface HeaderConverter extends Configurable, Closeable {
+
+    /**
+     * Convert the header name and byte array value into a {@link Header} object.
+     * @param topic the name of the topic for the record containing the header
+     * @param headerKey the header's key; may not be null
+     * @param value the header's raw value; may be null
+     * @return the {@link SchemaAndValue}; may not be null
+     */
+    SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value);
+
+    /**
+     * Convert the {@link Header}'s {@link Header#valueAsBytes() value} into its byte array representation.
+     * @param topic the name of the topic for the record containing the header
+     * @param headerKey the header's key; may not be null
+     * @param schema the schema for the header's value; may be null
+     * @param value the header's value to convert; may be null
+     * @return the byte array form of the Header's value; may be null if the value is null
+     */
+    byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value);
+
+    /**
+     * Configuration specification for this set of header converters.
+     * @return the configuration specification; may not be null
+     */
+    ConfigDef config();
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
new file mode 100644
index 0000000..69c4b86
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/SimpleHeaderConverter.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+/**
+ * A {@link HeaderConverter} that serializes header values as strings and that deserializes header values to the most appropriate
+ * numeric, boolean, array, or map representation. Schemas are not serialized, but are inferred upon deserialization when possible.
+ */
+public class SimpleHeaderConverter implements HeaderConverter {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SimpleHeaderConverter.class);
+    private static final ConfigDef CONFIG_DEF = new ConfigDef();
+    private static final SchemaAndValue NULL_SCHEMA_AND_VALUE = new SchemaAndValue(null, null);
+    private static final Charset UTF_8 = StandardCharsets.UTF_8;
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        // do nothing
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+        if (value == null) {
+            return NULL_SCHEMA_AND_VALUE;
+        }
+        try {
+            String str = new String(value, UTF_8);
+            if (str.isEmpty()) {
+                return new SchemaAndValue(Schema.STRING_SCHEMA, str);
+            }
+            return Values.parseString(str);
+        } catch (NoSuchElementException e) {
+            throw new DataException("Failed to deserialize value for header '" + headerKey + "' on topic '" + topic + "'", e);
+        } catch (Throwable t) {
+            LOG.warn("Failed to deserialize value for header '{}' on topic '{}', so using byte array", headerKey, topic, t);
+            return new SchemaAndValue(Schema.BYTES_SCHEMA, value);
+        }
+    }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+        if (value == null) {
+            return null;
+        }
+        return Values.convertToString(schema, value).getBytes(UTF_8);
+    }
+
+    @Override
+    public void close() throws IOException {
+        // do nothing
+    }
+}
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
index 85fef84..534cddd 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverter.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.connect.storage;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.kafka.common.serialization.StringSerializer;
@@ -27,16 +28,19 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * {@link Converter} implementation that only supports serializing to strings. When converting Kafka Connect data to bytes,
- * the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
+ * {@link Converter} and {@link HeaderConverter} implementation that only supports serializing to strings. When converting Kafka Connect
+ * data to bytes, the schema will be ignored and {@link Object#toString()} will always be invoked to convert the data to a String.
  * When converting from bytes to Kafka Connect format, the converter will only ever return an optional string schema and
  * a string or null.
  *
  * Encoding configuration is identical to {@link StringSerializer} and {@link StringDeserializer}, but for convenience
- * this class can also be configured to use the same encoding for both encoding and decoding with the converter.encoding
- * setting.
+ * this class can also be configured to use the same encoding for both encoding and decoding with the
+ * {@link StringConverterConfig#ENCODING_CONFIG converter.encoding} setting.
+ *
+ * This implementation currently does nothing with the topic names or header names.
  */
-public class StringConverter implements Converter {
+public class StringConverter implements Converter, HeaderConverter {
+
     private final StringSerializer serializer = new StringSerializer();
     private final StringDeserializer deserializer = new StringDeserializer();
 
@@ -44,23 +48,33 @@ public class StringConverter implements Converter {
     }
 
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        Map<String, Object> serializerConfigs = new HashMap<>();
-        serializerConfigs.putAll(configs);
-        Map<String, Object> deserializerConfigs = new HashMap<>();
-        deserializerConfigs.putAll(configs);
+    public ConfigDef config() {
+        return StringConverterConfig.configDef();
+    }
 
-        Object encodingValue = configs.get("converter.encoding");
-        if (encodingValue != null) {
-            serializerConfigs.put("serializer.encoding", encodingValue);
-            deserializerConfigs.put("deserializer.encoding", encodingValue);
-        }
+    @Override
+    public void configure(Map<String, ?> configs) {
+        StringConverterConfig conf = new StringConverterConfig(configs);
+        String encoding = conf.encoding();
+
+        Map<String, Object> serializerConfigs = new HashMap<>(configs);
+        Map<String, Object> deserializerConfigs = new HashMap<>(configs);
+        serializerConfigs.put("serializer.encoding", encoding);
+        deserializerConfigs.put("deserializer.encoding", encoding);
 
+        boolean isKey = conf.type() == ConverterType.KEY;
         serializer.configure(serializerConfigs, isKey);
         deserializer.configure(deserializerConfigs, isKey);
     }
 
     @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> conf = new HashMap<>(configs);
+        conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName());
+        configure(conf);
+    }
+
+    @Override
     public byte[] fromConnectData(String topic, Schema schema, Object value) {
         try {
             return serializer.serialize(topic, value == null ? null : value.toString());
@@ -77,4 +91,19 @@ public class StringConverter implements Converter {
             throw new DataException("Failed to deserialize string: ", e);
         }
     }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+        return fromConnectData(topic, schema, value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+        return toConnectData(topic, value);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
 }
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverterConfig.java b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverterConfig.java
new file mode 100644
index 0000000..26b0133
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/storage/StringConverterConfig.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+
+import java.util.Map;
+
+/**
+ * Configuration options for {@link StringConverter} instances.
+ */
+public class StringConverterConfig extends ConverterConfig {
+
+    public static final String ENCODING_CONFIG = "converter.encoding";
+    public static final String ENCODING_DEFAULT = "UTF8";
+    private static final String ENCODING_DOC = "The name of the Java character set to use for encoding strings as byte arrays.";
+    private static final String ENCODING_DISPLAY = "Encoding";
+
+    private final static ConfigDef CONFIG;
+
+    static {
+        CONFIG = ConverterConfig.newConfigDef();
+        CONFIG.define(ENCODING_CONFIG, Type.STRING, ENCODING_DEFAULT, Importance.HIGH, ENCODING_DOC, null, -1, Width.MEDIUM,
+                      ENCODING_DISPLAY);
+    }
+
+    public static ConfigDef configDef() {
+        return CONFIG;
+    }
+
+    public StringConverterConfig(Map<String, ?> props) {
+        super(CONFIG, props);
+    }
+
+    /**
+     * Get the string encoding.
+     *
+     * @return the encoding; never null
+     */
+    public String encoding() {
+        return getString(ENCODING_CONFIG);
+    }
+}
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
new file mode 100644
index 0000000..70835c8
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/data/ValuesTest.java
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.data;
+
+import org.apache.kafka.connect.data.Values.Parser;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ValuesTest {
+
+    private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).schema();
+
+    private static final Map<String, Short> STRING_SHORT_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_SHORT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT16_SCHEMA).schema();
+
+    private static final Map<String, Integer> STRING_INT_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_INT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).schema();
+
+    private static final List<Integer> INT_LIST = new ArrayList<>();
+    private static final Schema INT_LIST_SCHEMA = SchemaBuilder.array(Schema.INT32_SCHEMA).schema();
+
+    private static final List<String> STRING_LIST = new ArrayList<>();
+    private static final Schema STRING_LIST_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).schema();
+
+    static {
+        STRING_MAP.put("foo", "123");
+        STRING_MAP.put("bar", "baz");
+        STRING_SHORT_MAP.put("foo", (short) 12345);
+        STRING_SHORT_MAP.put("bar", (short) 0);
+        STRING_SHORT_MAP.put("baz", (short) -4321);
+        STRING_INT_MAP.put("foo", 1234567890);
+        STRING_INT_MAP.put("bar", 0);
+        STRING_INT_MAP.put("baz", -987654321);
+        STRING_LIST.add("foo");
+        STRING_LIST.add("bar");
+        INT_LIST.add(1234567890);
+        INT_LIST.add(-987654321);
+    }
+
+    @Test
+    public void shouldEscapeStringsWithEmbeddedQuotesAndBackslashes() {
+        String original = "three\"blind\\\"mice";
+        String expected = "three\\\"blind\\\\\\\"mice";
+        assertEquals(expected, Values.escape(original));
+    }
+
+    @Test
+    public void shouldConvertNullValue() {
+        assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, null);
+        assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, Schema.STRING_SCHEMA, null);
+    }
+
+    @Test
+    public void shouldConvertSimpleString() {
+        assertRoundTrip(Schema.STRING_SCHEMA,  "simple");
+    }
+
+    @Test
+    public void shouldConvertEmptyString() {
+        assertRoundTrip(Schema.STRING_SCHEMA, "");
+    }
+
+    @Test
+    public void shouldConvertStringWithQuotesAndOtherDelimiterCharacters() {
+        assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, "three\"blind\\\"mice");
+        assertRoundTrip(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA, "string with delimiters: <>?,./\\=+-!@#$%^&*(){}[]|;':");
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeys() {
+        assertRoundTrip(STRING_MAP_SCHEMA, STRING_MAP_SCHEMA, STRING_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_MAP_SCHEMA, "{\"foo\":\"123\",\"bar\":\"baz\"}");
+        assertEquals(STRING_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_MAP_SCHEMA, "{ \"foo\" : \"123\", \n\"bar\" : \"baz\" } ");
+        assertEquals(STRING_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeysAndShortValues() {
+        assertRoundTrip(STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithShortValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_SHORT_MAP_SCHEMA, "{\"foo\":12345,\"bar\":0,\"baz\":-4321}");
+        assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_SHORT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithShortValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_SHORT_MAP_SCHEMA, " { \"foo\" :  12345 , \"bar\" : 0,  \"baz\" : -4321 }  ");
+        assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_SHORT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeysAndIntegerValues() {
+        assertRoundTrip(STRING_INT_MAP_SCHEMA, STRING_INT_MAP_SCHEMA, STRING_INT_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithIntValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_INT_MAP_SCHEMA, "{\"foo\":1234567890,\"bar\":0,\"baz\":-987654321}");
+        assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_INT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithIntValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(STRING_INT_MAP_SCHEMA, " { \"foo\" :  1234567890 , \"bar\" : 0,  \"baz\" : -987654321 }  ");
+        assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_INT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertListWithStringValues() {
+        assertRoundTrip(STRING_LIST_SCHEMA, STRING_LIST_SCHEMA, STRING_LIST);
+    }
+
+    @Test
+    public void shouldConvertListWithIntegerValues() {
+        assertRoundTrip(INT_LIST_SCHEMA, INT_LIST_SCHEMA, INT_LIST);
+    }
+
+    @Test
+    public void shouldParseStringsWithoutDelimiters() {
+        //assertParsed("");
+        assertParsed("  ");
+        assertParsed("simple");
+        assertParsed("simple string");
+        assertParsed("simple \n\t\bstring");
+        assertParsed("'simple' string");
+        assertParsed("si\\mple");
+        assertParsed("si\\\\mple");
+    }
+
+    @Test
+    public void shouldParseStringsWithEscapedDelimiters() {
+        assertParsed("si\\\"mple");
+        assertParsed("si\\{mple");
+        assertParsed("si\\}mple");
+        assertParsed("si\\]mple");
+        assertParsed("si\\[mple");
+        assertParsed("si\\:mple");
+        assertParsed("si\\,mple");
+    }
+
+    @Test
+    public void shouldParseStringsWithSingleDelimiter() {
+        assertParsed("a{b", "a", "{", "b");
+        assertParsed("a}b", "a", "}", "b");
+        assertParsed("a[b", "a", "[", "b");
+        assertParsed("a]b", "a", "]", "b");
+        assertParsed("a:b", "a", ":", "b");
+        assertParsed("a,b", "a", ",", "b");
+        assertParsed("a\"b", "a", "\"", "b");
+        assertParsed("{b", "{", "b");
+        assertParsed("}b", "}", "b");
+        assertParsed("[b", "[", "b");
+        assertParsed("]b", "]", "b");
+        assertParsed(":b", ":", "b");
+        assertParsed(",b", ",", "b");
+        assertParsed("\"b", "\"", "b");
+        assertParsed("{", "{");
+        assertParsed("}", "}");
+        assertParsed("[", "[");
+        assertParsed("]", "]");
+        assertParsed(":", ":");
+        assertParsed(",", ",");
+        assertParsed("\"", "\"");
+    }
+
+    @Test
+    public void shouldParseStringsWithMultipleDelimiters() {
+        assertParsed("\"simple\" string", "\"", "simple", "\"", " string");
+        assertParsed("a{bc}d", "a", "{", "bc", "}", "d");
+        assertParsed("a { b c } d", "a ", "{", " b c ", "}", " d");
+        assertParsed("a { b c } d", "a ", "{", " b c ", "}", " d");
+    }
+
+    @Test
+    public void canConsume() {
+    }
+
+    protected void assertParsed(String input) {
+        assertParsed(input, input);
+    }
+
+    protected void assertParsed(String input, String... expectedTokens) {
+        Parser parser = new Parser(input);
+        if (!parser.hasNext()) {
+            assertEquals(1, expectedTokens.length);
+            assertTrue(expectedTokens[0].isEmpty());
+            return;
+        }
+
+        for (String expectedToken : expectedTokens) {
+            assertTrue(parser.hasNext());
+            int position = parser.mark();
+            assertEquals(expectedToken, parser.next());
+            assertEquals(position + expectedToken.length(), parser.position());
+            assertEquals(expectedToken, parser.previous());
+            parser.rewindTo(position);
+            assertEquals(position, parser.position());
+            assertEquals(expectedToken, parser.next());
+            int newPosition = parser.mark();
+            assertEquals(position + expectedToken.length(), newPosition);
+            assertEquals(expectedToken, parser.previous());
+        }
+        assertFalse(parser.hasNext());
+
+        // Rewind and try consuming expected tokens ...
+        parser.rewindTo(0);
+        assertConsumable(parser, expectedTokens);
+
+        // Parse again and try consuming expected tokens ...
+        parser = new Parser(input);
+        assertConsumable(parser, expectedTokens);
+    }
+
+    protected void assertConsumable(Parser parser, String ... expectedTokens) {
+        for (String expectedToken : expectedTokens) {
+            if (!expectedToken.trim().isEmpty()) {
+                int position = parser.mark();
+                assertTrue(parser.canConsume(expectedToken.trim()));
+                parser.rewindTo(position);
+                assertTrue(parser.canConsume(expectedToken.trim(), true));
+                parser.rewindTo(position);
+                assertTrue(parser.canConsume(expectedToken, false));
+            }
+        }
+    }
+
+    protected SchemaAndValue roundTrip(Schema desiredSchema, String currentValue) {
+        return roundTrip(desiredSchema, new SchemaAndValue(Schema.STRING_SCHEMA, currentValue));
+    }
+
+
+    protected SchemaAndValue roundTrip(Schema desiredSchema, SchemaAndValue input) {
+        String serialized = Values.convertToString(input.schema(), input.value());
+        if (input != null && input.value() != null) {
+            assertNotNull(serialized);
+        }
+        if (desiredSchema == null) {
+            desiredSchema = Values.inferSchema(input);
+            assertNotNull(desiredSchema);
+        }
+        Object newValue = null;
+        Schema newSchema = null;
+        switch (desiredSchema.type()) {
+            case STRING:
+                newValue = Values.convertToString(Schema.STRING_SCHEMA, serialized);
+                break;
+            case INT8:
+                newValue = Values.convertToByte(Schema.STRING_SCHEMA, serialized);
+                break;
+            case INT16:
+                newValue = Values.convertToShort(Schema.STRING_SCHEMA, serialized);
+                break;
+            case INT32:
+                newValue = Values.convertToInteger(Schema.STRING_SCHEMA, serialized);
+                break;
+            case INT64:
+                newValue = Values.convertToLong(Schema.STRING_SCHEMA, serialized);
+                break;
+            case FLOAT32:
+                newValue = Values.convertToFloat(Schema.STRING_SCHEMA, serialized);
+                break;
+            case FLOAT64:
+                newValue = Values.convertToDouble(Schema.STRING_SCHEMA, serialized);
+                break;
+            case BOOLEAN:
+                newValue = Values.convertToBoolean(Schema.STRING_SCHEMA, serialized);
+                break;
+            case ARRAY:
+                newValue = Values.convertToList(Schema.STRING_SCHEMA, serialized);
+                break;
+            case MAP:
+                newValue = Values.convertToMap(Schema.STRING_SCHEMA, serialized);
+                break;
+            case STRUCT:
+                newValue = Values.convertToStruct(Schema.STRING_SCHEMA, serialized);
+                break;
+            case BYTES:
+                fail("unexpected schema type");
+                break;
+        }
+        newSchema = Values.inferSchema(newValue);
+        return new SchemaAndValue(newSchema, newValue);
+    }
+
+    protected void assertRoundTrip(Schema schema, String value) {
+        assertRoundTrip(schema, Schema.STRING_SCHEMA, value);
+    }
+
+    protected void assertRoundTrip(Schema schema, Schema currentSchema, Object value) {
+        SchemaAndValue result = roundTrip(schema, new SchemaAndValue(currentSchema, value));
+
+        if (value == null) {
+            assertNull(result.schema());
+            assertNull(result.value());
+        } else {
+            assertEquals(value, result.value());
+            assertEquals(schema, result.schema());
+
+            SchemaAndValue result2 = roundTrip(result.schema(), result);
+            assertEquals(schema, result2.schema());
+            assertEquals(value, result2.value());
+            assertEquals(result, result2);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java
new file mode 100644
index 0000000..e30109f
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeaderTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+
+public class ConnectHeaderTest {
+
+    private String key;
+    private ConnectHeader header;
+
+    @Before
+    public void beforeEach() {
+        key = "key";
+        withString("value");
+    }
+
+    protected Header withValue(Schema schema, Object value) {
+        header = new ConnectHeader(key, new SchemaAndValue(schema, value));
+        return header;
+    }
+
+    protected Header withString(String value) {
+        return withValue(Schema.STRING_SCHEMA, value);
+    }
+
+    @Test
+    public void shouldAllowNullValues() {
+        withValue(Schema.OPTIONAL_STRING_SCHEMA, null);
+    }
+
+    @Test
+    public void shouldAllowNullSchema() {
+        withValue(null, null);
+        assertNull(header.schema());
+        assertNull(header.value());
+
+        String value = "non-null value";
+        withValue(null, value);
+        assertNull(header.schema());
+        assertSame(value, header.value());
+    }
+
+    @Test
+    public void shouldAllowNonNullValue() {
+        String value = "non-null value";
+        withValue(Schema.STRING_SCHEMA, value);
+        assertSame(Schema.STRING_SCHEMA, header.schema());
+        assertEquals(value, header.value());
+
+        withValue(Schema.BOOLEAN_SCHEMA, true);
+        assertSame(Schema.BOOLEAN_SCHEMA, header.schema());
+        assertEquals(true, header.value());
+    }
+
+    @Test
+    public void shouldGetSchemaFromStruct() {
+        Schema schema = SchemaBuilder.struct()
+                                     .field("foo", Schema.STRING_SCHEMA)
+                                     .field("bar", Schema.INT32_SCHEMA)
+                                     .build();
+        Struct value = new Struct(schema);
+        value.put("foo", "value");
+        value.put("bar", 100);
+        withValue(null, value);
+        assertSame(schema, header.schema());
+        assertSame(value, header.value());
+    }
+
+    @Test
+    public void shouldSatisfyEquals() {
+        String value = "non-null value";
+        Header h1 = withValue(Schema.STRING_SCHEMA, value);
+        assertSame(Schema.STRING_SCHEMA, header.schema());
+        assertEquals(value, header.value());
+
+        Header h2 = withValue(Schema.STRING_SCHEMA, value);
+        assertEquals(h1, h2);
+        assertEquals(h1.hashCode(), h2.hashCode());
+
+        Header h3 = withValue(Schema.INT8_SCHEMA, 100);
+        assertNotEquals(h3, h2);
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java
new file mode 100644
index 0000000..343bc5d
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/header/ConnectHeadersTest.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.header;
+
+import org.apache.kafka.connect.data.Date;
+import org.apache.kafka.connect.data.Decimal;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Schema.Type;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.data.Time;
+import org.apache.kafka.connect.data.Timestamp;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.errors.DataException;
+import org.apache.kafka.connect.header.Headers.HeaderTransform;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.math.BigDecimal;
+import java.math.RoundingMode;
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Collections;
+import java.util.GregorianCalendar;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConnectHeadersTest {
+
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_DAYS;
+    private static final GregorianCalendar EPOCH_PLUS_TEN_THOUSAND_MILLIS;
+
+    static {
+        EPOCH_PLUS_TEN_THOUSAND_DAYS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_DAYS.add(Calendar.DATE, 10000);
+
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS = new GregorianCalendar(1970, Calendar.JANUARY, 1, 0, 0, 0);
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.setTimeZone(TimeZone.getTimeZone("UTC"));
+        EPOCH_PLUS_TEN_THOUSAND_MILLIS.add(Calendar.MILLISECOND, 10000);
+    }
+
+    private ConnectHeaders headers;
+    private Iterator<Header> iter;
+    private String key;
+    private String other;
+
+    @Before
+    public void beforeEach() {
+        headers = new ConnectHeaders();
+        key = "k1";
+        other = "other key";
+    }
+
+    @Test(expected = NullPointerException.class)
+    public void shouldNotAllowNullKey() {
+        headers.add(null, "value", Schema.STRING_SCHEMA);
+    }
+
+    protected void populate(Headers headers) {
+        headers.addBoolean(key, true);
+        headers.addInt(key, 0);
+        headers.addString(other, "other value");
+        headers.addString(key, null);
+        headers.addString(key, "third");
+    }
+
+    @Test
+    public void shouldBeEquals() {
+        Headers other = new ConnectHeaders();
+        assertEquals(headers, other);
+        assertEquals(headers.hashCode(), other.hashCode());
+
+        populate(headers);
+        assertNotEquals(headers, other);
+        assertNotEquals(headers.hashCode(), other.hashCode());
+
+        populate(other);
+        assertEquals(headers, other);
+        assertEquals(headers.hashCode(), other.hashCode());
+
+        headers.addString("wow", "some value");
+        assertNotEquals(headers, other);
+    }
+
+    @Test
+    public void shouldHaveToString() {
+        // empty
+        assertNotNull(headers.toString());
+
+        // not empty
+        populate(headers);
+        assertNotNull(headers.toString());
+    }
+
+    @Test
+    public void shouldAddMultipleHeadersWithSameKeyAndRetainLatest() {
+        populate(headers);
+
+        Header header = headers.lastWithName(key);
+        assertHeader(header, key, Schema.STRING_SCHEMA, "third");
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+
+        headers.retainLatest(other);
+        assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
+
+        headers.retainLatest(key);
+        assertOnlySingleHeader(key, Schema.STRING_SCHEMA, "third");
+
+        headers.retainLatest();
+        assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
+        assertOnlySingleHeader(key, Schema.STRING_SCHEMA, "third");
+    }
+
+    @Test
+    public void shouldAddHeadersWithPrimitiveValues() {
+        String key = "k1";
+        headers.addBoolean(key, true);
+        headers.addByte(key, (byte) 0);
+        headers.addShort(key, (short) 0);
+        headers.addInt(key, 0);
+        headers.addLong(key, 0);
+        headers.addFloat(key, 1.0f);
+        headers.addDouble(key, 1.0d);
+        headers.addString(key, null);
+        headers.addString(key, "third");
+    }
+
+    @Test
+    public void shouldAddHeadersWithNullObjectValuesWithOptionalSchema() {
+        addHeader("k1", Schema.BOOLEAN_SCHEMA, true);
+        addHeader("k2", Schema.STRING_SCHEMA, "hello");
+        addHeader("k3", Schema.OPTIONAL_STRING_SCHEMA, null);
+    }
+
+    @Test
+    public void shouldNotAddHeadersWithNullObjectValuesWithNonOptionalSchema() {
+        attemptAndFailToAddHeader("k1", Schema.BOOLEAN_SCHEMA, null);
+        attemptAndFailToAddHeader("k2", Schema.STRING_SCHEMA, null);
+    }
+
+    @Test
+    public void shouldNotAddHeadersWithObjectValuesAndMismatchedSchema() {
+        attemptAndFailToAddHeader("k1", Schema.BOOLEAN_SCHEMA, "wrong");
+        attemptAndFailToAddHeader("k2", Schema.OPTIONAL_STRING_SCHEMA, 0L);
+    }
+
+    @Test
+    public void shouldRemoveAllHeadersWithSameKey() {
+        populate(headers);
+
+        iter = headers.allWithName(key);
+        assertContainsHeader(key, Schema.BOOLEAN_SCHEMA, true);
+        assertContainsHeader(key, Schema.INT32_SCHEMA, 0);
+        assertContainsHeader(key, Schema.STRING_SCHEMA, "third");
+        assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
+
+        headers.remove(key);
+        assertNoHeaderWithKey(key);
+        assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
+    }
+
+    @Test
+    public void shouldRemoveAllHeaders() {
+        populate(headers);
+
+        iter = headers.allWithName(key);
+        assertContainsHeader(key, Schema.BOOLEAN_SCHEMA, true);
+        assertContainsHeader(key, Schema.INT32_SCHEMA, 0);
+        assertContainsHeader(key, Schema.STRING_SCHEMA, "third");
+        assertOnlySingleHeader(other, Schema.STRING_SCHEMA, "other value");
+
+        headers.clear();
+        assertNoHeaderWithKey(key);
+        assertNoHeaderWithKey(other);
+        assertEquals(0, headers.size());
+        assertTrue(headers.isEmpty());
+    }
+
+    @Test
+    public void shouldTransformHeaders() {
+        populate(headers);
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+
+        // Transform the headers
+        assertEquals(5, headers.size());
+        headers.apply(appendToKey("-suffix"));
+        assertEquals(5, headers.size());
+
+        assertNoHeaderWithKey(key);
+        assertNoHeaderWithKey(other);
+
+        String altKey = key + "-suffix";
+        iter = headers.allWithName(altKey);
+        assertNextHeader(iter, altKey, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, altKey, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, altKey, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, altKey, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other + "-suffix");
+        assertOnlyNextHeader(iter, other + "-suffix", Schema.STRING_SCHEMA, "other value");
+    }
+
+    @Test
+    public void shouldTransformHeadersWithKey() {
+        populate(headers);
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+
+        // Transform the headers
+        assertEquals(5, headers.size());
+        headers.apply(key, appendToKey("-suffix"));
+        assertEquals(5, headers.size());
+
+        assertNoHeaderWithKey(key);
+
+        String altKey = key + "-suffix";
+        iter = headers.allWithName(altKey);
+        assertNextHeader(iter, altKey, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, altKey, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, altKey, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, altKey, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+    }
+
+    @Test
+    public void shouldTransformAndRemoveHeaders() {
+        populate(headers);
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNextHeader(iter, key, Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertNextHeader(iter, key, Schema.STRING_SCHEMA, "third");
+        assertNoNextHeader(iter);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+
+        // Transform the headers
+        assertEquals(5, headers.size());
+        headers.apply(key, removeHeadersOfType(Type.STRING));
+        assertEquals(3, headers.size());
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNoNextHeader(iter);
+
+        assertHeader(headers.lastWithName(key), key, Schema.INT32_SCHEMA, 0);
+
+        iter = headers.allWithName(other);
+        assertOnlyNextHeader(iter, other, Schema.STRING_SCHEMA, "other value");
+
+        // Transform the headers
+        assertEquals(3, headers.size());
+        headers.apply(removeHeadersOfType(Type.STRING));
+        assertEquals(2, headers.size());
+
+        assertNoHeaderWithKey(other);
+
+        iter = headers.allWithName(key);
+        assertNextHeader(iter, key, Schema.BOOLEAN_SCHEMA, true);
+        assertNextHeader(iter, key, Schema.INT32_SCHEMA, 0);
+        assertNoNextHeader(iter);
+    }
+
+    protected HeaderTransform appendToKey(final String suffix) {
+        return new HeaderTransform() {
+            @Override
+            public Header apply(Header header) {
+                return header.rename(header.key() + suffix);
+            }
+        };
+    }
+
+    protected HeaderTransform removeHeadersOfType(final Type type) {
+        return new HeaderTransform() {
+            @Override
+            public Header apply(Header header) {
+                Schema schema = header.schema();
+                if (schema != null && schema.type() == type) {
+                    return null;
+                }
+                return header;
+            }
+        };
+    }
+
+    @Test
+    public void shouldValidateBuildInTypes() {
+        assertSchemaMatches(Schema.OPTIONAL_BOOLEAN_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_BYTES_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_INT8_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_INT16_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_INT32_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_INT64_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_FLOAT32_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_FLOAT64_SCHEMA, null);
+        assertSchemaMatches(Schema.OPTIONAL_STRING_SCHEMA, null);
+        assertSchemaMatches(Schema.BOOLEAN_SCHEMA, true);
+        assertSchemaMatches(Schema.BYTES_SCHEMA, new byte[]{});
+        assertSchemaMatches(Schema.INT8_SCHEMA, (byte) 0);
+        assertSchemaMatches(Schema.INT16_SCHEMA, (short) 0);
+        assertSchemaMatches(Schema.INT32_SCHEMA, 0);
+        assertSchemaMatches(Schema.INT64_SCHEMA, 0L);
+        assertSchemaMatches(Schema.FLOAT32_SCHEMA, 1.0f);
+        assertSchemaMatches(Schema.FLOAT64_SCHEMA, 1.0d);
+        assertSchemaMatches(Schema.STRING_SCHEMA, "value");
+        assertSchemaMatches(SchemaBuilder.array(Schema.STRING_SCHEMA), new ArrayList<String>());
+        assertSchemaMatches(SchemaBuilder.array(Schema.STRING_SCHEMA), Collections.singletonList("value"));
+        assertSchemaMatches(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), new HashMap<String, Integer>());
+        assertSchemaMatches(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), Collections.singletonMap("a", 0));
+        Schema emptyStructSchema = SchemaBuilder.struct();
+        assertSchemaMatches(emptyStructSchema, new Struct(emptyStructSchema));
+        Schema structSchema = SchemaBuilder.struct().field("foo", Schema.OPTIONAL_BOOLEAN_SCHEMA).field("bar", Schema.STRING_SCHEMA)
+                                           .schema();
+        assertSchemaMatches(structSchema, new Struct(structSchema).put("foo", true).put("bar", "v"));
+    }
+
+    @Test
+    public void shouldValidateLogicalTypes() {
+        assertSchemaMatches(Decimal.schema(3), new BigDecimal(100.00));
+        assertSchemaMatches(Time.SCHEMA, new java.util.Date());
+        assertSchemaMatches(Date.SCHEMA, new java.util.Date());
+        assertSchemaMatches(Timestamp.SCHEMA, new java.util.Date());
+    }
+
+    @Test
+    public void shouldNotValidateNullValuesWithBuiltInTypes() {
+        assertSchemaDoesNotMatch(Schema.BOOLEAN_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.BYTES_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.INT8_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.INT16_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.INT32_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.INT64_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.FLOAT32_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.FLOAT64_SCHEMA, null);
+        assertSchemaDoesNotMatch(Schema.STRING_SCHEMA, null);
+        assertSchemaDoesNotMatch(SchemaBuilder.array(Schema.STRING_SCHEMA), null);
+        assertSchemaDoesNotMatch(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), null);
+        assertSchemaDoesNotMatch(SchemaBuilder.struct(), null);
+    }
+
+    @Test
+    public void shouldNotValidateMismatchedValuesWithBuiltInTypes() {
+        assertSchemaDoesNotMatch(Schema.BOOLEAN_SCHEMA, 0L);
+        assertSchemaDoesNotMatch(Schema.BYTES_SCHEMA, "oops");
+        assertSchemaDoesNotMatch(Schema.INT8_SCHEMA, 1.0f);
+        assertSchemaDoesNotMatch(Schema.INT16_SCHEMA, 1.0f);
+        assertSchemaDoesNotMatch(Schema.INT32_SCHEMA, 0L);
+        assertSchemaDoesNotMatch(Schema.INT64_SCHEMA, 1.0f);
+        assertSchemaDoesNotMatch(Schema.FLOAT32_SCHEMA, 1L);
+        assertSchemaDoesNotMatch(Schema.FLOAT64_SCHEMA, 1L);
+        assertSchemaDoesNotMatch(Schema.STRING_SCHEMA, true);
+        assertSchemaDoesNotMatch(SchemaBuilder.array(Schema.STRING_SCHEMA), "value");
+        assertSchemaDoesNotMatch(SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA), "value");
+        assertSchemaDoesNotMatch(SchemaBuilder.struct(), new ArrayList<String>());
+    }
+
+    @Test
+    public void shouldAddDate() {
+        java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_DAYS.getTime();
+        int days = Date.fromLogical(Date.SCHEMA, dateObj);
+        headers.addDate(key, dateObj);
+        Header header = headers.lastWithName(key);
+        assertEquals(days, (int) Values.convertToInteger(header.schema(), header.value()));
+        assertSame(dateObj, Values.convertToDate(header.schema(), header.value()));
+
+        headers.addInt(other, days);
+        header = headers.lastWithName(other);
+        assertEquals(days, (int) Values.convertToInteger(header.schema(), header.value()));
+        assertEquals(dateObj, Values.convertToDate(header.schema(), header.value()));
+    }
+
+    @Test
+    public void shouldAddTime() {
+        java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime();
+        long millis = Time.fromLogical(Time.SCHEMA, dateObj);
+        headers.addTime(key, dateObj);
+        Header header = headers.lastWithName(key);
+        assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
+        assertSame(dateObj, Values.convertToTime(header.schema(), header.value()));
+
+        headers.addLong(other, millis);
+        header = headers.lastWithName(other);
+        assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
+        assertEquals(dateObj, Values.convertToTime(header.schema(), header.value()));
+    }
+
+    @Test
+    public void shouldAddTimestamp() {
+        java.util.Date dateObj = EPOCH_PLUS_TEN_THOUSAND_MILLIS.getTime();
+        long millis = Timestamp.fromLogical(Timestamp.SCHEMA, dateObj);
+        headers.addTimestamp(key, dateObj);
+        Header header = headers.lastWithName(key);
+        assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
+        assertSame(dateObj, Values.convertToTimestamp(header.schema(), header.value()));
+
+        headers.addLong(other, millis);
+        header = headers.lastWithName(other);
+        assertEquals(millis, (long) Values.convertToLong(header.schema(), header.value()));
+        assertEquals(dateObj, Values.convertToTimestamp(header.schema(), header.value()));
+    }
+
+    @Test
+    public void shouldAddDecimal() {
+        BigDecimal value = new BigDecimal("3.038573478e+3");
+        headers.addDecimal(key, value);
+        Header header = headers.lastWithName(key);
+        assertEquals(value.doubleValue(), Values.convertToDouble(header.schema(), header.value()), 0.00001d);
+        assertEquals(value, Values.convertToDecimal(header.schema(), header.value(), value.scale()));
+
+        value = value.setScale(3, RoundingMode.DOWN);
+        BigDecimal decimal = Values.convertToDecimal(header.schema(), header.value(), value.scale());
+        assertEquals(value, decimal.setScale(value.scale(), RoundingMode.DOWN));
+    }
+
+    @Test
+    public void shouldDuplicateAndAlwaysReturnEquivalentButDifferentObject() {
+        assertEquals(headers, headers.duplicate());
+        assertNotSame(headers, headers.duplicate());
+    }
+
+    protected void assertSchemaMatches(Schema schema, Object value) {
+        headers.checkSchemaMatches(new SchemaAndValue(schema.schema(), value));
+    }
+
+    protected void assertSchemaDoesNotMatch(Schema schema, Object value) {
+        try {
+            assertSchemaMatches(schema, value);
+            fail("Should have failed to validate value '" + value + "' and schema: " + schema);
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    protected void attemptAndFailToAddHeader(String key, Schema schema, Object value) {
+        try {
+            headers.add(key, value, schema);
+            fail("Should have failed to add header with key '" + key + "', value '" + value + "', and schema: " + schema);
+        } catch (DataException e) {
+            // expected
+        }
+    }
+
+    protected void addHeader(String key, Schema schema, Object value) {
+        headers.add(key, value, schema);
+        Header header = headers.lastWithName(key);
+        assertNotNull(header);
+        assertHeader(header, key, schema, value);
+    }
+
+    protected void assertNoHeaderWithKey(String key) {
+        assertNoNextHeader(headers.allWithName(key));
+    }
+
+    protected void assertContainsHeader(String key, Schema schema, Object value) {
+        Header expected = new ConnectHeader(key, new SchemaAndValue(schema, value));
+        Iterator<Header> iter = headers.allWithName(key);
+        while (iter.hasNext()) {
+            Header header = iter.next();
+            if (header.equals(expected))
+                return;
+        }
+        fail("Should have found header " + expected);
+    }
+
+    protected void assertOnlySingleHeader(String key, Schema schema, Object value) {
+        assertOnlyNextHeader(headers.allWithName(key), key, schema, value);
+    }
+
+    protected void assertOnlyNextHeader(Iterator<Header> iter, String key, Schema schema, Object value) {
+        assertNextHeader(iter, key, schema, value);
+        assertNoNextHeader(iter);
+    }
+
+    protected void assertNextHeader(Iterator<Header> iter, String key, Schema schema, Object value) {
+        Header header = iter.next();
+        assertHeader(header, key, schema, value);
+    }
+
+    protected void assertNoNextHeader(Iterator<Header> iter) {
+        assertFalse(iter.hasNext());
+    }
+
+    protected void assertHeader(Header header, String key, Schema schema, Object value) {
+        assertNotNull(header);
+        assertSame(schema, header.schema());
+        assertSame(value, header.value());
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
new file mode 100644
index 0000000..fbfbb32
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/sink/SinkRecordTest.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.sink;
+
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.junit.Before;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SinkRecordTest {
+
+    private static final String TOPIC_NAME = "myTopic";
+    private static final Integer PARTITION_NUMBER = 0;
+    private static final long KAFKA_OFFSET = 0L;
+    private static final Long KAFKA_TIMESTAMP = 0L;
+    private static final TimestampType TS_TYPE = TimestampType.CREATE_TIME;
+
+    private SinkRecord record;
+
+    @Before
+    public void beforeEach() {
+        record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
+                                KAFKA_TIMESTAMP, TS_TYPE, null);
+    }
+
+    @Test
+    public void shouldCreateSinkRecordWithHeaders() {
+        Headers headers = new ConnectHeaders().addString("h1", "hv1").addBoolean("h2", true);
+        record = new SinkRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false, KAFKA_OFFSET,
+                                KAFKA_TIMESTAMP, TS_TYPE, headers);
+        assertNotNull(record.headers());
+        assertSame(headers, record.headers());
+        assertFalse(record.headers().isEmpty());
+    }
+
+    @Test
+    public void shouldCreateSinkRecordWithEmptyHeaders() {
+        assertEquals(TOPIC_NAME, record.topic());
+        assertEquals(PARTITION_NUMBER, record.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, record.keySchema());
+        assertEquals("key", record.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, record.valueSchema());
+        assertEquals(false, record.value());
+        assertEquals(KAFKA_OFFSET, record.kafkaOffset());
+        assertEquals(KAFKA_TIMESTAMP, record.timestamp());
+        assertEquals(TS_TYPE, record.timestampType());
+        assertNotNull(record.headers());
+        assertTrue(record.headers().isEmpty());
+    }
+
+    @Test
+    public void shouldDuplicateRecordAndCloneHeaders() {
+        SinkRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
+                                                KAFKA_TIMESTAMP);
+
+        assertEquals(TOPIC_NAME, duplicate.topic());
+        assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
+        assertEquals("key", duplicate.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
+        assertEquals(false, duplicate.value());
+        assertEquals(KAFKA_OFFSET, duplicate.kafkaOffset());
+        assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
+        assertEquals(TS_TYPE, duplicate.timestampType());
+        assertNotNull(duplicate.headers());
+        assertTrue(duplicate.headers().isEmpty());
+        assertNotSame(record.headers(), duplicate.headers());
+        assertEquals(record.headers(), duplicate.headers());
+    }
+
+
+    @Test
+    public void shouldDuplicateRecordUsingNewHeaders() {
+        Headers newHeaders = new ConnectHeaders().addString("h3", "hv3");
+        SinkRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
+                                                KAFKA_TIMESTAMP, newHeaders);
+
+        assertEquals(TOPIC_NAME, duplicate.topic());
+        assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
+        assertEquals("key", duplicate.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
+        assertEquals(false, duplicate.value());
+        assertEquals(KAFKA_OFFSET, duplicate.kafkaOffset());
+        assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
+        assertEquals(TS_TYPE, duplicate.timestampType());
+        assertNotNull(duplicate.headers());
+        assertEquals(newHeaders, duplicate.headers());
+        assertSame(newHeaders, duplicate.headers());
+        assertNotSame(record.headers(), duplicate.headers());
+        assertNotEquals(record.headers(), duplicate.headers());
+    }
+
+    @Test
+    public void shouldModifyRecordHeader() {
+        assertTrue(record.headers().isEmpty());
+        record.headers().addInt("intHeader", 100);
+        assertEquals(1, record.headers().size());
+        Header header = record.headers().lastWithName("intHeader");
+        assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value()));
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java
new file mode 100644
index 0000000..275109a
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/source/SourceRecordTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.source;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.Values;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.Collections;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+
+public class SourceRecordTest {
+
+    private static final Map<String, ?> SOURCE_PARTITION = Collections.singletonMap("src", "abc");
+    private static final Map<String, ?> SOURCE_OFFSET = Collections.singletonMap("offset", "1");
+    private static final String TOPIC_NAME = "myTopic";
+    private static final Integer PARTITION_NUMBER = 0;
+    private static final Long KAFKA_TIMESTAMP = 0L;
+
+    private SourceRecord record;
+
+    @Before
+    public void beforeEach() {
+        record = new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key",
+                                  Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP, null);
+    }
+
+    @Test
+    public void shouldCreateSinkRecordWithHeaders() {
+        Headers headers = new ConnectHeaders().addString("h1", "hv1").addBoolean("h2", true);
+        record = new SourceRecord(SOURCE_PARTITION, SOURCE_OFFSET, TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key",
+                                  Schema.BOOLEAN_SCHEMA, false, KAFKA_TIMESTAMP, headers);
+        assertNotNull(record.headers());
+        assertSame(headers, record.headers());
+        assertFalse(record.headers().isEmpty());
+    }
+
+    @Test
+    public void shouldCreateSinkRecordWithEmtpyHeaders() {
+        assertEquals(SOURCE_PARTITION, record.sourcePartition());
+        assertEquals(SOURCE_OFFSET, record.sourceOffset());
+        assertEquals(TOPIC_NAME, record.topic());
+        assertEquals(PARTITION_NUMBER, record.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, record.keySchema());
+        assertEquals("key", record.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, record.valueSchema());
+        assertEquals(false, record.value());
+        assertEquals(KAFKA_TIMESTAMP, record.timestamp());
+        assertNotNull(record.headers());
+        assertTrue(record.headers().isEmpty());
+    }
+
+    @Test
+    public void shouldDuplicateRecordAndCloneHeaders() {
+        SourceRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
+                                                  KAFKA_TIMESTAMP);
+
+        assertEquals(SOURCE_PARTITION, duplicate.sourcePartition());
+        assertEquals(SOURCE_OFFSET, duplicate.sourceOffset());
+        assertEquals(TOPIC_NAME, duplicate.topic());
+        assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
+        assertEquals("key", duplicate.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
+        assertEquals(false, duplicate.value());
+        assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
+        assertNotNull(duplicate.headers());
+        assertTrue(duplicate.headers().isEmpty());
+        assertNotSame(record.headers(), duplicate.headers());
+        assertEquals(record.headers(), duplicate.headers());
+    }
+
+    @Test
+    public void shouldDuplicateRecordUsingNewHeaders() {
+        Headers newHeaders = new ConnectHeaders().addString("h3", "hv3");
+        SourceRecord duplicate = record.newRecord(TOPIC_NAME, PARTITION_NUMBER, Schema.STRING_SCHEMA, "key", Schema.BOOLEAN_SCHEMA, false,
+                                                  KAFKA_TIMESTAMP, newHeaders);
+
+        assertEquals(SOURCE_PARTITION, duplicate.sourcePartition());
+        assertEquals(SOURCE_OFFSET, duplicate.sourceOffset());
+        assertEquals(TOPIC_NAME, duplicate.topic());
+        assertEquals(PARTITION_NUMBER, duplicate.kafkaPartition());
+        assertEquals(Schema.STRING_SCHEMA, duplicate.keySchema());
+        assertEquals("key", duplicate.key());
+        assertEquals(Schema.BOOLEAN_SCHEMA, duplicate.valueSchema());
+        assertEquals(false, duplicate.value());
+        assertEquals(KAFKA_TIMESTAMP, duplicate.timestamp());
+        assertNotNull(duplicate.headers());
+        assertEquals(newHeaders, duplicate.headers());
+        assertSame(newHeaders, duplicate.headers());
+        assertNotSame(record.headers(), duplicate.headers());
+        assertNotEquals(record.headers(), duplicate.headers());
+    }
+
+    @Test
+    public void shouldModifyRecordHeader() {
+        assertTrue(record.headers().isEmpty());
+        record.headers().addInt("intHeader", 100);
+        assertEquals(1, record.headers().size());
+        Header header = record.headers().lastWithName("intHeader");
+        assertEquals(100, (int) Values.convertToInteger(header.schema(), header.value()));
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/ConverterTypeTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/ConverterTypeTest.java
new file mode 100644
index 0000000..cbaa8b2
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/ConverterTypeTest.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class ConverterTypeTest {
+
+    @Test
+    public void shouldFindByName() {
+        for (ConverterType type : ConverterType.values()) {
+            assertEquals(type, ConverterType.withName(type.getName()));
+        }
+    }
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
new file mode 100644
index 0000000..fdfdd32
--- /dev/null
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/SimpleHeaderConverterTest.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+public class SimpleHeaderConverterTest {
+
+    private static final String TOPIC = "topic";
+    private static final String HEADER = "header";
+
+    private static final Map<String, String> STRING_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).schema();
+
+    private static final Map<String, Short> STRING_SHORT_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_SHORT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT16_SCHEMA).schema();
+
+    private static final Map<String, Integer> STRING_INT_MAP = new LinkedHashMap<>();
+    private static final Schema STRING_INT_MAP_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).schema();
+
+    private static final List<Integer> INT_LIST = new ArrayList<>();
+    private static final Schema INT_LIST_SCHEMA = SchemaBuilder.array(Schema.INT32_SCHEMA).schema();
+
+    private static final List<String> STRING_LIST = new ArrayList<>();
+    private static final Schema STRING_LIST_SCHEMA = SchemaBuilder.array(Schema.STRING_SCHEMA).schema();
+
+    static {
+        STRING_MAP.put("foo", "123");
+        STRING_MAP.put("bar", "baz");
+        STRING_SHORT_MAP.put("foo", (short) 12345);
+        STRING_SHORT_MAP.put("bar", (short) 0);
+        STRING_SHORT_MAP.put("baz", (short) -4321);
+        STRING_INT_MAP.put("foo", 1234567890);
+        STRING_INT_MAP.put("bar", 0);
+        STRING_INT_MAP.put("baz", -987654321);
+        STRING_LIST.add("foo");
+        STRING_LIST.add("bar");
+        INT_LIST.add(1234567890);
+        INT_LIST.add(-987654321);
+    }
+
+    private SimpleHeaderConverter converter;
+
+    @Before
+    public void beforeEach() {
+        converter = new SimpleHeaderConverter();
+    }
+
+    @Test
+    public void shouldConvertNullValue() {
+        assertRoundTrip(Schema.STRING_SCHEMA, null);
+        assertRoundTrip(Schema.OPTIONAL_STRING_SCHEMA, null);
+    }
+
+    @Test
+    public void shouldConvertSimpleString() {
+        assertRoundTrip(Schema.STRING_SCHEMA, "simple");
+    }
+
+    @Test
+    public void shouldConvertEmptyString() {
+        assertRoundTrip(Schema.STRING_SCHEMA, "");
+    }
+
+    @Test
+    public void shouldConvertStringWithQuotesAndOtherDelimiterCharacters() {
+        assertRoundTrip(Schema.STRING_SCHEMA, "three\"blind\\\"mice");
+        assertRoundTrip(Schema.STRING_SCHEMA, "string with delimiters: <>?,./\\=+-!@#$%^&*(){}[]|;':");
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeys() {
+        assertRoundTrip(STRING_MAP_SCHEMA, STRING_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithStringValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":\"123\",\"bar\":\"baz\"}");
+        assertEquals(STRING_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithStringValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{ \"foo\" : \"123\", \n\"bar\" : \"baz\" } ");
+        assertEquals(STRING_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeysAndShortValues() {
+        assertRoundTrip(STRING_SHORT_MAP_SCHEMA, STRING_SHORT_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithShortValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":12345,\"bar\":0,\"baz\":-4321}");
+        assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_SHORT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithShortValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, " { \"foo\" :  12345 , \"bar\" : 0,  \"baz\" : -4321 }  ");
+        assertEquals(STRING_SHORT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_SHORT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeysAndIntegerValues() {
+        assertRoundTrip(STRING_INT_MAP_SCHEMA, STRING_INT_MAP);
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithIntValuesWithoutWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, "{\"foo\":1234567890,\"bar\":0,\"baz\":-987654321}");
+        assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_INT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldParseStringOfMapWithIntValuesWithWhitespaceAsMap() {
+        SchemaAndValue result = roundTrip(Schema.STRING_SCHEMA, " { \"foo\" :  1234567890 , \"bar\" : 0,  \"baz\" : -987654321 }  ");
+        assertEquals(STRING_INT_MAP_SCHEMA, result.schema());
+        assertEquals(STRING_INT_MAP, result.value());
+    }
+
+    @Test
+    public void shouldConvertListWithStringValues() {
+        assertRoundTrip(STRING_LIST_SCHEMA, STRING_LIST);
+    }
+
+    @Test
+    public void shouldConvertListWithIntegerValues() {
+        assertRoundTrip(INT_LIST_SCHEMA, INT_LIST);
+    }
+
+    @Test
+    public void shouldConvertMapWithStringKeysAndMixedValuesToMapWithoutSchema() {
+        Map<String, Object> map = new LinkedHashMap<>();
+        map.put("foo", "bar");
+        map.put("baz", (short) 3456);
+        assertRoundTrip(null, map);
+    }
+
+    @Test
+    public void shouldConvertListWithMixedValuesToListWithoutSchema() {
+        List<Object> list = new ArrayList<>();
+        list.add("foo");
+        list.add((short) 13344);
+        assertRoundTrip(null, list);
+    }
+
+    @Test
+    public void shouldConvertEmptyMapToMapWithoutSchema() {
+        assertRoundTrip(null, new LinkedHashMap<>());
+    }
+
+    @Test
+    public void shouldConvertEmptyListToListWithoutSchema() {
+        assertRoundTrip(null, new ArrayList<>());
+    }
+
+    protected SchemaAndValue roundTrip(Schema schema, Object input) {
+        byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, input);
+        return converter.toConnectHeader(TOPIC, HEADER, serialized);
+    }
+
+    protected void assertRoundTrip(Schema schema, Object value) {
+        byte[] serialized = converter.fromConnectHeader(TOPIC, HEADER, schema, value);
+        SchemaAndValue result = converter.toConnectHeader(TOPIC, HEADER, serialized);
+
+        if (value == null) {
+            assertNull(serialized);
+            assertNull(result.schema());
+            assertNull(result.value());
+        } else {
+            assertNotNull(serialized);
+            assertEquals(value, result.value());
+            assertEquals(schema, result.schema());
+
+            byte[] serialized2 = converter.fromConnectHeader(TOPIC, HEADER, result.schema(), result.value());
+            SchemaAndValue result2 = converter.toConnectHeader(TOPIC, HEADER, serialized2);
+            assertNotNull(serialized2);
+            assertEquals(schema, result2.schema());
+            assertEquals(value, result2.value());
+            assertEquals(result, result2);
+            assertArrayEquals(serialized, serialized);
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
index e860fbb..49ffc74 100644
--- a/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
+++ b/connect/api/src/test/java/org/apache/kafka/connect/storage/StringConverterTest.java
@@ -79,4 +79,22 @@ public class StringConverterTest {
         assertEquals(Schema.OPTIONAL_STRING_SCHEMA, data.schema());
         assertEquals(SAMPLE_STRING, data.value());
     }
+
+    // Note: the header conversion methods delegates to the data conversion methods, which are tested above.
+    // The following simply verify that the delegation works.
+
+    @Test
+    public void testStringHeaderValueToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals(SAMPLE_STRING.getBytes("UTF8"), converter.fromConnectHeader(TOPIC, "hdr", Schema.STRING_SCHEMA, SAMPLE_STRING));
+    }
+
+    @Test
+    public void testNonStringHeaderValueToBytes() throws UnsupportedEncodingException {
+        assertArrayEquals("true".getBytes("UTF8"), converter.fromConnectHeader(TOPIC, "hdr", Schema.BOOLEAN_SCHEMA, true));
+    }
+
+    @Test
+    public void testNullHeaderValueToBytes() {
+        assertEquals(null, converter.fromConnectHeader(TOPIC, "hdr", Schema.OPTIONAL_STRING_SCHEMA, null));
+    }
 }
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
index 01b0abb..32ded44 100644
--- a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverter.java
@@ -23,6 +23,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
 import org.apache.kafka.common.cache.Cache;
 import org.apache.kafka.common.cache.LRUCache;
 import org.apache.kafka.common.cache.SynchronizedCache;
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.errors.SerializationException;
 import org.apache.kafka.connect.data.SchemaBuilder;
 import org.apache.kafka.connect.data.Schema;
@@ -36,6 +37,9 @@ import org.apache.kafka.connect.data.Decimal;
 import org.apache.kafka.connect.data.Date;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.storage.HeaderConverter;
+import org.apache.kafka.connect.storage.StringConverterConfig;
 
 import java.io.IOException;
 import java.math.BigDecimal;
@@ -48,13 +52,13 @@ import java.util.Iterator;
 import java.util.Map;
 
 /**
- * Implementation of Converter that uses JSON to store schemas and objects.
+ * Implementation of Converter that uses JSON to store schemas and objects. By default this converter will serialize Connect keys, values,
+ * and headers with schemas, although this can be disabled with {@link JsonConverterConfig#SCHEMAS_ENABLE_CONFIG schemas.enable}
+ * configuration option.
+ *
+ * This implementation currently does nothing with the topic names or header names.
  */
-public class JsonConverter implements Converter {
-    private static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
-    private static final boolean SCHEMAS_ENABLE_DEFAULT = true;
-    private static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
-    private static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
+public class JsonConverter implements Converter, HeaderConverter {
 
     private static final Map<Schema.Type, JsonToConnectTypeConverter> TO_CONNECT_CONVERTERS = new EnumMap<>(Schema.Type.class);
 
@@ -262,8 +266,8 @@ public class JsonConverter implements Converter {
     }
 
 
-    private boolean enableSchemas = SCHEMAS_ENABLE_DEFAULT;
-    private int cacheSize = SCHEMAS_CACHE_SIZE_DEFAULT;
+    private boolean enableSchemas = JsonConverterConfig.SCHEMAS_ENABLE_DEFAULT;
+    private int cacheSize = JsonConverterConfig.SCHEMAS_CACHE_SIZE_DEFAULT;
     private Cache<Schema, ObjectNode> fromConnectSchemaCache;
     private Cache<JsonNode, Schema> toConnectSchemaCache;
 
@@ -271,22 +275,47 @@ public class JsonConverter implements Converter {
     private final JsonDeserializer deserializer = new JsonDeserializer();
 
     @Override
-    public void configure(Map<String, ?> configs, boolean isKey) {
-        Object enableConfigsVal = configs.get(SCHEMAS_ENABLE_CONFIG);
-        if (enableConfigsVal != null)
-            enableSchemas = enableConfigsVal.toString().equals("true");
+    public ConfigDef config() {
+        return JsonConverterConfig.configDef();
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+        JsonConverterConfig config = new JsonConverterConfig(configs);
+        enableSchemas = config.schemasEnabled();
+        cacheSize = config.schemaCacheSize();
 
+        boolean isKey = config.type() == ConverterType.KEY;
         serializer.configure(configs, isKey);
         deserializer.configure(configs, isKey);
 
-        Object cacheSizeVal = configs.get(SCHEMAS_CACHE_SIZE_CONFIG);
-        if (cacheSizeVal != null)
-            cacheSize = Integer.parseInt((String) cacheSizeVal);
         fromConnectSchemaCache = new SynchronizedCache<>(new LRUCache<Schema, ObjectNode>(cacheSize));
         toConnectSchemaCache = new SynchronizedCache<>(new LRUCache<JsonNode, Schema>(cacheSize));
     }
 
     @Override
+    public void configure(Map<String, ?> configs, boolean isKey) {
+        Map<String, Object> conf = new HashMap<>(configs);
+        conf.put(StringConverterConfig.TYPE_CONFIG, isKey ? ConverterType.KEY.getName() : ConverterType.VALUE.getName());
+        configure(conf);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
+
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+        return fromConnectData(topic, schema, value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+        return toConnectData(topic, value);
+    }
+
+    @Override
     public byte[] fromConnectData(String topic, Schema schema, Object value) {
         JsonNode jsonValue = enableSchemas ? convertToJsonWithEnvelope(schema, value) : convertToJsonWithoutEnvelope(schema, value);
         try {
@@ -456,7 +485,7 @@ public class JsonConverter implements Converter {
                 break;
             case JsonSchema.ARRAY_TYPE_NAME:
                 JsonNode elemSchema = jsonSchema.get(JsonSchema.ARRAY_ITEMS_FIELD_NAME);
-                if (elemSchema == null)
+                if (elemSchema == null || elemSchema.isNull())
                     throw new DataException("Array schema did not specify the element type");
                 builder = SchemaBuilder.array(asConnectSchema(elemSchema));
                 break;
diff --git a/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
new file mode 100644
index 0000000..7f1dda2
--- /dev/null
+++ b/connect/json/src/main/java/org/apache/kafka/connect/json/JsonConverterConfig.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.json;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.connect.storage.ConverterConfig;
+
+import java.util.Map;
+
+/**
+ * Configuration options for {@link JsonConverter} instances.
+ */
+public class JsonConverterConfig extends ConverterConfig {
+
+    public static final String SCHEMAS_ENABLE_CONFIG = "schemas.enable";
+    public static final boolean SCHEMAS_ENABLE_DEFAULT = true;
+    private static final String SCHEMAS_ENABLE_DOC = "Include schemas within each of the serialized values and keys.";
+    private static final String SCHEMAS_ENABLE_DISPLAY = "Enable Schemas";
+
+    public static final String SCHEMAS_CACHE_SIZE_CONFIG = "schemas.cache.size";
+    public static final int SCHEMAS_CACHE_SIZE_DEFAULT = 1000;
+    private static final String SCHEMAS_CACHE_SIZE_DOC = "The maximum number of schemas that can be cached in this converter instance.";
+    private static final String SCHEMAS_CACHE_SIZE_DISPLAY = "Schema Cache Size";
+
+    private final static ConfigDef CONFIG;
+
+    static {
+        String group = "Schemas";
+        int orderInGroup = 0;
+        CONFIG = ConverterConfig.newConfigDef();
+        CONFIG.define(SCHEMAS_ENABLE_CONFIG, Type.BOOLEAN, SCHEMAS_ENABLE_DEFAULT, Importance.HIGH, SCHEMAS_ENABLE_DOC, group,
+                      orderInGroup++, Width.MEDIUM, SCHEMAS_ENABLE_DISPLAY);
+        CONFIG.define(SCHEMAS_CACHE_SIZE_CONFIG, Type.INT, SCHEMAS_CACHE_SIZE_DEFAULT, Importance.HIGH, SCHEMAS_CACHE_SIZE_DOC, group,
+                      orderInGroup++, Width.MEDIUM, SCHEMAS_CACHE_SIZE_DISPLAY);
+    }
+
+    public static ConfigDef configDef() {
+        return CONFIG;
+    }
+
+    public JsonConverterConfig(Map<String, ?> props) {
+        super(CONFIG, props);
+    }
+
+    /**
+     * Return whether schemas are enabled.
+     *
+     * @return true if enabled, or false otherwise
+     */
+    public boolean schemasEnabled() {
+        return getBoolean(SCHEMAS_ENABLE_CONFIG);
+    }
+
+    /**
+     * Get the cache size.
+     *
+     * @return the cache size
+     */
+    public int schemaCacheSize() {
+        return getInt(SCHEMAS_CACHE_SIZE_CONFIG);
+    }
+}
diff --git a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
index 62b52a0..0a71044 100644
--- a/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
+++ b/connect/json/src/test/java/org/apache/kafka/connect/json/JsonConverterTest.java
@@ -739,6 +739,23 @@ public class JsonConverterTest {
     }
 
 
+    // Note: the header conversion methods delegates to the data conversion methods, which are tested above.
+    // The following simply verify that the delegation works.
+
+    @Test
+    public void testStringHeaderToJson() throws UnsupportedEncodingException {
+        JsonNode converted = parse(converter.fromConnectHeader(TOPIC, "headerName", Schema.STRING_SCHEMA, "test-string"));
+        validateEnvelope(converted);
+        assertEquals(parse("{ \"type\": \"string\", \"optional\": false }"), converted.get(JsonSchema.ENVELOPE_SCHEMA_FIELD_NAME));
+        assertEquals("test-string", converted.get(JsonSchema.ENVELOPE_PAYLOAD_FIELD_NAME).textValue());
+    }
+
+    @Test
+    public void stringHeaderToConnect() {
+        assertEquals(new SchemaAndValue(Schema.STRING_SCHEMA, "foo-bar-baz"), converter.toConnectHeader(TOPIC, "headerName", "{ \"schema\": { \"type\": \"string\" }, \"payload\": \"foo-bar-baz\" }".getBytes()));
+    }
+
+
     private JsonNode parse(byte[] json) {
         try {
             return objectMapper.readTree(json);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
index 05dff27..34c552e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/converters/ByteArrayConverter.java
@@ -17,17 +17,33 @@
 
 package org.apache.kafka.connect.converters;
 
+import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.connect.data.Schema;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.DataException;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.HeaderConverter;
 
 import java.util.Map;
 
 /**
  * Pass-through converter for raw byte data.
+ *
+ * This implementation currently does nothing with the topic names or header names.
  */
-public class ByteArrayConverter implements Converter {
+public class ByteArrayConverter implements Converter, HeaderConverter {
+
+    private static final ConfigDef CONFIG_DEF = ConverterConfig.newConfigDef();
+
+    @Override
+    public ConfigDef config() {
+        return CONFIG_DEF;
+    }
+
+    @Override
+    public void configure(Map<String, ?> configs) {
+    }
 
     @Override
     public void configure(Map<String, ?> configs, boolean isKey) {
@@ -49,4 +65,18 @@ public class ByteArrayConverter implements Converter {
         return new SchemaAndValue(Schema.OPTIONAL_BYTES_SCHEMA, value);
     }
 
+    @Override
+    public byte[] fromConnectHeader(String topic, String headerKey, Schema schema, Object value) {
+        return fromConnectData(topic, schema, value);
+    }
+
+    @Override
+    public SchemaAndValue toConnectHeader(String topic, String headerKey, byte[] value) {
+        return toConnectData(topic, value);
+    }
+
+    @Override
+    public void close() {
+        // do nothing
+    }
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index aad12c3..0a895f6 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -73,6 +73,11 @@ public class ConnectorConfig extends AbstractConfig {
     public static final String VALUE_CONVERTER_CLASS_DOC = WorkerConfig.VALUE_CONVERTER_CLASS_DOC;
     public static final String VALUE_CONVERTER_CLASS_DISPLAY = "Value converter class";
 
+    public static final String HEADER_CONVERTER_CLASS_CONFIG = WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG;
+    public static final String HEADER_CONVERTER_CLASS_DOC = WorkerConfig.HEADER_CONVERTER_CLASS_DOC;
+    public static final String HEADER_CONVERTER_CLASS_DISPLAY = "Header converter class";
+    public static final String HEADER_CONVERTER_CLASS_DEFAULT = WorkerConfig.HEADER_CONVERTER_CLASS_DEFAULT;
+
     public static final String TASKS_MAX_CONFIG = "tasks.max";
     private static final String TASKS_MAX_DOC = "Maximum number of tasks to use for this connector.";
     public static final int TASKS_MAX_DEFAULT = 1;
@@ -96,12 +101,14 @@ public class ConnectorConfig extends AbstractConfig {
     }
 
     public static ConfigDef configDef() {
+        int orderInGroup = 0;
         return new ConfigDef()
-                .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
-                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
-                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
-                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
-                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
+                .define(NAME_CONFIG, Type.STRING, ConfigDef.NO_DEFAULT_VALUE, nonEmptyStringWithoutControlChars(), Importance.HIGH, NAME_DOC, COMMON_GROUP, ++orderInGroup, Width.MEDIUM, NAME_DISPLAY)
+                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.LONG, CONNECTOR_CLASS_DISPLAY)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, TASK_MAX_DISPLAY)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
+                .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS, HEADER_CONVERTER_CLASS_DEFAULT, Importance.LOW, HEADER_CONVERTER_CLASS_DOC, COMMON_GROUP, ++orderInGroup, Width.SHORT, HEADER_CONVERTER_CLASS_DISPLAY)
                 .define(TRANSFORMS_CONFIG, Type.LIST, Collections.emptyList(), ConfigDef.CompositeValidator.of(new ConfigDef.NonNullValidator(), new ConfigDef.Validator() {
                     @Override
                     public void ensureValid(String name, Object value) {
@@ -110,7 +117,7 @@ public class ConnectorConfig extends AbstractConfig {
                             throw new ConfigException(name, value, "Duplicate alias provided.");
                         }
                     }
-                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
+                }), Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, ++orderInGroup, Width.LONG, TRANSFORMS_DISPLAY);
     }
 
     public ConnectorConfig(Plugins plugins) {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 992825c..834eb39 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -36,6 +36,9 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.ConverterConfig;
+import org.apache.kafka.connect.storage.ConverterType;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageReaderImpl;
@@ -383,29 +386,29 @@ public class Worker {
             // search for converters within the connector dependencies, and if not found the
             // plugin class loader delegates loading to the delegating classloader.
             Converter keyConverter = connConfig.getConfiguredInstance(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG, Converter.class);
-            if (keyConverter != null)
-                keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
-            else {
-                Converter defaultKeyConverter = plugins.newConverter(
-                        config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName(),
-                        config
-                );
-                defaultKeyConverter.configure(config.originalsWithPrefix("key.converter."), true);
-                keyConverter = defaultKeyConverter;
+            if (keyConverter == null) {
+                String className = config.getClass(WorkerConfig.KEY_CONVERTER_CLASS_CONFIG).getName();
+                keyConverter = plugins.newConverter(className, config);
             }
+            keyConverter.configure(connConfig.originalsWithPrefix("key.converter."), true);
+
             Converter valueConverter = connConfig.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class);
-            if (valueConverter != null)
-                valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
-            else {
-                Converter defaultValueConverter = plugins.newConverter(
-                        config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName(),
-                        config
-                );
-                defaultValueConverter.configure(config.originalsWithPrefix("value.converter."), false);
-                valueConverter = defaultValueConverter;
+            if (valueConverter == null) {
+                String className = config.getClass(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG).getName();
+                valueConverter = plugins.newConverter(className, config);
+            }
+            valueConverter.configure(connConfig.originalsWithPrefix("value.converter."), false);
+
+            HeaderConverter headerConverter = connConfig.getConfiguredInstance(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG, HeaderConverter.class);
+            if (headerConverter == null) {
+                String className = config.getClass(WorkerConfig.HEADER_CONVERTER_CLASS_CONFIG).getName();
+                headerConverter = plugins.newHeaderConverter(className, config);
             }
+            Map<String, Object> converterConfig = connConfig.originalsWithPrefix("header.converter.");
+            converterConfig.put(ConverterConfig.TYPE_CONFIG, ConverterType.HEADER.getName());
+            headerConverter.configure(converterConfig);
 
-            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, connectorLoader);
+            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter, headerConverter, connectorLoader);
             workerTask.initialize(taskConfig);
             Plugins.compareAndSwapLoaders(savedLoader);
         } catch (Throwable t) {
@@ -437,6 +440,7 @@ public class Worker {
                                        TargetState initialState,
                                        Converter keyConverter,
                                        Converter valueConverter,
+                                       HeaderConverter headerConverter,
                                        ClassLoader loader) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
@@ -446,12 +450,12 @@ public class Worker {
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
-            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
-                    valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter, valueConverter,
+                    headerConverter, transformationChain, producer, offsetReader, offsetWriter, config, metrics, loader, time);
         } else if (task instanceof SinkTask) {
             TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, metrics, keyConverter,
-                    valueConverter, transformationChain, loader, time);
+                    valueConverter, headerConverter, transformationChain, loader, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
index 3b99c6b..b857b0e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
@@ -23,6 +23,7 @@ import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.connect.storage.SimpleHeaderConverter;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,6 +64,15 @@ public class WorkerConfig extends AbstractConfig {
                     " independent of connectors it allows any connector to work with any serialization format." +
                     " Examples of common formats include JSON and Avro.";
 
+    public static final String HEADER_CONVERTER_CLASS_CONFIG = "header.converter";
+    public static final String HEADER_CONVERTER_CLASS_DOC =
+            "HeaderConverter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
+                    " This controls the format of the header values in messages written to or read from Kafka, and since this is" +
+                    " independent of connectors it allows any connector to work with any serialization format." +
+                    " Examples of common formats include JSON and Avro. By default, the SimpleHeaderConverter is used to serialize" +
+                    " header values to strings and deserialize them by inferring the schemas.";
+    public static final String HEADER_CONVERTER_CLASS_DEFAULT = SimpleHeaderConverter.class.getName();
+
     public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter";
     public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC =
             "Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka." +
@@ -222,7 +232,11 @@ public class WorkerConfig extends AbstractConfig {
                 .define(METRIC_REPORTER_CLASSES_CONFIG, Type.LIST,
                         "", Importance.LOW,
                         CommonClientConfigs.METRIC_REPORTER_CLASSES_DOC)
-                .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG, ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC);
+                .define(BrokerSecurityConfigs.SSL_CLIENT_AUTH_CONFIG,
+                        ConfigDef.Type.STRING, "none", ConfigDef.Importance.LOW, BrokerSecurityConfigs.SSL_CLIENT_AUTH_DOC)
+                .define(HEADER_CONVERTER_CLASS_CONFIG, Type.CLASS,
+                        HEADER_CONVERTER_CLASS_DEFAULT,
+                        Importance.LOW, HEADER_CONVERTER_CLASS_DOC);
     }
 
     @Override
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 6961494..85695bb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -38,10 +38,13 @@ import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.data.SchemaAndValue;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectUtils;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.connect.util.SinkUtils;
@@ -70,6 +73,7 @@ class WorkerSinkTask extends WorkerTask {
     private final Time time;
     private final Converter keyConverter;
     private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
     private final TransformationChain<SinkRecord> transformationChain;
     private final SinkTaskMetricsGroup sinkTaskMetricsGroup;
     private KafkaConsumer<byte[], byte[]> consumer;
@@ -94,6 +98,7 @@ class WorkerSinkTask extends WorkerTask {
                           ConnectMetrics connectMetrics,
                           Converter keyConverter,
                           Converter valueConverter,
+                          HeaderConverter headerConverter,
                           TransformationChain<SinkRecord> transformationChain,
                           ClassLoader loader,
                           Time time) {
@@ -103,6 +108,7 @@ class WorkerSinkTask extends WorkerTask {
         this.task = task;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
         this.transformationChain = transformationChain;
         this.time = time;
         this.messageBatch = new ArrayList<>();
@@ -474,13 +480,15 @@ class WorkerSinkTask extends WorkerTask {
                     this, msg.topic(), msg.partition(), msg.offset(), msg.timestamp());
             SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
             SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
+            Headers headers = convertHeadersFor(msg);
             Long timestamp = ConnectUtils.checkAndConvertTimestamp(msg.timestamp());
             SinkRecord origRecord = new SinkRecord(msg.topic(), msg.partition(),
                     keyAndSchema.schema(), keyAndSchema.value(),
                     valueAndSchema.schema(), valueAndSchema.value(),
                     msg.offset(),
                     timestamp,
-                    msg.timestampType());
+                    msg.timestampType(),
+                    headers);
             log.trace("{} Applying transformations to record in topic '{}' partition {} at offset {} and timestamp {} with key {} and value {}",
                     this, msg.topic(), msg.partition(), msg.offset(), timestamp, keyAndSchema.value(), valueAndSchema.value());
             SinkRecord transRecord = transformationChain.apply(origRecord);
@@ -498,6 +506,19 @@ class WorkerSinkTask extends WorkerTask {
         sinkTaskMetricsGroup.recordConsumedOffsets(origOffsets);
     }
 
+    private Headers convertHeadersFor(ConsumerRecord<byte[], byte[]> record) {
+        Headers result = new ConnectHeaders();
+        org.apache.kafka.common.header.Headers recordHeaders = record.headers();
+        if (recordHeaders != null) {
+            String topic = record.topic();
+            for (org.apache.kafka.common.header.Header recordHeader : recordHeaders) {
+                SchemaAndValue schemaAndValue = headerConverter.toConnectHeader(topic, recordHeader.key(), recordHeader.value());
+                result.add(recordHeader.key(), schemaAndValue);
+            }
+        }
+        return result;
+    }
+
     private void resumeAll() {
         for (TopicPartition tp : consumer.assignment())
             if (!context.pausedPartitions().contains(tp))
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index a172cdb..6288767 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -22,6 +22,7 @@ import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.header.internals.RecordHeaders;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
@@ -30,10 +31,13 @@ import org.apache.kafka.common.metrics.stats.Total;
 import org.apache.kafka.common.metrics.stats.Value;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
 import org.apache.kafka.connect.runtime.ConnectMetrics.MetricGroup;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.ConnectUtils;
@@ -62,6 +66,7 @@ class WorkerSourceTask extends WorkerTask {
     private final SourceTask task;
     private final Converter keyConverter;
     private final Converter valueConverter;
+    private final HeaderConverter headerConverter;
     private final TransformationChain<SourceRecord> transformationChain;
     private KafkaProducer<byte[], byte[]> producer;
     private final OffsetStorageReader offsetReader;
@@ -89,6 +94,7 @@ class WorkerSourceTask extends WorkerTask {
                             TargetState initialState,
                             Converter keyConverter,
                             Converter valueConverter,
+                            HeaderConverter headerConverter,
                             TransformationChain<SourceRecord> transformationChain,
                             KafkaProducer<byte[], byte[]> producer,
                             OffsetStorageReader offsetReader,
@@ -103,6 +109,7 @@ class WorkerSourceTask extends WorkerTask {
         this.task = task;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
+        this.headerConverter = headerConverter;
         this.transformationChain = transformationChain;
         this.producer = producer;
         this.offsetReader = offsetReader;
@@ -216,10 +223,11 @@ class WorkerSourceTask extends WorkerTask {
                 continue;
             }
 
+            RecordHeaders headers = convertHeaderFor(record);
             byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
             final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(),
-                    ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value);
+                    ConnectUtils.checkAndConvertTimestamp(record.timestamp()), key, value, headers);
             log.trace("{} Appending record with key {}, value {}", this, record.key(), record.value());
             // We need this queued first since the callback could happen immediately (even synchronously in some cases).
             // Because of this we need to be careful about handling retries -- we always save the previously attempted
@@ -278,6 +286,20 @@ class WorkerSourceTask extends WorkerTask {
         return true;
     }
 
+    private RecordHeaders convertHeaderFor(SourceRecord record) {
+        Headers headers = record.headers();
+        RecordHeaders result = new RecordHeaders();
+        if (headers != null) {
+            String topic = record.topic();
+            for (Header header : headers) {
+                String key = header.key();
+                byte[] rawHeader = headerConverter.fromConnectHeader(topic, key, header.schema(), header.value());
+                result.add(key, rawHeader);
+            }
+        }
+        return result;
+    }
+
     private void commitTaskRecord(SourceRecord record) {
         try {
             task.commitRecord(record);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
index f7a5553..bcf2afb 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/isolation/Plugins.java
@@ -25,6 +25,7 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.transforms.Transformation;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -200,7 +201,26 @@ public class Plugins {
             throw new ConnectException(
                     "Failed to find any class that implements Converter and which name matches "
                             + converterClassOrAlias
-                            + ", available connectors are: "
+                            + ", available converters are: "
+                            + pluginNames(delegatingLoader.converters())
+            );
+        }
+        return config != null ? newConfiguredPlugin(config, klass) : newPlugin(klass);
+    }
+
+    public HeaderConverter newHeaderConverter(String converterClassOrAlias, AbstractConfig config) {
+        Class<? extends HeaderConverter> klass;
+        try {
+            klass = pluginClass(
+                    delegatingLoader,
+                    converterClassOrAlias,
+                    HeaderConverter.class
+            );
+        } catch (ClassNotFoundException e) {
+            throw new ConnectException(
+                    "Failed to find any class that implements HeaderConverter and which name matches "
+                            + converterClassOrAlias
+                            + ", available header converters are: "
                             + pluginNames(delegatingLoader.converters())
             );
         }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
index 500d31f..dac1392 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/AbstractHerderTest.java
@@ -31,9 +31,13 @@ import org.apache.kafka.connect.transforms.Transformation;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.easymock.Capture;
 import org.easymock.EasyMock;
-import org.easymock.EasyMockSupport;
 import org.easymock.IAnswer;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.MockStrict;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.Arrays;
 import java.util.Collections;
@@ -43,27 +47,34 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
+import static org.powermock.api.easymock.PowerMock.verifyAll;
+import static org.powermock.api.easymock.PowerMock.replayAll;
+import static org.easymock.EasyMock.strictMock;
+import static org.easymock.EasyMock.partialMockBuilder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 
-public class AbstractHerderTest extends EasyMockSupport {
-    private final Worker worker = strictMock(Worker.class);
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({AbstractHerder.class})
+public class AbstractHerderTest {
+
     private final String workerId = "workerId";
     private final String kafkaClusterId = "I4ZmrWqfT2e-upky_4fdPA";
     private final int generation = 5;
     private final String connector = "connector";
-    private final Plugins plugins = strictMock(Plugins.class);
-    private final ClassLoader classLoader = strictMock(ClassLoader.class);
+
+    @MockStrict private Worker worker;
+    @MockStrict private Plugins plugins;
+    @MockStrict private ClassLoader classLoader;
+    @MockStrict private ConfigBackingStore configStore;
+    @MockStrict private StatusBackingStore statusStore;
 
     @Test
     public void connectorStatus() {
         ConnectorTaskId taskId = new ConnectorTaskId(connector, 0);
 
-        ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
-        StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
-
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
                 .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
                 .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
@@ -96,7 +107,7 @@ public class AbstractHerderTest extends EasyMockSupport {
         assertEquals("UNASSIGNED", taskState.state());
         assertEquals(workerId, taskState.workerId());
 
-        verifyAll();
+        PowerMock.verifyAll();
     }
 
     @Test
@@ -104,9 +115,6 @@ public class AbstractHerderTest extends EasyMockSupport {
         ConnectorTaskId taskId = new ConnectorTaskId("connector", 0);
         String workerId = "workerId";
 
-        ConfigBackingStore configStore = strictMock(ConfigBackingStore.class);
-        StatusBackingStore statusStore = strictMock(StatusBackingStore.class);
-
         AbstractHerder herder = partialMockBuilder(AbstractHerder.class)
                 .withConstructor(Worker.class, String.class, String.class, StatusBackingStore.class, ConfigBackingStore.class)
                 .withArgs(worker, workerId, kafkaClusterId, statusStore, configStore)
@@ -163,14 +171,14 @@ public class AbstractHerderTest extends EasyMockSupport {
         assertEquals(TestSourceConnector.class.getName(), result.name());
         assertEquals(Arrays.asList(ConnectorConfig.COMMON_GROUP, ConnectorConfig.TRANSFORMS_GROUP), result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 6 fields, connector's configs add 2
-        assertEquals(8, result.values().size());
+        // Base connector config has 7 fields, connector's configs add 2
+        assertEquals(9, result.values().size());
         // Missing name should generate an error
         assertEquals(ConnectorConfig.NAME_CONFIG, result.values().get(0).configValue().name());
         assertEquals(1, result.values().get(0).configValue().errors().size());
         // "required" config from connector should generate an error
-        assertEquals("required", result.values().get(6).configValue().name());
-        assertEquals(1, result.values().get(6).configValue().errors().size());
+        assertEquals("required", result.values().get(7).configValue().name());
+        assertEquals(1, result.values().get(7).configValue().errors().size());
 
         verifyAll();
     }
@@ -209,15 +217,15 @@ public class AbstractHerderTest extends EasyMockSupport {
         );
         assertEquals(expectedGroups, result.groups());
         assertEquals(2, result.errorCount());
-        // Base connector config has 6 fields, connector's configs add 2, 2 type fields from the transforms, and
+        // Base connector config has 7 fields, connector's configs add 2, 2 type fields from the transforms, and
         // 1 from the valid transformation's config
-        assertEquals(11, result.values().size());
+        assertEquals(12, result.values().size());
         // Should get 2 type fields from the transforms, first adds its own config since it has a valid class
-        assertEquals("transforms.xformA.type", result.values().get(6).configValue().name());
-        assertTrue(result.values().get(6).configValue().errors().isEmpty());
-        assertEquals("transforms.xformA.subconfig", result.values().get(7).configValue().name());
-        assertEquals("transforms.xformB.type", result.values().get(8).configValue().name());
-        assertFalse(result.values().get(8).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.type", result.values().get(7).configValue().name());
+        assertTrue(result.values().get(7).configValue().errors().isEmpty());
+        assertEquals("transforms.xformA.subconfig", result.values().get(8).configValue().name());
+        assertEquals("transforms.xformB.type", result.values().get(9).configValue().name());
+        assertFalse(result.values().get(9).configValue().errors().isEmpty());
 
         verifyAll();
     }
@@ -262,7 +270,7 @@ public class AbstractHerderTest extends EasyMockSupport {
         @Override
         public ConfigDef config() {
             return new ConfigDef()
-                    .define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
+                           .define("subconfig", ConfigDef.Type.STRING, "default", ConfigDef.Importance.LOW, "docs");
         }
 
         @Override
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
index 99f4ded..2d5da98 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitterTest.java
@@ -54,6 +54,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
     private ConcurrentHashMap committers;
     @Mock
     private Logger mockLog;
+    @Mock private ScheduledFuture commitFuture;
+    @Mock private ScheduledFuture taskFuture;
+    @Mock private ConnectorTaskId taskId;
+    @Mock private WorkerSourceTask task;
 
     private SourceTaskOffsetCommitter committer;
 
@@ -81,15 +85,11 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
     public void testSchedule() throws Exception {
         Capture<Runnable> taskWrapper = EasyMock.newCapture();
 
-        ScheduledFuture commitFuture = PowerMock.createMock(ScheduledFuture.class);
         EasyMock.expect(executor.scheduleWithFixedDelay(
                 EasyMock.capture(taskWrapper), eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS),
                 eq(DEFAULT_OFFSET_COMMIT_INTERVAL_MS), eq(TimeUnit.MILLISECONDS))
         ).andReturn(commitFuture);
 
-        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
-        WorkerSourceTask task = PowerMock.createMock(WorkerSourceTask.class);
-
         EasyMock.expect(committers.put(taskId, commitFuture)).andReturn(null);
 
         PowerMock.replayAll();
@@ -135,9 +135,6 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
 
     @Test
     public void testRemove() throws Exception {
-        ConnectorTaskId taskId = PowerMock.createMock(ConnectorTaskId.class);
-        ScheduledFuture task = PowerMock.createMock(ScheduledFuture.class);
-
         // Try to remove a non-existing task
         EasyMock.expect(committers.remove(taskId)).andReturn(null);
         PowerMock.replayAll();
@@ -148,10 +145,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         PowerMock.resetAll();
 
         // Try to remove an existing task
-        EasyMock.expect(committers.remove(taskId)).andReturn(task);
-        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(task.isDone()).andReturn(false);
-        EasyMock.expect(task.get()).andReturn(null);
+        EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
+        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(taskFuture.isDone()).andReturn(false);
+        EasyMock.expect(taskFuture.get()).andReturn(null);
         PowerMock.replayAll();
 
         committer.remove(taskId);
@@ -160,10 +157,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         PowerMock.resetAll();
 
         // Try to remove a cancelled task
-        EasyMock.expect(committers.remove(taskId)).andReturn(task);
-        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(task.isDone()).andReturn(false);
-        EasyMock.expect(task.get()).andThrow(new CancellationException());
+        EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
+        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(taskFuture.isDone()).andReturn(false);
+        EasyMock.expect(taskFuture.get()).andThrow(new CancellationException());
         mockLog.trace(EasyMock.anyString(), EasyMock.<Object>anyObject());
         PowerMock.expectLastCall();
         PowerMock.replayAll();
@@ -174,10 +171,10 @@ public class SourceTaskOffsetCommitterTest extends ThreadedTest {
         PowerMock.resetAll();
 
         // Try to remove an interrupted task
-        EasyMock.expect(committers.remove(taskId)).andReturn(task);
-        EasyMock.expect(task.cancel(eq(false))).andReturn(false);
-        EasyMock.expect(task.isDone()).andReturn(false);
-        EasyMock.expect(task.get()).andThrow(new InterruptedException());
+        EasyMock.expect(committers.remove(taskId)).andReturn(taskFuture);
+        EasyMock.expect(taskFuture.cancel(eq(false))).andReturn(false);
+        EasyMock.expect(taskFuture.isDone()).andReturn(false);
+        EasyMock.expect(taskFuture.get()).andThrow(new InterruptedException());
         PowerMock.replayAll();
 
         try {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index b714dcc..4ff4248 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -36,6 +36,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
 import org.easymock.Capture;
@@ -122,6 +123,8 @@ public class WorkerSinkTaskTest {
     @Mock
     private Converter valueConverter;
     @Mock
+    private HeaderConverter headerConverter;
+    @Mock
     private TransformationChain<SinkRecord> transformationChain;
     @Mock
     private TaskStatus.Listener statusListener;
@@ -154,7 +157,7 @@ public class WorkerSinkTaskTest {
     private void createTask(TargetState initialState) {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, transformationChain, pluginLoader, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter, valueConverter, headerConverter, transformationChain, pluginLoader, time);
     }
 
     @After
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 4b32f0c..b09f847 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -34,6 +34,7 @@ import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
 import org.apache.kafka.connect.util.ThreadedTest;
@@ -108,6 +109,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private PluginClassLoader pluginLoader;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
     @Mock private TransformationChain transformationChain;
     private WorkerSinkTask workerTask;
     @Mock private KafkaConsumer<byte[], byte[]> consumer;
@@ -135,7 +137,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
                 taskId, sinkTask, statusListener, initialState, workerConfig, metrics, keyConverter,
-                valueConverter, TransformationChain.noOp(), pluginLoader, time);
+                valueConverter, headerConverter, TransformationChain.noOp(), pluginLoader, time);
 
         recordsReturned = 0;
     }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 4f0d243..266c6ad 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -31,6 +31,7 @@ import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.source.SourceTaskContext;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
 import org.apache.kafka.connect.util.Callback;
@@ -93,6 +94,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private HeaderConverter headerConverter;
     @Mock private TransformationChain<SourceRecord> transformationChain;
     @Mock private KafkaProducer<byte[], byte[]> producer;
     @Mock private OffsetStorageReader offsetReader;
@@ -140,8 +142,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private void createWorkerTask(TargetState initialState) {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain,
-                producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, headerConverter,
+                transformationChain, producer, offsetReader, offsetWriter, config, metrics, plugins.delegatingLoader(), Time.SYSTEM);
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
index 8b86dee..78f2836 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTaskTest.java
@@ -23,9 +23,14 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.apache.kafka.common.utils.MockTime;
 import org.easymock.EasyMock;
 import org.easymock.IAnswer;
+import org.easymock.Mock;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -37,6 +42,9 @@ import static org.easymock.EasyMock.replay;
 import static org.easymock.EasyMock.verify;
 import static org.junit.Assert.assertEquals;
 
+@RunWith(PowerMockRunner.class)
+@PrepareForTest({WorkerTask.class})
+@PowerMockIgnore("javax.management.*")
 public class WorkerTaskTest {
 
     private static final Map<String, String> TASK_PROPS = new HashMap<>();
@@ -46,6 +54,8 @@ public class WorkerTaskTest {
     private static final TaskConfig TASK_CONFIG = new TaskConfig(TASK_PROPS);
 
     private ConnectMetrics metrics;
+    @Mock private TaskStatus.Listener statusListener;
+    @Mock private ClassLoader loader;
 
     @Before
     public void setup() {
@@ -61,9 +71,6 @@ public class WorkerTaskTest {
     public void standardStartup() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
-        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
-
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
                 .withConstructor(
                         ConnectorTaskId.class,
@@ -110,9 +117,6 @@ public class WorkerTaskTest {
     public void stopBeforeStarting() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
-        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
-
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
                 .withConstructor(
                         ConnectorTaskId.class,
@@ -152,9 +156,6 @@ public class WorkerTaskTest {
     public void cancelBeforeStopping() throws Exception {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
 
-        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
-        ClassLoader loader = EasyMock.createMock(ClassLoader.class);
-
         WorkerTask workerTask = partialMockBuilder(WorkerTask.class)
                 .withConstructor(
                         ConnectorTaskId.class,
@@ -220,7 +221,6 @@ public class WorkerTaskTest {
     public void updateMetricsOnListenerEventsForStartupPauseResumeAndShutdown() {
         ConnectorTaskId taskId = new ConnectorTaskId("foo", 0);
         ConnectMetrics metrics = new MockConnectMetrics();
-        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
         TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
 
         statusListener.onStartup(taskId);
@@ -255,7 +255,6 @@ public class WorkerTaskTest {
         MockConnectMetrics metrics = new MockConnectMetrics();
         MockTime time = metrics.time();
         ConnectException error = new ConnectException("error");
-        TaskStatus.Listener statusListener = EasyMock.createMock(TaskStatus.Listener.class);
         TaskMetricsGroup group = new TaskMetricsGroup(taskId, metrics, statusListener);
 
         statusListener.onStartup(taskId);
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index b2af1de..2c04b88 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -38,6 +38,7 @@ import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
+import org.apache.kafka.connect.storage.HeaderConverter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
 import org.apache.kafka.connect.storage.OffsetStorageReader;
 import org.apache.kafka.connect.storage.OffsetStorageWriter;
@@ -92,6 +93,13 @@ public class WorkerTest extends ThreadedTest {
     @MockStrict
     private ConnectorStatus.Listener connectorStatusListener;
 
+    @Mock private Connector connector;
+    @Mock private ConnectorContext ctx;
+    @Mock private TestSourceTask task;
+    @Mock private WorkerSourceTask workerTask;
+    @Mock private Converter keyConverter;
+    @Mock private Converter valueConverter;
+
     @Before
     public void setup() {
         super.setup();
@@ -116,9 +124,6 @@ public class WorkerTest extends ThreadedTest {
         expectStartStorage();
 
         // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
                 .andReturn(connector);
@@ -187,8 +192,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
 
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
         Map<String, String> props = new HashMap<>();
         props.put(SinkConnectorConfig.TOPICS_CONFIG, "foo,bar");
         props.put(ConnectorConfig.TASKS_MAX_CONFIG, "1");
@@ -233,10 +236,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
 
-        // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTestConnector")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
@@ -301,10 +300,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
 
-        // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         EasyMock.expect(plugins.newConnector("WorkerTest")).andReturn(connector);
         EasyMock.expect(connector.version()).andReturn("1.0");
@@ -381,10 +376,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
 
-        // Create
-        Connector connector = PowerMock.createMock(Connector.class);
-        ConnectorContext ctx = PowerMock.createMock(ConnectorContext.class);
-
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(3);
         EasyMock.expect(plugins.newConnector(WorkerTestConnector.class.getName()))
                 .andReturn(connector);
@@ -473,9 +464,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters(true);
         expectStartStorage();
 
-        // Create
-        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
-        WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
@@ -486,6 +474,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(JsonConverter.class),
+                EasyMock.anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
@@ -609,9 +598,6 @@ public class WorkerTest extends ThreadedTest {
         expectConverters(true);
         expectStartStorage();
 
-        // Create
-        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
-        WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
@@ -622,6 +608,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(JsonConverter.class),
+                EasyMock.anyObject(JsonConverter.class),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
@@ -692,12 +679,11 @@ public class WorkerTest extends ThreadedTest {
         expectConverters();
         expectStartStorage();
 
-        TestSourceTask task = PowerMock.createMock(TestSourceTask.class);
-        WorkerSourceTask workerTask = PowerMock.createMock(WorkerSourceTask.class);
         EasyMock.expect(workerTask.id()).andStubReturn(TASK_ID);
 
         Capture<TestConverter> keyConverter = EasyMock.newCapture();
         Capture<TestConverter> valueConverter = EasyMock.newCapture();
+        Capture<HeaderConverter> headerConverter = EasyMock.newCapture();
 
         EasyMock.expect(plugins.currentThreadLoader()).andReturn(delegatingLoader).times(2);
         PowerMock.expectNew(
@@ -707,6 +693,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
+                EasyMock.capture(headerConverter),
                 EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
@@ -843,9 +830,6 @@ public class WorkerTest extends ThreadedTest {
     private void expectConverters(Class<? extends Converter> converterClass, Boolean expectDefaultConverters) {
         // As default converters are instantiated when a task starts, they are expected only if the `startTask` method is called
         if (expectDefaultConverters) {
-            // connector default
-            Converter keyConverter = PowerMock.createMock(converterClass);
-            Converter valueConverter = PowerMock.createMock(converterClass);
 
             // Instantiate and configure default
             EasyMock.expect(plugins.newConverter(JsonConverter.class.getName(), config))
diff --git a/docs/connect.html b/docs/connect.html
index 1230a7a..d5e2fc9 100644
--- a/docs/connect.html
+++ b/docs/connect.html
@@ -394,7 +394,7 @@
     }
     </pre>
 
-    <p>Again, we've omitted some details, but we can see the important steps: the <code>poll()</code> method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output <code>SourceRecord</code> with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output to [...]
+    <p>Again, we've omitted some details, but we can see the important steps: the <code>poll()</code> method is going to be called repeatedly, and for each call it will loop trying to read records from the file. For each line it reads, it also tracks the file offset. It uses this information to create an output <code>SourceRecord</code> with four pieces of information: the source partition (there is only one, the single file being read), source offset (byte offset in the file), output to [...]
 
     <p>Note that this implementation uses the normal Java <code>InputStream</code> interface and may sleep if data is not available. This is acceptable because Kafka Connect provides each task with a dedicated thread. While task implementations have to conform to the basic <code>poll()</code> interface, they have a lot of flexibility in how they are implemented. In this case, an NIO-based implementation would be more efficient, but this simple approach works, is quick to implement, and i [...]
 
@@ -414,7 +414,7 @@
         }
     </pre>
 
-    <p>The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering [...]
+    <p>The <code>SinkTask</code> documentation contains full details, but this interface is nearly as simple as the <code>SourceTask</code>. The <code>put()</code> method should contain most of the implementation, accepting sets of <code>SinkRecords</code>, performing any required translation, and storing them in the destination system. This method does not need to ensure the data has been fully written to the destination system before returning. In fact, in many cases internal buffering [...]
 
     <p>The <code>flush()</code> method is used during the offset commit process, which allows tasks to recover from failures and resume from a safe point such that no events will be missed. The method should push any outstanding data to the destination system and then block until the write has been acknowledged. The <code>offsets</code> parameter can often be ignored, but is useful in some cases where implementations want to store offset information in the destination store to provide ex [...]
     delivery. For example, an HDFS connector could do this and use atomic move operations to make sure the <code>flush()</code> operation atomically commits the data and offsets to a final location in HDFS.</p>
diff --git a/docs/upgrade.html b/docs/upgrade.html
index 02acdff..3c2d976 100644
--- a/docs/upgrade.html
+++ b/docs/upgrade.html
@@ -74,6 +74,7 @@
 	Kafka Streams retries and can <a href="/{{version}}/documentation/streams/developer-guide/config-streams">configure<a/>
 	fine-grained timeouts (instead of hard coded retries as in older version).</li>
     <li>Kafka Streams rebalance time was reduced further making Kafka Streams more responsive.</li>
+    <li>Kafka Connect now supports message headers in both sink and source connectors, and to manipulate them via simple message transforms. Connectors must be changed to explicitly use them. A new <code>HeaderConverter</code> is introduced to control how headers are (de)serialized, and the new "SimpleHeaderConverter" is used by default to use string representations of values.</li>
 </ul>
 
 <h5><a id="upgrade_110_new_protocols" href="#upgrade_110_new_protocols">New Protocol Versions</a></h5>

-- 
To stop receiving notification emails like this one, please contact
ewencp@apache.org.

Mime
View raw message