kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [kafka] branch 2.1 updated: KAFKA-7503: Connect integration test harness
Date Mon, 14 Jan 2019 22:10:26 GMT
This is an automated email from the ASF dual-hosted git repository.

ewencp pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.1 by this push:
     new 672cc57  KAFKA-7503: Connect integration test harness
672cc57 is described below

commit 672cc578ef0407ff0fe0ae3a8ed33a4d9683635e
Author: Arjun Satish <arjun@confluent.io>
AuthorDate: Mon Jan 14 13:50:23 2019 -0800

    KAFKA-7503: Connect integration test harness
    
    Expose a programmatic way to bring up a Kafka and Zk cluster through Java API to facilitate integration tests for framework level changes in Kafka Connect. The Kafka classes would be similar to KafkaEmbedded in streams. The new classes would reuse the kafka.server.KafkaServer classes from :core, and provide a simple interface to bring up brokers in integration tests.
    
    Signed-off-by: Arjun Satish <arjunconfluent.io>
    
    Author: Arjun Satish <arjun@confluent.io>
    Author: Arjun Satish <wicknicks@users.noreply.github.com>
    
    Reviewers: Randall Hauch <rhauch@gmail.com>, Konstantine Karantasis <konstantine@confluent.io>, Ewen Cheslack-Postava <ewen@confluent.io>
    
    Closes #5516 from wicknicks/connect-integration-test
    
    (cherry picked from commit 69d8d2ea11c5e08884ab4c7b8079af5fd21247be)
    Signed-off-by: Ewen Cheslack-Postava <me@ewencp.org>
---
 build.gradle                                       |   2 +
 checkstyle/import-control.xml                      |  14 +-
 .../kafka/connect/cli/ConnectDistributed.java      | 109 ++++---
 .../org/apache/kafka/connect/runtime/Connect.java  |   7 +-
 .../kafka/connect/runtime/HerderProvider.java      |  68 +++++
 .../runtime/health/ConnectClusterStateImpl.java    |  12 +-
 .../kafka/connect/runtime/rest/RestServer.java     |  31 +-
 .../runtime/rest/entities/ConnectorStateInfo.java  |  11 +-
 .../rest/resources/ConnectorPluginsResource.java   |  12 +-
 .../runtime/rest/resources/ConnectorsResource.java |  39 +--
 .../runtime/rest/resources/RootResource.java       |   8 +-
 .../kafka/connect/integration/ConnectorHandle.java | 116 +++++++
 .../integration/ErrorHandlingIntegrationTest.java  | 231 ++++++++++++++
 .../integration/ExampleConnectIntegrationTest.java | 137 +++++++++
 .../integration/MonitorableSinkConnector.java      | 115 +++++++
 .../kafka/connect/integration/RuntimeHandles.java  |  63 ++++
 .../kafka/connect/integration/TaskHandle.java      | 111 +++++++
 .../kafka/connect/runtime/rest/RestServerTest.java |   5 +-
 .../resources/ConnectorPluginsResourceTest.java    |   3 +-
 .../rest/resources/ConnectorsResourceTest.java     |   3 +-
 .../runtime/rest/resources/RootResourceTest.java   |   3 +-
 .../util/clusters/EmbeddedConnectCluster.java      | 280 +++++++++++++++++
 .../util/clusters/EmbeddedKafkaCluster.java        | 339 +++++++++++++++++++++
 .../runtime/src/test/resources/log4j.properties    |   3 +-
 24 files changed, 1617 insertions(+), 105 deletions(-)

diff --git a/build.gradle b/build.gradle
index 064bd2c..6ed1a87 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1409,6 +1409,8 @@ project(':connect:runtime') {
     testCompile libs.powermockEasymock
 
     testCompile project(':clients').sourceSets.test.output
+    testCompile project(':core')
+    testCompile project(':core').sourceSets.test.output
 
     testRuntime libs.slf4jlog4j
   }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 91d23f6..3927a25 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -335,8 +335,6 @@
       </subpackage>
     </subpackage>
 
-
-
     <subpackage name="cli">
       <allow pkg="org.apache.kafka.connect.runtime" />
       <allow pkg="org.apache.kafka.connect.storage" />
@@ -354,6 +352,18 @@
       <allow pkg="org.reflections.vfs" />
       <!-- for annotations to avoid code duplication -->
       <allow pkg="com.fasterxml.jackson.annotation" />
+      <allow pkg="com.fasterxml.jackson.databind" />
+      <subpackage name="clusters">
+        <allow pkg="kafka.server" />
+        <allow pkg="kafka.zk" />
+        <allow pkg="kafka.utils" />
+        <allow class="javax.servlet.http.HttpServletResponse" />
+      </subpackage>
+    </subpackage>
+
+    <subpackage name="integration">
+      <allow pkg="org.apache.kafka.connect.util.clusters" />
+      <allow pkg="org.apache.kafka.connect" />
     </subpackage>
 
     <subpackage name="json">
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index f8c15de..a6c6d98 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.utils.Exit;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -38,6 +39,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.net.URI;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Map;
 
