kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [kafka] Diff for: [GitHub] ewencp closed pull request #5516: KAFKA-7503: Connect integration test harness
Date Mon, 14 Jan 2019 21:57:48 GMT
diff --git a/build.gradle b/build.gradle
index 75a4354cc94..4dbe7d7907c 100644
--- a/build.gradle
+++ b/build.gradle
@@ -1453,6 +1453,8 @@ project(':connect:runtime') {
     testCompile libs.powermockEasymock
 
     testCompile project(':clients').sourceSets.test.output
+    testCompile project(':core')
+    testCompile project(':core').sourceSets.test.output
 
     testRuntime libs.slf4jlog4j
   }
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index c69f94d25bb..8c98f8d4478 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -347,8 +347,6 @@
       </subpackage>
     </subpackage>
 
-
-
     <subpackage name="cli">
       <allow pkg="org.apache.kafka.connect.runtime" />
       <allow pkg="org.apache.kafka.connect.storage" />
@@ -366,6 +364,18 @@
       <allow pkg="org.reflections.vfs" />
       <!-- for annotations to avoid code duplication -->
       <allow pkg="com.fasterxml.jackson.annotation" />
+      <allow pkg="com.fasterxml.jackson.databind" />
+      <subpackage name="clusters">
+        <allow pkg="kafka.server" />
+        <allow pkg="kafka.zk" />
+        <allow pkg="kafka.utils" />
+        <allow class="javax.servlet.http.HttpServletResponse" />
+      </subpackage>
+    </subpackage>
+
+    <subpackage name="integration">
+      <allow pkg="org.apache.kafka.connect.util.clusters" />
+      <allow pkg="org.apache.kafka.connect" />
     </subpackage>
 
     <subpackage name="json">
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index dd43c37cadd..a6c6d98faca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -20,6 +20,7 @@
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.WorkerConfigTransformer;
 import org.apache.kafka.connect.runtime.WorkerInfo;
@@ -54,62 +55,26 @@
 public class ConnectDistributed {
     private static final Logger log = LoggerFactory.getLogger(ConnectDistributed.class);
 
+    private final Time time = Time.SYSTEM;
+    private final long initStart = time.hiResClockMs();
+
     public static void main(String[] args) {
+
         if (args.length < 1 || Arrays.asList(args).contains("--help")) {
             log.info("Usage: ConnectDistributed worker.properties");
             Exit.exit(1);
         }
 
         try {
-            Time time = Time.SYSTEM;
-            log.info("Kafka Connect distributed worker initializing ...");
-            long initStart = time.hiResClockMs();
             WorkerInfo initInfo = new WorkerInfo();
             initInfo.logAll();
 
             String workerPropsFile = args[0];
             Map<String, String> workerProps = !workerPropsFile.isEmpty() ?
-                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.<String, String>emptyMap();
-
-            log.info("Scanning for plugin classes. This might take a moment ...");
-            Plugins plugins = new Plugins(workerProps);
-            plugins.compareAndSwapWithDelegatingLoader();
-            DistributedConfig config = new DistributedConfig(workerProps);
-
-            String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
-            log.debug("Kafka cluster ID: {}", kafkaClusterId);
-
-            RestServer rest = new RestServer(config);
-            URI advertisedUrl = rest.advertisedUrl();
-            String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
-
-            KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
-            offsetBackingStore.configure(config);
-
-            Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
-            WorkerConfigTransformer configTransformer = worker.configTransformer();
-
-            Converter internalValueConverter = worker.getInternalValueConverter();
-            StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
-            statusBackingStore.configure(config);
-
-            ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
-                    internalValueConverter,
-                    config,
-                    configTransformer);
-
-            DistributedHerder herder = new DistributedHerder(config, time, worker,
-                    kafkaClusterId, statusBackingStore, configBackingStore,
-                    advertisedUrl.toString());
-            final Connect connect = new Connect(herder, rest);
-            log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
-            try {
-                connect.start();
-            } catch (Exception e) {
-                log.error("Failed to start Connect", e);
-                connect.stop();
-                Exit.exit(3);
-            }
+                    Utils.propsToStringMap(Utils.loadProps(workerPropsFile)) : Collections.emptyMap();
+
+            ConnectDistributed connectDistributed = new ConnectDistributed();
+            Connect connect = connectDistributed.startConnect(workerProps);
 
             // Shutdown will be triggered by Ctrl-C or via HTTP shutdown request
             connect.awaitStop();
@@ -119,4 +84,55 @@ public static void main(String[] args) {
             Exit.exit(2);
         }
     }
+
+    public Connect startConnect(Map<String, String> workerProps) {
+        log.info("Scanning for plugin classes. This might take a moment ...");
+        Plugins plugins = new Plugins(workerProps);
+        plugins.compareAndSwapWithDelegatingLoader();
+        DistributedConfig config = new DistributedConfig(workerProps);
+
+        String kafkaClusterId = ConnectUtils.lookupKafkaClusterId(config);
+        log.debug("Kafka cluster ID: {}", kafkaClusterId);
+
+        RestServer rest = new RestServer(config);
+        HerderProvider provider = new HerderProvider();
+        rest.start(provider, plugins);
+
+        URI advertisedUrl = rest.advertisedUrl();
+        String workerId = advertisedUrl.getHost() + ":" + advertisedUrl.getPort();
+
+        KafkaOffsetBackingStore offsetBackingStore = new KafkaOffsetBackingStore();
+        offsetBackingStore.configure(config);
+
+        Worker worker = new Worker(workerId, time, plugins, config, offsetBackingStore);
+        WorkerConfigTransformer configTransformer = worker.configTransformer();
+
+        Converter internalValueConverter = worker.getInternalValueConverter();
+        StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, internalValueConverter);
+        statusBackingStore.configure(config);
+
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(
+                internalValueConverter,
+                config,
+                configTransformer);
+
+        DistributedHerder herder = new DistributedHerder(config, time, worker,
+                kafkaClusterId, statusBackingStore, configBackingStore,
+                advertisedUrl.toString());
+
+        final Connect connect = new Connect(herder, rest);
+        log.info("Kafka Connect distributed worker initialization took {}ms", time.hiResClockMs() - initStart);
+        try {
+            connect.start();
+            // herder has initialized now, and ready to be used by the RestServer.
+            provider.setHerder(herder);
+        } catch (Exception e) {
+            log.error("Failed to start Connect", e);
+            connect.stop();
+            Exit.exit(3);
+        }
+
+        return connect;
+    }
+
 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 846ed1a8835..965046cccf0 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -20,6 +20,7 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.net.URI;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -50,7 +51,6 @@ public void start() {
             Runtime.getRuntime().addShutdownHook(shutdownHook);
 
             herder.start();
-            rest.start(herder);
 
             log.info("Kafka Connect started");
         } finally {
@@ -82,6 +82,11 @@ public void awaitStop() {
         }
     }
 
