kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [5/5] kafka git commit: KAFKA-2370: kafka connect pause/resume API
Date Wed, 20 Apr 2016 21:10:24 GMT
KAFKA-2370: kafka connect pause/resume API

Author: Jason Gustafson <jason@confluent.io>

Reviewers: Liquan Pei <liquanpei@gmail.com>, Ewen Cheslack-Postava <ewen@confluent.io>

Closes #1087 from hachikuji/KAFKA-2370


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c9485b78
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c9485b78
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c9485b78

Branch: refs/heads/trunk
Commit: c9485b78a6e43747daf1314ae9532839fb7bc810
Parents: 280efe7
Author: Jason Gustafson <jason@confluent.io>
Authored: Wed Apr 20 14:09:59 2016 -0700
Committer: Ewen Cheslack-Postava <me@ewencp.org>
Committed: Wed Apr 20 14:09:59 2016 -0700

----------------------------------------------------------------------
 .../kafka/common/config/AbstractConfig.java     |   3 +-
 .../kafka/connect/cli/ConnectDistributed.java   |  10 +-
 .../kafka/connect/cli/ConnectStandalone.java    |   2 +-
 .../kafka/connect/runtime/AbstractHerder.java   |  52 +-
 .../kafka/connect/runtime/AbstractStatus.java   |   1 +
 .../apache/kafka/connect/runtime/Connect.java   |   6 +-
 .../kafka/connect/runtime/ConnectorStatus.java  |  17 +-
 .../apache/kafka/connect/runtime/Herder.java    |  15 +
 .../kafka/connect/runtime/TargetState.java      |  36 +
 .../kafka/connect/runtime/TaskStatus.java       |  12 +
 .../apache/kafka/connect/runtime/Worker.java    | 107 ++-
 .../kafka/connect/runtime/WorkerConnector.java  | 206 ++++++
 .../kafka/connect/runtime/WorkerSinkTask.java   |  76 ++-
 .../kafka/connect/runtime/WorkerSourceTask.java |  19 +-
 .../kafka/connect/runtime/WorkerTask.java       | 128 +++-
 .../runtime/distributed/ClusterConfigState.java |  46 +-
 .../runtime/distributed/DistributedHerder.java  | 449 ++++++------
 .../distributed/RebalanceNeededException.java   |  27 +
 .../distributed/StaleConfigException.java       |  27 -
 .../runtime/distributed/WorkerCoordinator.java  |   6 +-
 .../runtime/distributed/WorkerGroupMember.java  |   4 +-
 .../rest/resources/ConnectorsResource.java      |  18 +-
 .../runtime/standalone/StandaloneHerder.java    | 218 +++---
 .../connect/storage/ConfigBackingStore.java     | 126 ++++
 .../storage/KafkaConfigBackingStore.java        | 683 +++++++++++++++++++
 .../connect/storage/KafkaConfigStorage.java     | 594 ----------------
 .../storage/MemoryConfigBackingStore.java       | 154 +++++
 .../connect/storage/StatusBackingStore.java     |   2 +-
 .../connect/runtime/AbstractHerderTest.java     |  23 +-
 .../connect/runtime/WorkerConnectorTest.java    | 336 +++++++++
 .../connect/runtime/WorkerSinkTaskTest.java     | 115 +++-
 .../runtime/WorkerSinkTaskThreadedTest.java     |  33 +-
 .../connect/runtime/WorkerSourceTaskTest.java   | 134 +++-
 .../kafka/connect/runtime/WorkerTaskTest.java   |  36 +-
 .../kafka/connect/runtime/WorkerTest.java       |  30 +-
 .../distributed/DistributedHerderTest.java      | 143 ++--
 .../distributed/WorkerCoordinatorTest.java      |  12 +-
 .../standalone/StandaloneHerderTest.java        |  35 +-
 .../storage/KafkaConfigBackingStoreTest.java    | 537 +++++++++++++++
 .../connect/storage/KafkaConfigStorageTest.java | 534 ---------------
 40 files changed, 3253 insertions(+), 1759 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