@@ -53,62 +55,26 @@ import java.util.Map;
 public class ConnectDistributed {
     private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
 
-    public static void main(String[] args) throws Exception {
-        if (args.length < 1) {
+    private final Time time = Time.SYSTEM;
+    private final long initStart = time.hiResClockMs();
+
+    public static void main(String[] args) {
+
+        if (args.length < 1 || Arrays.asList(args).contains("--help")) {
             log.info("Usage: ConnectDistributed worker.properties");
             Exit.exit(1);
         }
 
         try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect distributed worker initializing ...");
-            long initStart = time.hiResClockMs();
             WorkerInfo initInfo = new WorkerInfo();
             initInfo.logAll();
 
             String workerPropsFile = args[0];
             Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            DistributedConfig config = new DistributedConfig(workerProps);
-
-            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-            RestServer rest = new RestServer(config);
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
-
-            KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
-            offsetBackingStore.configure(config);
-
-            Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
-            WorkerConfigTransformer configTransformer = worker.configTransformer();
-
-            Converter internalValueConverter = worker.getInternalValueConverter();
-            StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
-            statusBackingStore.configure(config);
-
-            ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
-                    internalValueConverter,
-                    config,
-                    configTransformer);
-
-            DistributedHerder herder = new DistributedHerder(config, time, worker,
-                    kafkaClusterId, statusBackingStore, configBackingStore,
-                    advertisedUrl.toString());
-            final Connect connect = new Connect(herder, rest);
-            log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
-            try {
-                connect.start();
-            } catch (Exception e) {
-                log.error("Failed to start Connect", e);
-                connect.stop();
-                Exit.exit(3);
-            }
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
+
+            ConnectDistributed connectDistributed = new ConnectDistributed();
+            Connect connect = connectDistributed.startConnect(workerProps);
 
             // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
             connect.awaitStop();
@@ -118,4 +84,55 @@ public class ConnectDistributed {
             Exit.exit(2);
         }
     }
+
+    public Connect startConnect(Map<String, String> workerProps) {
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+
+        RestServer rest = new RestServer(config);
+        HerderProvider provider = new HerderProvider();
+        rest.start(provider, plugins);
+
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+        WorkerConfigTransformer configTransformer = worker.configTransformer();
+
+        Converter internalValueConverter = worker.getInternalValueConverter();
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        statusBackingStore.configure(config);
+
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
+                internalValueConverter,
+                config,
+                configTransformer);
+
+        DistributedHerder herder = new DistributedHerder(config, time, worker,
+                kafkaClusterId, statusBackingStore, configBackingStore,
+                advertisedUrl.toString());
+
+        final Connect connect = new Connect(herder, rest);
+        log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+            // herder has initialized now, and ready to be used by the RestServer.
+            provider.setHerder(herder);
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        return connect;
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 846ed1a..965046c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -20,6 +20,7 @@ import org.apache.kafka.connect.runtime.rest.RestServer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -50,7 +51,6 @@ public class Connect {
             Runtime.getRuntime().addShutdownHook(shutdownHook);
 
             herder.start();
-            rest.start(herder);
 
             log.info("Kafka Connect started");
         } finally {
@@ -82,6 +82,11 @@ public class Connect {
         }
     }
 
+    // Visible for testing
+    public URI restUrl() {
+        return rest.serverUrl();
+    }
+
     private class ShutdownHook extends Thread {
         @Override
         public void run() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
new file mode 100644
index 0000000..42c0925
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A supplier for {@link Herder}s.
+ */
+public class HerderProvider {
+
+    private final CountDownLatch initialized = new CountDownLatch(1);
+    volatile Herder herder = null;
+
+    public HerderProvider() {
+    }
+
+    /**
+     * Create a herder provider with a herder.
+     * @param herder the herder that will be supplied to threads waiting on this provider
+     */
+    public HerderProvider(Herder herder) {
+        this.herder = herder;
+        initialized.countDown();
+    }
+
+    /**
+     * @return the contained herder.
+     * @throws ConnectException if a herder was not available within a duration of calling this method
+     */
+    public Herder get() {
+        try {
+            // wait for herder to be initialized
+            if (!initialized.await(1, TimeUnit.MINUTES)) {
+                throw new ConnectException("Timed out waiting for herder to be initialized.");
+            }
+        } catch (InterruptedException e) {
+            throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
+        }
+        return herder;
+    }
+
+    /**
+     * @param herder set a herder, and signal to all threads waiting on get().
+     */
+    public void setHerder(Herder herder) {
+        this.herder = herder;
+        initialized.countDown();
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index a0f7fde..ea93a72 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -22,7 +22,7 @@ import org.apache.kafka.connect.health.ConnectorHealth;
 import org.apache.kafka.connect.health.ConnectorState;
 import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.health.TaskState;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.util.Callback;
 
@@ -34,16 +34,16 @@ import java.util.Map;
 
 public class ConnectClusterStateImpl implements ConnectClusterState {
 
-    private Herder herder;
+    private HerderProvider herderProvider;
 
-    public ConnectClusterStateImpl(Herder herder) {
-        this.herder = herder;
+    public ConnectClusterStateImpl(HerderProvider herderProvider) {
+        this.herderProvider = herderProvider;
     }
 
     @Override
     public Collection<String> connectors() {
         final Collection<String> connectors = new ArrayList<>();
-        herder.connectors(new Callback<java.util.Collection<String>>() {
+        herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
             @Override
             public void onCompletion(Throwable error, Collection<String> result) {
                 connectors.addAll(result);
@@ -55,7 +55,7 @@ public class ConnectClusterStateImpl implements ConnectClusterState {
     @Override
     public ConnectorHealth connectorHealth(String connName) {
 
-        ConnectorStateInfo state = herder.connectorStatus(connName);
+        ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
         ConnectorState connectorState = new ConnectorState(
             state.connector().state(),
             state.connector().workerId(),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index 73997a5..2f07455 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -50,6 +50,8 @@ import org.glassfish.jersey.servlet.ServletContainer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -60,9 +62,6 @@ import java.util.Locale;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
-
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
@@ -160,20 +159,20 @@ public class RestServer {
         return connector;
     }
 
-    public void start(Herder herder) {
+    public void start(HerderProvider herderProvider, Plugins plugins) {
         log.info("Starting REST server");
 
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(new RootResource(herder));
-        resourceConfig.register(new ConnectorsResource(herder, config));
-        resourceConfig.register(new ConnectorPluginsResource(herder));
+        resourceConfig.register(new RootResource(herderProvider));
+        resourceConfig.register(new ConnectorsResource(herderProvider, config));
+        resourceConfig.register(new ConnectorPluginsResource(herderProvider));
 
         resourceConfig.register(ConnectExceptionMapper.class);
         resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
 
-        registerRestExtensions(herder, resourceConfig);
+        registerRestExtensions(herderProvider, plugins, resourceConfig);
 
         ServletContainer servletContainer = new ServletContainer(resourceConfig);
         ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -219,7 +218,9 @@ public class RestServer {
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
     }
 
-
+    public URI serverUrl() {
+        return jettyServer.getURI();
+    }
 
     public void stop() {
         log.info("Stopping REST server");
@@ -263,7 +264,7 @@ public class RestServer {
         Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
         if (advertisedPort != null)
             builder.port(advertisedPort);
-        else if (serverConnector != null)
+        else if (serverConnector != null && serverConnector.getPort() > 0)
             builder.port(serverConnector.getPort());
 
         log.info("Advertised URI: {}", builder.build());
@@ -301,15 +302,15 @@ public class RestServer {
         return null;
     }
 
-    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
-        connectRestExtensions = herder.plugins().newPlugins(
+    void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) {
+        connectRestExtensions = plugins.newPlugins(
             config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
             config, ConnectRestExtension.class);
 
         ConnectRestExtensionContext connectRestExtensionContext =
             new ConnectRestExtensionContextImpl(
                 new ConnectRestConfigurable(resourceConfig),
-                new ConnectClusterStateImpl(herder)
+                new ConnectClusterStateImpl(provider)
             );
         for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
             connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index 80192ca..6280473 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -90,7 +90,10 @@ public class ConnectorStateInfo {
     }
 
     public static class ConnectorState extends AbstractState {
-        public ConnectorState(String state, String worker, String msg) {
+        @JsonCreator
+        public ConnectorState(@JsonProperty("state") String state,
+                              @JsonProperty("worker_id") String worker,
+                              @JsonProperty("msg") String msg) {
             super(state, worker, msg);
         }
     }
@@ -98,7 +101,11 @@ public class ConnectorStateInfo {
     public static class TaskState extends AbstractState implements Comparable<TaskState> {
         private final int id;
 
-        public TaskState(int id, String state, String worker, String msg) {
+        @JsonCreator
+        public TaskState(@JsonProperty("id") int id,
+                         @JsonProperty("state") String state,
+                         @JsonProperty("worker_id") String worker,
+                         @JsonProperty("msg") String msg) {
             super(state, worker, msg);
             this.id = id;
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 24eb93b..87f25b2 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,7 +18,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -49,7 +49,7 @@ import java.util.Map;
 public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
-    private final Herder herder;
+    private final HerderProvider herderProvider;
     private final List<ConnectorPluginInfo> connectorPlugins;
 
     private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
@@ -58,8 +58,8 @@ public class ConnectorPluginsResource {
             SchemaSourceConnector.class
     );
 
-    public ConnectorPluginsResource(Herder herder) {
-        this.herder = herder;
+    public ConnectorPluginsResource(HerderProvider herderProvider) {
+        this.herderProvider = herderProvider;
         this.connectorPlugins = new ArrayList<>();
     }
 
@@ -78,7 +78,7 @@ public class ConnectorPluginsResource {
             );
         }
 
-        return herder.validateConnectorConfig(connectorConfig);
+        return herderProvider.get().validateConnectorConfig(connectorConfig);
     }
 
     @GET
@@ -90,7 +90,7 @@ public class ConnectorPluginsResource {
     // TODO: improve once plugins are allowed to be added/removed during runtime.
     private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
         if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            for (PluginDesc<Connector> plugin : herderProvider.get().plugins().connectors()) {
                 if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
                     connectorPlugins.add(new ConnectorPluginInfo(plugin));
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index e966104..29a8c39 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
 import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
@@ -67,21 +68,25 @@ public class ConnectorsResource {
     // but currently a worker simply leaving the group can take this long as well.
     private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
 
-    private final Herder herder;
+    private final HerderProvider herderProvider;
     private final WorkerConfig config;
     @javax.ws.rs.core.Context
     private ServletContext context;
 
-    public ConnectorsResource(Herder herder, WorkerConfig config) {
-        this.herder = herder;
+    public ConnectorsResource(HerderProvider herder, WorkerConfig config) {
+        this.herderProvider = herder;
         this.config = config;
     }
 
+    private Herder herder() {
+        return herderProvider.get();
+    }
+
     @GET
     @Path("/")
     public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
-        herder.connectors(cb);
+        herder().connectors(cb);
         return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
         }, forward);
     }
@@ -99,7 +104,7 @@ public class ConnectorsResource {
         checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(name, configs, false, cb);
+        herder().putConnectorConfig(name, configs, false, cb);
         Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
 
@@ -112,7 +117,7 @@ public class ConnectorsResource {
     public ConnectorInfo getConnector(final @PathParam("connector") String connector,
                                       final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
-        herder.connectorInfo(connector, cb);
+        herder().connectorInfo(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
     }
 
@@ -121,14 +126,14 @@ public class ConnectorsResource {
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
                                                   final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
-        herder.connectorConfig(connector, cb);
+        herder().connectorConfig(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
     }
 
     @GET
     @Path("/{connector}/status")
     public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
-        return herder.connectorStatus(connector);
+        return herder().connectorStatus(connector);
     }
 
     @PUT
@@ -139,7 +144,7 @@ public class ConnectorsResource {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         checkAndPutConnectorConfigName(connector, connectorConfig);
 
-        herder.putConnectorConfig(connector, connectorConfig, true, cb);
+        herder().putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
                 "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
@@ -157,21 +162,21 @@ public class ConnectorsResource {
     public void restartConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder.restartConnector(connector, cb);
+        herder().restartConnector(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
     public Response pauseConnector(@PathParam("connector") String connector) {
-        herder.pauseConnector(connector);
+        herder().pauseConnector(connector);
         return Response.accepted().build();
     }
 
     @PUT
     @Path("/{connector}/resume")
     public Response resumeConnector(@PathParam("connector") String connector) {
-        herder.resumeConnector(connector);
+        herder().resumeConnector(connector);
         return Response.accepted().build();
     }
 
@@ -180,7 +185,7 @@ public class ConnectorsResource {
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
                                          final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
-        herder.taskConfigs(connector, cb);
+        herder().taskConfigs(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
@@ -191,7 +196,7 @@ public class ConnectorsResource {
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder.putTaskConfigs(connector, taskConfigs, cb);
+        herder().putTaskConfigs(connector, taskConfigs, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
     }
 
@@ -199,7 +204,7 @@ public class ConnectorsResource {
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
                                                       final @PathParam("task") Integer task) throws Throwable {
-        return herder.taskStatus(new ConnectorTaskId(connector, task));
+        return herder().taskStatus(new ConnectorTaskId(connector, task));
     }
 
     @POST
@@ -209,7 +214,7 @@ public class ConnectorsResource {
                             final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
-        herder.restartTask(taskId, cb);
+        herder().restartTask(taskId, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
     }
 
@@ -218,7 +223,7 @@ public class ConnectorsResource {
     public void destroyConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.deleteConnectorConfig(connector, cb);
+        herder().deleteConnectorConfig(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 9666bf1..56516cd 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
 import javax.ws.rs.GET;
@@ -28,15 +28,15 @@ import javax.ws.rs.core.MediaType;
 @Produces(MediaType.APPLICATION_JSON)
 public class RootResource {
 
-    private final Herder herder;
+    private final HerderProvider herder;
 
-    public RootResource(Herder herder) {
+    public RootResource(HerderProvider herder) {
         this.herder = herder;
     }
 
     @GET
     @Path("/")
     public ServerInfo serverInfo() {
-        return new ServerInfo(herder.kafkaClusterId());
+        return new ServerInfo(herder.get().kafkaClusterId());
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
new file mode 100644
index 0000000..e59691b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A handle to a connector executing in a Connect cluster.
+ */
+public class ConnectorHandle {
+
+    private static final Logger log = LoggerFactory.getLogger(ConnectorHandle.class);
+
+    private final String connectorName;
+    private final Map<String, TaskHandle> taskHandles = new ConcurrentHashMap<>();
+
+    private CountDownLatch recordsRemainingLatch;
+    private int expectedRecords = -1;
+
+    public ConnectorHandle(String connectorName) {
+        this.connectorName = connectorName;
+    }
+
+    /**
+     * Get or create a task handle for a given task id. The task need not be created when this method is called. If the
+     * handle is called before the task is created, the task will bind to the handle once it starts (or restarts).
+     *
+     * @param taskId the task id
+     * @return a non-null {@link TaskHandle}
+     */
+    public TaskHandle taskHandle(String taskId) {
+        return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, taskId));
+    }
+
+    public Collection<TaskHandle> tasks() {
+        return taskHandles.values();
+    }
+
+    /**
+     * Delete the task handle for this task id.
+     *
+     * @param taskId the task id.
+     */
+    public void deleteTask(String taskId) {
+        log.info("Removing handle for {} task in connector {}", taskId, connectorName);
+        taskHandles.remove(taskId);
+    }
+
+    /**
+     * Set the number of expected records for this task.
+     *
+     * @param expectedRecords number of records
+     */
+    public void expectedRecords(int expectedRecords) {
+        this.expectedRecords = expectedRecords;
+        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    }
+
+    /**
+     * Record a message arrival at the connector.
+     */
+    public void record() {
+        if (recordsRemainingLatch != null) {
+            recordsRemainingLatch.countDown();
+        }
+    }
+
+    /**
+     * Wait for this task to receive the expected number of records.
+     *
+     * @param consumeMaxDurationMs max duration to wait for records
+     * @throws InterruptedException if another threads interrupts this one while waiting for records
+     */
+    public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException {
+        if (recordsRemainingLatch == null || expectedRecords < 0) {
+            throw new IllegalStateException("expectedRecords() was not set for this task?");
+        }
+        if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) {
+            String msg = String.format("Insufficient records seen by connector %s in %d millis. Records expected=%d, actual=%d",
+                    connectorName,
+                    consumeMaxDurationMs,
+                    expectedRecords,
+                    expectedRecords - recordsRemainingLatch.getCount());
+            throw new DataException(msg);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectorHandle{" +
+                "connectorName='" + connectorName + '\'' +
+                '}';
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
new file mode 100644
index 0000000..af3ab44
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records,
+ * and dead letter queues).
+ */
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
+
+    private static final String DLQ_TOPIC = "my-connector-errors";
+    private static final String CONNECTOR_NAME = "error-conn";
+    private static final String TASK_ID = "error-conn-0";
+    private static final int NUM_RECORDS_PRODUCED = 20;
+    private static final int EXPECTED_CORRECT_RECORDS = 19;
+    private static final int EXPECTED_INCORRECT_RECORDS = 1;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 5000;
+    private static final int CONSUME_MAX_DURATION_MS = 5000;
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect cluster with defaults
+        connect = new EmbeddedConnectCluster.Builder().build();
+
+        // start Connect cluster
+        connect.start();
+
+        // get connector handles before starting test.
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+        connect.stop();
+    }
+
+    @Test
+    public void testSkipRetryAndDLQWithHeaders() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic");
+
+        // setup connector config
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TRANSFORMS_CONFIG, "failing_transform");
+        props.put("transforms.failing_transform.type", FaultyPassthrough.class.getName());
+
+        // log all errors, along with message metadata
+        props.put(ERRORS_LOG_ENABLE_CONFIG, "true");
+        props.put(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+        // produce bad messages into dead letter queue
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+
+        // tolerate all erros
+        props.put(ERRORS_TOLERANCE_CONFIG, "all");
+
+        // retry for up to one second
+        props.put(ERRORS_RETRY_TIMEOUT_CONFIG, "1000");
+
+        // set expected records to successfully reach the task
+        connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
+
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 1
+                        && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1,
+                CONNECTOR_SETUP_DURATION_MS,
+                "Connector task was not assigned a partition.");
+
+        // produce some strings into test topic
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", "key-" + i, "value-" + i);
+        }
+
+        // consume all records from test topic
+        log.info("Consuming records from test topic");
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> rec : connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic")) {
+            String k = new String(rec.key());
+            String v = new String(rec.value());
+            log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic());
+            assertEquals("Unexpected key", k, "key-" + i);
+            assertEquals("Unexpected value", v, "value-" + i);
+            i++;
+        }
+
+        // wait for records to reach the task
+        connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS);
+
+        // consume failed records from dead letter queue topic
+        log.info("Consuming records from test topic");
+        ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
+        for (ConsumerRecord<byte[], byte[]> recs : messages) {
+            log.debug("Consumed record (key={}, value={}) from dead letter queue topic {}",
+                    new String(recs.key()), new String(recs.value()), DLQ_TOPIC);
+            assertTrue(recs.headers().toArray().length > 0);
+            assertValue("test-topic", recs.headers(), ERROR_HEADER_ORIG_TOPIC);
+            assertValue(RetriableException.class.getName(), recs.headers(), ERROR_HEADER_EXCEPTION);
+            assertValue("Error when value='value-7'", recs.headers(), ERROR_HEADER_EXCEPTION_MESSAGE);
+        }
+
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+
+    private void assertValue(String expected, Headers headers, String headerKey) {
+        byte[] actual = headers.lastHeader(headerKey).value();
+        if (expected == null && actual == null) {
+            return;
+        }
+        if (expected == null || actual == null) {
+            fail();
+        }
+        assertEquals(expected, new String(actual));
+    }
+
+    public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
+
+        static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+        /**
+         * An arbitrary id which causes this transformation to fail with a {@link RetriableException}, but succeeds
+         * on subsequent attempt.
+         */
+        static final int BAD_RECORD_VAL_RETRIABLE = 4;
+
+        /**
+         * An arbitrary id which causes this transformation to fail with a {@link RetriableException}.
+         */
+        static final int BAD_RECORD_VAL = 7;
+
+        private boolean shouldFail = true;
+
+        @Override
+        public R apply(R record) {
+            String badValRetriable = "value-" + BAD_RECORD_VAL_RETRIABLE;
+            if (badValRetriable.equals(record.value()) && shouldFail) {
+                shouldFail = false;
+                throw new RetriableException("Error when value='" + badValRetriable
+                        + "'. A reattempt with this record will succeed.");
+            }
+            String badVal = "value-" + BAD_RECORD_VAL;
+            if (badVal.equals(record.value())) {
+                throw new RetriableException("Error when value='" + badVal + "'");
+            }
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
new file mode 100644
index 0000000..5d887cf
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * An example integration test that demonstrates how to setup an integration test for Connect.
+ * <p></p>
+ * The following test configures and executes up a sink connector pipeline in a worker, produces messages into
+ * the source topic-partitions, and demonstrates how to check the overall behavior of the pipeline.
+ */
+@Category(IntegrationTest.class)
+public class ExampleConnectIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final int CONSUME_MAX_DURATION_MS = 5000;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
+    private static final String CONNECTOR_NAME = "simple-conn";
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kakfa and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("example-cluster")
+                .numWorkers(3)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();
+
+        // get a handle to the connector
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        // delete connector handle
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+
+        // stop all Connect, Kakfa and Zk threads.
+        connect.stop();
+    }
+
+    /**
+     * Simple test case to configure and execute an embedded Connect cluster. The test will produce and consume
+     * records, and start up a sink connector which will consume these records.
+     */
+    @Test
+    public void testProduceConsumeConnector() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(TASKS_MAX_CONFIG, "3");
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 3
+                        && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1),
+                CONNECTOR_SETUP_DURATION_MS,
+                "Connector tasks were not assigned a partition each.");
+
+        // produce some messages into source topic partitions
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+        }
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced.
+        assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+                connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic").count());
+
+        // wait for the connector tasks to consume all records.
+        connectorHandle.awaitRecords(CONSUME_MAX_DURATION_MS);
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
new file mode 100644
index 0000000..23a8d99
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.TestSinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A connector to be used in integration tests. This class provides methods to find task instances
+ * which are initiated by the embedded connector, and wait for them to consume a desired number of
+ * messages.
+ */
+public class MonitorableSinkConnector extends TestSinkConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(MonitorableSinkConnector.class);
+
+    private String connectorName;
+
+    @Override
+    public void start(Map<String, String> props) {
+        connectorName = props.get("name");
+        log.info("Starting connector {}", props.get("name"));
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MonitorableSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> config = new HashMap<>();
+            config.put("connector.name", connectorName);
+            config.put("task.id", connectorName + "-" + i);
+            configs.add(config);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+
+    public static class MonitorableSinkTask extends SinkTask {
+
+        private String connectorName;
+        private String taskId;
+        private TaskHandle taskHandle;
+
+        @Override
+        public String version() {
+            return "unknown";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            taskId = props.get("task.id");
+            connectorName = props.get("connector.name");
+            taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
+            log.debug("Starting task {}", taskId);
+        }
+
+        @Override
+        public void open(Collection<TopicPartition> partitions) {
+            log.debug("Opening {} partitions", partitions.size());
+            super.open(partitions);
+            taskHandle.partitionsAssigned(partitions.size());
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+            for (SinkRecord rec : records) {
+                taskHandle.record();
+                log.trace("Task {} obtained record (key='{}' value='{}')", taskId, rec.key(), rec.value());
+            }
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java
new file mode 100644
index 0000000..c9900f3
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A singleton class which provides a shared class for {@link ConnectorHandle}s and {@link TaskHandle}s that are
+ * required for integration tests.
+ */
+public class RuntimeHandles {
+
+    private static final RuntimeHandles INSTANCE = new RuntimeHandles();
+
+    private final Map<String, ConnectorHandle> connectorHandles = new ConcurrentHashMap<>();
+
+    private RuntimeHandles() {
+    }
+
+    /**
+     * @return the shared {@link RuntimeHandles} instance.
+     */
+    public static RuntimeHandles get() {
+        return INSTANCE;
+    }
+
+    /**
+     * Get or create a connector handle for a given connector name. The connector need not be running at the time
+     * this method is called. Once the connector is created, it will bind to this handle. Binding happens with the
+     * connectorName.
+     *
+     * @param connectorName the name of the connector
+     * @return a non-null {@link ConnectorHandle}
+     */
+    public ConnectorHandle connectorHandle(String connectorName) {
+        return connectorHandles.computeIfAbsent(connectorName, k -> new ConnectorHandle(connectorName));
+    }
+
+    /**
+     * Delete the connector handle for this connector name.
+     *
+     * @param connectorName name of the connector
+     */
+    public void deleteConnector(String connectorName) {
+        connectorHandles.remove(connectorName);
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
new file mode 100644
index 0000000..de3d924
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A handle to an executing task in a worker. Use this class to record progress, for example: number of records seen
+ * by the task using so far, or waiting for partitions to be assigned to the task.
+ */
+public class TaskHandle {
+
+    private static final Logger log = LoggerFactory.getLogger(TaskHandle.class);
+
+    private final String taskId;
+    private final ConnectorHandle connectorHandle;
+    private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
+
+    private CountDownLatch recordsRemainingLatch;
+    private int expectedRecords = -1;
+
+    public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
+        log.info("Created task {} for connector {}", taskId, connectorHandle);
+        this.taskId = taskId;
+        this.connectorHandle = connectorHandle;
+    }
+
+    /**
+     * Record a message arrival at the task.
+     */
+    public void record() {
+        if (recordsRemainingLatch != null) {
+            recordsRemainingLatch.countDown();
+        }
+        connectorHandle.record();
+    }
+
+    /**
+     * Set the number of expected records for this task.
+     *
+     * @param expectedRecords number of records
+     */
+    public void expectedRecords(int expectedRecords) {
+        this.expectedRecords = expectedRecords;
+        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    }
+
+    /**
+     * Set the number of partitions assigned to this task.
+     *
+     * @param numPartitions number of partitions
+     */
+    public void partitionsAssigned(int numPartitions) {
+        partitionsAssigned.set(numPartitions);
+    }
+
+    /**
+     * @return the number of topic partitions assigned to this task.
+     */
+    public int partitionsAssigned() {
+        return partitionsAssigned.get();
+    }
+
+    /**
+     * Wait for this task to receive the expected number of records.
+     *
+     * @param consumeMaxDurationMs max duration to wait for records
+     * @throws InterruptedException if another threads interrupts this one while waiting for records
+     */
+    public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException {
+        if (recordsRemainingLatch == null) {
+            throw new IllegalStateException("Illegal state encountered. expectedRecords() was not set for this task?");
+        }
+        if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) {
+            String msg = String.format("Insufficient records seen by task %s in %d millis. Records expected=%d, actual=%d",
+                    taskId,
+                    consumeMaxDurationMs,
+                    expectedRecords,
+                    expectedRecords - recordsRemainingLatch.getCount());
+            throw new DataException(msg);
+        }
+        log.debug("Task {} saw {} records, expected {} records", taskId, expectedRecords - recordsRemainingLatch.getCount(), expectedRecords);
+    }
+
+    @Override
+    public String toString() {
+        return "Handle{" +
+                "taskId='" + taskId + '\'' +
+                '}';
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 8fee4f4..d084e4c 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest;
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -174,7 +175,7 @@ public class RestServerTest {
         PowerMock.replayAll();
 
         server = new RestServer(workerConfig);
-        server.start(herder);
+        server.start(new HerderProvider(herder), herder.plugins());
 
         Response response = request("/connectors")
             .accept(MediaType.WILDCARD)
@@ -211,7 +212,7 @@ public class RestServerTest {
 
 
         server = new RestServer(workerConfig);
-        server.start(herder);
+        server.start(new HerderProvider(herder), herder.plugins());
 
         Response response = request("/connectors")
                 .header("Referer", origin + "/page")
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index ad360b6..64de29f 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,6 +30,7 @@ import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.TestSinkConnector;
 import org.apache.kafka.connect.runtime.TestSourceConnector;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -183,7 +184,7 @@ public class ConnectorPluginsResourceTest {
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
-        connectorPluginsResource = new ConnectorPluginsResource(herder);
+        connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder));
     }
 
     private void expectPlugins() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd25..5a52074 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,6 +23,7 @@ import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -126,7 +127,7 @@ public class ConnectorsResourceTest {
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
                 RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
-        connectorsResource = new ConnectorsResource(herder, null);
+        connectorsResource = new ConnectorsResource(new HerderProvider(herder), null);
     }
 
     private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index 4e928a3..be80e28 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -19,6 +19,7 @@ package org.apache.kafka.connect.runtime.rest.resources;
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -39,7 +40,7 @@ public class RootResourceTest extends EasyMockSupport {
 
     @Before
     public void setUp() {
-        rootResource = new RootResource(herder);
+        rootResource = new RootResource(new HerderProvider(herder));
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
new file mode 100644
index 0000000..9ba0e06
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.cli.ConnectDistributed;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_HOST_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_PORT_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
+
+/**
+ * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp
+ * directories and clean up them on them.
+ */
+public class EmbeddedConnectCluster {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class);
+
+    private static final int DEFAULT_NUM_BROKERS = 1;
+    private static final int DEFAULT_NUM_WORKERS = 1;
+    private static final Properties DEFAULT_BROKER_CONFIG = new Properties();
+    private static final String REST_HOST_NAME = "localhost";
+
+    private final Connect[] connectCluster;
+    private final EmbeddedKafkaCluster kafkaCluster;
+    private final Map<String, String> workerProps;
+    private final String connectClusterName;
+    private final int numBrokers;
+
+    private EmbeddedConnectCluster(String name, Map<String, String> workerProps, int numWorkers, int numBrokers, Properties brokerProps) {
+        this.workerProps = workerProps;
+        this.connectClusterName = name;
+        this.numBrokers = numBrokers;
+        this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps);
+        this.connectCluster = new Connect[numWorkers];
+    }
+
+    /**
+     * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
+     */
+    public void start() throws IOException {
+        kafkaCluster.before();
+        startConnect();
+    }
+
+    /**
+     * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
+     * Clean up any temp directories created locally.
+     */
+    public void stop() {
+        for (Connect worker : this.connectCluster) {
+            try {
+                worker.stop();
+            } catch (Exception e) {
+                log.error("Could not stop connect", e);
+                throw new RuntimeException("Could not stop worker", e);
+            }
+        }
+
+        try {
+            kafkaCluster.after();
+        } catch (Exception e) {
+            log.error("Could not stop kafka", e);
+            throw new RuntimeException("Could not stop brokers", e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public void startConnect() {
+        log.info("Starting Connect cluster with {} workers. clusterName {}", connectCluster.length, connectClusterName);
+
+        workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
+        workerProps.put(REST_HOST_NAME_CONFIG, REST_HOST_NAME);
+        workerProps.put(REST_PORT_CONFIG, "0"); // use a random available port
+
+        String internalTopicsReplFactor = String.valueOf(numBrokers);
+        putIfAbsent(workerProps, GROUP_ID_CONFIG, "connect-integration-test-" + connectClusterName);
+        putIfAbsent(workerProps, OFFSET_STORAGE_TOPIC_CONFIG, "connect-offset-topic-" + connectClusterName);
+        putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
+        putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
+        putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+
+        for (int i = 0; i < connectCluster.length; i++) {
+            connectCluster[i] = new ConnectDistributed().startConnect(workerProps);
+        }
+    }
+
+    /**
+     * Configure a connector. If the connector does not already exist, a new one will be created and
+     * the given configuration will be applied to it.
+     *
+     * @param connName   the name of the connector
+     * @param connConfig the intended configuration
+     * @throws IOException          if call to the REST api fails.
+     * @throws ConnectRestException if REST api returns error status
+     */
+    public void configureConnector(String connName, Map<String, String> connConfig) throws IOException {
+        String url = endpointForResource(String.format("connectors/%s/config", connName));
+        ObjectMapper mapper = new ObjectMapper();
+        int status;
+        try {
+            String content = mapper.writeValueAsString(connConfig);
+            status = executePut(url, content);
+        } catch (IOException e) {
+            log.error("Could not execute PUT request to " + url, e);
+            throw e;
+        }
+        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
+            throw new ConnectRestException(status, "Could not execute PUT request");
+        }
+    }
+
+    /**
+     * Delete an existing connector.
+     *
+     * @param connName name of the connector to be deleted
+     * @throws IOException if call to the REST api fails.
+     */
+    public void deleteConnector(String connName) throws IOException {
+        String url = endpointForResource(String.format("connectors/%s", connName));
+        int status = executeDelete(url);
+        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
+            throw new ConnectRestException(status, "Could not execute DELETE request.");
+        }
+    }
+
+    public ConnectorStateInfo connectorStatus(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s/status", connectorName));
+        try {
+            return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url));
+        } catch (IOException e) {
+            log.error("Could not read connector state", e);
+            throw new ConnectException("Could not read connector state", e);
+        }
+    }
+
+    private String endpointForResource(String resource) {
+        String url = String.valueOf(connectCluster[0].restUrl());
+        return url + resource;
+    }
+
+    private static void putIfAbsent(Map<String, String> props, String propertyKey, String propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+
+    public EmbeddedKafkaCluster kafka() {
+        return kafkaCluster;
+    }
+
+    public int executePut(String url, String body) throws IOException {
+        log.debug("Executing PUT request to URL={}. Payload={}", url, body);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestProperty("Content-Type", "application/json");
+        httpCon.setRequestMethod("PUT");
+        try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
+            out.write(body);
+        }
+        try (InputStream is = httpCon.getInputStream()) {
+            int c;
+            StringBuilder response = new StringBuilder();
+            while ((c = is.read()) != -1) {
+                response.append((char) c);
+            }
+            log.info("Put response for URL={} is {}", url, response);
+        }
+        return httpCon.getResponseCode();
+    }
+
+    public String executeGet(String url) throws IOException {
+        log.debug("Executing GET request to URL={}.", url);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestMethod("GET");
+        try (InputStream is = httpCon.getInputStream()) {
+            int c;
+            StringBuilder response = new StringBuilder();
+            while ((c = is.read()) != -1) {
+                response.append((char) c);
+            }
+            log.debug("Get response for URL={} is {}", url, response);
+            return response.toString();
+        }
+    }
+
+    public int executeDelete(String url) throws IOException {
+        log.debug("Executing DELETE request to URL={}", url);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestMethod("DELETE");
+        httpCon.connect();
+        return httpCon.getResponseCode();
+    }
+
+    public static class Builder {
+        private String name = UUID.randomUUID().toString();
+        private Map<String, String> workerProps = new HashMap<>();
+        private int numWorkers = DEFAULT_NUM_WORKERS;
+        private int numBrokers = DEFAULT_NUM_BROKERS;
+        private Properties brokerProps = DEFAULT_BROKER_CONFIG;
+
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public Builder workerProps(Map<String, String> workerProps) {
+            this.workerProps = workerProps;
+            return this;
+        }
+
+        public Builder numWorkers(int numWorkers) {
+            this.numWorkers = numWorkers;
+            return this;
+        }
+
+        public Builder numBrokers(int numBrokers) {
+            this.numBrokers = numBrokers;
+            return this;
+        }
+
+        public Builder brokerProps(Properties brokerProps) {
+            this.brokerProps = brokerProps;
+            return this;
+        }
+
+        public EmbeddedConnectCluster build() {
+            return new EmbeddedConnectCluster(name, workerProps, numWorkers, numBrokers, brokerProps);
+        }
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
new file mode 100644
index 0000000..109ba14
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.CoreUtils;
+import kafka.utils.TestUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
+ * integration tests.
+ */
+public class EmbeddedKafkaCluster extends ExternalResource {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 500;
+
+    // Kafka Config
+    private final KafkaServer[] brokers;
+    private final Properties brokerConfig;
+    private final Time time = new MockTime();
+
+    private EmbeddedZookeeper zookeeper = null;
+    private ListenerName listenerName = new ListenerName("PLAINTEXT");
+    private KafkaProducer<byte[], byte[]> producer;
+
+    public EmbeddedKafkaCluster(final int numBrokers,
+                                final Properties brokerConfig) {
+        brokers = new KafkaServer[numBrokers];
+        this.brokerConfig = brokerConfig;
+    }
+
+    @Override
+    protected void before() throws IOException {
+        start();
+    }
+
+    @Override
+    protected void after() {
+        stop();
+    }
+
+    private void start() throws IOException {
+        zookeeper = new EmbeddedZookeeper();
+
+        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), 0); // pick a random port
+
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+
+        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        if (listenerConfig != null) {
+            listenerName = new ListenerName(listenerConfig.toString());
+        }
+
+        for (int i = 0; i < brokers.length; i++) {
+            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), createLogDir());
+            brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time);
+        }
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producer = new KafkaProducer<>(producerProps);
+    }
+
+    private void stop() {
+
+        try {
+            producer.close();
+        } catch (Exception e) {
+            log.error("Could not shutdown producer ", e);
+            throw new RuntimeException("Could not shutdown producer", e);
+        }
+
+        for (KafkaServer broker : brokers) {
+            try {
+                broker.shutdown();
+            } catch (Throwable t) {
+                String msg = String.format("Could not shutdown broker at %s", address(broker));
+                log.error(msg, t);
+                throw new RuntimeException(msg, t);
+            }
+        }
+
+        for (KafkaServer broker : brokers) {
+            try {
+                log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs());
+                CoreUtils.delete(broker.config().logDirs());
+            } catch (Throwable t) {
+                String msg = String.format("Could not clean up log dirs for broker at %s", address(broker));
+                log.error(msg, t);
+                throw new RuntimeException(msg, t);
+            }
+        }
+
+        try {
+            zookeeper.shutdown();
+        } catch (Throwable t) {
+            String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString());
+            log.error(msg, t);
+            throw new RuntimeException(msg, t);
+        }
+    }
+
+    private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+
+    private String createLogDir() throws IOException {
+        TemporaryFolder tmpFolder = new TemporaryFolder();
+        tmpFolder.create();
+        return tmpFolder.newFolder().getAbsolutePath();
+    }
+
+    public String bootstrapServers() {
+        return Arrays.stream(brokers)
+                .map(this::address)
+                .collect(Collectors.joining(","));
+    }
+
+    public String address(KafkaServer server) {
+        return server.config().hostName() + ":" + server.boundPort(listenerName);
+    }
+
+    public String zKConnectString() {
+        return "127.0.0.1:" + zookeeper.port();
+    }
+
+    /**
+     * Create a Kafka topic with 1 partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic) {
+        createTopic(topic, 1);
+    }
+
+    /**
+     * Create a Kafka topic with given partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic, int partitions) {
+        createTopic(topic, partitions, 1, new HashMap<>());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (partitions of) this topic.
+     * @param topicConfig Additional topic-level configuration settings.
+     */
+    public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig) {
+        if (replication > brokers.length) {
+            throw new InvalidReplicationFactorException("Insufficient brokers ("
+                    + brokers.length + ") for desired replication (" + replication + ")");
+        }
+
+        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
+                topic, partitions, replication, topicConfig);
+        final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
+        newTopic.configs(topicConfig);
+
+        try (final AdminClient adminClient = createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void produce(String topic, String value) {
+        produce(topic, null, null, value);
+    }
+
+    public void produce(String topic, String key, String value) {
+        produce(topic, null, key, value);
+    }
+
+    public void produce(String topic, Integer partition, String key, String value) {
+        ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
+        try {
+            producer.send(msg).get(DEFAULT_PRODUCE_SEND_DURATION_MS, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            throw new KafkaException("Could not produce message to topic=" + topic, e);
+        }
+    }
+
+    public AdminClient createAdminClient() {
+        final Properties adminClientConfig = new Properties();
+        adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        final Object listeners = brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp());
+        if (listeners != null && listeners.toString().contains("SSL")) {
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+            adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        }
+        return AdminClient.create(adminClientConfig);
+    }
+
+    /**
+     * Consume at least n records in a given duration or throw an exception.
+     *
+     * @param n the number of expected records in this topic.
+     * @param maxDuration the max duration to wait for these records (in milliseconds).
+     * @param topics the topics to subscribe and consume records from.
+     * @return a {@link ConsumerRecords} collection containing at least n records.
+     */
+    public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, String... topics) {
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+        int consumedRecords = 0;
+        try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) {
+            final long startMillis = System.currentTimeMillis();
+            long allowedDuration = maxDuration;
+            while (allowedDuration > 0) {
+                log.debug("Consuming from {} for {} millis.", Arrays.toString(topics), allowedDuration);
+                ConsumerRecords<byte[], byte[]> rec = consumer.poll(Duration.ofMillis(allowedDuration));
+                if (rec.isEmpty()) {
+                    allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis);
+                    continue;
+                }
+                for (TopicPartition partition: rec.partitions()) {
+                    final List<ConsumerRecord<byte[], byte[]>> r = rec.records(partition);
+                    records.computeIfAbsent(partition, t -> new ArrayList<>()).addAll(r);
+                    consumedRecords += r.size();
+                }
+                if (consumedRecords >= n) {
+                    return new ConsumerRecords<>(records);
+                }
+                allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis);
+            }
+        }
+
+        throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n);
+    }
+
+    public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> consumerProps) {
+        Map<String, Object> props = new HashMap<>(consumerProps);
+
+        putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false");
+        putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest");
+        putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        KafkaConsumer<byte[], byte[]> consumer;
+        try {
+            consumer = new KafkaConsumer<>(props);
+        } catch (Throwable t) {
+            throw new ConnectException("Failed to create consumer", t);
+        }
+        return consumer;
+    }
+
+    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> consumerProps, String... topics) {
+        KafkaConsumer<byte[], byte[]> consumer = createConsumer(consumerProps);
+        consumer.subscribe(Arrays.asList(topics));
+        return consumer;
+    }
+
+    private static void putIfAbsent(final Map<String, Object> props, final String propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+}
diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties
index d5e90fe..1feedb8 100644
--- a/connect/runtime/src/test/resources/log4j.properties
+++ b/connect/runtime/src/test/resources/log4j.properties
@@ -18,6 +18,7 @@ log4j.rootLogger=OFF, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+log4j.appender.stdout.layout.ConversionPattern=[%d] (%t) %p %m (%c:%L)%n
 
+log4j.logger.org.reflections=ERROR
 log4j.logger.org.apache.kafka=ERROR


Mime
View raw message