+    // Visible for testing
+    public URI restUrl() {
+        return rest.serverUrl();
+    }
+
     private class ShutdownHook extends Thread {
         @Override
         public void run() {
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
new file mode 100644
index 00000000000..42c0925a704
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/HerderProvider.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A supplier for {@link Herder}s.
+ */
+public class HerderProvider {
+
+    private final CountDownLatch initialized = new CountDownLatch(1);
+    volatile Herder herder = null;
+
+    public HerderProvider() {
+    }
+
+    /**
+     * Create a herder provider with a herder.
+     * @param herder the herder that will be supplied to threads waiting on this provider
+     */
+    public HerderProvider(Herder herder) {
+        this.herder = herder;
+        initialized.countDown();
+    }
+
+    /**
+     * @return the contained herder.
+     * @throws ConnectException if a herder was not available within a duration of calling this method
+     */
+    public Herder get() {
+        try {
+            // wait for herder to be initialized
+            if (!initialized.await(1, TimeUnit.MINUTES)) {
+                throw new ConnectException("Timed out waiting for herder to be initialized.");
+            }
+        } catch (InterruptedException e) {
+            throw new ConnectException("Interrupted while waiting for herder to be initialized.", e);
+        }
+        return herder;
+    }
+
+    /**
+     * @param herder set a herder, and signal to all threads waiting on get().
+     */
+    public void setHerder(Herder herder) {
+        this.herder = herder;
+        initialized.countDown();
+    }
+
+}
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
index a0f7fdeea9f..ea93a72d500 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/health/ConnectClusterStateImpl.java
@@ -22,7 +22,7 @@
 import org.apache.kafka.connect.health.ConnectorState;
 import org.apache.kafka.connect.health.ConnectorType;
 import org.apache.kafka.connect.health.TaskState;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
 import org.apache.kafka.connect.util.Callback;
 
@@ -34,16 +34,16 @@
 
 public class ConnectClusterStateImpl implements ConnectClusterState {
 
-    private Herder herder;
+    private HerderProvider herderProvider;
 
-    public ConnectClusterStateImpl(Herder herder) {
-        this.herder = herder;
+    public ConnectClusterStateImpl(HerderProvider herderProvider) {
+        this.herderProvider = herderProvider;
     }
 
     @Override
     public Collection<String> connectors() {
         final Collection<String> connectors = new ArrayList<>();
-        herder.connectors(new Callback<java.util.Collection<String>>() {
+        herderProvider.get().connectors(new Callback<java.util.Collection<String>>() {
             @Override
             public void onCompletion(Throwable error, Collection<String> result) {
                 connectors.addAll(result);
@@ -55,7 +55,7 @@ public void onCompletion(Throwable error, Collection<String> result) {
     @Override
     public ConnectorHealth connectorHealth(String connName) {
 
-        ConnectorStateInfo state = herder.connectorStatus(connName);
+        ConnectorStateInfo state = herderProvider.get().connectorStatus(connName);
         ConnectorState connectorState = new ConnectorState(
             state.connector().state(),
             state.connector().workerId(),
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
index c0d83f29103..5cc31cd5cca 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
@@ -17,14 +17,14 @@
 package org.apache.kafka.connect.runtime.rest;
 
 import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider;
-
 import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.connect.errors.ConnectException;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.rest.ConnectRestExtensionContext;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.health.ConnectClusterStateImpl;
+import org.apache.kafka.connect.runtime.isolation.Plugins;
 import org.apache.kafka.connect.runtime.rest.errors.ConnectExceptionMapper;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource;
 import org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource;
@@ -50,6 +50,8 @@
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.servlet.DispatcherType;
+import javax.ws.rs.core.UriBuilder;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -60,9 +62,6 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
-import javax.servlet.DispatcherType;
-import javax.ws.rs.core.UriBuilder;
-
 /**
  * Embedded server for the REST API that provides the control plane for Kafka Connect workers.
  */
@@ -161,20 +160,20 @@ public Connector createConnector(String listener) {
         return connector;
     }
 
-    public void start(Herder herder) {
+    public void start(HerderProvider herderProvider, Plugins plugins) {
         log.info("Starting REST server");
 
         ResourceConfig resourceConfig = new ResourceConfig();
         resourceConfig.register(new JacksonJsonProvider());
 
-        resourceConfig.register(new RootResource(herder));
-        resourceConfig.register(new ConnectorsResource(herder, config));
-        resourceConfig.register(new ConnectorPluginsResource(herder));
+        resourceConfig.register(new RootResource(herderProvider));
+        resourceConfig.register(new ConnectorsResource(herderProvider, config));
+        resourceConfig.register(new ConnectorPluginsResource(herderProvider));
 
         resourceConfig.register(ConnectExceptionMapper.class);
         resourceConfig.property(ServerProperties.WADL_FEATURE_DISABLE, true);
 
-        registerRestExtensions(herder, resourceConfig);
+        registerRestExtensions(herderProvider, plugins, resourceConfig);
 
         ServletContainer servletContainer = new ServletContainer(resourceConfig);
         ServletHolder servletHolder = new ServletHolder(servletContainer);
@@ -220,7 +219,9 @@ public void start(Herder herder) {
         log.info("REST server listening at " + jettyServer.getURI() + ", advertising URL " + advertisedUrl());
     }
 
-
+    public URI serverUrl() {
+        return jettyServer.getURI();
+    }
 
     public void stop() {
         log.info("Stopping REST server");
@@ -264,7 +265,7 @@ else if (serverConnector != null && serverConnector.getHost() != null && serverC
         Integer advertisedPort = config.getInt(WorkerConfig.REST_ADVERTISED_PORT_CONFIG);
         if (advertisedPort != null)
             builder.port(advertisedPort);
-        else if (serverConnector != null)
+        else if (serverConnector != null && serverConnector.getPort() > 0)
             builder.port(serverConnector.getPort());
 
         log.info("Advertised URI: {}", builder.build());
@@ -302,15 +303,15 @@ ServerConnector findConnector(String protocol) {
         return null;
     }
 
-    void registerRestExtensions(Herder herder, ResourceConfig resourceConfig) {
-        connectRestExtensions = herder.plugins().newPlugins(
+    void registerRestExtensions(HerderProvider provider, Plugins plugins, ResourceConfig resourceConfig) {
+        connectRestExtensions = plugins.newPlugins(
             config.getList(WorkerConfig.REST_EXTENSION_CLASSES_CONFIG),
             config, ConnectRestExtension.class);
 
         ConnectRestExtensionContext connectRestExtensionContext =
             new ConnectRestExtensionContextImpl(
                 new ConnectRestConfigurable(resourceConfig),
-                new ConnectClusterStateImpl(herder)
+                new ConnectClusterStateImpl(provider)
             );
         for (ConnectRestExtension connectRestExtension : connectRestExtensions) {
             connectRestExtension.register(connectRestExtensionContext);
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
index 80192ca0644..6280473af96 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ConnectorStateInfo.java
@@ -90,7 +90,10 @@ public String trace() {
     }
 
     public static class ConnectorState extends AbstractState {
-        public ConnectorState(String state, String worker, String msg) {
+        @JsonCreator
+        public ConnectorState(@JsonProperty("state") String state,
+                              @JsonProperty("worker_id") String worker,
+                              @JsonProperty("msg") String msg) {
             super(state, worker, msg);
         }
     }
@@ -98,7 +101,11 @@ public ConnectorState(String state, String worker, String msg) {
     public static class TaskState extends AbstractState implements Comparable<TaskState> {
         private final int id;
 
-        public TaskState(int id, String state, String worker, String msg) {
+        @JsonCreator
+        public TaskState(@JsonProperty("id") int id,
+                         @JsonProperty("state") String state,
+                         @JsonProperty("worker_id") String worker,
+                         @JsonProperty("msg") String msg) {
             super(state, worker, msg);
             this.id = id;
         }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
index 24eb93b8c0d..87f25b29cb5 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResource.java
@@ -18,7 +18,7 @@
 
 import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.isolation.PluginDesc;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigInfos;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
@@ -49,7 +49,7 @@
 public class ConnectorPluginsResource {
 
     private static final String ALIAS_SUFFIX = "Connector";
-    private final Herder herder;
+    private final HerderProvider herderProvider;
     private final List<ConnectorPluginInfo> connectorPlugins;
 
     private static final List<Class<? extends Connector>> CONNECTOR_EXCLUDES = Arrays.asList(
@@ -58,8 +58,8 @@
             SchemaSourceConnector.class
     );
 
-    public ConnectorPluginsResource(Herder herder) {
-        this.herder = herder;
+    public ConnectorPluginsResource(HerderProvider herderProvider) {
+        this.herderProvider = herderProvider;
         this.connectorPlugins = new ArrayList<>();
     }
 
@@ -78,7 +78,7 @@ public ConfigInfos validateConfigs(
             );
         }
 
-        return herder.validateConnectorConfig(connectorConfig);
+        return herderProvider.get().validateConnectorConfig(connectorConfig);
     }
 
     @GET
@@ -90,7 +90,7 @@ public ConfigInfos validateConfigs(
     // TODO: improve once plugins are allowed to be added/removed during runtime.
     private synchronized List<ConnectorPluginInfo> getConnectorPlugins() {
         if (connectorPlugins.isEmpty()) {
-            for (PluginDesc<Connector> plugin : herder.plugins().connectors()) {
+            for (PluginDesc<Connector> plugin : herderProvider.get().plugins().connectors()) {
                 if (!CONNECTOR_EXCLUDES.contains(plugin.pluginClass())) {
                     connectorPlugins.add(new ConnectorPluginInfo(plugin));
                 }
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
index e9661046d31..29a8c39028e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -19,6 +19,7 @@
 import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.RebalanceNeededException;
 import org.apache.kafka.connect.runtime.distributed.RequestTargetException;
@@ -67,21 +68,25 @@
     // but currently a worker simply leaving the group can take this long as well.
     private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
 
-    private final Herder herder;
+    private final HerderProvider herderProvider;
     private final WorkerConfig config;
     @javax.ws.rs.core.Context
     private ServletContext context;
 
-    public ConnectorsResource(Herder herder, WorkerConfig config) {
-        this.herder = herder;
+    public ConnectorsResource(HerderProvider herder, WorkerConfig config) {
+        this.herderProvider = herder;
         this.config = config;
     }
 
+    private Herder herder() {
+        return herderProvider.get();
+    }
+
     @GET
     @Path("/")
     public Collection<String> listConnectors(final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Collection<String>> cb = new FutureCallback<>();
-        herder.connectors(cb);
+        herder().connectors(cb);
         return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference<Collection<String>>() {
         }, forward);
     }
@@ -99,7 +104,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward,
         checkAndPutConnectorConfigName(name, configs);
 
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.putConnectorConfig(name, configs, false, cb);
+        herder().putConnectorConfig(name, configs, false, cb);
         Herder.Created<ConnectorInfo> info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
                 new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
 
@@ -112,7 +117,7 @@ public Response createConnector(final @QueryParam("forward") Boolean forward,
     public ConnectorInfo getConnector(final @PathParam("connector") String connector,
                                       final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<ConnectorInfo> cb = new FutureCallback<>();
-        herder.connectorInfo(connector, cb);
+        herder().connectorInfo(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, forward);
     }
 
@@ -121,14 +126,14 @@ public ConnectorInfo getConnector(final @PathParam("connector") String connector
     public Map<String, String> getConnectorConfig(final @PathParam("connector") String connector,
                                                   final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Map<String, String>> cb = new FutureCallback<>();
-        herder.connectorConfig(connector, cb);
+        herder().connectorConfig(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, forward);
     }
 
     @GET
     @Path("/{connector}/status")
     public ConnectorStateInfo getConnectorStatus(final @PathParam("connector") String connector) throws Throwable {
-        return herder.connectorStatus(connector);
+        return herder().connectorStatus(connector);
     }
 
     @PUT
@@ -139,7 +144,7 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
         checkAndPutConnectorConfigName(connector, connectorConfig);
 
-        herder.putConnectorConfig(connector, connectorConfig, true, cb);
+        herder().putConnectorConfig(connector, connectorConfig, true, cb);
         Herder.Created<ConnectorInfo> createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config",
                 "PUT", connectorConfig, new TypeReference<ConnectorInfo>() { }, new CreatedConnectorInfoTranslator(), forward);
         Response.ResponseBuilder response;
@@ -157,21 +162,21 @@ public Response putConnectorConfig(final @PathParam("connector") String connecto
     public void restartConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder.restartConnector(connector, cb);
+        herder().restartConnector(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/restart", "POST", null, forward);
     }
 
     @PUT
     @Path("/{connector}/pause")
     public Response pauseConnector(@PathParam("connector") String connector) {
-        herder.pauseConnector(connector);
+        herder().pauseConnector(connector);
         return Response.accepted().build();
     }
 
     @PUT
     @Path("/{connector}/resume")
     public Response resumeConnector(@PathParam("connector") String connector) {
-        herder.resumeConnector(connector);
+        herder().resumeConnector(connector);
         return Response.accepted().build();
     }
 
@@ -180,7 +185,7 @@ public Response resumeConnector(@PathParam("connector") String connector) {
     public List<TaskInfo> getTaskConfigs(final @PathParam("connector") String connector,
                                          final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<List<TaskInfo>> cb = new FutureCallback<>();
-        herder.taskConfigs(connector, cb);
+        herder().taskConfigs(connector, cb);
         return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference<List<TaskInfo>>() {
         }, forward);
     }
@@ -191,7 +196,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector,
                                final @QueryParam("forward") Boolean forward,
                                final List<Map<String, String>> taskConfigs) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
-        herder.putTaskConfigs(connector, taskConfigs, cb);
+        herder().putTaskConfigs(connector, taskConfigs, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs, forward);
     }
 
@@ -199,7 +204,7 @@ public void putTaskConfigs(final @PathParam("connector") String connector,
     @Path("/{connector}/tasks/{task}/status")
     public ConnectorStateInfo.TaskState getTaskStatus(final @PathParam("connector") String connector,
                                                       final @PathParam("task") Integer task) throws Throwable {
-        return herder.taskStatus(new ConnectorTaskId(connector, task));
+        return herder().taskStatus(new ConnectorTaskId(connector, task));
     }
 
     @POST
@@ -209,7 +214,7 @@ public void restartTask(final @PathParam("connector") String connector,
                             final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Void> cb = new FutureCallback<>();
         ConnectorTaskId taskId = new ConnectorTaskId(connector, task);
-        herder.restartTask(taskId, cb);
+        herder().restartTask(taskId, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks/" + task + "/restart", "POST", null, forward);
     }
 
@@ -218,7 +223,7 @@ public void restartTask(final @PathParam("connector") String connector,
     public void destroyConnector(final @PathParam("connector") String connector,
                                  final @QueryParam("forward") Boolean forward) throws Throwable {
         FutureCallback<Herder.Created<ConnectorInfo>> cb = new FutureCallback<>();
-        herder.deleteConnectorConfig(connector, cb);
+        herder().deleteConnectorConfig(connector, cb);
         completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null, forward);
     }
 
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
index 9666bf15954..56516cd4109 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java
@@ -16,7 +16,7 @@
  */
 package org.apache.kafka.connect.runtime.rest.resources;
 
-import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 
 import javax.ws.rs.GET;
@@ -28,15 +28,15 @@
 @Produces(MediaType.APPLICATION_JSON)
 public class RootResource {
 
-    private final Herder herder;
+    private final HerderProvider herder;
 
-    public RootResource(Herder herder) {
+    public RootResource(HerderProvider herder) {
         this.herder = herder;
     }
 
     @GET
     @Path("/")
     public ServerInfo serverInfo() {
-        return new ServerInfo(herder.kafkaClusterId());
+        return new ServerInfo(herder.get().kafkaClusterId());
     }
 }
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
new file mode 100644
index 00000000000..e59691b843d
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ConnectorHandle.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * A handle to a connector executing in a Connect cluster.
+ */
+public class ConnectorHandle {
+
+    private static final Logger log = LoggerFactory.getLogger(ConnectorHandle.class);
+
+    private final String connectorName;
+    private final Map<String, TaskHandle> taskHandles = new ConcurrentHashMap<>();
+
+    private CountDownLatch recordsRemainingLatch;
+    private int expectedRecords = -1;
+
+    public ConnectorHandle(String connectorName) {
+        this.connectorName = connectorName;
+    }
+
+    /**
+     * Get or create a task handle for a given task id. The task need not be created when this method is called. If the
+     * handle is called before the task is created, the task will bind to the handle once it starts (or restarts).
+     *
+     * @param taskId the task id
+     * @return a non-null {@link TaskHandle}
+     */
+    public TaskHandle taskHandle(String taskId) {
+        return taskHandles.computeIfAbsent(taskId, k -> new TaskHandle(this, taskId));
+    }
+
+    public Collection<TaskHandle> tasks() {
+        return taskHandles.values();
+    }
+
+    /**
+     * Delete the task handle for this task id.
+     *
+     * @param taskId the task id.
+     */
+    public void deleteTask(String taskId) {
+        log.info("Removing handle for {} task in connector {}", taskId, connectorName);
+        taskHandles.remove(taskId);
+    }
+
+    /**
+     * Set the number of expected records for this task.
+     *
+     * @param expectedRecords number of records
+     */
+    public void expectedRecords(int expectedRecords) {
+        this.expectedRecords = expectedRecords;
+        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    }
+
+    /**
+     * Record a message arrival at the connector.
+     */
+    public void record() {
+        if (recordsRemainingLatch != null) {
+            recordsRemainingLatch.countDown();
+        }
+    }
+
+    /**
+     * Wait for this task to receive the expected number of records.
+     *
+     * @param consumeMaxDurationMs max duration to wait for records
+     * @throws InterruptedException if another threads interrupts this one while waiting for records
+     */
+    public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException {
+        if (recordsRemainingLatch == null || expectedRecords < 0) {
+            throw new IllegalStateException("expectedRecords() was not set for this task?");
+        }
+        if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) {
+            String msg = String.format("Insufficient records seen by connector %s in %d millis. Records expected=%d, actual=%d",
+                    connectorName,
+                    consumeMaxDurationMs,
+                    expectedRecords,
+                    expectedRecords - recordsRemainingLatch.getCount());
+            throw new DataException(msg);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return "ConnectorHandle{" +
+                "connectorName='" + connectorName + '\'' +
+                '}';
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
new file mode 100644
index 00000000000..af3ab4421a3
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ErrorHandlingIntegrationTest.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.errors.RetriableException;
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.transforms.Transformation;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_LOG_INCLUDE_MESSAGES_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_RETRY_TIMEOUT_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.ERRORS_TOLERANCE_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TRANSFORMS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_CONTEXT_HEADERS_ENABLE_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.DLQ_TOPIC_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_EXCEPTION_MESSAGE;
+import static org.apache.kafka.connect.runtime.errors.DeadLetterQueueReporter.ERROR_HEADER_ORIG_TOPIC;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * Integration test for the different error handling policies in Connect (namely, retry policies, skipping bad records,
+ * and dead letter queues).
+ */
+@Category(IntegrationTest.class)
+public class ErrorHandlingIntegrationTest {
+
+    private static final Logger log = LoggerFactory.getLogger(ErrorHandlingIntegrationTest.class);
+
+    private static final String DLQ_TOPIC = "my-connector-errors";
+    private static final String CONNECTOR_NAME = "error-conn";
+    private static final String TASK_ID = "error-conn-0";
+    private static final int NUM_RECORDS_PRODUCED = 20;
+    private static final int EXPECTED_CORRECT_RECORDS = 19;
+    private static final int EXPECTED_INCORRECT_RECORDS = 1;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 5000;
+    private static final int CONSUME_MAX_DURATION_MS = 5000;
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect cluster with defaults
+        connect = new EmbeddedConnectCluster.Builder().build();
+
+        // start Connect cluster
+        connect.start();
+
+        // get connector handles before starting test.
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+        connect.stop();
+    }
+
+    @Test
+    public void testSkipRetryAndDLQWithHeaders() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic");
+
+        // setup connector config
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(TASKS_MAX_CONFIG, "1");
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(TRANSFORMS_CONFIG, "failing_transform");
+        props.put("transforms.failing_transform.type", FaultyPassthrough.class.getName());
+
+        // log all errors, along with message metadata
+        props.put(ERRORS_LOG_ENABLE_CONFIG, "true");
+        props.put(ERRORS_LOG_INCLUDE_MESSAGES_CONFIG, "true");
+
+        // produce bad messages into dead letter queue
+        props.put(DLQ_TOPIC_NAME_CONFIG, DLQ_TOPIC);
+        props.put(DLQ_CONTEXT_HEADERS_ENABLE_CONFIG, "true");
+        props.put(DLQ_TOPIC_REPLICATION_FACTOR_CONFIG, "1");
+
+        // tolerate all erros
+        props.put(ERRORS_TOLERANCE_CONFIG, "all");
+
+        // retry for up to one second
+        props.put(ERRORS_RETRY_TIMEOUT_CONFIG, "1000");
+
+        // set expected records to successfully reach the task
+        connectorHandle.taskHandle(TASK_ID).expectedRecords(EXPECTED_CORRECT_RECORDS);
+
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 1
+                        && connectorHandle.taskHandle(TASK_ID).partitionsAssigned() == 1,
+                CONNECTOR_SETUP_DURATION_MS,
+                "Connector task was not assigned a partition.");
+
+        // produce some strings into test topic
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", "key-" + i, "value-" + i);
+        }
+
+        // consume all records from test topic
+        log.info("Consuming records from test topic");
+        int i = 0;
+        for (ConsumerRecord<byte[], byte[]> rec : connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic")) {
+            String k = new String(rec.key());
+            String v = new String(rec.value());
+            log.debug("Consumed record (key='{}', value='{}') from topic {}", k, v, rec.topic());
+            assertEquals("Unexpected key", k, "key-" + i);
+            assertEquals("Unexpected value", v, "value-" + i);
+            i++;
+        }
+
+        // wait for records to reach the task
+        connectorHandle.taskHandle(TASK_ID).awaitRecords(CONSUME_MAX_DURATION_MS);
+
+        // consume failed records from dead letter queue topic
+        log.info("Consuming records from test topic");
+        ConsumerRecords<byte[], byte[]> messages = connect.kafka().consume(EXPECTED_INCORRECT_RECORDS, CONSUME_MAX_DURATION_MS, DLQ_TOPIC);
+        for (ConsumerRecord<byte[], byte[]> recs : messages) {
+            log.debug("Consumed record (key={}, value={}) from dead letter queue topic {}",
+                    new String(recs.key()), new String(recs.value()), DLQ_TOPIC);
+            assertTrue(recs.headers().toArray().length > 0);
+            assertValue("test-topic", recs.headers(), ERROR_HEADER_ORIG_TOPIC);
+            assertValue(RetriableException.class.getName(), recs.headers(), ERROR_HEADER_EXCEPTION);
+            assertValue("Error when value='value-7'", recs.headers(), ERROR_HEADER_EXCEPTION_MESSAGE);
+        }
+
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+
+    private void assertValue(String expected, Headers headers, String headerKey) {
+        byte[] actual = headers.lastHeader(headerKey).value();
+        if (expected == null && actual == null) {
+            return;
+        }
+        if (expected == null || actual == null) {
+            fail();
+        }
+        assertEquals(expected, new String(actual));
+    }
+
+    public static class FaultyPassthrough<R extends ConnectRecord<R>> implements Transformation<R> {
+
+        static final ConfigDef CONFIG_DEF = new ConfigDef();
+
+        /**
+         * An arbitrary id which causes this transformation to fail with a {@link RetriableException}, but succeeds
+         * on subsequent attempt.
+         */
+        static final int BAD_RECORD_VAL_RETRIABLE = 4;
+
+        /**
+         * An arbitrary id which causes this transformation to fail with a {@link RetriableException}.
+         */
+        static final int BAD_RECORD_VAL = 7;
+
+        private boolean shouldFail = true;
+
+        @Override
+        public R apply(R record) {
+            String badValRetriable = "value-" + BAD_RECORD_VAL_RETRIABLE;
+            if (badValRetriable.equals(record.value()) && shouldFail) {
+                shouldFail = false;
+                throw new RetriableException("Error when value='" + badValRetriable
+                        + "'. A reattempt with this record will succeed.");
+            }
+            String badVal = "value-" + BAD_RECORD_VAL;
+            if (badVal.equals(record.value())) {
+                throw new RetriableException("Error when value='" + badVal + "'");
+            }
+            return record;
+        }
+
+        @Override
+        public ConfigDef config() {
+            return CONFIG_DEF;
+        }
+
+        @Override
+        public void close() {
+        }
+
+        @Override
+        public void configure(Map<String, ?> configs) {
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
new file mode 100644
index 00000000000..5d887cf4cbf
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/ExampleConnectIntegrationTest.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.storage.StringConverter;
+import org.apache.kafka.connect.util.clusters.EmbeddedConnectCluster;
+import org.apache.kafka.test.IntegrationTest;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.kafka.connect.runtime.ConnectorConfig.CONNECTOR_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.TASKS_MAX_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.SinkConnectorConfig.TOPICS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.OFFSET_COMMIT_INTERVAL_MS_CONFIG;
+import static org.apache.kafka.test.TestUtils.waitForCondition;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * An example integration test that demonstrates how to setup an integration test for Connect.
+ * <p></p>
+ * The following test configures and executes up a sink connector pipeline in a worker, produces messages into
+ * the source topic-partitions, and demonstrates how to check the overall behavior of the pipeline.
+ */
+@Category(IntegrationTest.class)
+public class ExampleConnectIntegrationTest {
+
+    private static final int NUM_RECORDS_PRODUCED = 2000;
+    private static final int NUM_TOPIC_PARTITIONS = 3;
+    private static final int CONSUME_MAX_DURATION_MS = 5000;
+    private static final int CONNECTOR_SETUP_DURATION_MS = 15000;
+    private static final String CONNECTOR_NAME = "simple-conn";
+
+    private EmbeddedConnectCluster connect;
+    private ConnectorHandle connectorHandle;
+
+    @Before
+    public void setup() throws IOException {
+        // setup Connect worker properties
+        Map<String, String> exampleWorkerProps = new HashMap<>();
+        exampleWorkerProps.put(OFFSET_COMMIT_INTERVAL_MS_CONFIG, "30000");
+
+        // setup Kafka broker properties
+        Properties exampleBrokerProps = new Properties();
+        exampleBrokerProps.put("auto.create.topics.enable", "false");
+
+        // build a Connect cluster backed by Kakfa and Zk
+        connect = new EmbeddedConnectCluster.Builder()
+                .name("example-cluster")
+                .numWorkers(3)
+                .numBrokers(1)
+                .workerProps(exampleWorkerProps)
+                .brokerProps(exampleBrokerProps)
+                .build();
+
+        // start the clusters
+        connect.start();
+
+        // get a handle to the connector
+        connectorHandle = RuntimeHandles.get().connectorHandle(CONNECTOR_NAME);
+    }
+
+    @After
+    public void close() {
+        // delete connector handle
+        RuntimeHandles.get().deleteConnector(CONNECTOR_NAME);
+
+        // stop all Connect, Kakfa and Zk threads.
+        connect.stop();
+    }
+
+    /**
+     * Simple test case to configure and execute an embedded Connect cluster. The test will produce and consume
+     * records, and start up a sink connector which will consume these records.
+     */
+    @Test
+    public void testProduceConsumeConnector() throws Exception {
+        // create test topic
+        connect.kafka().createTopic("test-topic", NUM_TOPIC_PARTITIONS);
+
+        // setup up props for the sink connector
+        Map<String, String> props = new HashMap<>();
+        props.put(CONNECTOR_CLASS_CONFIG, "MonitorableSink");
+        props.put(TASKS_MAX_CONFIG, "3");
+        props.put(TOPICS_CONFIG, "test-topic");
+        props.put(KEY_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+        props.put(VALUE_CONVERTER_CLASS_CONFIG, StringConverter.class.getName());
+
+        // expect all records to be consumed by the connector
+        connectorHandle.expectedRecords(NUM_RECORDS_PRODUCED);
+
+        // start a sink connector
+        connect.configureConnector(CONNECTOR_NAME, props);
+
+        waitForCondition(() -> connect.connectorStatus(CONNECTOR_NAME).tasks().size() == 3
+                        && connectorHandle.tasks().stream().allMatch(th -> th.partitionsAssigned() == 1),
+                CONNECTOR_SETUP_DURATION_MS,
+                "Connector tasks were not assigned a partition each.");
+
+        // produce some messages into source topic partitions
+        for (int i = 0; i < NUM_RECORDS_PRODUCED; i++) {
+            connect.kafka().produce("test-topic", i % NUM_TOPIC_PARTITIONS, "key", "simple-message-value-" + i);
+        }
+
+        // consume all records from the source topic or fail, to ensure that they were correctly produced.
+        assertEquals("Unexpected number of records consumed", NUM_RECORDS_PRODUCED,
+                connect.kafka().consume(NUM_RECORDS_PRODUCED, CONSUME_MAX_DURATION_MS, "test-topic").count());
+
+        // wait for the connector tasks to consume all records.
+        connectorHandle.awaitRecords(CONSUME_MAX_DURATION_MS);
+
+        // delete connector
+        connect.deleteConnector(CONNECTOR_NAME);
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
new file mode 100644
index 00000000000..23a8d99e84e
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/MonitorableSinkConnector.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.connect.connector.Task;
+import org.apache.kafka.connect.runtime.TestSinkConnector;
+import org.apache.kafka.connect.sink.SinkRecord;
+import org.apache.kafka.connect.sink.SinkTask;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A connector to be used in integration tests. This class provides methods to find task instances
+ * which are initiated by the embedded connector, and wait for them to consume a desired number of
+ * messages.
+ */
+public class MonitorableSinkConnector extends TestSinkConnector {
+
+    private static final Logger log = LoggerFactory.getLogger(MonitorableSinkConnector.class);
+
+    private String connectorName;
+
+    @Override
+    public void start(Map<String, String> props) {
+        connectorName = props.get("name");
+        log.info("Starting connector {}", props.get("name"));
+    }
+
+    @Override
+    public Class<? extends Task> taskClass() {
+        return MonitorableSinkTask.class;
+    }
+
+    @Override
+    public List<Map<String, String>> taskConfigs(int maxTasks) {
+        List<Map<String, String>> configs = new ArrayList<>();
+        for (int i = 0; i < maxTasks; i++) {
+            Map<String, String> config = new HashMap<>();
+            config.put("connector.name", connectorName);
+            config.put("task.id", connectorName + "-" + i);
+            configs.add(config);
+        }
+        return configs;
+    }
+
+    @Override
+    public void stop() {
+    }
+
+    @Override
+    public ConfigDef config() {
+        return new ConfigDef();
+    }
+
+    public static class MonitorableSinkTask extends SinkTask {
+
+        private String connectorName;
+        private String taskId;
+        private TaskHandle taskHandle;
+
+        @Override
+        public String version() {
+            return "unknown";
+        }
+
+        @Override
+        public void start(Map<String, String> props) {
+            taskId = props.get("task.id");
+            connectorName = props.get("connector.name");
+            taskHandle = RuntimeHandles.get().connectorHandle(connectorName).taskHandle(taskId);
+            log.debug("Starting task {}", taskId);
+        }
+
+        @Override
+        public void open(Collection<TopicPartition> partitions) {
+            log.debug("Opening {} partitions", partitions.size());
+            super.open(partitions);
+            taskHandle.partitionsAssigned(partitions.size());
+        }
+
+        @Override
+        public void put(Collection<SinkRecord> records) {
+            for (SinkRecord rec : records) {
+                taskHandle.record();
+                log.trace("Task {} obtained record (key='{}' value='{}')", taskId, rec.key(), rec.value());
+            }
+        }
+
+        @Override
+        public void stop() {
+        }
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java
new file mode 100644
index 00000000000..c9900f3a7fb
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/RuntimeHandles.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A singleton class which provides a shared class for {@link ConnectorHandle}s and {@link TaskHandle}s that are
+ * required for integration tests.
+ */
+public class RuntimeHandles {
+
+    private static final RuntimeHandles INSTANCE = new RuntimeHandles();
+
+    private final Map<String, ConnectorHandle> connectorHandles = new ConcurrentHashMap<>();
+
+    private RuntimeHandles() {
+    }
+
+    /**
+     * @return the shared {@link RuntimeHandles} instance.
+     */
+    public static RuntimeHandles get() {
+        return INSTANCE;
+    }
+
+    /**
+     * Get or create a connector handle for a given connector name. The connector need not be running at the time
+     * this method is called. Once the connector is created, it will bind to this handle. Binding happens with the
+     * connectorName.
+     *
+     * @param connectorName the name of the connector
+     * @return a non-null {@link ConnectorHandle}
+     */
+    public ConnectorHandle connectorHandle(String connectorName) {
+        return connectorHandles.computeIfAbsent(connectorName, k -> new ConnectorHandle(connectorName));
+    }
+
+    /**
+     * Delete the connector handle for this connector name.
+     *
+     * @param connectorName name of the connector
+     */
+    public void deleteConnector(String connectorName) {
+        connectorHandles.remove(connectorName);
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
new file mode 100644
index 00000000000..de3d9240d1b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/integration/TaskHandle.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.integration;
+
+import org.apache.kafka.connect.errors.DataException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * A handle to an executing task in a worker. Use this class to record progress, for example: number of records seen
+ * by the task using so far, or waiting for partitions to be assigned to the task.
+ */
+public class TaskHandle {
+
+    private static final Logger log = LoggerFactory.getLogger(TaskHandle.class);
+
+    private final String taskId;
+    private final ConnectorHandle connectorHandle;
+    private final AtomicInteger partitionsAssigned = new AtomicInteger(0);
+
+    private CountDownLatch recordsRemainingLatch;
+    private int expectedRecords = -1;
+
+    public TaskHandle(ConnectorHandle connectorHandle, String taskId) {
+        log.info("Created task {} for connector {}", taskId, connectorHandle);
+        this.taskId = taskId;
+        this.connectorHandle = connectorHandle;
+    }
+
+    /**
+     * Record a message arrival at the task.
+     */
+    public void record() {
+        if (recordsRemainingLatch != null) {
+            recordsRemainingLatch.countDown();
+        }
+        connectorHandle.record();
+    }
+
+    /**
+     * Set the number of expected records for this task.
+     *
+     * @param expectedRecords number of records
+     */
+    public void expectedRecords(int expectedRecords) {
+        this.expectedRecords = expectedRecords;
+        this.recordsRemainingLatch = new CountDownLatch(expectedRecords);
+    }
+
+    /**
+     * Set the number of partitions assigned to this task.
+     *
+     * @param numPartitions number of partitions
+     */
+    public void partitionsAssigned(int numPartitions) {
+        partitionsAssigned.set(numPartitions);
+    }
+
+    /**
+     * @return the number of topic partitions assigned to this task.
+     */
+    public int partitionsAssigned() {
+        return partitionsAssigned.get();
+    }
+
+    /**
+     * Wait for this task to receive the expected number of records.
+     *
+     * @param consumeMaxDurationMs max duration to wait for records
+     * @throws InterruptedException if another threads interrupts this one while waiting for records
+     */
+    public void awaitRecords(int consumeMaxDurationMs) throws InterruptedException {
+        if (recordsRemainingLatch == null) {
+            throw new IllegalStateException("Illegal state encountered. expectedRecords() was not set for this task?");
+        }
+        if (!recordsRemainingLatch.await(consumeMaxDurationMs, TimeUnit.MILLISECONDS)) {
+            String msg = String.format("Insufficient records seen by task %s in %d millis. Records expected=%d, actual=%d",
+                    taskId,
+                    consumeMaxDurationMs,
+                    expectedRecords,
+                    expectedRecords - recordsRemainingLatch.getCount());
+            throw new DataException(msg);
+        }
+        log.debug("Task {} saw {} records, expected {} records", taskId, expectedRecords - recordsRemainingLatch.getCount(), expectedRecords);
+    }
+
+    @Override
+    public String toString() {
+        return "Handle{" +
+                "taskId='" + taskId + '\'' +
+                '}';
+    }
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
index 39e35cb01df..d5802cb5222 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.connect.rest.ConnectRestExtension;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.isolation.Plugins;
@@ -177,7 +178,7 @@ public void testOptionsDoesNotIncludeWadlOutput() {
         PowerMock.replayAll();
 
         server = new RestServer(workerConfig);
-        server.start(herder);
+        server.start(new HerderProvider(herder), herder.plugins());
 
         Response response = request("/connectors")
             .accept(MediaType.WILDCARD)
@@ -214,7 +215,7 @@ public void checkCORSRequest(String corsDomain, String origin, String expectedHe
 
 
         server = new RestServer(workerConfig);
-        server.start(herder);
+        server.start(new HerderProvider(herder), herder.plugins());
 
         Response response = request("/connectors")
                 .header("Referer", origin + "/page")
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
index a3aee6a407d..684064d30df 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorPluginsResourceTest.java
@@ -30,6 +30,7 @@
 import org.apache.kafka.connect.runtime.AbstractHerder;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.TestSinkConnector;
 import org.apache.kafka.connect.runtime.TestSourceConnector;
 import org.apache.kafka.connect.runtime.WorkerConfig;
@@ -186,7 +187,7 @@ public void setUp() throws Exception {
 
         plugins = PowerMock.createMock(Plugins.class);
         herder = PowerMock.createMock(AbstractHerder.class);
-        connectorPluginsResource = new ConnectorPluginsResource(herder);
+        connectorPluginsResource = new ConnectorPluginsResource(new HerderProvider(herder));
     }
 
     private void expectPlugins() {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
index f84cd258fd4..5a520744bcd 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResourceTest.java
@@ -23,6 +23,7 @@
 import org.apache.kafka.connect.errors.NotFoundException;
 import org.apache.kafka.connect.runtime.ConnectorConfig;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.WorkerConfig;
 import org.apache.kafka.connect.runtime.distributed.NotAssignedException;
 import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
@@ -126,7 +127,7 @@
     public void setUp() throws NoSuchMethodException {
         PowerMock.mockStatic(RestClient.class,
                 RestClient.class.getMethod("httpRequest", String.class, String.class, Object.class, TypeReference.class, WorkerConfig.class));
-        connectorsResource = new ConnectorsResource(herder, null);
+        connectorsResource = new ConnectorsResource(new HerderProvider(herder), null);
     }
 
     private static final Map<String, String> getConnectorConfig(Map<String, String> mapToClone) {
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
index 4e928a37037..be80e28f42d 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/resources/RootResourceTest.java
@@ -19,6 +19,7 @@
 import org.apache.kafka.clients.admin.MockAdminClient;
 import org.apache.kafka.common.utils.AppInfoParser;
 import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.HerderProvider;
 import org.apache.kafka.connect.runtime.rest.entities.ServerInfo;
 import org.easymock.EasyMock;
 import org.easymock.EasyMockRunner;
@@ -39,7 +40,7 @@
 
     @Before
     public void setUp() {
-        rootResource = new RootResource(herder);
+        rootResource = new RootResource(new HerderProvider(herder));
     }
 
     @Test
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
new file mode 100644
index 00000000000..9ba0e06bfd0
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedConnectCluster.java
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.kafka.connect.cli.ConnectDistributed;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.runtime.Connect;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.servlet.http.HttpServletResponse;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStreamWriter;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.KEY_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.ConnectorConfig.VALUE_CONVERTER_CLASS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_HOST_NAME_CONFIG;
+import static org.apache.kafka.connect.runtime.WorkerConfig.REST_PORT_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.CONFIG_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_REPLICATION_FACTOR_CONFIG;
+import static org.apache.kafka.connect.runtime.distributed.DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG;
+
+/**
+ * Start an embedded connect worker. Internally, this class will spin up a Kafka and Zk cluster, setup any tmp
+ * directories and clean up them on them.
+ */
+public class EmbeddedConnectCluster {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedConnectCluster.class);
+
+    private static final int DEFAULT_NUM_BROKERS = 1;
+    private static final int DEFAULT_NUM_WORKERS = 1;
+    private static final Properties DEFAULT_BROKER_CONFIG = new Properties();
+    private static final String REST_HOST_NAME = "localhost";
+
+    private final Connect[] connectCluster;
+    private final EmbeddedKafkaCluster kafkaCluster;
+    private final Map<String, String> workerProps;
+    private final String connectClusterName;
+    private final int numBrokers;
+
+    private EmbeddedConnectCluster(String name, Map<String, String> workerProps, int numWorkers, int numBrokers, Properties brokerProps) {
+        this.workerProps = workerProps;
+        this.connectClusterName = name;
+        this.numBrokers = numBrokers;
+        this.kafkaCluster = new EmbeddedKafkaCluster(numBrokers, brokerProps);
+        this.connectCluster = new Connect[numWorkers];
+    }
+
+    /**
+     * Start the connect cluster and the embedded Kafka and Zookeeper cluster.
+     */
+    public void start() throws IOException {
+        kafkaCluster.before();
+        startConnect();
+    }
+
+    /**
+     * Stop the connect cluster and the embedded Kafka and Zookeeper cluster.
+     * Clean up any temp directories created locally.
+     */
+    public void stop() {
+        for (Connect worker : this.connectCluster) {
+            try {
+                worker.stop();
+            } catch (Exception e) {
+                log.error("Could not stop connect", e);
+                throw new RuntimeException("Could not stop worker", e);
+            }
+        }
+
+        try {
+            kafkaCluster.after();
+        } catch (Exception e) {
+            log.error("Could not stop kafka", e);
+            throw new RuntimeException("Could not stop brokers", e);
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public void startConnect() {
+        log.info("Starting Connect cluster with {} workers. clusterName {}", connectCluster.length, connectClusterName);
+
+        workerProps.put(BOOTSTRAP_SERVERS_CONFIG, kafka().bootstrapServers());
+        workerProps.put(REST_HOST_NAME_CONFIG, REST_HOST_NAME);
+        workerProps.put(REST_PORT_CONFIG, "0"); // use a random available port
+
+        String internalTopicsReplFactor = String.valueOf(numBrokers);
+        putIfAbsent(workerProps, GROUP_ID_CONFIG, "connect-integration-test-" + connectClusterName);
+        putIfAbsent(workerProps, OFFSET_STORAGE_TOPIC_CONFIG, "connect-offset-topic-" + connectClusterName);
+        putIfAbsent(workerProps, OFFSET_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, CONFIG_TOPIC_CONFIG, "connect-config-topic-" + connectClusterName);
+        putIfAbsent(workerProps, CONFIG_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, STATUS_STORAGE_TOPIC_CONFIG, "connect-storage-topic-" + connectClusterName);
+        putIfAbsent(workerProps, STATUS_STORAGE_REPLICATION_FACTOR_CONFIG, internalTopicsReplFactor);
+        putIfAbsent(workerProps, KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+        putIfAbsent(workerProps, VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.storage.StringConverter");
+
+        for (int i = 0; i < connectCluster.length; i++) {
+            connectCluster[i] = new ConnectDistributed().startConnect(workerProps);
+        }
+    }
+
+    /**
+     * Configure a connector. If the connector does not already exist, a new one will be created and
+     * the given configuration will be applied to it.
+     *
+     * @param connName   the name of the connector
+     * @param connConfig the intended configuration
+     * @throws IOException          if call to the REST api fails.
+     * @throws ConnectRestException if REST api returns error status
+     */
+    public void configureConnector(String connName, Map<String, String> connConfig) throws IOException {
+        String url = endpointForResource(String.format("connectors/%s/config", connName));
+        ObjectMapper mapper = new ObjectMapper();
+        int status;
+        try {
+            String content = mapper.writeValueAsString(connConfig);
+            status = executePut(url, content);
+        } catch (IOException e) {
+            log.error("Could not execute PUT request to " + url, e);
+            throw e;
+        }
+        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
+            throw new ConnectRestException(status, "Could not execute PUT request");
+        }
+    }
+
+    /**
+     * Delete an existing connector.
+     *
+     * @param connName name of the connector to be deleted
+     * @throws IOException if call to the REST api fails.
+     */
+    public void deleteConnector(String connName) throws IOException {
+        String url = endpointForResource(String.format("connectors/%s", connName));
+        int status = executeDelete(url);
+        if (status >= HttpServletResponse.SC_BAD_REQUEST) {
+            throw new ConnectRestException(status, "Could not execute DELETE request.");
+        }
+    }
+
+    public ConnectorStateInfo connectorStatus(String connectorName) {
+        ObjectMapper mapper = new ObjectMapper();
+        String url = endpointForResource(String.format("connectors/%s/status", connectorName));
+        try {
+            return mapper.readerFor(ConnectorStateInfo.class).readValue(executeGet(url));
+        } catch (IOException e) {
+            log.error("Could not read connector state", e);
+            throw new ConnectException("Could not read connector state", e);
+        }
+    }
+
+    private String endpointForResource(String resource) {
+        String url = String.valueOf(connectCluster[0].restUrl());
+        return url + resource;
+    }
+
+    private static void putIfAbsent(Map<String, String> props, String propertyKey, String propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+
+    public EmbeddedKafkaCluster kafka() {
+        return kafkaCluster;
+    }
+
+    public int executePut(String url, String body) throws IOException {
+        log.debug("Executing PUT request to URL={}. Payload={}", url, body);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestProperty("Content-Type", "application/json");
+        httpCon.setRequestMethod("PUT");
+        try (OutputStreamWriter out = new OutputStreamWriter(httpCon.getOutputStream())) {
+            out.write(body);
+        }
+        try (InputStream is = httpCon.getInputStream()) {
+            int c;
+            StringBuilder response = new StringBuilder();
+            while ((c = is.read()) != -1) {
+                response.append((char) c);
+            }
+            log.info("Put response for URL={} is {}", url, response);
+        }
+        return httpCon.getResponseCode();
+    }
+
+    public String executeGet(String url) throws IOException {
+        log.debug("Executing GET request to URL={}.", url);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestMethod("GET");
+        try (InputStream is = httpCon.getInputStream()) {
+            int c;
+            StringBuilder response = new StringBuilder();
+            while ((c = is.read()) != -1) {
+                response.append((char) c);
+            }
+            log.debug("Get response for URL={} is {}", url, response);
+            return response.toString();
+        }
+    }
+
+    public int executeDelete(String url) throws IOException {
+        log.debug("Executing DELETE request to URL={}", url);
+        HttpURLConnection httpCon = (HttpURLConnection) new URL(url).openConnection();
+        httpCon.setDoOutput(true);
+        httpCon.setRequestMethod("DELETE");
+        httpCon.connect();
+        return httpCon.getResponseCode();
+    }
+
+    public static class Builder {
+        private String name = UUID.randomUUID().toString();
+        private Map<String, String> workerProps = new HashMap<>();
+        private int numWorkers = DEFAULT_NUM_WORKERS;
+        private int numBrokers = DEFAULT_NUM_BROKERS;
+        private Properties brokerProps = DEFAULT_BROKER_CONFIG;
+
+        public Builder name(String name) {
+            this.name = name;
+            return this;
+        }
+
+        public Builder workerProps(Map<String, String> workerProps) {
+            this.workerProps = workerProps;
+            return this;
+        }
+
+        public Builder numWorkers(int numWorkers) {
+            this.numWorkers = numWorkers;
+            return this;
+        }
+
+        public Builder numBrokers(int numBrokers) {
+            this.numBrokers = numBrokers;
+            return this;
+        }
+
+        public Builder brokerProps(Properties brokerProps) {
+            this.brokerProps = brokerProps;
+            return this;
+        }
+
+        public EmbeddedConnectCluster build() {
+            return new EmbeddedConnectCluster(name, workerProps, numWorkers, numBrokers, brokerProps);
+        }
+    }
+
+}
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
new file mode 100644
index 00000000000..109ba14a0dd
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/util/clusters/EmbeddedKafkaCluster.java
@@ -0,0 +1,339 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.util.clusters;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaConfig$;
+import kafka.server.KafkaServer;
+import kafka.utils.CoreUtils;
+import kafka.utils.TestUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.admin.NewTopic;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.errors.InvalidReplicationFactorException;
+import org.apache.kafka.common.network.ListenerName;
+import org.apache.kafka.common.utils.MockTime;
+import org.apache.kafka.common.utils.Time;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.junit.rules.ExternalResource;
+import org.junit.rules.TemporaryFolder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.kafka.clients.consumer.ConsumerConfig.AUTO_OFFSET_RESET_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
+
+/**
+ * Setup an embedded Kafka cluster with specified number of brokers and specified broker properties. To be used for
+ * integration tests.
+ */
+public class EmbeddedKafkaCluster extends ExternalResource {
+
+    private static final Logger log = LoggerFactory.getLogger(EmbeddedKafkaCluster.class);
+
+    private static final long DEFAULT_PRODUCE_SEND_DURATION_MS = 500;
+
+    // Kafka Config
+    private final KafkaServer[] brokers;
+    private final Properties brokerConfig;
+    private final Time time = new MockTime();
+
+    private EmbeddedZookeeper zookeeper = null;
+    private ListenerName listenerName = new ListenerName("PLAINTEXT");
+    private KafkaProducer<byte[], byte[]> producer;
+
+    public EmbeddedKafkaCluster(final int numBrokers,
+                                final Properties brokerConfig) {
+        brokers = new KafkaServer[numBrokers];
+        this.brokerConfig = brokerConfig;
+    }
+
+    @Override
+    protected void before() throws IOException {
+        start();
+    }
+
+    @Override
+    protected void after() {
+        stop();
+    }
+
+    private void start() throws IOException {
+        zookeeper = new EmbeddedZookeeper();
+
+        brokerConfig.put(KafkaConfig$.MODULE$.ZkConnectProp(), zKConnectString());
+        brokerConfig.put(KafkaConfig$.MODULE$.PortProp(), 0); // pick a random port
+
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.HostNameProp(), "localhost");
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.DeleteTopicEnableProp(), true);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.GroupInitialRebalanceDelayMsProp(), 0);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.OffsetsTopicReplicationFactorProp(), (short) brokers.length);
+        putIfAbsent(brokerConfig, KafkaConfig$.MODULE$.AutoCreateTopicsEnableProp(), false);
+
+        Object listenerConfig = brokerConfig.get(KafkaConfig$.MODULE$.InterBrokerListenerNameProp());
+        if (listenerConfig != null) {
+            listenerName = new ListenerName(listenerConfig.toString());
+        }
+
+        for (int i = 0; i < brokers.length; i++) {
+            brokerConfig.put(KafkaConfig$.MODULE$.BrokerIdProp(), i);
+            brokerConfig.put(KafkaConfig$.MODULE$.LogDirProp(), createLogDir());
+            brokers[i] = TestUtils.createServer(new KafkaConfig(brokerConfig, true), time);
+        }
+
+        Map<String, Object> producerProps = new HashMap<>();
+        producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+        producer = new KafkaProducer<>(producerProps);
+    }
+
+    private void stop() {
+
+        try {
+            producer.close();
+        } catch (Exception e) {
+            log.error("Could not shutdown producer ", e);
+            throw new RuntimeException("Could not shutdown producer", e);
+        }
+
+        for (KafkaServer broker : brokers) {
+            try {
+                broker.shutdown();
+            } catch (Throwable t) {
+                String msg = String.format("Could not shutdown broker at %s", address(broker));
+                log.error(msg, t);
+                throw new RuntimeException(msg, t);
+            }
+        }
+
+        for (KafkaServer broker : brokers) {
+            try {
+                log.info("Cleaning up kafka log dirs at {}", broker.config().logDirs());
+                CoreUtils.delete(broker.config().logDirs());
+            } catch (Throwable t) {
+                String msg = String.format("Could not clean up log dirs for broker at %s", address(broker));
+                log.error(msg, t);
+                throw new RuntimeException(msg, t);
+            }
+        }
+
+        try {
+            zookeeper.shutdown();
+        } catch (Throwable t) {
+            String msg = String.format("Could not shutdown zookeeper at %s", zKConnectString());
+            log.error(msg, t);
+            throw new RuntimeException(msg, t);
+        }
+    }
+
+    private static void putIfAbsent(final Properties props, final String propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+
+    private String createLogDir() throws IOException {
+        TemporaryFolder tmpFolder = new TemporaryFolder();
+        tmpFolder.create();
+        return tmpFolder.newFolder().getAbsolutePath();
+    }
+
+    public String bootstrapServers() {
+        return Arrays.stream(brokers)
+                .map(this::address)
+                .collect(Collectors.joining(","));
+    }
+
+    public String address(KafkaServer server) {
+        return server.config().hostName() + ":" + server.boundPort(listenerName);
+    }
+
+    public String zKConnectString() {
+        return "127.0.0.1:" + zookeeper.port();
+    }
+
+    /**
+     * Create a Kafka topic with 1 partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic) {
+        createTopic(topic, 1);
+    }
+
+    /**
+     * Create a Kafka topic with given partition and a replication factor of 1.
+     *
+     * @param topic The name of the topic.
+     */
+    public void createTopic(String topic, int partitions) {
+        createTopic(topic, partitions, 1, new HashMap<>());
+    }
+
+    /**
+     * Create a Kafka topic with the given parameters.
+     *
+     * @param topic       The name of the topic.
+     * @param partitions  The number of partitions for this topic.
+     * @param replication The replication factor for (partitions of) this topic.
+     * @param topicConfig Additional topic-level configuration settings.
+     */
+    public void createTopic(String topic, int partitions, int replication, Map<String, String> topicConfig) {
+        if (replication > brokers.length) {
+            throw new InvalidReplicationFactorException("Insufficient brokers ("
+                    + brokers.length + ") for desired replication (" + replication + ")");
+        }
+
+        log.debug("Creating topic { name: {}, partitions: {}, replication: {}, config: {} }",
+                topic, partitions, replication, topicConfig);
+        final NewTopic newTopic = new NewTopic(topic, partitions, (short) replication);
+        newTopic.configs(topicConfig);
+
+        try (final AdminClient adminClient = createAdminClient()) {
+            adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
+        } catch (final InterruptedException | ExecutionException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void produce(String topic, String value) {
+        produce(topic, null, null, value);
+    }
+
+    public void produce(String topic, String key, String value) {
+        produce(topic, null, key, value);
+    }
+
+    public void produce(String topic, Integer partition, String key, String value) {
+        ProducerRecord<byte[], byte[]> msg = new ProducerRecord<>(topic, partition, key == null ? null : key.getBytes(), value == null ? null : value.getBytes());
+        try {
+            producer.send(msg).get(DEFAULT_PRODUCE_SEND_DURATION_MS, TimeUnit.MILLISECONDS);
+        } catch (Exception e) {
+            throw new KafkaException("Could not produce message to topic=" + topic, e);
+        }
+    }
+
+    public AdminClient createAdminClient() {
+        final Properties adminClientConfig = new Properties();
+        adminClientConfig.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        final Object listeners = brokerConfig.get(KafkaConfig$.MODULE$.ListenersProp());
+        if (listeners != null && listeners.toString().contains("SSL")) {
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG));
+            adminClientConfig.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, ((Password) brokerConfig.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG)).value());
+            adminClientConfig.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
+        }
+        return AdminClient.create(adminClientConfig);
+    }
+
+    /**
+     * Consume at least n records in a given duration or throw an exception.
+     *
+     * @param n the number of expected records in this topic.
+     * @param maxDuration the max duration to wait for these records (in milliseconds).
+     * @param topics the topics to subscribe and consume records from.
+     * @return a {@link ConsumerRecords} collection containing at least n records.
+     */
+    public ConsumerRecords<byte[], byte[]> consume(int n, long maxDuration, String... topics) {
+        Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> records = new HashMap<>();
+        int consumedRecords = 0;
+        try (KafkaConsumer<byte[], byte[]> consumer = createConsumerAndSubscribeTo(Collections.emptyMap(), topics)) {
+            final long startMillis = System.currentTimeMillis();
+            long allowedDuration = maxDuration;
+            while (allowedDuration > 0) {
+                log.debug("Consuming from {} for {} millis.", Arrays.toString(topics), allowedDuration);
+                ConsumerRecords<byte[], byte[]> rec = consumer.poll(Duration.ofMillis(allowedDuration));
+                if (rec.isEmpty()) {
+                    allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis);
+                    continue;
+                }
+                for (TopicPartition partition: rec.partitions()) {
+                    final List<ConsumerRecord<byte[], byte[]>> r = rec.records(partition);
+                    records.computeIfAbsent(partition, t -> new ArrayList<>()).addAll(r);
+                    consumedRecords += r.size();
+                }
+                if (consumedRecords >= n) {
+                    return new ConsumerRecords<>(records);
+                }
+                allowedDuration = maxDuration - (System.currentTimeMillis() - startMillis);
+            }
+        }
+
+        throw new RuntimeException("Could not find enough records. found " + consumedRecords + ", expected " + n);
+    }
+
+    public KafkaConsumer<byte[], byte[]> createConsumer(Map<String, Object> consumerProps) {
+        Map<String, Object> props = new HashMap<>(consumerProps);
+
+        putIfAbsent(props, GROUP_ID_CONFIG, UUID.randomUUID().toString());
+        putIfAbsent(props, BOOTSTRAP_SERVERS_CONFIG, bootstrapServers());
+        putIfAbsent(props, ENABLE_AUTO_COMMIT_CONFIG, "false");
+        putIfAbsent(props, AUTO_OFFSET_RESET_CONFIG, "earliest");
+        putIfAbsent(props, KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+        putIfAbsent(props, VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+
+        KafkaConsumer<byte[], byte[]> consumer;
+        try {
+            consumer = new KafkaConsumer<>(props);
+        } catch (Throwable t) {
+            throw new ConnectException("Failed to create consumer", t);
+        }
+        return consumer;
+    }
+
+    public KafkaConsumer<byte[], byte[]> createConsumerAndSubscribeTo(Map<String, Object> consumerProps, String... topics) {
+        KafkaConsumer<byte[], byte[]> consumer = createConsumer(consumerProps);
+        consumer.subscribe(Arrays.asList(topics));
+        return consumer;
+    }
+
+    private static void putIfAbsent(final Map<String, Object> props, final String propertyKey, final Object propertyValue) {
+        if (!props.containsKey(propertyKey)) {
+            props.put(propertyKey, propertyValue);
+        }
+    }
+}
diff --git a/connect/runtime/src/test/resources/log4j.properties b/connect/runtime/src/test/resources/log4j.properties
index d5e90fe788f..1feedb89721 100644
--- a/connect/runtime/src/test/resources/log4j.properties
+++ b/connect/runtime/src/test/resources/log4j.properties
@@ -18,6 +18,7 @@ log4j.rootLogger=OFF, stdout
 
 log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
+log4j.appender.stdout.layout.ConversionPattern=[%d] (%t) %p %m (%c:%L)%n
 
+log4j.logger.org.reflections=ERROR
 log4j.logger.org.apache.kafka=ERROR


With regards,
Apache Git Services

Mime
View raw message