Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 273E418399 for ; Mon, 9 Nov 2015 06:11:42 +0000 (UTC) Received: (qmail 22751 invoked by uid 500); 9 Nov 2015 06:11:41 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 22615 invoked by uid 500); 9 Nov 2015 06:11:41 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 21858 invoked by uid 99); 9 Nov 2015 06:11:40 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 09 Nov 2015 06:11:40 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6DB9BE00B4; Mon, 9 Nov 2015 06:11:40 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gwenshap@apache.org To: commits@kafka.apache.org Date: Mon, 09 Nov 2015 06:11:57 -0000 Message-Id: <297cb3eb326d45d3a41e23eab5837228@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [18/26] kafka git commit: KAFKA-2774: Rename Copycat to Kafka Connect http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java new file mode 100644 index 0000000..c8e0f6f --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/CreateConnectorRequest.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Map; +import java.util.Objects; + +public class CreateConnectorRequest { + private final String name; + private final Map config; + + @JsonCreator + public CreateConnectorRequest(@JsonProperty("name") String name, @JsonProperty("config") Map config) { + this.name = name; + this.config = config; + } + + @JsonProperty + public String name() { + return name; + } + + @JsonProperty + public Map config() { + return config; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + CreateConnectorRequest that = (CreateConnectorRequest) o; + return Objects.equals(name, that.name) && + Objects.equals(config, that.config); + } + + @Override + public int hashCode() { + return Objects.hash(name, config); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java new file mode 100644 index 0000000..493b00d --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ErrorMessage.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; + +import java.util.Objects; + +/** + * Standard error format for all REST API failures. These are generated automatically by + * {@link ConnectExceptionMapper} in response to uncaught + * {@link ConnectException}s. + */ +public class ErrorMessage { + private final int errorCode; + private final String message; + + @JsonCreator + public ErrorMessage(@JsonProperty("error_code") int errorCode, @JsonProperty("message") String message) { + this.errorCode = errorCode; + this.message = message; + } + + @JsonProperty("error_code") + public int errorCode() { + return errorCode; + } + + @JsonProperty + public String message() { + return message; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ErrorMessage that = (ErrorMessage) o; + return Objects.equals(errorCode, that.errorCode) && + Objects.equals(message, that.message); + } + + @Override + public int hashCode() { + return Objects.hash(errorCode, message); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java new file mode 100644 index 0000000..1d5e8ba --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/ServerInfo.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.common.utils.AppInfoParser; + +public class ServerInfo { + private String version; + private String commit; + + public ServerInfo() { + version = AppInfoParser.getVersion(); + commit = AppInfoParser.getCommitId(); + } + + @JsonProperty + public String version() { + return version; + } + + @JsonProperty + public String commit() { + return commit; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java new file mode 100644 index 0000000..3d443a5 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/entities/TaskInfo.java @@ -0,0 +1,58 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.entities; + +import com.fasterxml.jackson.annotation.JsonProperty; +import org.apache.kafka.connect.util.ConnectorTaskId; + +import java.util.Map; +import java.util.Objects; + +public class TaskInfo { + private final ConnectorTaskId id; + private final Map config; + + public TaskInfo(ConnectorTaskId id, Map config) { + this.id = id; + this.config = config; + } + + @JsonProperty + public ConnectorTaskId id() { + return id; + } + + @JsonProperty + public Map config() { + return config; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + TaskInfo taskInfo = (TaskInfo) o; + return Objects.equals(id, taskInfo.id) && + Objects.equals(config, taskInfo.config); + } + + @Override + public int hashCode() { + return Objects.hash(id, config); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java new file mode 100644 index 0000000..67c38e7 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectExceptionMapper.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.errors; + +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.rest.entities.ErrorMessage; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.ws.rs.core.Response; +import javax.ws.rs.ext.ExceptionMapper; + +public class ConnectExceptionMapper implements ExceptionMapper { + private static final Logger log = LoggerFactory.getLogger(ConnectExceptionMapper.class); + + @Override + public Response toResponse(ConnectException exception) { + log.debug("Uncaught exception in REST call: ", exception); + + if (exception instanceof ConnectRestException) { + ConnectRestException restException = (ConnectRestException) exception; + return Response.status(restException.statusCode()) + .entity(new ErrorMessage(restException.errorCode(), restException.getMessage())) + .build(); + } + + if (exception instanceof NotFoundException) { + return Response.status(Response.Status.NOT_FOUND) + .entity(new ErrorMessage(Response.Status.NOT_FOUND.getStatusCode(), exception.getMessage())) + .build(); + } + + if (exception instanceof AlreadyExistsException) { + return Response.status(Response.Status.CONFLICT) + .entity(new ErrorMessage(Response.Status.CONFLICT.getStatusCode(), exception.getMessage())) + .build(); + } + + return Response.status(Response.Status.INTERNAL_SERVER_ERROR) + .entity(new ErrorMessage(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), exception.getMessage())) + .build(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java new file mode 100644 index 0000000..5dcbcf4 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/errors/ConnectRestException.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.errors; + +import org.apache.kafka.connect.errors.ConnectException; + +import javax.ws.rs.core.Response; + +public class ConnectRestException extends ConnectException { + private final int statusCode; + private final int errorCode; + + public ConnectRestException(int statusCode, int errorCode, String message, Throwable t) { + super(message, t); + this.statusCode = statusCode; + this.errorCode = errorCode; + } + + public ConnectRestException(Response.Status status, int errorCode, String message, Throwable t) { + this(status.getStatusCode(), errorCode, message, t); + } + + public ConnectRestException(int statusCode, int errorCode, String message) { + this(statusCode, errorCode, message, null); + } + + public ConnectRestException(Response.Status status, int errorCode, String message) { + this(status, errorCode, message, null); + } + + public ConnectRestException(int statusCode, String message, Throwable t) { + this(statusCode, statusCode, message, t); + } + + public ConnectRestException(Response.Status status, String message, Throwable t) { + this(status, status.getStatusCode(), message, t); + } + + public ConnectRestException(int statusCode, String message) { + this(statusCode, statusCode, message, null); + } + + public ConnectRestException(Response.Status status, String message) { + this(status.getStatusCode(), status.getStatusCode(), message, null); + } + + + public int statusCode() { + return statusCode; + } + + public int errorCode() { + return errorCode; + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java new file mode 100644 index 0000000..cea4360 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/ConnectorsResource.java @@ -0,0 +1,201 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.resources; + +import com.fasterxml.jackson.core.type.TypeReference; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.distributed.NotLeaderException; +import org.apache.kafka.connect.runtime.rest.RestServer; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.CreateConnectorRequest; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.runtime.rest.errors.ConnectRestException; +import org.apache.kafka.connect.util.FutureCallback; + +import javax.servlet.ServletContext; +import javax.ws.rs.Consumes; +import javax.ws.rs.DELETE; +import javax.ws.rs.GET; +import javax.ws.rs.POST; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import java.net.URI; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Path("/connectors") +@Produces(MediaType.APPLICATION_JSON) +@Consumes(MediaType.APPLICATION_JSON) +public class ConnectorsResource { + // TODO: This should not be so long. However, due to potentially long rebalances that may have to wait a full + // session timeout to complete, during which we cannot serve some requests. Ideally we could reduce this, but + // we need to consider all possible scenarios this could fail. It might be ok to fail with a timeout in rare cases, + // but currently a worker simply leaving the group can take this long as well. + private static final long REQUEST_TIMEOUT_MS = 90 * 1000; + + private final Herder herder; + @javax.ws.rs.core.Context + private ServletContext context; + + public ConnectorsResource(Herder herder) { + this.herder = herder; + } + + @GET + @Path("/") + public Collection listConnectors() throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.connectors(cb); + return completeOrForwardRequest(cb, "/connectors", "GET", null, new TypeReference>() { + }); + } + + @POST + @Path("/") + public Response createConnector(final CreateConnectorRequest createRequest) throws Throwable { + String name = createRequest.name(); + Map configs = createRequest.config(); + if (!configs.containsKey(ConnectorConfig.NAME_CONFIG)) + configs.put(ConnectorConfig.NAME_CONFIG, name); + + FutureCallback> cb = new FutureCallback<>(); + herder.putConnectorConfig(name, configs, false, cb); + Herder.Created info = completeOrForwardRequest(cb, "/connectors", "POST", createRequest, + new TypeReference() { }, new CreatedConnectorInfoTranslator()); + return Response.created(URI.create("/connectors/" + name)).entity(info.result()).build(); + } + + @GET + @Path("/{connector}") + public ConnectorInfo getConnector(final @PathParam("connector") String connector) throws Throwable { + FutureCallback cb = new FutureCallback<>(); + herder.connectorInfo(connector, cb); + return completeOrForwardRequest(cb, "/connectors/" + connector, "GET", null, new TypeReference() { + }); + } + + @GET + @Path("/{connector}/config") + public Map getConnectorConfig(final @PathParam("connector") String connector) throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.connectorConfig(connector, cb); + return completeOrForwardRequest(cb, "/connectors/" + connector + "/config", "GET", null, new TypeReference>() { + }); + } + + @PUT + @Path("/{connector}/config") + public Response putConnectorConfig(final @PathParam("connector") String connector, + final Map connectorConfig) throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.putConnectorConfig(connector, connectorConfig, true, cb); + Herder.Created createdInfo = completeOrForwardRequest(cb, "/connectors/" + connector + "/config", + "PUT", connectorConfig, new TypeReference() { }, new CreatedConnectorInfoTranslator()); + Response.ResponseBuilder response; + if (createdInfo.created()) + response = Response.created(URI.create("/connectors/" + connector)); + else + response = Response.ok(); + return response.entity(createdInfo.result()).build(); + } + + @GET + @Path("/{connector}/tasks") + public List getTaskConfigs(final @PathParam("connector") String connector) throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.taskConfigs(connector, cb); + return completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "GET", null, new TypeReference>() { + }); + } + + @POST + @Path("/{connector}/tasks") + public void putTaskConfigs(final @PathParam("connector") String connector, + final List> taskConfigs) throws Throwable { + FutureCallback cb = new FutureCallback<>(); + herder.putTaskConfigs(connector, taskConfigs, cb); + completeOrForwardRequest(cb, "/connectors/" + connector + "/tasks", "POST", taskConfigs); + } + + @DELETE + @Path("/{connector}") + public void destroyConnector(final @PathParam("connector") String connector) throws Throwable { + FutureCallback> cb = new FutureCallback<>(); + herder.putConnectorConfig(connector, null, true, cb); + completeOrForwardRequest(cb, "/connectors/" + connector, "DELETE", null); + } + + // Wait for a FutureCallback to complete. If it succeeds, return the parsed response. If it fails, try to forward the + // request to the leader. + private T completeOrForwardRequest( + FutureCallback cb, String path, String method, Object body, TypeReference resultType, + Translator translator) throws Throwable { + try { + return cb.get(REQUEST_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (ExecutionException e) { + if (e.getCause() instanceof NotLeaderException) { + NotLeaderException notLeaderError = (NotLeaderException) e.getCause(); + return translator.translate(RestServer.httpRequest(RestServer.urlJoin(notLeaderError.leaderUrl(), path), method, body, resultType)); + } + + throw e.getCause(); + } catch (TimeoutException e) { + // This timeout is for the operation itself. None of the timeout error codes are relevant, so internal server + // error is the best option + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request timed out"); + } catch (InterruptedException e) { + throw new ConnectRestException(Response.Status.INTERNAL_SERVER_ERROR.getStatusCode(), "Request interrupted"); + } + } + + private T completeOrForwardRequest(FutureCallback cb, String path, String method, Object body, TypeReference resultType) throws Throwable { + return completeOrForwardRequest(cb, path, method, body, resultType, new IdentityTranslator()); + } + + private T completeOrForwardRequest(FutureCallback cb, String path, String method, Object body) throws Throwable { + return completeOrForwardRequest(cb, path, method, body, null, new IdentityTranslator()); + } + + private interface Translator { + T translate(RestServer.HttpResponse response); + } + + private class IdentityTranslator implements Translator { + @Override + public T translate(RestServer.HttpResponse response) { + return response.body(); + } + } + + private class CreatedConnectorInfoTranslator implements Translator, ConnectorInfo> { + @Override + public Herder.Created translate(RestServer.HttpResponse response) { + boolean created = response.status() == 201; + return new Herder.Created<>(created, response.body()); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java new file mode 100644 index 0000000..3364ffd --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/resources/RootResource.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.rest.resources; + +import org.apache.kafka.connect.runtime.rest.entities.ServerInfo; + +import javax.ws.rs.GET; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.MediaType; + +@Path("/") +@Produces(MediaType.APPLICATION_JSON) +public class RootResource { + + @GET + @Path("/") + public ServerInfo serverInfo() { + return new ServerInfo(); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java new file mode 100644 index 0000000..7cefe22 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneConfig.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.standalone; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.runtime.WorkerConfig; + +import java.util.Map; + +public class StandaloneConfig extends WorkerConfig { + private static final ConfigDef CONFIG; + + static { + CONFIG = baseConfigDef(); + } + + public StandaloneConfig(Map props) { + super(CONFIG, props); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java new file mode 100644 index 0000000..89847ab --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/standalone/StandaloneHerder.java @@ -0,0 +1,272 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.runtime.standalone; + +import org.apache.kafka.connect.errors.AlreadyExistsException; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.NotFoundException; +import org.apache.kafka.connect.runtime.ConnectorConfig; +import org.apache.kafka.connect.runtime.Herder; +import org.apache.kafka.connect.runtime.HerderConnectorContext; +import org.apache.kafka.connect.runtime.TaskConfig; +import org.apache.kafka.connect.runtime.Worker; +import org.apache.kafka.connect.runtime.rest.entities.ConnectorInfo; +import org.apache.kafka.connect.runtime.rest.entities.TaskInfo; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + + +/** + * Single process, in-memory "herder". Useful for a standalone Kafka Connect process. + */ +public class StandaloneHerder implements Herder { + private static final Logger log = LoggerFactory.getLogger(StandaloneHerder.class); + + private final Worker worker; + private HashMap connectors = new HashMap<>(); + + public StandaloneHerder(Worker worker) { + this.worker = worker; + } + + public synchronized void start() { + log.info("Herder starting"); + log.info("Herder started"); + } + + public synchronized void stop() { + log.info("Herder stopping"); + + // There's no coordination/hand-off to do here since this is all standalone. Instead, we + // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all + // the tasks. + for (String connName : new HashSet<>(connectors.keySet())) { + removeConnectorTasks(connName); + try { + worker.stopConnector(connName); + } catch (ConnectException e) { + log.error("Error shutting down connector {}: ", connName, e); + } + } + connectors.clear(); + + log.info("Herder stopped"); + } + + @Override + public synchronized void connectors(Callback> callback) { + callback.onCompletion(null, new ArrayList<>(connectors.keySet())); + } + + @Override + public synchronized void connectorInfo(String connName, Callback callback) { + ConnectorState state = connectors.get(connName); + if (state == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found"), null); + return; + } + callback.onCompletion(null, createConnectorInfo(state)); + } + + private ConnectorInfo createConnectorInfo(ConnectorState state) { + if (state == null) + return null; + + List taskIds = new ArrayList<>(); + for (int i = 0; i < state.taskConfigs.size(); i++) + taskIds.add(new ConnectorTaskId(state.name, i)); + return new ConnectorInfo(state.name, state.configOriginals, taskIds); + } + + @Override + public void connectorConfig(String connName, final Callback> callback) { + // Subset of connectorInfo, so piggy back on that implementation + connectorInfo(connName, new Callback() { + @Override + public void onCompletion(Throwable error, ConnectorInfo result) { + if (error != null) { + callback.onCompletion(error, null); + return; + } + callback.onCompletion(null, result.config()); + } + }); + } + + @Override + public synchronized void putConnectorConfig(String connName, final Map config, + boolean allowReplace, + final Callback> callback) { + try { + boolean created = false; + if (connectors.containsKey(connName)) { + if (!allowReplace) { + callback.onCompletion(new AlreadyExistsException("Connector " + connName + " already exists"), null); + return; + } + if (config == null) // Deletion, kill tasks as well + removeConnectorTasks(connName); + worker.stopConnector(connName); + if (config == null) + connectors.remove(connName); + } else { + if (config == null) { + // Deletion, must already exist + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + return; + } + created = true; + } + if (config != null) { + startConnector(config); + updateConnectorTasks(connName); + } + if (config != null) + callback.onCompletion(null, new Created<>(created, createConnectorInfo(connectors.get(connName)))); + else + callback.onCompletion(null, new Created(false, null)); + } catch (ConnectException e) { + callback.onCompletion(e, null); + } + + } + + @Override + public synchronized void requestTaskReconfiguration(String connName) { + if (!worker.connectorNames().contains(connName)) { + log.error("Task that requested reconfiguration does not exist: {}", connName); + return; + } + updateConnectorTasks(connName); + } + + @Override + public synchronized void taskConfigs(String connName, Callback> callback) { + ConnectorState state = connectors.get(connName); + if (state == null) { + callback.onCompletion(new NotFoundException("Connector " + connName + " not found", null), null); + return; + } + + List result = new ArrayList<>(); + for (int i = 0; i < state.taskConfigs.size(); i++) { + TaskInfo info = new TaskInfo(new ConnectorTaskId(connName, i), state.taskConfigs.get(i)); + result.add(info); + } + callback.onCompletion(null, result); + } + + @Override + public void putTaskConfigs(String connName, List> configs, Callback callback) { + throw new UnsupportedOperationException("Kafka Connect in standalone mode does not support externally setting task configurations."); + } + + /** + * Start a connector in the worker and record its state. + * @param connectorProps new connector configuration + * @return the connector name + */ + private String startConnector(Map connectorProps) { + ConnectorConfig connConfig = new ConnectorConfig(connectorProps); + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + ConnectorState state = connectors.get(connName); + worker.addConnector(connConfig, new HerderConnectorContext(this, connName)); + if (state == null) { + connectors.put(connName, new ConnectorState(connectorProps, connConfig)); + } else { + state.configOriginals = connectorProps; + state.config = connConfig; + } + return connName; + } + + + private List> recomputeTaskConfigs(String connName) { + ConnectorState state = connectors.get(connName); + return worker.connectorTaskConfigs(connName, + state.config.getInt(ConnectorConfig.TASKS_MAX_CONFIG), + state.config.getList(ConnectorConfig.TOPICS_CONFIG)); + } + + private void createConnectorTasks(String connName) { + ConnectorState state = connectors.get(connName); + int index = 0; + for (Map taskConfigMap : state.taskConfigs) { + ConnectorTaskId taskId = new ConnectorTaskId(connName, index); + TaskConfig config = new TaskConfig(taskConfigMap); + try { + worker.addTask(taskId, config); + } catch (Throwable e) { + log.error("Failed to add task {}: ", taskId, e); + // Swallow this so we can continue updating the rest of the tasks + // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task + // that died after starting successfully. + } + index++; + } + } + + private void removeConnectorTasks(String connName) { + ConnectorState state = connectors.get(connName); + for (int i = 0; i < state.taskConfigs.size(); i++) { + ConnectorTaskId taskId = new ConnectorTaskId(connName, i); + try { + worker.stopTask(taskId); + } catch (ConnectException e) { + log.error("Failed to stop task {}: ", taskId, e); + // Swallow this so we can continue stopping the rest of the tasks + // FIXME: Forcibly kill the task? + } + } + state.taskConfigs = new ArrayList<>(); + } + + private void updateConnectorTasks(String connName) { + List> newTaskConfigs = recomputeTaskConfigs(connName); + ConnectorState state = connectors.get(connName); + if (!newTaskConfigs.equals(state.taskConfigs)) { + removeConnectorTasks(connName); + state.taskConfigs = newTaskConfigs; + createConnectorTasks(connName); + } + } + + + private static class ConnectorState { + public String name; + public Map configOriginals; + public ConnectorConfig config; + List> taskConfigs; + + public ConnectorState(Map configOriginals, ConnectorConfig config) { + this.name = config.getString(ConnectorConfig.NAME_CONFIG); + this.configOriginals = configOriginals; + this.config = config; + this.taskConfigs = new ArrayList<>(); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java new file mode 100644 index 0000000..1d1f8ef --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/FileOffsetBackingStore.java @@ -0,0 +1,102 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.connect.errors.ConnectException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.*; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; + +/** + * Implementation of OffsetBackingStore that saves data locally to a file. To ensure this behaves + * similarly to a real backing store, operations are executed asynchronously on a background thread. + */ +public class FileOffsetBackingStore extends MemoryOffsetBackingStore { + private static final Logger log = LoggerFactory.getLogger(FileOffsetBackingStore.class); + + public final static String OFFSET_STORAGE_FILE_FILENAME_CONFIG = "offset.storage.file.filename"; + private File file; + + public FileOffsetBackingStore() { + + } + + @Override + public void configure(Map props) { + super.configure(props); + String filename = (String) props.get(OFFSET_STORAGE_FILE_FILENAME_CONFIG); + file = new File(filename); + } + + @Override + public synchronized void start() { + super.start(); + log.info("Starting FileOffsetBackingStore with file {}", file); + load(); + } + + @Override + public synchronized void stop() { + super.stop(); + // Nothing to do since this doesn't maintain any outstanding connections/data + log.info("Stopped FileOffsetBackingStore"); + } + + @SuppressWarnings("unchecked") + private void load() { + try { + ObjectInputStream is = new ObjectInputStream(new FileInputStream(file)); + Object obj = is.readObject(); + if (!(obj instanceof HashMap)) + throw new ConnectException("Expected HashMap but found " + obj.getClass()); + Map raw = (Map) obj; + data = new HashMap<>(); + for (Map.Entry mapEntry : raw.entrySet()) { + ByteBuffer key = (mapEntry.getKey() != null) ? ByteBuffer.wrap(mapEntry.getKey()) : null; + ByteBuffer value = (mapEntry.getValue() != null) ? ByteBuffer.wrap(mapEntry.getValue()) : null; + data.put(key, value); + } + is.close(); + } catch (FileNotFoundException | EOFException e) { + // FileNotFoundException: Ignore, may be new. + // EOFException: Ignore, this means the file was missing or corrupt + } catch (IOException | ClassNotFoundException e) { + throw new ConnectException(e); + } + } + + protected void save() { + try { + ObjectOutputStream os = new ObjectOutputStream(new FileOutputStream(file)); + Map raw = new HashMap<>(); + for (Map.Entry mapEntry : data.entrySet()) { + byte[] key = (mapEntry.getKey() != null) ? mapEntry.getKey().array() : null; + byte[] value = (mapEntry.getValue() != null) ? mapEntry.getValue().array() : null; + raw.put(key, value); + } + os.writeObject(raw); + os.close(); + } catch (IOException e) { + throw new ConnectException(e); + } + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/417e283d/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java new file mode 100644 index 0000000..4b60131 --- /dev/null +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/storage/KafkaConfigStorage.java @@ -0,0 +1,578 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.errors.ConnectException; +import org.apache.kafka.connect.errors.DataException; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + *

+ * Provides persistent storage of Kafka Connect connector configurations in a Kafka topic. + *

+ *

+ * This class manages both connector and task configurations. It tracks three types of configuration entries: + *

+ * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for + * expanding this format if necessary. (Kafka key: connector-[connector-id]). + * These configs are *not* ephemeral. They represent the source of truth. If the entire Connect + * cluster goes down, this is all that is really needed to recover. + * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding + * this format if necessary. (Kafka key: task-[connector-id]-[task-id]). + * These configs are ephemeral; they are stored here to a) disseminate them to all workers while + * ensuring agreement and b) to allow faster cluster/worker recovery since the common case + * of recovery (restoring a connector) will simply result in the same configuration as before + * the failure. + * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task + * configs for a connector can be applied. (Kafka key: commit-[connector-id]. + * This config has two effects. First, it records the number of tasks the connector is currently + * running (and can therefore increase/decrease parallelism). Second, because each task config + * is stored separately but they need to be applied together to ensure each partition is assigned + * to a single task, this record also indicates that task configs for the specified connector + * can be "applied" or "committed". + *

+ *

+ * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition + * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows + * us to clean up outdated configurations over time. However, this combination has some important implications for + * the implementation of this class and the configuration state that it may expose. + *

+ *

+ * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record + * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any + * others, and they do not need to coordinate with the connector's task configuration at all. + *

+ *

+ * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not + * currently have multi-record transactions or support atomic batch record writes, task commit messages are required + * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs + * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs + * were applied immediately you could be using half the old configs and half the new configs. In that condition, some + * partitions may be double-assigned because the old config and new config may use completely different assignments. + * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them + * once a commit message has been read. + *

+ *

+ * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was + * always available, but we would like to be able to enable compaction so our configuration topic does not grow + * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading + * from the beginning of the log in order to build up the full current configuration will see task commits, but some + * records required for those commits will have been removed because the same keys have subsequently been rewritten. + * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config, + * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing + * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read + * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up. + *

+ *

+ * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario + * as the previous one, but in this case both the first and second update will write 2 task configs. However, the + * second write fails half of the way through: + * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction + * occurs and we're left with + * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't + * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent + * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never + * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second + * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task + * configs for connector "foo". + *

+ *

+ * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system + * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated + * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data. + * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These + * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle + * of updating task configurations). + *

+ *

+ * Note that the expectation is that this config storage system has only a single writer at a time. + * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change + * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker). + *

+ *

+ * Since processing of the config log occurs in a background thread, callers must take care when using accessors. + * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster. + * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are + * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require + * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the + * rebalance must be deferred. + *

+ */ +public class KafkaConfigStorage { + private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class); + + public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic"; + + public static final String CONNECTOR_PREFIX = "connector-"; + + public static String CONNECTOR_KEY(String connectorName) { + return CONNECTOR_PREFIX + connectorName; + } + + public static final String TASK_PREFIX = "task-"; + + public static String TASK_KEY(ConnectorTaskId taskId) { + return TASK_PREFIX + taskId.connector() + "-" + taskId.task(); + } + + public static final String COMMIT_TASKS_PREFIX = "commit-"; + + public static String COMMIT_TASKS_KEY(String connectorName) { + return COMMIT_TASKS_PREFIX + connectorName; + } + + // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys, + // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or + // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely + // the same. + public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct() + .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)) + .build(); + public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0; + public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct() + .field("tasks", Schema.INT32_SCHEMA) + .build(); + + private static final long READ_TO_END_TIMEOUT_MS = 30000; + + private final Object lock; + private boolean starting; + private final Converter converter; + private final Callback connectorConfigCallback; + private final Callback> tasksConfigCallback; + private String topic; + // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Connect + // format to serialized form + private KafkaBasedLog configLog; + // Connector -> # of tasks + private Map connectorTaskCounts = new HashMap<>(); + // Connector and task configs: name or id -> config map + private Map> connectorConfigs = new HashMap<>(); + private Map> taskConfigs = new HashMap<>(); + // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data + // is in an inconsistent state and we cannot safely use them until they have been refreshed. + private Set inconsistent = new HashSet<>(); + // The most recently read offset. This does not take into account deferred task updates/commits, so we may have + // outstanding data to be applied. + private long offset; + + // Connector -> Map[ConnectorTaskId -> Configs] + private Map>> deferredTaskUpdates = new HashMap<>(); + + + public KafkaConfigStorage(Converter converter, Callback connectorConfigCallback, Callback> tasksConfigCallback) { + this.lock = new Object(); + this.starting = false; + this.converter = converter; + this.connectorConfigCallback = connectorConfigCallback; + this.tasksConfigCallback = tasksConfigCallback; + + offset = -1; + } + + public void configure(Map configs) { + if (configs.get(CONFIG_TOPIC_CONFIG) == null) + throw new ConnectException("Must specify topic for connector configuration."); + topic = (String) configs.get(CONFIG_TOPIC_CONFIG); + + Map producerProps = new HashMap<>(); + producerProps.putAll(configs); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + + Map consumerProps = new HashMap<>(); + consumerProps.putAll(configs); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + consumerProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); + + configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback); + } + + public void start() { + log.info("Starting KafkaConfigStorage"); + // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that + // updates can continue to occur in the background + starting = true; + configLog.start(); + starting = false; + log.info("Started KafkaConfigStorage"); + } + + public void stop() { + log.info("Closing KafkaConfigStorage"); + configLog.stop(); + log.info("Closed KafkaConfigStorage"); + } + + /** + * Get a snapshot of the current state of the cluster. + */ + public ClusterConfigState snapshot() { + synchronized (lock) { + // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be + // immutable configs + return new ClusterConfigState( + offset, + new HashMap<>(connectorTaskCounts), + new HashMap<>(connectorConfigs), + new HashMap<>(taskConfigs), + new HashSet<>(inconsistent) + ); + } + } + + /** + * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by + * tailing the Kafka log with a consumer. + * + * @param connector name of the connector to write data for + * @param properties the configuration to write + */ + public void putConnectorConfig(String connector, Map properties) { + byte[] serializedConfig; + if (properties == null) { + serializedConfig = null; + } else { + Struct connectConfig = new Struct(CONNECTOR_CONFIGURATION_V0); + connectConfig.put("properties", properties); + serializedConfig = converter.fromConnectData(topic, CONNECTOR_CONFIGURATION_V0, connectConfig); + } + + try { + configLog.send(CONNECTOR_KEY(connector), serializedConfig); + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write connector configuration to Kafka: ", e); + throw new ConnectException("Error writing connector configuration to Kafka", e); + } + } + + /** + * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates + * that we would be leaving one of the referenced connectors with an inconsistent state. + * + * @param configs map containing task configurations + * @throws ConnectException if the task configurations do not resolve inconsistencies found in the existing root + * and task configurations. + */ + public void putTaskConfigs(Map> configs) { + // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have + // any outstanding lagging data to consume. + try { + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write root configuration to Kafka: ", e); + throw new ConnectException("Error writing root configuration to Kafka", e); + } + + // In theory, there is only a single writer and we shouldn't need this lock since the background thread should + // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to + // the root config being updated. + Map newTaskCounts = new HashMap<>(); + synchronized (lock) { + // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks + // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here + Map> updatedConfigIdsByConnector = taskIdsByConnector(configs); + for (Map.Entry> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) { + if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) { + log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); + throw new ConnectException("Error writing task configurations: found some connectors with invalid connectors"); + } + newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size()); + } + } + + // Start sending all the individual updates + for (Map.Entry> taskConfigEntry : configs.entrySet()) { + Struct connectConfig = new Struct(TASK_CONFIGURATION_V0); + connectConfig.put("properties", taskConfigEntry.getValue()); + byte[] serializedConfig = converter.fromConnectData(topic, TASK_CONFIGURATION_V0, connectConfig); + configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig); + } + + // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to + // the end of the log + try { + // Read to end to ensure all the task configs have been written + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // Write all the commit messages + for (Map.Entry taskCountEntry : newTaskCounts.entrySet()) { + Struct connectConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); + connectConfig.put("tasks", taskCountEntry.getValue()); + byte[] serializedConfig = converter.fromConnectData(topic, CONNECTOR_TASKS_COMMIT_V0, connectConfig); + configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig); + } + + // Read to end to ensure all the commit messages have been written + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write root configuration to Kafka: ", e); + throw new ConnectException("Error writing root configuration to Kafka", e); + } + } + + public Future readToEnd() { + return configLog.readToEnd(); + } + + public void readToEnd(Callback cb) { + configLog.readToEnd(cb); + } + + private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, + Map consumerProps, Callback> consumedCallback) { + return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); + } + + private final Callback> consumedCallback = new Callback>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord record) { + if (error != null) { + log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error); + return; + } + + final SchemaAndValue value; + try { + value = converter.toConnectData(topic, record.value()); + } catch (DataException e) { + log.error("Failed to convert config data to Kafka Connect format: ", e); + return; + } + // Make the recorded offset match the API used for positions in the consumer -- return the offset of the + // *next record*, not the last one consumed. + offset = record.offset() + 1; + + if (record.key().startsWith(CONNECTOR_PREFIX)) { + String connectorName = record.key().substring(CONNECTOR_PREFIX.length()); + synchronized (lock) { + if (value.value() == null) { + // Connector deletion will be written as a null value + connectorConfigs.remove(connectorName); + } else { + // Connector configs can be applied and callbacks invoked immediately + if (!(value.value() instanceof Map)) { + log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass()); + return; + } + Object newConnectorConfig = ((Map) value.value()).get("properties"); + if (!(newConnectorConfig instanceof Map)) { + log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass()); + return; + } + connectorConfigs.put(connectorName, (Map) newConnectorConfig); + } + } + if (!starting) + connectorConfigCallback.onCompletion(null, connectorName); + } else if (record.key().startsWith(TASK_PREFIX)) { + synchronized (lock) { + ConnectorTaskId taskId = parseTaskId(record.key()); + if (taskId == null) { + log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key"); + return; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring task configuration because it is in the wrong format: " + value.value()); + return; + } + + Object newTaskConfig = ((Map) value.value()).get("properties"); + if (!(newTaskConfig instanceof Map)) { + log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass()); + return; + } + + Map> deferred = deferredTaskUpdates.get(taskId.connector()); + if (deferred == null) { + deferred = new HashMap<>(); + deferredTaskUpdates.put(taskId.connector(), deferred); + } + deferred.put(taskId, (Map) newTaskConfig); + } + } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { + String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length()); + List updatedTasks = new ArrayList<>(); + synchronized (lock) { + // Apply any outstanding deferred task updates for the given connector. Note that just because we + // encounter a commit message does not mean it will result in consistent output. In particular due to + // compaction, there may be cases where . For example if we have the following sequence of writes: + // + // 1. Write connector "foo"'s config + // 2. Write connector "foo", task 1's config <-- compacted + // 3. Write connector "foo", task 2's config + // 4. Write connector "foo" task commit message + // 5. Write connector "foo", task 1's config + // 6. Write connector "foo", task 2's config + // 7. Write connector "foo" task commit message + // + // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied + // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete + // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that + // only 5 was written, then there may be nothing that will finish writing the configs and get the + // log back into a consistent state. + // + // It is expected that the user of this class (i.e., the Herder) will take the necessary action to + // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is + // exposed in the snapshots provided via ClusterConfigState so they are easy to handle. + if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs + log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value()); + return; + } + + Map> deferred = deferredTaskUpdates.get(connectorName); + + int newTaskCount = intValue(((Map) value.value()).get("tasks")); + + // Validate the configs we're supposed to update to ensure we're getting a complete configuration + // update of all tasks that are expected based on the number of tasks in the commit message. + Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred); + Set taskIdSet = updatedConfigIdsByConnector.get(connectorName); + if (!completeTaskIdSet(taskIdSet, newTaskCount)) { + // Given the logic for writing commit messages, we should only hit this condition due to compacted + // historical data, in which case we would not have applied any updates yet and there will be no + // task config data already committed for the connector, so we shouldn't have to clear any data + // out. All we need to do is add the flag marking it inconsistent. + inconsistent.add(connectorName); + } else { + if (deferred != null) { + taskConfigs.putAll(deferred); + updatedTasks.addAll(taskConfigs.keySet()); + } + inconsistent.remove(connectorName); + } + // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent + // update, then we need to see a completely fresh set of configs after this commit message, so we don't + // want any of these outdated configs + if (deferred != null) + deferred.clear(); + + connectorTaskCounts.put(connectorName, newTaskCount); + } + + if (!starting) + tasksConfigCallback.onCompletion(null, updatedTasks); + } else { + log.error("Discarding config update record with invalid key: " + record.key()); + } + } + }; + + private ConnectorTaskId parseTaskId(String key) { + String[] parts = key.split("-"); + if (parts.length < 3) return null; + + try { + int taskNum = Integer.parseInt(parts[parts.length - 1]); + String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-"); + return new ConnectorTaskId(connectorName, taskNum); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Given task configurations, get a set of integer task IDs organized by connector name. + */ + private Map> taskIdsByConnector(Map> configs) { + Map> connectorTaskIds = new HashMap<>(); + if (configs == null) + return connectorTaskIds; + for (Map.Entry> taskConfigEntry : configs.entrySet()) { + ConnectorTaskId taskId = taskConfigEntry.getKey(); + if (!connectorTaskIds.containsKey(taskId.connector())) + connectorTaskIds.put(taskId.connector(), new TreeSet()); + connectorTaskIds.get(taskId.connector()).add(taskId.task()); + } + return connectorTaskIds; + } + + private boolean completeTaskIdSet(Set idSet, int expectedSize) { + // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out + // with 2 tasks, then reduce to 1, we'll end up with log entries like: + // + // 1. Connector "foo" config + // 2. Connector "foo", task 1 config + // 3. Connector "foo", task 2 config + // 4. Connector "foo", commit 2 tasks + // 5. Connector "foo", task 1 config + // 6. Connector "foo", commit 1 tasks + // + // However, due to compaction we could end up with a log that looks like this: + // + // 1. Connector "foo" config + // 3. Connector "foo", task 2 config + // 5. Connector "foo", task 1 config + // 6. Connector "foo", commit 1 tasks + // + // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just + // validate that all the configs specified by the commit message are present. This should be fine because the + // logic for writing configs ensures all the task configs are written (and reads them back) before writing the + // commit message. + + if (idSet.size() < expectedSize) + return false; + + for (int i = 0; i < expectedSize; i++) + if (!idSet.contains(i)) + return false; + return true; + } + + // Convert an integer value extracted from a schemaless struct to an int. This handles potentially different + // encodings by different Converters. + private static int intValue(Object value) { + if (value instanceof Integer) + return (int) value; + else if (value instanceof Long) + return (int) (long) value; + else + throw new ConnectException("Expected integer value to be either Integer or Long"); + } +} +