Return-Path:
X-Original-To: apmail-kafka-commits-archive@www.apache.org
Delivered-To: apmail-kafka-commits-archive@www.apache.org
Received: from mail.apache.org (hermes.apache.org [140.211.11.3])
by minotaur.apache.org (Postfix) with SMTP id 273E418399
for ;
Mon, 9 Nov 2015 06:11:42 +0000 (UTC)
Received: (qmail 22751 invoked by uid 500); 9 Nov 2015 06:11:41 -0000
Delivered-To: apmail-kafka-commits-archive@kafka.apache.org
Received: (qmail 22615 invoked by uid 500); 9 Nov 2015 06:11:41 -0000
Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm
Precedence: bulk
List-Help:
List-Unsubscribe:
List-Post:
List-Id:
Reply-To: dev@kafka.apache.org
Delivered-To: mailing list commits@kafka.apache.org
Received: (qmail 21858 invoked by uid 99); 9 Nov 2015 06:11:40 -0000
Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org)
(140.211.11.23)
by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 06:11:40 +0000
Received: by git1-us-west.apache.org (ASF Mail Server at
git1-us-west.apache.org, from userid 33)
id 6DB9BE00B4; Mon, 9 Nov 2015 06:11:40 +0000 (UTC)
Content-Type: text/plain; charset="us-ascii"
MIME-Version: 1.0
Content-Transfer-Encoding: 7bit
From: gwenshap@apache.org
To: commits@kafka.apache.org
Date: Mon, 09 Nov 2015 06:11:57 -0000
Message-Id: <297cb3eb326d45d3a41e23eab5837228@git.apache.org>
In-Reply-To:
References:
X-Mailer: ASF-Git Admin Mailer
Subject: [18/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
new file mode 100644
index 0000000..c8e0f6f
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java
@@ -0,0 +1,59 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class CreateConnectorRequest {
+ private final String name;
+ private final Map config;
+
+ @JsonCreator
+ public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map config) {
+ this.name = name;
+ this.config = config;
+ }
+
+ @JsonProperty
+ public String name() {
+ return name;
+ }
+
+ @JsonProperty
+ public Map config() {
+ return config;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ CreateConnectorRequest that = (CreateConnectorRequest) o;
+ return Objects.equals(name, that.name) &&
+ Objects.equals(config, that.config);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
new file mode 100644
index 0000000..493b00d
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import java.util.Objects;
+
+/**
+ * Standard error format for all REST API failures. These are generated automatically by
+ * {@link ConnectExceptionMapper} in response to uncaught
+ * {@link ConnectException}s.
+ */
+public class ErrorMessage {
+ private final int errorCode;
+ private final String message;
+
+ @JsonCreator
+ public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) {
+ this.errorCode = errorCode;
+ this.message = message;
+ }
+
+ @JsonProperty("error_code")
+ public int errorCode() {
+ return errorCode;
+ }
+
+ @JsonProperty
+ public String message() {
+ return message;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ ErrorMessage that = (ErrorMessage) o;
+ return Objects.equals(errorCode, that.errorCode) &&
+ Objects.equals(message, that.message);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(errorCode, message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
new file mode 100644
index 0000000..1d5e8ba
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.AppInfoParser;
+
+public class ServerInfo {
+ private String version;
+ private String commit;
+
+ public ServerInfo() {
+ version = AppInfoParser.getVersion();
+ commit = AppInfoParser.getCommitId();
+ }
+
+ @JsonProperty
+ public String version() {
+ return version;
+ }
+
+ @JsonProperty
+ public String commit() {
+ return commit;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
new file mode 100644
index 0000000..3d443a5
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.entities;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+
+import java.util.Map;
+import java.util.Objects;
+
+public class TaskInfo {
+ private final ConnectorTaskId id;
+ private final Map config;
+
+ public TaskInfo(ConnectorTaskId id, Map config) {
+ this.id = id;
+ this.config = config;
+ }
+
+ @JsonProperty
+ public ConnectorTaskId id() {
+ return id;
+ }
+
+ @JsonProperty
+ public Map config() {
+ return config;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+ TaskInfo taskInfo = (TaskInfo) o;
+ return Objects.equals(id, taskInfo.id) &&
+ Objects.equals(config, taskInfo.config);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(id, config);
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
new file mode 100644
index 0000000..67c38e7
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.errors;
+
+import org.apache.kafka.connect.errors.AlreadyExistsException;
+import org.apache.kafka.connect.errors.ConnectException;
+import org.apache.kafka.connect.errors.NotFoundException;
+import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.ws.rs.core.Response;
+import javax.ws.rs.ext.ExceptionMapper;
+
+public class ConnectExceptionMapper implements ExceptionMapper {
+ private static final Logger log = LoggerFactory.getLogger(ConnectExceptionMapper.class);
+
+ @Override
+ public Response toResponse(ConnectException exception) {
+ log.debug("Uncaught exception in REST call: ", exception);
+
+ if (exception instanceof ConnectRestException) {
+ ConnectRestException restException = (ConnectRestException) exception;
+ return Response.status(restException.statusCode())
+ .entity(new ErrorMessage(restException.errorCode(), restException.getMessage()))
+ .build();
+ }
+
+ if (exception instanceof NotFoundException) {
+ return Response.status(Response.Status.NOT_FOUND)
+ .entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage()))
+ .build();
+ }
+
+ if (exception instanceof AlreadyExistsException) {
+ return Response.status(Response.Status.CONFLICT)
+ .entity(new ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage()))
+ .build();
+ }
+
+ return Response.status(Response.Status.INTERNAL_SERVER_ERROR)
+ .entity(new ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.getMessage()))
+ .build();
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
new file mode 100644
index 0000000..5dcbcf4
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.errors;
+
+import org.apache.kafka.connect.errors.ConnectException;
+
+import javax.ws.rs.core.Response;
+
+public class ConnectRestException extends ConnectException {
+ private final int statusCode;
+ private final int errorCode;
+
+ public ConnectRestException(int statusCode, int errorCode, String message, Throwable t) {
+ super(message, t);
+ this.statusCode = statusCode;
+ this.errorCode = errorCode;
+ }
+
+ public ConnectRestException(Response.Status status, int errorCode, String message, Throwable t) {
+ this(status.getStatusCode(), errorCode, message, t);
+ }
+
+ public ConnectRestException(int statusCode, int errorCode, String message) {
+ this(statusCode, errorCode, message, null);
+ }
+
+ public ConnectRestException(Response.Status status, int errorCode, String message) {
+ this(status, errorCode, message, null);
+ }
+
+ public ConnectRestException(int statusCode, String message, Throwable t) {
+ this(statusCode, statusCode, message, t);
+ }
+
+ public ConnectRestException(Response.Status status, String message, Throwable t) {
+ this(status, status.getStatusCode(), message, t);
+ }
+
+ public ConnectRestException(int statusCode, String message) {
+ this(statusCode, statusCode, message, null);
+ }
+
+ public ConnectRestException(Response.Status status, String message) {
+ this(status.getStatusCode(), status.getStatusCode(), message, null);
+ }
+
+
+ public int statusCode() {
+ return statusCode;
+ }
+
+ public int errorCode() {
+ return errorCode;
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
new file mode 100644
index 0000000..cea4360
--- /dev/null
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java
@@ -0,0 +1,201 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.runtime.rest.resources;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import org.apache.kafka.connect.runtime.ConnectorConfig;
+import org.apache.kafka.connect.runtime.Herder;
+import org.apache.kafka.connect.runtime.distributed.NotLeaderException;
+import org.apache.kafka.connect.runtime.rest.RestServer;
+import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo;
+import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest;
+import org.apache.kafka.connect.runtime.rest.entities.TaskInfo;
+import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException;
+import org.apache.kafka.connect.util.FutureCallback;
+
+import javax.servlet.ServletContext;
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.PUT;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import java.net.URI;
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+@Path("/connectors")
+@Produces(MediaType.APPLICATION_JSON)
+@Consumes(MediaType.APPLICATION_JSON)
+public class ConnectorsResource {
+ // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full
+ // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but
+ // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases,
+ // but currently a worker simply leaving the group can take this long as well.
+ private static final long REQUEST_TIMEOUT_MS = 90 * 1000;
+
+ private final Herder herder;
+ @javax.ws.rs.core.Context
+ private ServletContext context;
+
+ public ConnectorsResource(Herder herder) {
+ this.herder = herder;
+ }
+
+ @GET
+ @Path("/")
+ public Collection listConnectors() throws Throwable {
+ FutureCallback> cb = new FutureCallback<>();
+ herder.connectors(cb);
+ return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference>() {
+ });
+ }
+
+ @POST
+ @Path("/")
+ public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable {
+ String name = createRequest.name();
+ Map configs = createRequest.config();
+ if (!configs.containsKey(ConnectorConfig.NAME_CONFIG))
+ configs.put(ConnectorConfig.NAME_CONFIG, name);
+
+ FutureCallback> cb = new FutureCallback<>();
+ herder.putConnectorConfig(name, configs, false, cb);
+ Herder.Created info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest,
+ new TypeReference() { }, new CreatedConnectorInfoTranslator());
+ return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build();
+ }
+
+ @GET
+ @Path("/{connector}")
+ public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback cb = new FutureCallback<>();
+ herder.connectorInfo(connector, cb);
+ return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference() {
+ });
+ }
+
+ @GET
+ @Path("/{connector}/config")
+ public Map getConnectorConfig(final @PathParam("connector") String connector) throws Throwable {
+ FutureCallback
+ *
+ * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition
+ * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows
+ * us to clean up outdated configurations over time. However, this combination has some important implications for
+ * the implementation of this class and the configuration state that it may expose.
+ *
+ *
+ * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record
+ * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any
+ * others, and they do not need to coordinate with the connector's task configuration at all.
+ *
+ *
+ * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not
+ * currently have multi-record transactions or support atomic batch record writes, task commit messages are required
+ * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs
+ * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs
+ * were applied immediately you could be using half the old configs and half the new configs. In that condition, some
+ * partitions may be double-assigned because the old config and new config may use completely different assignments.
+ * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them
+ * once a commit message has been read.
+ *
+ *
+ * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was
+ * always available, but we would like to be able to enable compaction so our configuration topic does not grow
+ * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading
+ * from the beginning of the log in order to build up the full current configuration will see task commits, but some
+ * records required for those commits will have been removed because the same keys have subsequently been rewritten.
+ * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config,
+ * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read
+ * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up.
+ *
+ *
+ * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario
+ * as the previous one, but in this case both the first and second update will write 2 task configs. However, the
+ * second write fails half of the way through:
+ * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction
+ * occurs and we're left with
+ * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't
+ * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent
+ * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never
+ * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second
+ * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task
+ * configs for connector "foo".
+ *
+ *
+ * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system
+ * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated
+ * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data.
+ * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These
+ * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle
+ * of updating task configurations).
+ *
+ *
+ * Note that the expectation is that this config storage system has only a single writer at a time.
+ * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change
+ * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker).
+ *
+ *
+ * Since processing of the config log occurs in a background thread, callers must take care when using accessors.
+ * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster.
+ * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are
+ * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require
+ * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the
+ * rebalance must be deferred.
+ *
+ */
+public class KafkaConfigStorage {
+ private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class);
+
+ public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic";
+
+ public static final String CONNECTOR_PREFIX = "connector-";
+
+ public static String CONNECTOR_KEY(String connectorName) {
+ return CONNECTOR_PREFIX + connectorName;
+ }
+
+ public static final String TASK_PREFIX = "task-";
+
+ public static String TASK_KEY(ConnectorTaskId taskId) {
+ return TASK_PREFIX + taskId.connector() + "-" + taskId.task();
+ }
+
+ public static final String COMMIT_TASKS_PREFIX = "commit-";
+
+ public static String COMMIT_TASKS_KEY(String connectorName) {
+ return COMMIT_TASKS_PREFIX + connectorName;
+ }
+
+ // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys,
+ // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or
+ // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely
+ // the same.
+ public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct()
+ .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA))
+ .build();
+ public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0;
+ public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct()
+ .field("tasks", Schema.INT32_SCHEMA)
+ .build();
+
+ private static final long READ_TO_END_TIMEOUT_MS = 30000;
+
+ private final Object lock;
+ private boolean starting;
+ private final Converter converter;
+ private final Callback connectorConfigCallback;
+ private final Callback> tasksConfigCallback;
+ private String topic;
+ // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect
+ // format to serialized form
+ private KafkaBasedLog configLog;
+ // Connector -> # of tasks
+ private Map connectorTaskCounts = new HashMap<>();
+ // Connector and task configs: name or id -> config map
+ private Map> connectorConfigs = new HashMap<>();
+ private Map> taskConfigs = new HashMap<>();
+ // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data
+ // is in an inconsistent state and we cannot safely use them until they have been refreshed.
+ private Set inconsistent = new HashSet<>();
+ // The most recently read offset. This does not take into account deferred task updates/commits, so we may have
+ // outstanding data to be applied.
+ private long offset;
+
+ // Connector -> Map[ConnectorTaskId -> Configs]
+ private Map>> deferredTaskUpdates = new HashMap<>();
+
+
+ public KafkaConfigStorage(Converter converter, Callback connectorConfigCallback, Callback> tasksConfigCallback) {
+ this.lock = new Object();
+ this.starting = false;
+ this.converter = converter;
+ this.connectorConfigCallback = connectorConfigCallback;
+ this.tasksConfigCallback = tasksConfigCallback;
+
+ offset = -1;
+ }
+
+ public void configure(Map configs) {
+ if (configs.get(CONFIG_TOPIC_CONFIG) == null)
+ throw new ConnectException("Must specify topic for connector configuration.");
+ topic = (String) configs.get(CONFIG_TOPIC_CONFIG);
+
+ Map producerProps = new HashMap<>();
+ producerProps.putAll(configs);
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
+ producerProps.put(ProducerConfig.ACKS_CONFIG, "all");
+
+ Map consumerProps = new HashMap<>();
+ consumerProps.putAll(configs);
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer");
+ consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+
+ configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback);
+ }
+
+ public void start() {
+ log.info("Starting KafkaConfigStorage");
+ // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that
+ // updates can continue to occur in the background
+ starting = true;
+ configLog.start();
+ starting = false;
+ log.info("Started KafkaConfigStorage");
+ }
+
+ public void stop() {
+ log.info("Closing KafkaConfigStorage");
+ configLog.stop();
+ log.info("Closed KafkaConfigStorage");
+ }
+
+ /**
+ * Get a snapshot of the current state of the cluster.
+ */
+ public ClusterConfigState snapshot() {
+ synchronized (lock) {
+ // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be
+ // immutable configs
+ return new ClusterConfigState(
+ offset,
+ new HashMap<>(connectorTaskCounts),
+ new HashMap<>(connectorConfigs),
+ new HashMap<>(taskConfigs),
+ new HashSet<>(inconsistent)
+ );
+ }
+ }
+
+ /**
+ * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by
+ * tailing the Kafka log with a consumer.
+ *
+ * @param connector name of the connector to write data for
+ * @param properties the configuration to write
+ */
+ public void putConnectorConfig(String connector, Map properties) {
+ byte[] serializedConfig;
+ if (properties == null) {
+ serializedConfig = null;
+ } else {
+ Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0);
+ connectConfig.put("properties", properties);
+ serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig);
+ }
+
+ try {
+ configLog.send(CONNECTOR_KEY(connector), serializedConfig);
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write connector configuration to Kafka: ", e);
+ throw new ConnectException("Error writing connector configuration to Kafka", e);
+ }
+ }
+
+ /**
+ * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates
+ * that we would be leaving one of the referenced connectors with an inconsistent state.
+ *
+ * @param configs map containing task configurations
+ * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root
+ * and task configurations.
+ */
+ public void putTaskConfigs(Map> configs) {
+ // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have
+ // any outstanding lagging data to consume.
+ try {
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write root configuration to Kafka: ", e);
+ throw new ConnectException("Error writing root configuration to Kafka", e);
+ }
+
+ // In theory, there is only a single writer and we shouldn't need this lock since the background thread should
+ // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to
+ // the root config being updated.
+ Map newTaskCounts = new HashMap<>();
+ synchronized (lock) {
+ // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks
+ // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here
+ Map> updatedConfigIdsByConnector = taskIdsByConnector(configs);
+ for (Map.Entry> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) {
+ if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) {
+ log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission");
+ throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors");
+ }
+ newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size());
+ }
+ }
+
+ // Start sending all the individual updates
+ for (Map.Entry> taskConfigEntry : configs.entrySet()) {
+ Struct connectConfig = new Struct(TASK_CONFIGURATION_V0);
+ connectConfig.put("properties", taskConfigEntry.getValue());
+ byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig);
+ configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig);
+ }
+
+ // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to
+ // the end of the log
+ try {
+ // Read to end to ensure all the task configs have been written
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+
+ // Write all the commit messages
+ for (Map.Entry taskCountEntry : newTaskCounts.entrySet()) {
+ Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0);
+ connectConfig.put("tasks", taskCountEntry.getValue());
+ byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig);
+ configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig);
+ }
+
+ // Read to end to ensure all the commit messages have been written
+ configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException | ExecutionException | TimeoutException e) {
+ log.error("Failed to write root configuration to Kafka: ", e);
+ throw new ConnectException("Error writing root configuration to Kafka", e);
+ }
+ }
+
+ public Future readToEnd() {
+ return configLog.readToEnd();
+ }
+
+ public void readToEnd(Callback cb) {
+ configLog.readToEnd(cb);
+ }
+
+ private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps,
+ Map consumerProps, Callback> consumedCallback) {
+ return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime());
+ }
+
+ private final Callback> consumedCallback = new Callback>() {
+ @Override
+ public void onCompletion(Throwable error, ConsumerRecord record) {
+ if (error != null) {
+ log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error);
+ return;
+ }
+
+ final SchemaAndValue value;
+ try {
+ value = converter.toConnectData(topic, record.value());
+ } catch (DataException e) {
+ log.error("Failed to convert config data to Kafka Connect format: ", e);
+ return;
+ }
+ // Make the recorded offset match the API used for positions in the consumer -- return the offset of the
+ // *next record*, not the last one consumed.
+ offset = record.offset() + 1;
+
+ if (record.key().startsWith(CONNECTOR_PREFIX)) {
+ String connectorName = record.key().substring(CONNECTOR_PREFIX.length());
+ synchronized (lock) {
+ if (value.value() == null) {
+ // Connector deletion will be written as a null value
+ connectorConfigs.remove(connectorName);
+ } else {
+ // Connector configs can be applied and callbacks invoked immediately
+ if (!(value.value() instanceof Map)) {
+ log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass());
+ return;
+ }
+ Object newConnectorConfig = ((Map) value.value()).get("properties");
+ if (!(newConnectorConfig instanceof Map)) {
+ log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass());
+ return;
+ }
+ connectorConfigs.put(connectorName, (Map) newConnectorConfig);
+ }
+ }
+ if (!starting)
+ connectorConfigCallback.onCompletion(null, connectorName);
+ } else if (record.key().startsWith(TASK_PREFIX)) {
+ synchronized (lock) {
+ ConnectorTaskId taskId = parseTaskId(record.key());
+ if (taskId == null) {
+ log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key");
+ return;
+ }
+ if (!(value.value() instanceof Map)) {
+ log.error("Ignoring task configuration because it is in the wrong format: " + value.value());
+ return;
+ }
+
+ Object newTaskConfig = ((Map) value.value()).get("properties");
+ if (!(newTaskConfig instanceof Map)) {
+ log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass());
+ return;
+ }
+
+ Map> deferred = deferredTaskUpdates.get(taskId.connector());
+ if (deferred == null) {
+ deferred = new HashMap<>();
+ deferredTaskUpdates.put(taskId.connector(), deferred);
+ }
+ deferred.put(taskId, (Map) newTaskConfig);
+ }
+ } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) {
+ String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length());
+ List updatedTasks = new ArrayList<>();
+ synchronized (lock) {
+ // Apply any outstanding deferred task updates for the given connector. Note that just because we
+ // encounter a commit message does not mean it will result in consistent output. In particular due to
+ // compaction, there may be cases where . For example if we have the following sequence of writes:
+ //
+ // 1. Write connector "foo"'s config
+ // 2. Write connector "foo", task 1's config <-- compacted
+ // 3. Write connector "foo", task 2's config
+ // 4. Write connector "foo" task commit message
+ // 5. Write connector "foo", task 1's config
+ // 6. Write connector "foo", task 2's config
+ // 7. Write connector "foo" task commit message
+ //
+ // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied
+ // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete
+ // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that
+ // only 5 was written, then there may be nothing that will finish writing the configs and get the
+ // log back into a consistent state.
+ //
+ // It is expected that the user of this class (i.e., the Herder) will take the necessary action to
+ // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is
+ // exposed in the snapshots provided via ClusterConfigState so they are easy to handle.
+ if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs
+ log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value());
+ return;
+ }
+
+ Map> deferred = deferredTaskUpdates.get(connectorName);
+
+ int newTaskCount = intValue(((Map) value.value()).get("tasks"));
+
+ // Validate the configs we're supposed to update to ensure we're getting a complete configuration
+ // update of all tasks that are expected based on the number of tasks in the commit message.
+ Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred);
+ Set taskIdSet = updatedConfigIdsByConnector.get(connectorName);
+ if (!completeTaskIdSet(taskIdSet, newTaskCount)) {
+ // Given the logic for writing commit messages, we should only hit this condition due to compacted
+ // historical data, in which case we would not have applied any updates yet and there will be no
+ // task config data already committed for the connector, so we shouldn't have to clear any data
+ // out. All we need to do is add the flag marking it inconsistent.
+ inconsistent.add(connectorName);
+ } else {
+ if (deferred != null) {
+ taskConfigs.putAll(deferred);
+ updatedTasks.addAll(taskConfigs.keySet());
+ }
+ inconsistent.remove(connectorName);
+ }
+ // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent
+ // update, then we need to see a completely fresh set of configs after this commit message, so we don't
+ // want any of these outdated configs
+ if (deferred != null)
+ deferred.clear();
+
+ connectorTaskCounts.put(connectorName, newTaskCount);
+ }
+
+ if (!starting)
+ tasksConfigCallback.onCompletion(null, updatedTasks);
+ } else {
+ log.error("Discarding config update record with invalid key: " + record.key());
+ }
+ }
+ };
+
+ private ConnectorTaskId parseTaskId(String key) {
+ String[] parts = key.split("-");
+ if (parts.length < 3) return null;
+
+ try {
+ int taskNum = Integer.parseInt(parts[parts.length - 1]);
+ String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-");
+ return new ConnectorTaskId(connectorName, taskNum);
+ } catch (NumberFormatException e) {
+ return null;
+ }
+ }
+
+ /**
+ * Given task configurations, get a set of integer task IDs organized by connector name.
+ */
+ private Map> taskIdsByConnector(Map> configs) {
+ Map> connectorTaskIds = new HashMap<>();
+ if (configs == null)
+ return connectorTaskIds;
+ for (Map.Entry> taskConfigEntry : configs.entrySet()) {
+ ConnectorTaskId taskId = taskConfigEntry.getKey();
+ if (!connectorTaskIds.containsKey(taskId.connector()))
+ connectorTaskIds.put(taskId.connector(), new TreeSet());
+ connectorTaskIds.get(taskId.connector()).add(taskId.task());
+ }
+ return connectorTaskIds;
+ }
+
+ private boolean completeTaskIdSet(Set idSet, int expectedSize) {
+ // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out
+ // with 2 tasks, then reduce to 1, we'll end up with log entries like:
+ //
+ // 1. Connector "foo" config
+ // 2. Connector "foo", task 1 config
+ // 3. Connector "foo", task 2 config
+ // 4. Connector "foo", commit 2 tasks
+ // 5. Connector "foo", task 1 config
+ // 6. Connector "foo", commit 1 tasks
+ //
+ // However, due to compaction we could end up with a log that looks like this:
+ //
+ // 1. Connector "foo" config
+ // 3. Connector "foo", task 2 config
+ // 5. Connector "foo", task 1 config
+ // 6. Connector "foo", commit 1 tasks
+ //
+ // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just
+ // validate that all the configs specified by the commit message are present. This should be fine because the
+ // logic for writing configs ensures all the task configs are written (and reads them back) before writing the
+ // commit message.
+
+ if (idSet.size() < expectedSize)
+ return false;
+
+ for (int i = 0; i < expectedSize; i++)
+ if (!idSet.contains(i))
+ return false;
+ return true;
+ }
+
+ // Convert an integer value extracted from a schemaless struct to an int. This handles potentially different
+ // encodings by different Converters.
+ private static int intValue(Object value) {
+ if (value instanceof Integer)
+ return (int) value;
+ else if (value instanceof Long)
+ return (int) (long) value;
+ else
+ throw new ConnectException("Expected integer value to be either Integer or Long");
+ }
+}
+