kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [2/2] kafka git commit: KAFKA-3209: KIP-66: single message transforms
Date Fri, 13 Jan 2017 00:15:01 GMT
KAFKA-3209: KIP-66: single message transforms

Besides API and runtime changes, this PR also includes 2 data transformations (`InsertField`, `HoistToStruct`) and 1 routing transformation (`TimestampRouter`).

There is some gnarliness in `ConnectorConfig` / `ConfigDef` around creating, parsing and validating a dynamic `ConfigDef`.

Author: Shikhar Bhushan <shikhar@confluent.io>

Reviewers: Ewen Cheslack-Postava <ewen@confluent.io>

Closes #2299 from shikhar/smt-2017


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/2f904883
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/2f904883
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/2f904883

Branch: refs/heads/trunk
Commit: 2f904883237c476fd82a202f7ddda93fe56eef36
Parents: a1e8b50
Author: Shikhar Bhushan <shikhar@confluent.io>
Authored: Thu Jan 12 16:14:53 2017 -0800
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Thu Jan 12 16:14:53 2017 -0800

----------------------------------------------------------------------
 bin/kafka-run-class.sh                          |   2 +-
 build.gradle                                    |  42 ++-
 checkstyle/import-control.xml                   |   7 +
 .../apache/kafka/common/config/ConfigDef.java   | 173 ++++++++---
 .../kafka/connect/connector/ConnectRecord.java  |   5 +-
 .../apache/kafka/connect/sink/SinkRecord.java   |   7 +-
 .../kafka/connect/source/SourceRecord.java      |   7 +-
 .../apache/kafka/connect/source/SourceTask.java |   3 +-
 .../connect/transforms/Transformation.java      |  48 +++
 .../connect/file/FileStreamSourceTask.java      |   3 +-
 .../kafka/connect/runtime/AbstractHerder.java   |  57 +---
 .../kafka/connect/runtime/ConnectorConfig.java  | 148 +++++++++-
 .../kafka/connect/runtime/PluginDiscovery.java  | 127 ++++++++
 .../connect/runtime/TransformationChain.java    |  69 +++++
 .../apache/kafka/connect/runtime/Worker.java    |  13 +-
 .../kafka/connect/runtime/WorkerSinkTask.java   |  22 +-
 .../kafka/connect/runtime/WorkerSourceTask.java |  15 +-
 .../resources/ConnectorPluginsResource.java     |   4 +-
 .../connect/runtime/ConnectorConfigTest.java    | 162 ++++++++++
 .../connect/runtime/WorkerSinkTaskTest.java     |  35 ++-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  12 +-
 .../connect/runtime/WorkerSourceTaskTest.java   |  47 ++-
 .../kafka/connect/runtime/WorkerTest.java       |   3 +
 .../kafka/connect/transforms/HoistToStruct.java | 127 ++++++++
 .../kafka/connect/transforms/InsertField.java   | 296 +++++++++++++++++++
 .../connect/transforms/TimestampRouter.java     |  95 ++++++
 .../connect/transforms/util/SimpleConfig.java   |  34 +++
 .../connect/transforms/HoistToStructTest.java   |  44 +++
 .../connect/transforms/InsertFieldTest.java     | 113 +++++++
 .../connect/transforms/TimestampRouterTest.java |  43 +++
 .../scala/kafka/tools/ConsoleConsumer.scala     |   2 +
 settings.gradle                                 |   2 +-
 tests/kafkatest/services/console_consumer.py    |  10 +-
 .../tests/connect/connect_distributed_test.py   |  67 ++++-
 .../tests/connect/connect_rest_test.py          |   4 +-
 35 files changed, 1692 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/bin/kafka-run-class.sh
----------------------------------------------------------------------
diff --git a/bin/kafka-run-class.sh b/bin/kafka-run-class.sh
index 41c75eb..af10f61 100755
--- a/bin/kafka-run-class.sh
+++ b/bin/kafka-run-class.sh
@@ -111,7 +111,7 @@ do
   CLASSPATH="$CLASSPATH:$dir/*"
 done
 
-for cc_pkg in "api" "runtime" "file" "json" "tools"
+for cc_pkg in "api" "transforms" "runtime" "file" "json" "tools"
 do
   for file in "$base_dir"/connect/${cc_pkg}/build/libs/connect-${cc_pkg}*.jar;
   do

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/build.gradle
----------------------------------------------------------------------
diff --git a/build.gradle b/build.gradle
index 96bfa62..6f4d250 100644
--- a/build.gradle
+++ b/build.gradle
@@ -361,7 +361,7 @@ for ( sv in ['2_10', '2_11', '2_12'] ) {
   }
 }
 
-def connectPkgs = ['connect:api', 'connect:runtime', 'connect:json', 'connect:file']
+def connectPkgs = ['connect:api', 'connect:runtime', 'connect:transforms', 'connect:json', 'connect:file']
 def pkgs = ['clients', 'examples', 'log4j-appender', 'tools', 'streams', 'streams:examples'] + connectPkgs
 
 tasks.create(name: "jarConnect", dependsOn: connectPkgs.collect { it + ":jar" }) {}
@@ -533,6 +533,8 @@ project(':core') {
     from(project(':connect:api').configurations.runtime) { into("libs/") }
     from(project(':connect:runtime').jar) { into("libs/") }
     from(project(':connect:runtime').configurations.runtime) { into("libs/") }
+    from(project(':connect:transforms').jar) { into("libs/") }
+    from(project(':connect:transforms').configurations.runtime) { into("libs/") }
     from(project(':connect:json').jar) { into("libs/") }
     from(project(':connect:json').configurations.runtime) { into("libs/") }
     from(project(':connect:file').jar) { into("libs/") }
@@ -816,6 +818,42 @@ project(':connect:api') {
   }
 }
 