index f833d7e..8e36f40 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
@@ -137,7 +137,8 @@ public class AbstractConfig {
         Map<String, String> copy = new RecordingMap<>();
         for (Map.Entry<String, ?> entry : originals.entrySet()) {
             if (!(entry.getValue() instanceof String))
-                throw new ClassCastException("Non-string value found in original settings");
+                throw new ClassCastException("Non-string value found in original settings for key " + entry.getKey() +
+                        ": " + (entry.getValue() == null ? null : entry.getValue().getClass().getName()));
             copy.put(entry.getKey(), (String) entry.getValue());
         }
         return copy;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
index 849fa2f..e7a0c36 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectDistributed.java
@@ -26,6 +26,8 @@ import org.apache.kafka.connect.runtime.Worker;
 import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
 import org.apache.kafka.connect.runtime.distributed.DistributedHerder;
 import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
+import org.apache.kafka.connect.storage.KafkaConfigBackingStore;
 import org.apache.kafka.connect.storage.KafkaOffsetBackingStore;
 import org.apache.kafka.connect.storage.KafkaStatusBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
@@ -74,8 +76,12 @@ public class ConnectDistributed {
         StatusBackingStore statusBackingStore = new KafkaStatusBackingStore(time, worker.getInternalValueConverter());
         statusBackingStore.configure(config);
 
-        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, advertisedUrl.toString());
-        final Connect connect = new Connect(worker, herder, rest);
+        ConfigBackingStore configBackingStore = new KafkaConfigBackingStore(worker.getInternalValueConverter());
+        configBackingStore.configure(config);
+
+        DistributedHerder herder = new DistributedHerder(config, time, worker, statusBackingStore, configBackingStore,
+                advertisedUrl.toString());
+        final Connect connect = new Connect(herder, rest);
         try {
             connect.start();
         } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
index 6c4335e..4ade18c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/cli/ConnectStandalone.java
@@ -76,7 +76,7 @@ public class ConnectStandalone {
         Worker worker = new Worker(workerId, time, config, new FileOffsetBackingStore());
 
         Herder herder = new StandaloneHerder(worker);
-        final Connect connect = new Connect(worker, herder, rest);
+        final Connect connect = new Connect(herder, rest);
         connect.start();
 
         try {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
index 1d87d60..a22f15c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractHerder.java
@@ -29,6 +29,7 @@ import org.apache.kafka.connect.runtime.rest.entities.ConfigKeyInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConfigValueInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorPluginInfo;
 import org.apache.kafka.connect.runtime.rest.entities.ConnectorStateInfo;
+import org.apache.kafka.connect.storage.ConfigBackingStore;
 import org.apache.kafka.connect.storage.StatusBackingStore;
 import org.apache.kafka.connect.tools.VerifiableSinkConnector;
 import org.apache.kafka.connect.tools.VerifiableSourceConnector;
@@ -75,28 +76,37 @@ import java.util.concurrent.ConcurrentHashMap;
  */
 public abstract class AbstractHerder implements Herder, TaskStatus.Listener, ConnectorStatus.Listener {
 
+    private final String workerId;
     protected final Worker worker;
     protected final StatusBackingStore statusBackingStore;
-    private final String workerId;
+    protected final ConfigBackingStore configBackingStore;
 
     private Map<String, Connector> tempConnectors = new ConcurrentHashMap<>();
     private static final List<Class<? extends Connector>> SKIPPED_CONNECTORS = Arrays.<Class<? extends Connector>>asList(VerifiableSourceConnector.class, VerifiableSinkConnector.class);
     private static List<ConnectorPluginInfo> validConnectorPlugins;
 
-    public AbstractHerder(Worker worker, StatusBackingStore statusBackingStore, String workerId) {
+    public AbstractHerder(Worker worker,
+                          String workerId,
+                          StatusBackingStore statusBackingStore,
+                          ConfigBackingStore configBackingStore) {
         this.worker = worker;
-        this.statusBackingStore = statusBackingStore;
         this.workerId = workerId;
+        this.statusBackingStore = statusBackingStore;
+        this.configBackingStore = configBackingStore;
     }
 
     protected abstract int generation();
 
     protected void startServices() {
+        this.worker.start();
         this.statusBackingStore.start();
+        this.configBackingStore.start();
     }
 
     protected void stopServices() {
         this.statusBackingStore.stop();
+        this.configBackingStore.stop();
+        this.worker.stop();
     }
 
     @Override
@@ -106,6 +116,18 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
+    public void onPause(String connector) {
+        statusBackingStore.put(new ConnectorStatus(connector, ConnectorStatus.State.PAUSED,
+                workerId, generation()));
+    }
+
+    @Override
+    public void onResume(String connector) {
+        statusBackingStore.put(new ConnectorStatus(connector, TaskStatus.State.RUNNING,
+                workerId, generation()));
+    }
+
+    @Override
     public void onShutdown(String connector) {
         statusBackingStore.putSafe(new ConnectorStatus(connector, ConnectorStatus.State.UNASSIGNED,
                 workerId, generation()));
@@ -133,6 +155,16 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
+    public void onResume(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.RUNNING, workerId, generation()));
+    }
+
+    @Override
+    public void onPause(ConnectorTaskId id) {
+        statusBackingStore.put(new TaskStatus(id, TaskStatus.State.PAUSED, workerId, generation()));
+    }
+
+    @Override
     public void onDeletion(String connector) {
         for (TaskStatus status : statusBackingStore.getAll(connector))
             statusBackingStore.put(new TaskStatus(status.id(), TaskStatus.State.DESTROYED, workerId, generation()));
@@ -140,6 +172,20 @@ public abstract class AbstractHerder implements Herder, TaskStatus.Listener, Con
     }
 
     @Override
+    public void pauseConnector(String connector) {
+        if (!configBackingStore.contains(connector))
+            throw new NotFoundException("Unknown connector " + connector);
+        configBackingStore.putTargetState(connector, TargetState.PAUSED);
+    }
+
+    @Override
+    public void resumeConnector(String connector) {
+        if (!configBackingStore.contains(connector))
+            throw new NotFoundException("Unknown connector " + connector);
+        configBackingStore.putTargetState(connector, TargetState.STARTED);
+    }
+
+    @Override
     public ConnectorStateInfo connectorStatus(String connName) {
         ConnectorStatus connector = statusBackingStore.get(connName);
         if (connector == null)

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
index 4f31be1..d00b81f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/AbstractStatus.java
@@ -21,6 +21,7 @@ public abstract class AbstractStatus<T> {
     public enum State {
         UNASSIGNED,
         RUNNING,
+        PAUSED,
         FAILED,
         DESTROYED,
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
index 49cf4bc..86f7f23 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Connect.java
@@ -33,7 +33,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
 public class Connect {
     private static final Logger log = LoggerFactory.getLogger(Connect.class);
 
-    private final Worker worker;
     private final Herder herder;
     private final RestServer rest;
     private final CountDownLatch startLatch = new CountDownLatch(1);
@@ -41,9 +40,8 @@ public class Connect {
     private final AtomicBoolean shutdown = new AtomicBoolean(false);
     private final ShutdownHook shutdownHook;
 
-    public Connect(Worker worker, Herder herder, RestServer rest) {
+    public Connect(Herder herder, RestServer rest) {
         log.debug("Kafka Connect instance created");
-        this.worker = worker;
         this.herder = herder;
         this.rest = rest;
         shutdownHook = new ShutdownHook();
@@ -54,7 +52,6 @@ public class Connect {
             log.info("Kafka Connect starting");
             Runtime.getRuntime().addShutdownHook(shutdownHook);
 
-            worker.start();
             herder.start();
             rest.start(herder);
 
@@ -72,7 +69,6 @@ public class Connect {
 
                 rest.stop();
                 herder.stop();
-                worker.stop();
 
                 log.info("Kafka Connect stopped");
             }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
index d9a2eba..de5d393 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/ConnectorStatus.java
@@ -16,6 +16,7 @@
  **/
 package org.apache.kafka.connect.runtime;
 
+
 public class ConnectorStatus extends AbstractStatus<String> {
 
     public ConnectorStatus(String connector, State state, String msg, String workerUrl, int generation) {
@@ -35,7 +36,9 @@ public class ConnectorStatus extends AbstractStatus<String> {
         void onShutdown(String connector);
 
         /**
-         * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}.
+         * Invoked from the Connector using {@link org.apache.kafka.connect.connector.ConnectorContext#raiseError(Exception)}
+         * or if either {@link org.apache.kafka.connect.connector.Connector#start(java.util.Map)} or
+         * {@link org.apache.kafka.connect.connector.Connector#stop()} throw an exception.
          * Note that no shutdown event will follow after the task has been failed.
          * @param connector The connector name
          * @param cause Error raised from the connector.
@@ -43,6 +46,18 @@ public class ConnectorStatus extends AbstractStatus<String> {
         void onFailure(String connector, Throwable cause);
 
         /**
+         * Invoked when the connector is paused through the REST API
+         * @param connector The connector name
+         */
+        void onPause(String connector);
+
+        /**
+         * Invoked after the connector has been resumed.
+         * @param connector The connector name
+         */
+        void onResume(String connector);
+
+        /**
          * Invoked after successful startup of the connector.
          * @param connector The connector name
          */

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
index cce100e..ce8bcf9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Herder.java
@@ -149,6 +149,21 @@ public interface Herder {
      */
     void restartConnector(String connName, Callback<Void> cb);
 
+    /**
+     * Pause the connector. This call will asynchronously suspend processing by the connector and all
+     * of its tasks.
+     * @param connector name of the connector
+     */
+    void pauseConnector(String connector);
+
+    /**
+     * Resume the connector. This call will asynchronously start the connector and its tasks (if
+     * not started already).
+     * @param connector name of the connector
+     */
+    void resumeConnector(String connector);
+
+
     class Created<T> {
         private final boolean created;
         private final T result;

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
new file mode 100644
index 0000000..b59b3bb
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TargetState.java
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+/**
+ * The target state of a connector is its desired state as indicated by the user
+ * through interaction with the REST API. When a connector is first created, its
+ * target state is "STARTED." This does not mean it has actually started, just that
+ * the Connect framework will attempt to start it after its tasks have been assigned.
+ * After the connector has been paused, the target state will change to PAUSED,
+ * and all the tasks will stop doing work.
+ *
+ * Target states are persisted in the config topic, which is read by all of the
+ * workers in the group. When a worker sees a new target state for a connector which
+ * is running, it will transition any tasks which it owns (i.e. which have been
+ * assigned to it by the leader) into the desired target state. Upon completion of
+ * a task rebalance, the worker will start the task in the last known target state.
+ */
+public enum TargetState {
+    STARTED,
+    PAUSED,
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
index 3542eb8..173a694 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/TaskStatus.java
@@ -37,6 +37,18 @@ public class TaskStatus extends AbstractStatus<ConnectorTaskId> {
         void onStartup(ConnectorTaskId id);
 
         /**
+         * Invoked after the task has been paused.
+         * @param id The id of the task
+         */
+        void onPause(ConnectorTaskId id);
+
+        /**
+         * Invoked after the task has been resumed.
+         * @param id The id of the task
+         */
+        void onResume(ConnectorTaskId id);
+
+        /**
          * Invoked if the task raises an error. No shutdown event will follow.
          * @param id The id of the task
          * @param cause The error raised by the task.

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
index 22843d3..a88d0f9 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/Worker.java
@@ -26,7 +26,6 @@ import org.apache.kafka.connect.connector.Connector;
 import org.apache.kafka.connect.connector.ConnectorContext;
 import org.apache.kafka.connect.connector.Task;
 import org.apache.kafka.connect.errors.ConnectException;
-import org.apache.kafka.connect.sink.SinkConnector;
 import org.apache.kafka.connect.sink.SinkTask;
 import org.apache.kafka.connect.source.SourceTask;
 import org.apache.kafka.connect.storage.Converter;
@@ -132,10 +131,10 @@ public class Worker {
         long limit = started + config.getLong(WorkerConfig.TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG);
 
         for (Map.Entry<String, WorkerConnector> entry : connectors.entrySet()) {
-            WorkerConnector conn = entry.getValue();
+            WorkerConnector workerConnector = entry.getValue();
             log.warn("Shutting down connector {} uncleanly; herder should have shut down connectors before the" +
-                    "Worker is stopped.", conn);
-            conn.stop();
+                    "Worker is stopped.", entry.getKey());
+            workerConnector.shutdown();
         }
 
         Collection<ConnectorTaskId> taskIds = tasks.keySet();
@@ -157,8 +156,12 @@ public class Worker {
      * @param connConfig connector configuration
      * @param ctx context for the connector
      * @param statusListener listener for notifications of connector status changes
+     * @param initialState the initial target state that the connector should be initialized to
      */
-    public void startConnector(ConnectorConfig connConfig, ConnectorContext ctx, ConnectorStatus.Listener statusListener) {
+    public void startConnector(ConnectorConfig connConfig,
+                               ConnectorContext ctx,
+                               ConnectorStatus.Listener statusListener,
+                               TargetState initialState) {
         String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG);
         Class<? extends Connector> connClass = getConnectorClass(connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG));
 
@@ -171,22 +174,17 @@ public class Worker {
         WorkerConnector workerConnector = new WorkerConnector(connName, connector, ctx, statusListener);
 
         log.info("Instantiated connector {} with version {} of type {}", connName, connector.version(), connClass.getName());
-        workerConnector.initialize();
-        try {
-            workerConnector.start(connConfig.originalsStrings());
-        } catch (ConnectException e) {
-            throw new ConnectException("Connector threw an exception while starting", e);
-        }
+        workerConnector.initialize(connConfig);
+        workerConnector.transitionTo(initialState);
 
         connectors.put(connName, workerConnector);
-
         log.info("Finished creating connector {}", connName);
     }
 
     /* Now that the configuration doesn't contain the actual class name, we need to be able to tell the herder whether a connector is a Sink */
     public boolean isSinkConnector(String connName) {
         WorkerConnector workerConnector = connectors.get(connName);
-        return SinkConnector.class.isAssignableFrom(workerConnector.delegate.getClass());
+        return workerConnector.isSinkConnector();
     }
 
     public Connector getConnector(String connType) {
@@ -263,7 +261,7 @@ public class Worker {
         if (workerConnector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
-        Connector connector = workerConnector.delegate;
+        Connector connector = workerConnector.connector();
         List<Map<String, String>> result = new ArrayList<>();
         String taskClassName = connector.taskClass().getName();
         for (Map<String, String> taskProps : connector.taskConfigs(maxTasks)) {
@@ -283,7 +281,7 @@ public class Worker {
         if (connector == null)
             throw new ConnectException("Connector " + connName + " not found in this worker.");
 
-        connector.stop();
+        connector.shutdown();
         connectors.remove(connName);
 
         log.info("Stopped connector {}", connName);
@@ -296,13 +294,24 @@ public class Worker {
         return connectors.keySet();
     }
 
+    public boolean isRunning(String connName) {
+        WorkerConnector connector = connectors.get(connName);
+        if (connector == null)
+            throw new ConnectException("Connector " + connName + " not found in this worker.");
+        return connector.isRunning();
+    }
+
     /**
      * Add a new task.
      * @param id Globally unique ID for this task.
      * @param taskConfig the parsed task configuration
      * @param statusListener listener for notifications of task status changes
+     * @param initialState the initial target state that the task should be initialized to
      */
-    public void startTask(ConnectorTaskId id, TaskConfig taskConfig, TaskStatus.Listener statusListener) {
+    public void startTask(ConnectorTaskId id,
+                          TaskConfig taskConfig,
+                          TaskStatus.Listener statusListener,
+                          TargetState initialState) {
         log.info("Creating task {}", id);
 
         if (tasks.containsKey(id)) {
@@ -316,11 +325,11 @@ public class Worker {
         final Task task = instantiateTask(taskClass);
         log.info("Instantiated task {} with version {} of type {}", id, task.version(), taskClass.getName());
 
-        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener);
+        final WorkerTask workerTask = buildWorkerTask(id, task, statusListener, initialState);
 
         // Start the task before adding modifying any state, any exceptions are caught higher up the
         // call chain and there's no cleanup to do here
-        workerTask.initialize(taskConfig.originalsStrings());
+        workerTask.initialize(taskConfig);
         executor.submit(workerTask);
 
         if (task instanceof SourceTask) {
@@ -330,17 +339,21 @@ public class Worker {
         tasks.put(id, workerTask);
     }
 
-    private WorkerTask buildWorkerTask(ConnectorTaskId id, Task task, TaskStatus.Listener lifecycleListener) {
+    private WorkerTask buildWorkerTask(ConnectorTaskId id,
+                                       Task task,
+                                       TaskStatus.Listener statusListener,
+                                       TargetState initialState) {
         // Decide which type of worker task we need based on the type of task.
         if (task instanceof SourceTask) {
             OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
             OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(),
                     internalKeyConverter, internalValueConverter);
-            return new WorkerSourceTask(id, (SourceTask) task, lifecycleListener, keyConverter, valueConverter, producer,
-                    offsetReader, offsetWriter, config, time);
+            return new WorkerSourceTask(id, (SourceTask) task, statusListener, initialState, keyConverter,
+                     valueConverter, producer, offsetReader, offsetWriter, config, time);
         } else if (task instanceof SinkTask) {
-            return new WorkerSinkTask(id, (SinkTask) task, lifecycleListener, config, keyConverter, valueConverter, time);
+            return new WorkerSinkTask(id, (SinkTask) task, statusListener, initialState, config, keyConverter,
+                    valueConverter, time);
         } else {
             log.error("Tasks must be a subclass of either SourceTask or SinkTask", task);
             throw new ConnectException("Tasks must be a subclass of either SourceTask or SinkTask");
@@ -422,51 +435,17 @@ public class Worker {
         return this.connectors.containsKey(connName);
     }
 
-    private static class WorkerConnector  {
-        private final String connName;
-        private final ConnectorStatus.Listener lifecycleListener;
-        private final ConnectorContext ctx;
-        private final Connector delegate;
+    public void setTargetState(String connName, TargetState state) {
+        log.info("Setting connector {} state to {}", connName, state);
 
-        public WorkerConnector(String connName,
-                               Connector delegate,
-                               ConnectorContext ctx,
-                               ConnectorStatus.Listener lifecycleListener) {
-            this.connName = connName;
-            this.ctx = ctx;
-            this.delegate = delegate;
-            this.lifecycleListener = lifecycleListener;
-        }
-
-        public void initialize() {
-            delegate.initialize(ctx);
-        }
-
-        public void start(Map<String, String> props) {
-            try {
-                delegate.start(props);
-                lifecycleListener.onStartup(connName);
-            } catch (Throwable t) {
-                log.error("Error while starting connector {}", connName, t);
-                lifecycleListener.onFailure(connName, t);
-            }
-        }
-
-        public void stop() {
-            try {
-                delegate.stop();
-                lifecycleListener.onShutdown(connName);
-            } catch (Throwable t) {
-                log.error("Error while shutting down connector {}", connName, t);
-                lifecycleListener.onFailure(connName, t);
-            }
-        }
+        WorkerConnector connector = connectors.get(connName);
+        if (connector != null)
+            connector.transitionTo(state);
 
-        @Override
-        public String toString() {
-            return delegate.toString();
+        for (Map.Entry<ConnectorTaskId, WorkerTask> taskEntry : tasks.entrySet()) {
+            if (taskEntry.getKey().connector().equals(connName))
+                taskEntry.getValue().transitionTo(state);
         }
-
     }
 
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
new file mode 100644
index 0000000..7880095
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConnector.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+package org.apache.kafka.connect.runtime;
+
+import org.apache.kafka.connect.connector.Connector;
+import org.apache.kafka.connect.connector.ConnectorContext;
+import org.apache.kafka.connect.sink.SinkConnector;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+/**
+ * Container for connectors which is responsible for managing their lifecycle (e.g. handling startup,
+ * shutdown, pausing, etc.). Internally, we manage the runtime state of the connector and transition according
+ * to target state changes. Note that unlike connector tasks, the connector does not really have a "pause"
+ * state which is distinct from being stopped. We therefore treat pause operations as requests to momentarily
+ * stop the connector, and resume operations as requests to restart it (without reinitialization). Connector
+ * failures, whether in initialization or after startup, are treated as fatal, which means that we will not attempt
+ * to restart this connector instance after failure. What this means from a user perspective is that you must
+ * use the /restart REST API to restart a failed task. This behavior is consistent with task failures.
+ *
+ * Note that this class is NOT thread-safe.
+ */
+public class WorkerConnector {
+    private static final Logger log = LoggerFactory.getLogger(WorkerConnector.class);
+
+    private enum State {
+        INIT,    // initial state before startup
+        STOPPED, // the connector has been stopped/paused.
+        STARTED, // the connector has been started/resumed.
+        FAILED,  // the connector has failed (no further transitions are possible after this state)
+    }
+
+    private final String connName;
+    private final ConnectorStatus.Listener statusListener;
+    private final ConnectorContext ctx;
+    private final Connector connector;
+
+    private Map<String, String> config;
+    private State state;
+
+    public WorkerConnector(String connName,
+                           Connector connector,
+                           ConnectorContext ctx,
+                           ConnectorStatus.Listener statusListener) {
+        this.connName = connName;
+        this.ctx = ctx;
+        this.connector = connector;
+        this.statusListener = statusListener;
+        this.state = State.INIT;
+    }
+
+    public void initialize(ConnectorConfig connectorConfig) {
+        log.debug("Initializing connector {} with config {}", connName, config);
+
+        try {
+            this.config = connectorConfig.originalsStrings();
+
+            connector.initialize(new ConnectorContext() {
+                @Override
+                public void requestTaskReconfiguration() {
+                    ctx.requestTaskReconfiguration();
+                }
+
+                @Override
+                public void raiseError(Exception e) {
+                    log.error("Connector raised an error {}", connName, e);
+                    onFailure(e);
+                    ctx.raiseError(e);
+                }
+            });
+        } catch (Throwable t) {
+            log.error("Error initializing connector {}", connName, t);
+            onFailure(t);
+        }
+    }
+
+    private boolean doStart() {
+        try {
+            switch (state) {
+                case STARTED:
+                    return false;
+
+                case INIT:
+                case STOPPED:
+                    connector.start(config);
+                    this.state = State.STARTED;
+                    return true;
+
+                default:
+                    throw new IllegalArgumentException("Cannot start connector in state " + state);
+            }
+        } catch (Throwable t) {
+            log.error("Error while starting connector {}", connName, t);
+            onFailure(t);
+            return false;
+        }
+    }
+
+    private void onFailure(Throwable t) {
+        statusListener.onFailure(connName, t);
+        this.state = State.FAILED;
+    }
+
+    private void resume() {
+        if (doStart())
+            statusListener.onResume(connName);
+    }
+
+    private void start() {
+        if (doStart())
+            statusListener.onStartup(connName);
+    }
+
+    public boolean isRunning() {
+        return state == State.STARTED;
+    }
+
+    private void pause() {
+        try {
+            switch (state) {
+                case STOPPED:
+                    return;
+
+                case STARTED:
+                    connector.stop();
+                    // fall through
+
+                case INIT:
+                    statusListener.onPause(connName);
+                    this.state = State.STOPPED;
+                    break;
+
+                default:
+                    throw new IllegalArgumentException("Cannot pause connector in state " + state);
+            }
+        } catch (Throwable t) {
+            log.error("Error while shutting down connector {}", connName, t);
+            statusListener.onFailure(connName, t);
+            this.state = State.FAILED;
+        }
+    }
+
+    public void shutdown() {
+        try {
+            if (state == State.STARTED)
+                connector.stop();
+            this.state = State.STOPPED;
+        } catch (Throwable t) {
+            log.error("Error while shutting down connector {}", connName, t);
+            this.state = State.FAILED;
+        } finally {
+            statusListener.onShutdown(connName);
+        }
+    }
+
+    public void transitionTo(TargetState targetState) {
+        if (state == State.FAILED) {
+            log.warn("Cannot transition connector {} to {} since it has failed", connName, targetState);
+            return;
+        }
+
+        log.debug("Transition connector {} to {}", connName, targetState);
+        if (targetState == TargetState.PAUSED) {
+            pause();
+        } else if (targetState == TargetState.STARTED) {
+            if (state == State.INIT)
+                start();
+            else
+                resume();
+        } else {
+            throw new IllegalArgumentException("Unhandled target state " + targetState);
+        }
+    }
+
+    public boolean isSinkConnector() {
+        return SinkConnector.class.isAssignableFrom(connector.getClass());
+    }
+
+    public Connector connector() {
+        return connector;
+    }
+
+    @Override
+    public String toString() {
+        return "WorkerConnector{" +
+                "connName='" + connName + '\'' +
+                ", connector=" + connector +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
index 6293455..f5eaac4 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
@@ -43,11 +43,8 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import static java.util.Collections.singleton;
 
@@ -78,12 +75,13 @@ class WorkerSinkTask extends WorkerTask {
 
     public WorkerSinkTask(ConnectorTaskId id,
                           SinkTask task,
-                          TaskStatus.Listener lifecycleListener,
+                          TaskStatus.Listener statusListener,
+                          TargetState initialState,
                           WorkerConfig workerConfig,
                           Converter keyConverter,
                           Converter valueConverter,
                           Time time) {
-        super(id, lifecycleListener);
+        super(id, statusListener, initialState);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -103,10 +101,15 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
-    public void initialize(Map<String, String> taskConfig) {
-        this.taskConfig = taskConfig;
-        this.consumer = createConsumer();
-        this.context = new WorkerSinkTaskContext(consumer);
+    public void initialize(TaskConfig taskConfig) {
+        try {
+            this.taskConfig = taskConfig.originalsStrings();
+            this.consumer = createConsumer();
+            this.context = new WorkerSinkTaskContext(consumer);
+        } catch (Throwable t) {
+            log.error("Task {} failed initialization and will not be started.", t);
+            onFailure(t);
+        }
     }
 
     @Override
@@ -126,6 +129,12 @@ class WorkerSinkTask extends WorkerTask {
     }
 
     @Override
+    public void transitionTo(TargetState state) {
+        super.transitionTo(state);
+        consumer.wakeup();
+    }
+
+    @Override
     public void execute() {
         initializeAndStart();
         try {
@@ -218,6 +227,12 @@ class WorkerSinkTask extends WorkerTask {
             deliverMessages();
         } catch (WakeupException we) {
             log.trace("{} consumer woken up", id);
+
+            if (shouldPause()) {
+                pauseAll();
+            } else if (!pausedForRedelivery) {
+                resumeAll();
+            }
         }
     }
 
@@ -338,6 +353,16 @@ class WorkerSinkTask extends WorkerTask {
         }
     }
 
+    private void resumeAll() {
+        for (TopicPartition tp : consumer.assignment())
+            if (!context.pausedPartitions().contains(tp))
+                consumer.resume(singleton(tp));
+    }
+
+    private void pauseAll() {
+        consumer.pause(consumer.assignment());
+    }
+
     private void deliverMessages() {
         // Finally, deliver this batch to the sink
         try {
@@ -350,9 +375,8 @@ class WorkerSinkTask extends WorkerTask {
             // If we had paused all consumer topic partitions to try to redeliver data, then we should resume any that
             // the task had not explicitly paused
             if (pausedForRedelivery) {
-                for (TopicPartition tp : consumer.assignment())
-                    if (!context.pausedPartitions().contains(tp))
-                        consumer.resume(singleton(tp));
+                if (!shouldPause())
+                    resumeAll();
                 pausedForRedelivery = false;
             }
         } catch (RetriableException e) {
@@ -360,7 +384,7 @@ class WorkerSinkTask extends WorkerTask {
             // If we're retrying a previous batch, make sure we've paused all topic partitions so we don't get new data,
             // but will still be able to poll in order to handle user-requested timeouts, keep group membership, etc.
             pausedForRedelivery = true;
-            consumer.pause(consumer.assignment());
+            pauseAll();
             // Let this exit normally, the batch will be reprocessed on the next loop.
         } catch (Throwable t) {
             log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
@@ -412,24 +436,14 @@ class WorkerSinkTask extends WorkerTask {
             // If we paused everything for redelivery (which is no longer relevant since we discarded the data), make
             // sure anything we paused that the task didn't request to be paused *and* which we still own is resumed.
             // Also make sure our tracking of paused partitions is updated to remove any partitions we no longer own.
-            if (pausedForRedelivery) {
-                pausedForRedelivery = false;
-
-                Set<TopicPartition> assigned = new HashSet<>(partitions);
-                Set<TopicPartition> taskPaused = context.pausedPartitions();
-
-                for (TopicPartition tp : partitions) {
-                    if (!taskPaused.contains(tp))
-                        consumer.resume(singleton(tp));
-                }
-
-                Iterator<TopicPartition> tpIter = taskPaused.iterator();
-                while (tpIter.hasNext()) {
-                    TopicPartition tp = tpIter.next();
-                    if (assigned.contains(tp))
-                        tpIter.remove();
-                }
-            }
+            pausedForRedelivery = false;
+
+            // Ensure that the paused partitions contains only assigned partitions and repause as necessary
+            context.pausedPartitions().retainAll(partitions);
+            if (shouldPause())
+                pauseAll();
+            else if (!context.pausedPartitions().isEmpty())
+                consumer.pause(context.pausedPartitions());
 
             // Instead of invoking the assignment callback on initialization, we guarantee the consumer is ready upon
             // task start. Since this callback gets invoked during that initial setup before we've started the task, we

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
index 3a43f96..602af4a 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
@@ -76,7 +76,8 @@ class WorkerSourceTask extends WorkerTask {
 
     public WorkerSourceTask(ConnectorTaskId id,
                             SourceTask task,
-                            TaskStatus.Listener lifecycleListener,
+                            TaskStatus.Listener statusListener,
+                            TargetState initialState,
                             Converter keyConverter,
                             Converter valueConverter,
                             KafkaProducer<byte[], byte[]> producer,
@@ -84,7 +85,7 @@ class WorkerSourceTask extends WorkerTask {
                             OffsetStorageWriter offsetWriter,
                             WorkerConfig workerConfig,
                             Time time) {
-        super(id, lifecycleListener);
+        super(id, statusListener, initialState);
 
         this.workerConfig = workerConfig;
         this.task = task;
@@ -104,8 +105,13 @@ class WorkerSourceTask extends WorkerTask {
     }
 
     @Override
-    public void initialize(Map<String, String> config) {
-        this.taskConfig = config;
+    public void initialize(TaskConfig taskConfig) {
+        try {
+            this.taskConfig = taskConfig.originalsStrings();
+        } catch (Throwable t) {
+            log.error("Task {} failed initialization and will not be started.", t);
+            onFailure(t);
+        }
     }
 
     protected void close() {
@@ -139,6 +145,11 @@ class WorkerSourceTask extends WorkerTask {
             }
 
             while (!isStopping()) {
+                if (shouldPause()) {
+                    awaitUnpause();
+                    continue;
+                }
+
                 if (toSend == null) {
                     log.debug("Nothing to send to Kafka. Polling source for additional records");
                     toSend = task.poll();

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
index 7979fb0..846ca95 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerTask.java
@@ -21,33 +21,41 @@ import org.apache.kafka.connect.util.ConnectorTaskId;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Map;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
 
 /**
  * Handles processing for an individual task. This interface only provides the basic methods
  * used by {@link Worker} to manage the tasks. Implementations combine a user-specified Task with
  * Kafka to create a data flow.
+ *
+ * Note on locking: since the task runs in its own thread, special care must be taken to ensure
+ * that state transitions are reported correctly, in particular since some state transitions are
+ * asynchronous (e.g. pause/resume). For example, changing the state to paused could cause a race
+ * if the task fails at the same time. To protect from these cases, we synchronize status updates
+ * using the WorkerTask's monitor.
  */
 abstract class WorkerTask implements Runnable {
     private static final Logger log = LoggerFactory.getLogger(WorkerTask.class);
 
     protected final ConnectorTaskId id;
-    private final AtomicBoolean stopping;
-    private final AtomicBoolean running;
-    private final AtomicBoolean cancelled;
+    private final AtomicBoolean stopping;   // indicates whether the Worker has asked the task to stop
+    private final AtomicBoolean cancelled;  // indicates whether the Worker has cancelled the task (e.g. because of slow shutdown)
     private final CountDownLatch shutdownLatch;
-    private final TaskStatus.Listener lifecycleListener;
+    private final TaskStatus.Listener statusListener;
+    private final AtomicReference<TargetState> targetState;
 
-    public WorkerTask(ConnectorTaskId id, TaskStatus.Listener lifecycleListener) {
+    public WorkerTask(ConnectorTaskId id,
+                      TaskStatus.Listener statusListener,
+                      TargetState initialState) {
         this.id = id;
         this.stopping = new AtomicBoolean(false);
-        this.running = new AtomicBoolean(false);
         this.cancelled = new AtomicBoolean(false);
         this.shutdownLatch = new CountDownLatch(1);
-        this.lifecycleListener = lifecycleListener;
+        this.statusListener = statusListener;
+        this.targetState = new AtomicReference<>(initialState);
     }
 
     public ConnectorTaskId id() {
@@ -58,14 +66,24 @@ abstract class WorkerTask implements Runnable {
      * Initialize the task for execution.
      * @param props initial configuration
      */
-    public abstract void initialize(Map<String, String> props);
+    public abstract void initialize(TaskConfig taskConfig);
+
+
+    private void triggerStop() {
+        synchronized (this) {
+            this.stopping.set(true);
+
+            // wakeup any threads that are waiting for unpause
+            this.notifyAll();
+        }
+    }
 
     /**
      * Stop this task from processing messages. This method does not block, it only triggers
      * shutdown. Use #{@link #awaitStop} to block until completion.
      */
     public void stop() {
-        this.stopping.set(true);
+        triggerStop();
     }
 
     /**
@@ -83,9 +101,6 @@ abstract class WorkerTask implements Runnable {
      * @return true if successful, false if the timeout was reached
      */
     public boolean awaitStop(long timeoutMs) {
-        if (!running.get())
-            return true;
-
         try {
             return shutdownLatch.await(timeoutMs, TimeUnit.MILLISECONDS);
         } catch (InterruptedException e) {
@@ -101,10 +116,6 @@ abstract class WorkerTask implements Runnable {
         return stopping.get();
     }
 
-    protected boolean isStopped() {
-        return !running.get();
-    }
-
     private void doClose() {
         try {
             close();
@@ -115,14 +126,17 @@ abstract class WorkerTask implements Runnable {
     }
 
     private void doRun() {
-        if (!this.running.compareAndSet(false, true))
-            throw new IllegalStateException("The task cannot be started while still running");
-
         try {
-            if (stopping.get())
-                return;
+            synchronized (this) {
+                if (stopping.get())
+                    return;
+
+                if (targetState.get() == TargetState.PAUSED)
+                    statusListener.onPause(id);
+                else
+                    statusListener.onStartup(id);
+            }
 
-            lifecycleListener.onStartup(id);
             execute();
         } catch (Throwable t) {
             log.error("Task {} threw an uncaught and unrecoverable exception", id, t);
@@ -133,22 +147,80 @@ abstract class WorkerTask implements Runnable {
         }
     }
 
+    private void onShutdown() {
+        synchronized (this) {
+            triggerStop();
+
+            // if we were cancelled, skip the status update since the task may have already been
+            // started somewhere else
+            if (!cancelled.get())
+                statusListener.onShutdown(id);
+        }
+    }
+
+    protected void onFailure(Throwable t) {
+        synchronized (this) {
+            triggerStop();
+
+            // if we were cancelled, skip the status update since the task may have already been
+            // started somewhere else
+            if (!cancelled.get())
+                statusListener.onFailure(id, t);
+        }
+    }
+
     @Override
     public void run() {
         try {
             doRun();
-            if (!cancelled.get())
-                lifecycleListener.onShutdown(id);
+            onShutdown();
         } catch (Throwable t) {
-            if (!cancelled.get())
-                lifecycleListener.onFailure(id, t);
+            onFailure(t);
 
             if (t instanceof Error)
                 throw t;
         } finally {
-            running.set(false);
             shutdownLatch.countDown();
         }
     }
 
+    public boolean shouldPause() {
+        return this.targetState.get() == TargetState.PAUSED;
+    }
+
+    /**
+     * Await task resumption.
+     * @return true if the task's target state is not paused, false if the task is shutdown before resumption
+     * @throws InterruptedException
+     */
+    protected boolean awaitUnpause() throws InterruptedException {
+        synchronized (this) {
+            while (targetState.get() == TargetState.PAUSED) {
+                if (stopping.get())
+                    return false;
+                this.wait();
+            }
+            return true;
+        }
+    }
+
+    public void transitionTo(TargetState state) {
+        synchronized (this) {
+            // ignore the state change if we are stopping
+            if (stopping.get())
+                return;
+
+            TargetState oldState = this.targetState.getAndSet(state);
+            if (state != oldState) {
+                if (state == TargetState.PAUSED) {
+                    statusListener.onPause(id);
+                } else if (state == TargetState.STARTED) {
+                    statusListener.onResume(id);
+                    this.notifyAll();
+                } else
+                    throw new IllegalArgumentException("Unhandled target state " + state);
+            }
+        }
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
index cc4a3c1..c5c217e 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/ClusterConfigState.java
@@ -17,10 +17,12 @@
 
 package org.apache.kafka.connect.runtime.distributed;
 
+import org.apache.kafka.connect.runtime.TargetState;
 import org.apache.kafka.connect.util.ConnectorTaskId;
 
 import java.util.ArrayList;
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -29,24 +31,32 @@ import java.util.Set;
  * An immutable snapshot of the configuration state of connectors and tasks in a Kafka Connect cluster.
  */
 public class ClusterConfigState {
-    public static final ClusterConfigState EMPTY = new ClusterConfigState(-1, Collections.<String, Integer>emptyMap(),
-            Collections.<String, Map<String, String>>emptyMap(), Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
+    public static final long NO_OFFSET = -1;
+    public static final ClusterConfigState EMPTY = new ClusterConfigState(
+            NO_OFFSET,
+            Collections.<String, Integer>emptyMap(),
+            Collections.<String, Map<String, String>>emptyMap(),
+            Collections.<String, TargetState>emptyMap(),
+            Collections.<ConnectorTaskId, Map<String, String>>emptyMap(),
             Collections.<String>emptySet());
 
     private final long offset;
     private final Map<String, Integer> connectorTaskCounts;
     private final Map<String, Map<String, String>> connectorConfigs;
+    private final Map<String, TargetState> connectorTargetStates;
     private final Map<ConnectorTaskId, Map<String, String>> taskConfigs;
     private final Set<String> inconsistentConnectors;
 
     public ClusterConfigState(long offset,
                               Map<String, Integer> connectorTaskCounts,
                               Map<String, Map<String, String>> connectorConfigs,
+                              Map<String, TargetState> connectorTargetStates,
                               Map<ConnectorTaskId, Map<String, String>> taskConfigs,
                               Set<String> inconsistentConnectors) {
         this.offset = offset;
         this.connectorTaskCounts = connectorTaskCounts;
         this.connectorConfigs = connectorConfigs;
+        this.connectorTargetStates = connectorTargetStates;
         this.taskConfigs = taskConfigs;
         this.inconsistentConnectors = inconsistentConnectors;
     }
@@ -61,6 +71,15 @@ public class ClusterConfigState {
     }
 
     /**
+     * Check whether this snapshot contains configuration for a connector.
+     * @param connector name of the connector
+     * @return true if this state contains configuration for the connector, false otherwise
+     */
+    public boolean contains(String connector) {
+        return connectorConfigs.containsKey(connector);
+    }
+
+    /**
      * Get a list of the connectors in this configuration
      */
     public Set<String> connectors() {
@@ -77,6 +96,15 @@ public class ClusterConfigState {
     }
 
     /**
+     * Get the target state of the connector
+     * @param connector name of the connector
+     * @return the target state
+     */
+    public TargetState targetState(String connector) {
+        return connectorTargetStates.get(connector);
+    }
+
+    /**
      * Get the configuration for a task.
      * @param task id of the task
      * @return a map containing configuration parameters
@@ -86,6 +114,20 @@ public class ClusterConfigState {
     }
 
     /**
+     * Get all task configs for a connector.
+     * @param connector name of the connector
+     * @return a map from the task id to its configuration
+     */
+    public Map<ConnectorTaskId, Map<String, String>> allTaskConfigs(String connector) {
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+        for (Map.Entry<ConnectorTaskId, Map<String, String>> taskConfigEntry : this.taskConfigs.entrySet()) {
+            if (taskConfigEntry.getKey().connector().equals(connector))
+                taskConfigs.put(taskConfigEntry.getKey(), taskConfigEntry.getValue());
+        }
+        return taskConfigs;
+    }
+
+    /**
      * Get the number of tasks assigned for the given connector.
      * @param connectorName name of the connector to look up tasks for
      * @return the number of tasks


Mime
View raw message