+project(':connect:transforms') {
+  archivesBaseName = "connect-transforms"
+
+  dependencies {
+    compile project(':connect:api')
+    compile libs.slf4jApi
+
+    testCompile libs.easymock
+    testCompile libs.junit
+    testCompile libs.powermock
+    testCompile libs.powermockEasymock
+
+    testRuntime libs.slf4jlog4j
+  }
+
+  javadoc {
+    enabled = false
+  }
+
+  tasks.create(name: "copyDependantLibs", type: Copy) {
+    from (configurations.testRuntime) {
+      include('slf4j-log4j12*')
+    }
+    from (configurations.runtime) {
+      exclude('kafka-clients*')
+      exclude('connect-*')
+    }
+    into "$buildDir/dependant-libs"
+    duplicatesStrategy 'exclude'
+  }
+
+  jar {
+    dependsOn copyDependantLibs
+  }
+}
+
 project(':connect:json') {
   archivesBaseName = "connect-json"
 
@@ -858,6 +896,7 @@ project(':connect:runtime') {
 
   dependencies {
     compile project(':connect:api')
+    compile project(":connect:transforms")
     compile project(':clients')
     compile project(':tools')
     compile libs.slf4jApi
@@ -877,6 +916,7 @@ project(':connect:runtime') {
     testCompile libs.junit
     testCompile libs.powermock
     testCompile libs.powermockEasymock
+
     testCompile project(":connect:json")
 
     testRuntime libs.slf4jlog4j

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 9fcd329..942dbdd 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -257,6 +257,13 @@
       <allow pkg="org.apache.kafka.tools" />
       <allow pkg="com.fasterxml.jackson" />
     </subpackage>
+
+    <subpackage name="transforms">
+      <allow class="org.apache.kafka.connect.connector.ConnectRecord" />
+      <allow class="org.apache.kafka.connect.source.SourceRecord" />
+      <allow class="org.apache.kafka.connect.sink.SinkRecord" />
+      <allow pkg="org.apache.kafka.connect.transforms.util" />
+    </subpackage>
   </subpackage>
 
 </import-control>

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 33f60a7..89feb9a 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -70,10 +70,22 @@ public class ConfigDef {
 
     public static final Object NO_DEFAULT_VALUE = new String("");
 
-    private final Map<String, ConfigKey> configKeys = new HashMap<>();
-    private final List<String> groups = new LinkedList<>();
+    private final Map<String, ConfigKey> configKeys;
+    private final List<String> groups;
     private Set<String> configsWithNoParent;
 
+    public ConfigDef() {
+        configKeys = new HashMap<>();
+        groups = new LinkedList<>();
+        configsWithNoParent = null;
+    }
+
+    public ConfigDef(ConfigDef base) {
+        configKeys = new HashMap<>(base.configKeys);
+        groups = new LinkedList<>(base.groups);
+        configsWithNoParent = base.configsWithNoParent == null ? null : new HashSet<>(base.configsWithNoParent);
+    }
+
     /**
      * Returns unmodifiable set of properties names defined in this {@linkplain ConfigDef}
      *
@@ -83,6 +95,17 @@ public class ConfigDef {
         return Collections.unmodifiableSet(configKeys.keySet());
     }
 
+    public ConfigDef define(ConfigKey key) {
+        if (configKeys.containsKey(key.name)) {
+            throw new ConfigException("Configuration " + key.name + " is defined twice.");
+        }
+        if (key.group != null && !groups.contains(key.group)) {
+            groups.add(key.group);
+        }
+        configKeys.put(key.name, key);
+        return this;
+    }
+
     /**
      * Define a new configuration
      * @param name          the name of the config parameter
@@ -101,15 +124,7 @@ public class ConfigDef {
      */
     public ConfigDef define(String name, Type type, Object defaultValue, Validator validator, Importance importance, String documentation,
                             String group, int orderInGroup, Width width, String displayName, List<String> dependents, Recommender recommender) {
-        if (configKeys.containsKey(name)) {
-            throw new ConfigException("Configuration " + name + " is defined twice.");
-        }
-        if (group != null && !groups.contains(group)) {
-            groups.add(group);
-        }
-        Object parsedDefault = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
-        configKeys.put(name, new ConfigKey(name, type, parsedDefault, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
-        return this;
+        return define(new ConfigKey(name, type, defaultValue, validator, importance, documentation, group, orderInGroup, width, displayName, dependents, recommender));
     }
 
     /**
@@ -584,7 +599,7 @@ public class ConfigDef {
      * @param type  The expected type
      * @return The parsed object
      */
-    private Object parseType(String name, Object value, Type type) {
+    public static Object parseType(String name, Object value, Type type) {
         try {
             if (value == null) return null;
 
@@ -882,11 +897,11 @@ public class ConfigDef {
                          List<String> dependents, Recommender recommender) {
             this.name = name;
             this.type = type;
-            this.defaultValue = defaultValue;
+            this.defaultValue = defaultValue == NO_DEFAULT_VALUE ? NO_DEFAULT_VALUE : parseType(name, defaultValue, type);
             this.validator = validator;
             this.importance = importance;
-            if (this.validator != null && this.hasDefault())
-                this.validator.ensureValid(name, defaultValue);
+            if (this.validator != null && hasDefault())
+                this.validator.ensureValid(name, this.defaultValue);
             this.documentation = documentation;
             this.dependents = dependents;
             this.group = group;
@@ -980,7 +995,7 @@ public class ConfigDef {
         StringBuilder b = new StringBuilder();
 
         String lastKeyGroupName = "";
-        for (ConfigKey def : sortedConfigsByGroup()) {
+        for (ConfigKey def : sortedConfigs()) {
             if (def.group != null) {
                 if (!lastKeyGroupName.equalsIgnoreCase(def.group)) {
                     b.append(def.group).append("\n");
@@ -1034,38 +1049,11 @@ public class ConfigDef {
     }
 
     /**
-     * Get a list of configs sorted into "natural" order: listing required fields first, then
-     * ordering by importance, and finally by name.
-     */
-    protected List<ConfigKey> sortedConfigs() {
-        // sort first required fields, then by importance, then name
-        List<ConfigKey> configs = new ArrayList<>(this.configKeys.values());
-        Collections.sort(configs, new Comparator<ConfigKey>() {
-            public int compare(ConfigKey k1, ConfigKey k2) {
-                // first take anything with no default value
-                if (!k1.hasDefault() && k2.hasDefault()) {
-                    return -1;
-                } else if (!k2.hasDefault() && k1.hasDefault()) {
-                    return 1;
-                }
-
-                // then sort by importance
-                int cmp = k1.importance.compareTo(k2.importance);
-                if (cmp == 0) {
-                    // then sort in alphabetical order
-                    return k1.name.compareTo(k2.name);
-                } else {
-                    return cmp;
-                }
-            }
-        });
-        return configs;
-    }
-
-    /**
      * Get a list of configs sorted taking the 'group' and 'orderInGroup' into account.
+     *
+     * If grouping is not specified, the result will reflect "natural" order: listing required fields first, then ordering by importance, and finally by name.
      */
-    protected List<ConfigKey> sortedConfigsByGroup() {
+    private List<ConfigKey> sortedConfigs() {
         final Map<String, Integer> groupOrd = new HashMap<>(groups.size());
         int ord = 0;
         for (String group: groups) {
@@ -1081,9 +1069,19 @@ public class ConfigDef {
                         : (k2.group == null ? 1 : Integer.compare(groupOrd.get(k1.group), groupOrd.get(k2.group)));
                 if (cmp == 0) {
                     cmp = Integer.compare(k1.orderInGroup, k2.orderInGroup);
-                }
-                if (cmp == 0) {
-                    cmp = k1.name.compareTo(k2.name);
+                    if (cmp == 0) {
+                        // first take anything with no default value
+                        if (!k1.hasDefault() && k2.hasDefault()) {
+                            cmp = -1;
+                        } else if (!k2.hasDefault() && k1.hasDefault()) {
+                            cmp = 1;
+                        } else {
+                            cmp = k1.importance.compareTo(k2.importance);
+                            if (cmp == 0) {
+                                return k1.name.compareTo(k2.name);
+                            }
+                        }
+                    }
                 }
                 return cmp;
             }
@@ -1091,4 +1089,81 @@ public class ConfigDef {
         return configs;
     }
 
+    public void embed(final String keyPrefix, final String groupPrefix, final int startingOrd, final ConfigDef child) {
+        int orderInGroup = startingOrd;
+        for (ConfigDef.ConfigKey key : child.sortedConfigs()) {
+            define(new ConfigDef.ConfigKey(
+                    keyPrefix + key.name,
+                    key.type,
+                    key.defaultValue,
+                    embeddedValidator(keyPrefix, key.validator),
+                    key.importance,
+                    key.documentation,
+                    groupPrefix + (key.group == null ? "" : ": " + key.group),
+                    orderInGroup++,
+                    key.width,
+                    key.displayName,
+                    embeddedDependents(keyPrefix, key.dependents),
+                    embeddedRecommender(keyPrefix, key.recommender)
+            ));
+        }
+    }
+
+    /**
+     * Returns a new validator instance that delegates to the base validator but unprefixes the config name along the way.
+     */
+    private static ConfigDef.Validator embeddedValidator(final String keyPrefix, final ConfigDef.Validator base) {
+        if (base == null) return null;
+        return new ConfigDef.Validator() {
+            @Override
+            public void ensureValid(String name, Object value) {
+                base.ensureValid(name.substring(keyPrefix.length()), value);
+            }
+        };
+    }
+
+    /**
+     * Updated list of dependent configs with the specified {@code prefix} added.
+     */
+    private static List<String> embeddedDependents(final String keyPrefix, final List<String> dependents) {
+        if (dependents == null) return null;
+        final List<String> updatedDependents = new ArrayList<>(dependents.size());
+        for (String dependent : dependents) {
+            updatedDependents.add(keyPrefix + dependent);
+        }
+        return updatedDependents;
+    }
+
+    /**
+     * Returns a new recommender instance that delegates to the base recommender but unprefixes the input parameters along the way.
+     */
+    private static ConfigDef.Recommender embeddedRecommender(final String keyPrefix, final ConfigDef.Recommender base) {
+        if (base == null) return null;
+        return new ConfigDef.Recommender() {
+            private String unprefixed(String k) {
+                return k.substring(keyPrefix.length());
+            }
+
+            private Map<String, Object> unprefixed(Map<String, Object> parsedConfig) {
+                final Map<String, Object> unprefixedParsedConfig = new HashMap<>(parsedConfig.size());
+                for (Map.Entry<String, Object> e : parsedConfig.entrySet()) {
+                    if (e.getKey().startsWith(keyPrefix)) {
+                        unprefixedParsedConfig.put(unprefixed(e.getKey()), e.getValue());
+                    }
+                }
+                return unprefixedParsedConfig;
+            }
+
+            @Override
+            public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+                return base.validValues(unprefixed(name), unprefixed(parsedConfig));
+            }
+
+            @Override
+            public boolean visible(String name, Map<String, Object> parsedConfig) {
+                return base.visible(unprefixed(name), unprefixed(parsedConfig));
+            }
+        };
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
index d6319a1..6236f7e 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/connector/ConnectRecord.java
@@ -29,7 +29,7 @@ import org.apache.kafka.connect.data.Schema;
  * </p>
  */
 @InterfaceStability.Unstable
-public abstract class ConnectRecord {
+public abstract class ConnectRecord<R extends ConnectRecord<R>> {
     private final String topic;
     private final Integer kafkaPartition;
     private final Schema keySchema;
@@ -79,6 +79,9 @@ public abstract class ConnectRecord {
         return timestamp;
     }
 
+    /** Generate a new record of the same type as itself, with the specified parameter values. **/
+    public abstract R newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp);
+
     @Override
     public String toString() {
         return "ConnectRecord{" +

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
index fbe1bdc..ad1b2d5 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/sink/SinkRecord.java
@@ -31,7 +31,7 @@ import org.apache.kafka.connect.data.Schema;
  * timestamp, which may be {@code null}.
  */
 @InterfaceStability.Unstable
-public class SinkRecord extends ConnectRecord {
+public class SinkRecord extends ConnectRecord<SinkRecord> {
     private final long kafkaOffset;
     private final TimestampType timestampType;
 
@@ -55,6 +55,11 @@ public class SinkRecord extends ConnectRecord {
     }
 
     @Override
+    public SinkRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+        return new SinkRecord(topic, kafkaPartition, keySchema, key, valueSchema, value, kafkaOffset(), timestamp, timestampType);
+    }
+
+    @Override
     public boolean equals(Object o) {
         if (this == o)
             return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
index 327b67b..444979a 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceRecord.java
@@ -42,7 +42,7 @@ import java.util.Map;
  * </p>
  */
 @InterfaceStability.Unstable
-public class SourceRecord extends ConnectRecord {
+public class SourceRecord extends ConnectRecord<SourceRecord> {
     private final Map<String, ?> sourcePartition;
     private final Map<String, ?> sourceOffset;
 
@@ -86,6 +86,11 @@ public class SourceRecord extends ConnectRecord {
     }
 
     @Override
+    public SourceRecord newRecord(String topic, Integer kafkaPartition, Schema keySchema, Object key, Schema valueSchema, Object value, Long timestamp) {
+        return new SourceRecord(sourcePartition, sourceOffset, topic, kafkaPartition, keySchema, key, valueSchema, value, timestamp);
+    }
+
+    @Override
     public boolean equals(Object o) {
         if (this == o)
             return true;

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
index c508ec1..c085085 100644
--- a/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
+++ b/connect/api/src/main/java/org/apache/kafka/connect/source/SourceTask.java
@@ -82,13 +82,14 @@ public abstract class SourceTask implements Task {
 
     /**
      * <p>
-     * Commit an individual {@link SourceRecord} when the callback from the producer client is received.
+     * Commit an individual {@link SourceRecord} when the callback from the producer client is received, or if a record is filtered by a transformation.
      * </p>
      * <p>
      * SourceTasks are not required to implement this functionality; Kafka Connect will record offsets
      * automatically. This hook is provided for systems that also need to store offsets internally
      * in their own system.
      * </p>
+     *
      * @param record {@link SourceRecord} that was successfully sent via the producer.
      * @throws InterruptedException
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
----------------------------------------------------------------------
diff --git a/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
new file mode 100644
index 0000000..b17119f
--- /dev/null
+++ b/connect/api/src/main/java/org/apache/kafka/connect/transforms/Transformation.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.Configurable;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.ConnectRecord;
+
+import java.io.Closeable;
+
+/**
+ * Single message transformation for Kafka Connect record types.
+ *
+ * Connectors can be configured with transformations to make lightweight message-at-a-time modifications.
+ */
+public interface Transformation<R extends ConnectRecord<R>> extends Configurable, Closeable {
+
+    /**
+     * Apply transformation to the {@code record} and return another record object (which may be {@code record} itself) or {@code null},
+     * corresponding to a map or filter operation respectively.
+     *
+     * The implementation must be thread-safe.
+     */
+    R apply(R record);
+
+    /** Configuration specification for this transformation. **/
+    ConfigDef config();
+
+    /** Signal that this transformation instance will no longer will be used. **/
+    @Override
+    void close();
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
index bab3f77..55bd0f9 100644
--- a/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
+++ b/connect/file/src/main/java/org/apache/kafka/connect/file/FileStreamSourceTask.java
@@ -144,7 +144,8 @@ public class FileStreamSourceTask extends SourceTask {
                             log.trace("Read a line from {}", logFilename());
                             if (records == null)
                                 records = new ArrayList<>();
-                            records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, VALUE_SCHEMA, line));
+                            records.add(new SourceRecord(offsetKey(filename), offsetValue(streamOffset), topic, null,
+                                    null, null, VALUE_SCHEMA, line, System.currentTimeMillis()));
                         }
                     } while (line != null);
                 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index ba4894b..cbcf14c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -27,37 +27,23 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
-import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.runtime.rest.errors.BadRequestException;
 import org.apache.kafka.connect.source.SourceConnector;
 import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
-import org.apache.kafka.connect.tools.MockConnector;
-import org.apache.kafka.connect.tools.MockSinkConnector;
-import org.apache.kafka.connect.tools.MockSourceConnector;
-import org.apache.kafka.connect.tools.SchemaSourceConnector;
-import org.apache.kafka.connect.tools.VerifiableSinkConnector;
-import org.apache.kafka.connect.tools.VerifiableSourceConnector;
 import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.ReflectionsUtil;
-import org.reflections.Reflections;
-import org.reflections.util.ClasspathHelper;
-import org.reflections.util.ConfigurationBuilder;
 
 import java.io.ByteArrayOutputStream;
 import java.io.PrintStream;
 import java.io.UnsupportedEncodingException;
-import java.lang.reflect.Modifier;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -89,13 +75,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     protected final ConfigBackingStore configBackingStore;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
-    private static List<ConnectorPluginInfo> validConnectorPlugins;
-    private static final Object LOCK = new Object();
     private Thread classPathTraverser;
-    private static final List<Class<? extends Connector>> EXCLUDES = Arrays.asList(
-        VerifiableSourceConnector.class, VerifiableSinkConnector.class,
-        MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
-        SchemaSourceConnector.class);
 
     public AbstractHerder(Worker worker,
                           String workerId,
@@ -251,12 +231,12 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
             throw new BadRequestException("Connector config " + connectorConfig + " contains no connector type");
 
         Connector connector = getConnector(connType);
-        ConfigDef connectorConfigDef;
-        if (connector instanceof SourceConnector) {
-            connectorConfigDef = SourceConnectorConfig.configDef();
-        } else {
-            connectorConfigDef = SinkConnectorConfig.configDef();
-        }
+
+        final ConfigDef connectorConfigDef = ConnectorConfig.enrich(
+                (connector instanceof SourceConnector) ? SourceConnectorConfig.configDef() : SinkConnectorConfig.configDef(),
+                connectorConfig,
+                false
+        );
 
         List<ConfigValue> configValues = new ArrayList<>();
         Map<String, ConfigKey> configKeys = new HashMap<>();
@@ -278,29 +258,6 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         return generateResult(connType, configKeys, configValues, allGroups);
     }
 
-    public static List<ConnectorPluginInfo> connectorPlugins() {
-        synchronized (LOCK) {
-            if (validConnectorPlugins != null) {
-                return validConnectorPlugins;
-            }
-            ReflectionsUtil.registerUrlTypes();
-            ConfigurationBuilder builder = new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath());
-            Reflections reflections = new Reflections(builder);
-
-            Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
-            connectorClasses.removeAll(EXCLUDES);
-            List<ConnectorPluginInfo> connectorPlugins = new LinkedList<>();
-            for (Class<? extends Connector> connectorClass : connectorClasses) {
-                int mod = connectorClass.getModifiers();
-                if (!Modifier.isAbstract(mod) && !Modifier.isInterface(mod)) {
-                    connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
-                }
-            }
-            validConnectorPlugins = connectorPlugins;
-            return connectorPlugins;
-        }
-    }
-
     // public for testing
     public static ConfigInfos generateResult(String connType, Map<String, ConfigKey> configKeys, List<ConfigValue> configValues, List<String> groups) {
         int errorCount = 0;
@@ -394,7 +351,7 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
         classPathTraverser = new Thread(new Runnable() {
             @Override
             public void run() {
-                connectorPlugins();
+                PluginDiscovery.scanClasspathForPlugins();
             }
         }, "CLASSPATH traversal thread.");
         classPathTraverser.start();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
index 30869a4..7086da2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorConfig.java
@@ -22,8 +22,17 @@ import org.apache.kafka.common.config.ConfigDef;
 import org.apache.kafka.common.config.ConfigDef.Importance;
 import org.apache.kafka.common.config.ConfigDef.Type;
 import org.apache.kafka.common.config.ConfigDef.Width;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.transforms.Transformation;
 
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.List;
 import java.util.Map;
 
 import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
@@ -41,6 +50,7 @@ import static org.apache.kafka.common.config.ConfigDef.Range.atLeast;
  */
 public class ConnectorConfig extends AbstractConfig {
     protected static final String COMMON_GROUP = "Common";
+    protected static final String TRANSFORMS_GROUP = "Transforms";
 
     public static final String NAME_CONFIG = "name";
     private static final String NAME_DOC = "Globally unique name to use for this connector.";
@@ -48,7 +58,7 @@ public class ConnectorConfig extends AbstractConfig {
 
     public static final String CONNECTOR_CLASS_CONFIG = "connector.class";
     private static final String CONNECTOR_CLASS_DOC =
-                    "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
+            "Name or alias of the class for this connector. Must be a subclass of org.apache.kafka.connect.connector.Connector. " +
                     "If the connector is org.apache.kafka.connect.file.FileStreamSinkConnector, you can either specify this full name, " +
                     " or use \"FileStreamSink\" or \"FileStreamSinkConnector\" to make the configuration a bit shorter";
     private static final String CONNECTOR_CLASS_DISPLAY = "Connector class";
@@ -68,13 +78,27 @@ public class ConnectorConfig extends AbstractConfig {
 
     private static final String TASK_MAX_DISPLAY = "Tasks max";
 
+    public static final String TRANSFORMS_CONFIG = "transforms";
+    private static final String TRANSFORMS_DOC = "Aliases for the transformations to be applied to records.";
+    private static final String TRANSFORMS_DISPLAY = "Transforms";
+
     public static ConfigDef configDef() {
         return new ConfigDef()
-            .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
-            .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
-            .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
-            .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
-            .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY);
+                .define(NAME_CONFIG, Type.STRING, Importance.HIGH, NAME_DOC, COMMON_GROUP, 1, Width.MEDIUM, NAME_DISPLAY)
+                .define(CONNECTOR_CLASS_CONFIG, Type.STRING, Importance.HIGH, CONNECTOR_CLASS_DOC, COMMON_GROUP, 2, Width.LONG, CONNECTOR_CLASS_DISPLAY)
+                .define(TASKS_MAX_CONFIG, Type.INT, TASKS_MAX_DEFAULT, atLeast(TASKS_MIN_CONFIG), Importance.HIGH, TASKS_MAX_DOC, COMMON_GROUP, 3, Width.SHORT, TASK_MAX_DISPLAY)
+                .define(KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, KEY_CONVERTER_CLASS_DOC, COMMON_GROUP, 4, Width.SHORT, KEY_CONVERTER_CLASS_DISPLAY)
+                .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, null, Importance.LOW, VALUE_CONVERTER_CLASS_DOC, COMMON_GROUP, 5, Width.SHORT, VALUE_CONVERTER_CLASS_DISPLAY)
+                .define(TRANSFORMS_CONFIG, Type.LIST, null, new ConfigDef.Validator() {
+                    @Override
+                    public void ensureValid(String name, Object value) {
+                        if (value == null) return;
+                        final List<String> transformAliases = (List<String>) value;
+                        if (transformAliases.size() > new HashSet<>(transformAliases).size()) {
+                            throw new ConfigException(name, value, "Duplicate alias provided.");
+                        }
+                    }
+                }, Importance.LOW, TRANSFORMS_DOC, TRANSFORMS_GROUP, 6, Width.LONG, TRANSFORMS_DISPLAY);
     }
 
     public ConnectorConfig() {
@@ -82,10 +106,114 @@ public class ConnectorConfig extends AbstractConfig {
     }
 
     public ConnectorConfig(Map<String, String> props) {
-        super(configDef(), props);
+        this(configDef(), props);
+    }
+
+    public ConnectorConfig(ConfigDef configDef, Map<String, String> props) {
+        super(enrich(configDef, props, true), props);
     }
 
-    public ConnectorConfig(ConfigDef subClassConfig, Map<String, String> props) {
-        super(subClassConfig, props);
+    /**
+     * Returns the initialized list of {@link Transformation} which are specified in {@link #TRANSFORMS_CONFIG}.
+     */
+    public <R extends ConnectRecord<R>> List<Transformation<R>> transformations() {
+        final List<String> transformAliases = getList(TRANSFORMS_CONFIG);
+        if (transformAliases == null || transformAliases.isEmpty()) {
+            return Collections.emptyList();
+        }
+
+        final List<Transformation<R>> transformations = new ArrayList<>(transformAliases.size());
+        for (String alias : transformAliases) {
+            final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+            final Transformation<R> transformation;
+            try {
+                transformation = getClass(prefix + "type").asSubclass(Transformation.class).newInstance();
+            } catch (Exception e) {
+                throw new ConnectException(e);
+            }
+            transformation.configure(originalsWithPrefix(prefix));
+            transformations.add(transformation);
+        }
+
+        return transformations;
     }
-}
+
+    /**
+     * Returns an enriched {@link ConfigDef} building upon the {@code ConfigDef}, using the current configuration specified in {@code props} as an input.
+     * <p>
+     * {@code requireFullConfig} specifies whether required config values that are missing should cause an exception to be thrown.
+     */
+    public static ConfigDef enrich(ConfigDef baseConfigDef, Map<String, String> props, boolean requireFullConfig) {
+        final List<String> transformAliases = (List<String>) ConfigDef.parseType(TRANSFORMS_CONFIG, props.get(TRANSFORMS_CONFIG), Type.LIST);
+        if (transformAliases == null || transformAliases.isEmpty()) {
+            return baseConfigDef;
+        }
+
+        final ConfigDef newDef = new ConfigDef(baseConfigDef);
+
+        for (String alias : new LinkedHashSet<>(transformAliases)) {
+            final String prefix = TRANSFORMS_CONFIG + "." + alias + ".";
+            final String group = TRANSFORMS_GROUP + ": " + alias;
+            int orderInGroup = 0;
+
+            final String transformationTypeConfig = prefix + "type";
+            final ConfigDef.Validator typeValidator = new ConfigDef.Validator() {
+                @Override
+                public void ensureValid(String name, Object value) {
+                    getConfigDefFromTransformation(transformationTypeConfig, (Class) value);
+                }
+            };
+            newDef.define(transformationTypeConfig, Type.CLASS, ConfigDef.NO_DEFAULT_VALUE, typeValidator, Importance.HIGH,
+                    "Class for the '" + alias + "' transformation.", group, orderInGroup++, Width.LONG, "Transformation type for " + alias,
+                    Collections.<String>emptyList(), new TransformationClassRecommender());
+
+            final ConfigDef transformationConfigDef;
+            try {
+                final String className = props.get(transformationTypeConfig);
+                final Class<?> cls = (Class<?>) ConfigDef.parseType(transformationTypeConfig, className, Type.CLASS);
+                transformationConfigDef = getConfigDefFromTransformation(transformationTypeConfig, cls);
+            } catch (ConfigException e) {
+                if (requireFullConfig) {
+                    throw e;
+                } else {
+                    continue;
+                }
+            }
+
+            newDef.embed(prefix, group, orderInGroup, transformationConfigDef);
+        }
+
+        return newDef;
+    }
+
+    /**
+     * Return {@link ConfigDef} from {@code transformationCls}, which is expected to be a non-null {@code Class<Transformation>},
+     * by instantiating it and invoking {@link Transformation#config()}.
+     */
+    static ConfigDef getConfigDefFromTransformation(String key, Class<?> transformationCls) {
+        if (transformationCls == null || !Transformation.class.isAssignableFrom(transformationCls)) {
+            throw new ConfigException(key, String.valueOf(transformationCls), "Not a Transformation");
+        }
+        try {
+            return (transformationCls.asSubclass(Transformation.class).newInstance()).config();
+        } catch (Exception e) {
+            throw new ConfigException(key, String.valueOf(transformationCls), "Error getting config definition from Transformation: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Recommend bundled transformations.
+     */
+    static final class TransformationClassRecommender implements ConfigDef.Recommender {
+        @Override
+        public List<Object> validValues(String name, Map<String, Object> parsedConfig) {
+            return (List) PluginDiscovery.transformationPlugins();
+        }
+
+        @Override
+        public boolean visible(String name, Map<String, Object> parsedConfig) {
+            return true;
+        }
+    }
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
new file mode 100644
index 0000000..847f527
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/PluginDiscovery.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
+import org.apache.kafka.connect.tools.MockConnector;
+import org.apache.kafka.connect.tools.MockSinkConnector;
+import org.apache.kafka.connect.tools.MockSourceConnector;
+import org.apache.kafka.connect.tools.SchemaSourceConnector;
+import org.apache.kafka.connect.tools.VerifiableSinkConnector;
+import org.apache.kafka.connect.tools.VerifiableSourceConnector;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.util.ReflectionsUtil;
+import org.reflections.Reflections;
+import org.reflections.util.ClasspathHelper;
+import org.reflections.util.ConfigurationBuilder;
+
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Set;
+
+public class PluginDiscovery {
+
+    private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
+            VerifiableSourceConnector.class, VerifiableSinkConnector.class,
+            MockConnector.class, MockSourceConnector.class, MockSinkConnector.class,
+            SchemaSourceConnector.class
+    );
+
+    private static final List<Class<? extends Transformation>> TRANSFORMATION_EXCLUDES = Arrays.asList();
+
+    private static boolean scanned = false;
+    private static List<ConnectorPluginInfo> validConnectorPlugins;
+    private static List<Class<? extends Transformation>> validTransformationPlugins;
+
+    public static synchronized List<ConnectorPluginInfo> connectorPlugins() {
+        scanClasspathForPlugins();
+        return validConnectorPlugins;
+    }
+
+    public static synchronized List<Class<? extends Transformation>> transformationPlugins() {
+        scanClasspathForPlugins();
+        return validTransformationPlugins;
+    }
+
+    public static synchronized void scanClasspathForPlugins() {
+        if (scanned) return;
+        ReflectionsUtil.registerUrlTypes();
+        final Reflections reflections = new Reflections(new ConfigurationBuilder().setUrls(ClasspathHelper.forJavaClassPath()));
+        validConnectorPlugins = Collections.unmodifiableList(connectorPlugins(reflections));
+        validTransformationPlugins = Collections.unmodifiableList(transformationPlugins(reflections));
+        scanned = true;
+    }
+
+    private static List<ConnectorPluginInfo> connectorPlugins(Reflections reflections) {
+        final Set<Class<? extends Connector>> connectorClasses = reflections.getSubTypesOf(Connector.class);
+        connectorClasses.removeAll(CONNECTOR_EXCLUDES);
+
+        final List<ConnectorPluginInfo> connectorPlugins = new ArrayList<>(connectorClasses.size());
+        for (Class<? extends Connector> connectorClass : connectorClasses) {
+            if (isConcrete(connectorClass)) {
+                connectorPlugins.add(new ConnectorPluginInfo(connectorClass.getCanonicalName()));
+            }
+        }
+
+        Collections.sort(connectorPlugins, new Comparator<ConnectorPluginInfo>() {
+            @Override
+            public int compare(ConnectorPluginInfo a, ConnectorPluginInfo b) {
+                return a.clazz().compareTo(b.clazz());
+            }
+        });
+
+        return connectorPlugins;
+    }
+
+    private static List<Class<? extends Transformation>> transformationPlugins(Reflections reflections) {
+        final Set<Class<? extends Transformation>> transformationClasses = reflections.getSubTypesOf(Transformation.class);
+        transformationClasses.removeAll(TRANSFORMATION_EXCLUDES);
+
+        final List<Class<? extends Transformation>> transformationPlugins = new ArrayList<>(transformationClasses.size());
+        for (Class<? extends Transformation> transformationClass : transformationClasses) {
+            if (isConcrete(transformationClass)) {
+                transformationPlugins.add(transformationClass);
+            }
+        }
+
+        Collections.sort(transformationPlugins, new Comparator<Class<? extends Transformation>>() {
+            @Override
+            public int compare(Class<? extends Transformation> a, Class<? extends Transformation> b) {
+                return a.getCanonicalName().compareTo(b.getCanonicalName());
+            }
+        });
+
+        return transformationPlugins;
+    }
+
+    private static boolean isConcrete(Class<?> cls) {
+        final int mod = cls.getModifiers();
+        return !Modifier.isAbstract(mod) && !Modifier.isInterface(mod);
+    }
+
+    public static void main(String... args) {
+        System.out.println(connectorPlugins());
+        System.out.println(transformationPlugins());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
new file mode 100644
index 0000000..b5a7dc4
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TransformationChain.java
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.transforms.Transformation;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Objects;
+
+public class TransformationChain<R extends ConnectRecord<R>> {
+
+    private final List<Transformation<R>> transformations;
+
+    public TransformationChain(List<Transformation<R>> transformations) {
+        this.transformations = transformations;
+    }
+
+    public R apply(R record) {
+        if (transformations.isEmpty()) return record;
+
+        for (Transformation<R> transformation : transformations) {
+            record = transformation.apply(record);
+            if (record == null) break;
+        }
+
+        return record;
+    }
+
+    public void close() {
+        for (Transformation<R> transformation : transformations) {
+            transformation.close();
+        }
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+        TransformationChain that = (TransformationChain) o;
+        return Objects.equals(transformations, that.transformations);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(transformations);
+    }
+
+    public static <R extends ConnectRecord<R>> TransformationChain<R> noOp() {
+        return new TransformationChain<R>(Collections.<Transformation<R>>emptyList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index c575d92..3703ed9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -25,7 +25,9 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.sink.SinkRecord;
 import org.apache.kafka.connect.sink.SinkTask;
+import org.apache.kafka.connect.source.SourceRecord;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
 import org.apache.kafka.connect.storage.OffsetBackingStore;
@@ -325,7 +327,7 @@ public class Worker {
             else
                 valueConverter = defaultValueConverter;
 
-            workerTask = buildWorkerTask(id, task, statusListener, initialState, keyConverter, valueConverter);
+            workerTask = buildWorkerTask(connConfig, id, task, statusListener, initialState, keyConverter, valueConverter);
             workerTask.initialize(taskConfig);
         } catch (Throwable t) {
             log.error("Failed to start task {}", id, t);
@@ -344,7 +346,8 @@ public class Worker {
         return true;
     }
 
-    private WorkerTask buildWorkerTask(ConnectorTaskId id,
+    private WorkerTask buildWorkerTask(ConnectorConfig connConfig,
+                                       ConnectorTaskId id,
                                        Task task,
                                        TaskStatus.Listener statusListener,
                                        TargetState initialState,
@@ -352,16 +355,18 @@ public class Worker {
                                        Converter valueConverter) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
+            TransformationChain<SourceRecord> transformationChain = new TransformationChain<>(connConfig.<SourceRecord>transformations());
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             KafkaProducer<byte[], byte[]> producer = new KafkaProducer<>(producerProps);
             return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
-                     valueConverter, producer, offsetReader, offsetWriter, config, time);
+                     valueConverter, transformationChain, producer, offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
+            TransformationChain<SinkRecord> transformationChain = new TransformationChain<>(connConfig.<SinkRecord>transformations());
             return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
-                    valueConverter, time);
+                    valueConverter, transformationChain, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index a284ec7..5cc70d5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -62,6 +62,7 @@ class WorkerSinkTask extends WorkerTask {
     private final Time time;
     private final Converter keyConverter;
     private final Converter valueConverter;
+    private final TransformationChain<SinkRecord> transformationChain;
     private KafkaConsumer<byte[], byte[]> consumer;
     private WorkerSinkTaskContext context;
     private final List<SinkRecord> messageBatch;
@@ -82,6 +83,7 @@ class WorkerSinkTask extends WorkerTask {
                           WorkerConfig workerConfig,
                           Converter keyConverter,
                           Converter valueConverter,
+                          TransformationChain<SinkRecord> transformationChain,
                           Time time) {
         super(id, statusListener, initialState);
 
@@ -89,6 +91,7 @@ class WorkerSinkTask extends WorkerTask {
         this.task = task;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
+        this.transformationChain = transformationChain;
         this.time = time;
         this.messageBatch = new ArrayList<>();
         this.currentOffsets = new HashMap<>();
@@ -128,6 +131,7 @@ class WorkerSinkTask extends WorkerTask {
         task.stop();
         if (consumer != null)
             consumer.close();
+        transformationChain.close();
     }
 
     @Override
@@ -395,14 +399,16 @@ class WorkerSinkTask extends WorkerTask {
             log.trace("Consuming message with key {}, value {}", msg.key(), msg.value());
             SchemaAndValue keyAndSchema = keyConverter.toConnectData(msg.topic(), msg.key());
             SchemaAndValue valueAndSchema = valueConverter.toConnectData(msg.topic(), msg.value());
-            messageBatch.add(
-                    new SinkRecord(msg.topic(), msg.partition(),
-                            keyAndSchema.schema(), keyAndSchema.value(),
-                            valueAndSchema.schema(), valueAndSchema.value(),
-                            msg.offset(),
-                            (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? null : msg.timestamp(),
-                            msg.timestampType())
-            );
+            SinkRecord record = new SinkRecord(msg.topic(), msg.partition(),
+                    keyAndSchema.schema(), keyAndSchema.value(),
+                    valueAndSchema.schema(), valueAndSchema.value(),
+                    msg.offset(),
+                    (msg.timestampType() == TimestampType.NO_TIMESTAMP_TYPE) ? null : msg.timestamp(),
+                    msg.timestampType());
+            record = transformationChain.apply(record);
+            if (record != null) {
+                messageBatch.add(record);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 05a07b8..092072d 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -55,6 +55,7 @@ class WorkerSourceTask extends WorkerTask {
     private final SourceTask task;
     private final Converter keyConverter;
     private final Converter valueConverter;
+    private final TransformationChain<SourceRecord> transformationChain;
     private KafkaProducer<byte[], byte[]> producer;
     private final OffsetStorageReader offsetReader;
     private final OffsetStorageWriter offsetWriter;
@@ -80,6 +81,7 @@ class WorkerSourceTask extends WorkerTask {
                             TargetState initialState,
                             Converter keyConverter,
                             Converter valueConverter,
+                            TransformationChain<SourceRecord> transformationChain,
                             KafkaProducer<byte[], byte[]> producer,
                             OffsetStorageReader offsetReader,
                             OffsetStorageWriter offsetWriter,
@@ -91,6 +93,7 @@ class WorkerSourceTask extends WorkerTask {
         this.task = task;
         this.keyConverter = keyConverter;
         this.valueConverter = valueConverter;
+        this.transformationChain = transformationChain;
         this.producer = producer;
         this.offsetReader = offsetReader;
         this.offsetWriter = offsetWriter;
@@ -116,6 +119,7 @@ class WorkerSourceTask extends WorkerTask {
 
     protected void close() {
         producer.close(30, TimeUnit.SECONDS);
+        transformationChain.close();
     }
 
     @Override
@@ -181,7 +185,14 @@ class WorkerSourceTask extends WorkerTask {
      */
     private boolean sendRecords() {
         int processed = 0;
-        for (final SourceRecord record : toSend) {
+        for (final SourceRecord preTransformRecord : toSend) {
+            final SourceRecord record = transformationChain.apply(preTransformRecord);
+
+            if (record == null) {
+                commitTaskRecord(preTransformRecord);
+                continue;
+            }
+
             byte[] key = keyConverter.fromConnectData(record.topic(), record.keySchema(), record.key());
             byte[] value = valueConverter.fromConnectData(record.topic(), record.valueSchema(), record.value());
             final ProducerRecord<byte[], byte[]> producerRecord = new ProducerRecord<>(record.topic(), record.kafkaPartition(), record.timestamp(), key, value);
@@ -219,7 +230,7 @@ class WorkerSourceTask extends WorkerTask {
                                     log.trace("Wrote record successfully: topic {} partition {} offset {}",
                                             recordMetadata.topic(), recordMetadata.partition(),
                                             recordMetadata.offset());
-                                    commitTaskRecord(record);
+                                    commitTaskRecord(preTransformRecord);
                                 }
                                 recordSent(producerRecord);
                             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 519aa9a..0d20802 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -17,9 +17,9 @@
 
 package org.apache.kafka.connect.runtime.rest.resources;
 
-import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.PluginDiscovery;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 
@@ -60,6 +60,6 @@ public class ConnectorPluginsResource {
     @GET
     @Path("/")
     public List<ConnectorPluginInfo> listConnectorPlugins() {
-        return AbstractHerder.connectorPlugins();
+        return PluginDiscovery.connectorPlugins();
     }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
new file mode 100644
index 0000000..9c77b36
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ConnectorConfigTest.java
@@ -0,0 +1,162 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class ConnectorConfigTest<R extends ConnectRecord<R>> {
+
+    public static abstract class TestConnector extends Connector {
+    }
+
+    public static class SimpleTransformation<R extends ConnectRecord<R>> implements Transformation<R>  {
+
+        int magicNumber = 0;
+
+        @Override
+        public void configure(Map<String, ?> props) {
+            magicNumber = Integer.parseInt((String) props.get("magic.number"));
+        }
+
+        @Override
+        public R apply(R record) {
+            return null;
+        }
+
+        @Override
+        public void close() {
+            magicNumber = 0;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return new ConfigDef()
+                    .define("magic.number", ConfigDef.Type.INT, ConfigDef.NO_DEFAULT_VALUE, ConfigDef.Range.atLeast(42), ConfigDef.Importance.HIGH, "");
+        }
+    }
+
+    @Test
+    public void noTransforms() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        new ConnectorConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void danglingTransformAlias() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "dangler");
+        new ConnectorConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void wrongTransformationType() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", "uninstantiable");
+        new ConnectorConfig(props);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void unconfiguredTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        new ConnectorConfig(props);
+    }
+
+    @Test
+    public void misconfiguredTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "40");
+        try {
+            new ConnectorConfig(props);
+            fail();
+        } catch (ConfigException e) {
+            assertTrue(e.getMessage().contains("Value must be at least 42"));
+        }
+    }
+
+    @Test
+    public void singleTransform() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        final ConnectorConfig config = new ConnectorConfig(props);
+        final List<Transformation<R>> transformations = config.transformations();
+        assertEquals(1, transformations.size());
+        final SimpleTransformation xform = (SimpleTransformation) transformations.get(0);
+        assertEquals(42, xform.magicNumber);
+    }
+
+    @Test(expected = ConfigException.class)
+    public void multipleTransformsOneDangling() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a, b");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        new ConnectorConfig(props);
+    }
+
+    @Test
+    public void multipleTransforms() {
+        Map<String, String> props = new HashMap<>();
+        props.put("name", "test");
+        props.put("connector.class", TestConnector.class.getName());
+        props.put("transforms", "a, b");
+        props.put("transforms.a.type", SimpleTransformation.class.getName());
+        props.put("transforms.a.magic.number", "42");
+        props.put("transforms.b.type", SimpleTransformation.class.getName());
+        props.put("transforms.b.magic.number", "84");
+        final ConnectorConfig config = new ConnectorConfig(props);
+        final List<Transformation<R>> transformations = config.transformations();
+        assertEquals(2, transformations.size());
+        assertEquals(42, ((SimpleTransformation) transformations.get(0)).magicNumber);
+        assertEquals(84, ((SimpleTransformation) transformations.get(1)).magicNumber);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index d4427d1..f93e385 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -109,6 +109,8 @@ public class WorkerSinkTaskTest {
     @Mock
     private Converter valueConverter;
     @Mock
+    private TransformationChain<SinkRecord> transformationChain;
+    @Mock
     private TaskStatus.Listener statusListener;
     @Mock
     private KafkaConsumer<byte[], byte[]> consumer;
@@ -130,7 +132,7 @@ public class WorkerSinkTaskTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, transformationChain, time);
 
         recordsReturned = 0;
     }
@@ -139,7 +141,7 @@ public class WorkerSinkTaskTest {
     public void testStartPaused() throws Exception {
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, TargetState.PAUSED, workerConfig, keyConverter, valueConverter, transformationChain, time);
 
         expectInitializeTask();
         expectPollInitialAssignment();
@@ -164,7 +166,7 @@ public class WorkerSinkTaskTest {
         expectPollInitialAssignment();
 
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -196,7 +198,7 @@ public class WorkerSinkTaskTest {
         PowerMock.expectLastCall();
 
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -223,7 +225,7 @@ public class WorkerSinkTaskTest {
 
         // If a retriable exception is thrown, we should redeliver the same batch, pausing the consumer in the meantime
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
         sinkTask.put(EasyMock.capture(records));
         EasyMock.expectLastCall().andThrow(new RetriableException("retry"));
@@ -308,7 +310,7 @@ public class WorkerSinkTaskTest {
         expectPollInitialAssignment();
 
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -376,7 +378,7 @@ public class WorkerSinkTaskTest {
         expectPollInitialAssignment();
 
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -429,7 +431,7 @@ public class WorkerSinkTaskTest {
 
         // iter 2
         expectConsumerPoll(2);
-        expectConvertMessages(2);
+        expectConversionAndTransformation(2);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -492,7 +494,7 @@ public class WorkerSinkTaskTest {
 
         // iter 2
         expectConsumerPoll(1);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
         sinkTask.put(EasyMock.<Collection<SinkRecord>>anyObject());
         EasyMock.expectLastCall();
 
@@ -533,7 +535,7 @@ public class WorkerSinkTaskTest {
     public void testMissingTimestampPropagation() throws Exception {
         expectInitializeTask();
         expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
 
         Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
 
@@ -561,7 +563,7 @@ public class WorkerSinkTaskTest {
 
         expectInitializeTask();
         expectConsumerPoll(1, timestamp, timestampType);
-        expectConvertMessages(1);
+        expectConversionAndTransformation(1);
 
         Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
 
@@ -682,9 +684,18 @@ public class WorkerSinkTaskTest {
                 });
     }
 
-    private void expectConvertMessages(final int numMessages) {
+    private void expectConversionAndTransformation(final int numMessages) {
         EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).times(numMessages);
         EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).times(numMessages);
+
+        final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
+        EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)))
+                .andAnswer(new IAnswer<SinkRecord>() {
+                    @Override
+                    public SinkRecord answer() {
+                        return recordCapture.getValue();
+                    }
+                }).times(numMessages);
     }
 
     private abstract static class TestSinkTask extends SinkTask  {

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
index 8fa62b6..9739c99 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskThreadedTest.java
@@ -104,6 +104,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
     private WorkerConfig workerConfig;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private TransformationChain transformationChain;
     private WorkerSinkTask workerTask;
     @Mock private KafkaConsumer<byte[], byte[]> consumer;
     private Capture<ConsumerRebalanceListener> rebalanceListener = EasyMock.newCapture();
@@ -128,7 +129,7 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
         workerConfig = new StandaloneConfig(workerProps);
         workerTask = PowerMock.createPartialMock(
                 WorkerSinkTask.class, new String[]{"createConsumer"},
-                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, time);
+                taskId, sinkTask, statusListener, initialState, workerConfig, keyConverter, valueConverter, TransformationChain.<SinkRecord>noOp(), time);
 
         recordsReturned = 0;
     }
@@ -555,6 +556,15 @@ public class WorkerSinkTaskThreadedTest extends ThreadedTest {
                 });
         EasyMock.expect(keyConverter.toConnectData(TOPIC, RAW_KEY)).andReturn(new SchemaAndValue(KEY_SCHEMA, KEY)).anyTimes();
         EasyMock.expect(valueConverter.toConnectData(TOPIC, RAW_VALUE)).andReturn(new SchemaAndValue(VALUE_SCHEMA, VALUE)).anyTimes();
+
+        final Capture<SinkRecord> recordCapture = EasyMock.newCapture();
+        EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture))).andAnswer(new IAnswer<SinkRecord>() {
+            @Override
+            public SinkRecord answer() {
+                return recordCapture.getValue();
+            }
+        }).anyTimes();
+
         Capture<Collection<SinkRecord>> capturedRecords = EasyMock.newCapture(CaptureType.ALL);
         sinkTask.put(EasyMock.capture(capturedRecords));
         EasyMock.expectLastCall().anyTimes();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
index 71b315f..8c07ba1 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSourceTaskTest.java
@@ -85,6 +85,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @Mock private SourceTask sourceTask;
     @Mock private Converter keyConverter;
     @Mock private Converter valueConverter;
+    @Mock private TransformationChain<SourceRecord> transformationChain;
     @Mock private KafkaProducer<byte[], byte[]> producer;
     @Mock private OffsetStorageReader offsetReader;
     @Mock private OffsetStorageWriter offsetWriter;
@@ -124,8 +125,8 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     }
 
     private void createWorkerTask(TargetState initialState) {
-        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter,
-                valueConverter, producer, offsetReader, offsetWriter, config, Time.SYSTEM);
+        workerTask = new WorkerSourceTask(taskId, sourceTask, statusListener, initialState, keyConverter, valueConverter, transformationChain,
+                producer, offsetReader, offsetWriter, config, Time.SYSTEM);
     }
 
     @Test
@@ -146,6 +147,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         statusListener.onShutdown(taskId);
         EasyMock.expectLastCall();
 
@@ -191,6 +195,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -237,6 +244,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -282,6 +292,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -322,6 +335,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -363,6 +379,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -509,6 +528,9 @@ public class WorkerSourceTaskTest extends ThreadedTest {
         producer.close(EasyMock.anyLong(), EasyMock.anyObject(TimeUnit.class));
         EasyMock.expectLastCall();
 
+        transformationChain.close();
+        EasyMock.expectLastCall();
+
         PowerMock.replayAll();
 
         workerTask.initialize(TASK_CONFIG);
@@ -553,6 +575,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
     @SuppressWarnings("unchecked")
     private void expectSendRecordSyncFailure(Throwable error) throws InterruptedException {
         expectConvertKeyValue(false);
+        expectApplyTransformationChain(false);
 
         offsetWriter.offset(PARTITION, OFFSET);
         PowerMock.expectLastCall();
@@ -581,6 +604,7 @@ public class WorkerSourceTaskTest extends ThreadedTest {
 
     private Capture<ProducerRecord<byte[], byte[]>> expectSendRecord(boolean anyTimes, boolean isRetry, boolean succeed) throws InterruptedException {
         expectConvertKeyValue(anyTimes);
+        expectApplyTransformationChain(anyTimes);
 
         Capture<ProducerRecord<byte[], byte[]>> sent = EasyMock.newCapture();
 
@@ -634,6 +658,25 @@ public class WorkerSourceTaskTest extends ThreadedTest {
             convertValueExpect.andReturn(SERIALIZED_RECORD);
     }
 
+    private void expectApplyTransformationChain(boolean anyTimes) {
+        final Capture<SourceRecord> recordCapture = EasyMock.newCapture();
+        IExpectationSetters<SourceRecord> convertKeyExpect = EasyMock.expect(transformationChain.apply(EasyMock.capture(recordCapture)));
+        if (anyTimes)
+            convertKeyExpect.andStubAnswer(new IAnswer<SourceRecord>() {
+                @Override
+                public SourceRecord answer() {
+                    return recordCapture.getValue();
+                }
+            });
+        else
+            convertKeyExpect.andAnswer(new IAnswer<SourceRecord>() {
+                @Override
+                public SourceRecord answer() {
+                    return recordCapture.getValue();
+                }
+            });
+    }
+
     private void expectTaskCommitRecord(boolean anyTimes, boolean succeed) throws InterruptedException {
         sourceTask.commitRecord(EasyMock.anyObject(SourceRecord.class));
         IExpectationSetters<Void> expect = EasyMock.expectLastCall();

http://git-wip-us.apache.org/repos/asf/kafka/blob/2f904883/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
index eac0520..4e90b45 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerTest.java
@@ -373,6 +373,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(JsonConverter.class),
+                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
@@ -446,6 +447,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.anyObject(JsonConverter.class),
                 EasyMock.anyObject(JsonConverter.class),
+                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),
@@ -500,6 +502,7 @@ public class WorkerTest extends ThreadedTest {
                 EasyMock.eq(TargetState.STARTED),
                 EasyMock.capture(keyConverter),
                 EasyMock.capture(valueConverter),
+                EasyMock.eq(TransformationChain.<SourceRecord>noOp()),
                 EasyMock.anyObject(KafkaProducer.class),
                 EasyMock.anyObject(OffsetStorageReader.class),
                 EasyMock.anyObject(OffsetStorageWriter.class),


Mime
View raw message