Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4F2FD1831C for ; Tue, 13 Oct 2015 17:30:23 +0000 (UTC) Received: (qmail 15878 invoked by uid 500); 13 Oct 2015 17:23:43 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 15738 invoked by uid 500); 13 Oct 2015 17:23:43 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 15719 invoked by uid 99); 13 Oct 2015 17:23:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Oct 2015 17:23:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id C88B7E0944; Tue, 13 Oct 2015 17:23:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: gwenshap@apache.org To: commits@kafka.apache.org Date: Tue, 13 Oct 2015 17:23:44 -0000 Message-Id: <47eda8644d6047889399e03a47cda74b@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/3] kafka git commit: KAFKA-2372: Add Kafka-backed storage of Copycat configs. KAFKA-2372: Add Kafka-backed storage of Copycat configs. This also adds some other needed infrastructure for distributed Copycat, most importantly the DistributedHerder, and refactors some code for handling Kafka-backed logs into KafkaBasedLog since this is shared betweeen offset and config storage. Author: Ewen Cheslack-Postava Reviewers: Gwen Shapira, James Cheng Closes #241 from ewencp/kafka-2372-copycat-distributed-config Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/36d44693 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/36d44693 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/36d44693 Branch: refs/heads/trunk Commit: 36d4469326fe20c3f0657315321e6ad515530a3e Parents: e2ec02e Author: Ewen Cheslack-Postava Authored: Tue Oct 13 10:23:21 2015 -0700 Committer: Gwen Shapira Committed: Tue Oct 13 10:23:21 2015 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/utils/Utils.java | 13 + config/copycat-distributed.properties | 13 +- config/copycat-standalone.properties | 12 +- .../kafka/copycat/cli/CopycatDistributed.java | 8 +- .../kafka/copycat/cli/CopycatStandalone.java | 2 +- .../apache/kafka/copycat/cli/WorkerConfig.java | 20 +- .../kafka/copycat/runtime/ConnectorConfig.java | 7 +- .../apache/kafka/copycat/runtime/Herder.java | 19 +- .../copycat/runtime/HerderConnectorContext.java | 42 ++ .../apache/kafka/copycat/runtime/Worker.java | 23 +- .../runtime/distributed/ClusterConfigState.java | 122 +++++ .../runtime/distributed/DistributedHerder.java | 320 +++++++++++ .../standalone/StandaloneConnectorContext.java | 42 -- .../runtime/standalone/StandaloneHerder.java | 34 +- .../copycat/storage/KafkaConfigStorage.java | 546 +++++++++++++++++++ .../storage/KafkaOffsetBackingStore.java | 258 ++------- .../storage/OffsetStorageReaderImpl.java | 2 +- .../kafka/copycat/storage/OffsetUtils.java | 3 + .../kafka/copycat/util/KafkaBasedLog.java | 331 +++++++++++ .../copycat/runtime/WorkerSinkTaskTest.java | 8 +- .../copycat/runtime/WorkerSourceTaskTest.java | 8 +- .../kafka/copycat/runtime/WorkerTest.java | 8 +- .../distributed/DistributedHerderTest.java | 289 ++++++++++ .../standalone/StandaloneHerderTest.java | 62 ++- .../copycat/storage/KafkaConfigStorageTest.java | 508 +++++++++++++++++ .../storage/KafkaOffsetBackingStoreTest.java | 399 +++++--------- .../kafka/copycat/util/KafkaBasedLogTest.java | 463 ++++++++++++++++ .../apache/kafka/copycat/util/TestFuture.java | 80 ++- tests/kafkatest/services/copycat.py | 3 +- .../kafkatest/tests/copycat_distributed_test.py | 14 +- .../templates/copycat-distributed.properties | 9 +- .../templates/copycat-standalone.properties | 8 +- 32 files changed, 3043 insertions(+), 633 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/clients/src/main/java/org/apache/kafka/common/utils/Utils.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java index fa7c92f..aee379a 100755 --- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java +++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java @@ -26,9 +26,11 @@ import java.nio.MappedByteBuffer; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; +import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Properties; @@ -420,6 +422,17 @@ public class Utils { } /** + * Converts a Properties object to a Map, calling {@link #toString} to ensure all keys and values + * are Strings. + */ + public static Map propsToStringMap(Properties props) { + Map result = new HashMap<>(); + for (Map.Entry entry : props.entrySet()) + result.put(entry.getKey().toString(), entry.getValue().toString()); + return result; + } + + /** * Get the stack trace from an exception as a string */ public static String stackTrace(Throwable e) { http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-distributed.properties ---------------------------------------------------------------------- diff --git a/config/copycat-distributed.properties b/config/copycat-distributed.properties index 654ed24..b122413 100644 --- a/config/copycat-distributed.properties +++ b/config/copycat-distributed.properties @@ -27,13 +27,14 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true -# The offset converter is configurable and must be specified, but most users will always want to use the built-in default. -# Offset data is never visible outside of Copcyat. -offset.key.converter=org.apache.kafka.copycat.json.JsonConverter -offset.value.converter=org.apache.kafka.copycat.json.JsonConverter -offset.key.converter.schemas.enable=false -offset.value.converter.schemas.enable=false +# The internal converter used for offsets and config data is configurable and must be specified, but most users will +# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. +internal.key.converter=org.apache.kafka.copycat.json.JsonConverter +internal.value.converter=org.apache.kafka.copycat.json.JsonConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false offset.storage.topic=copycat-offsets # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 +config.storage.topic=copycat-configs \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/config/copycat-standalone.properties ---------------------------------------------------------------------- diff --git a/config/copycat-standalone.properties b/config/copycat-standalone.properties index fd264b5..ebf689f 100644 --- a/config/copycat-standalone.properties +++ b/config/copycat-standalone.properties @@ -25,12 +25,12 @@ value.converter=org.apache.kafka.copycat.json.JsonConverter key.converter.schemas.enable=true value.converter.schemas.enable=true -# The offset converter is configurable and must be specified, but most users will always want to use the built-in default. -# Offset data is never visible outside of Copcyat. -offset.key.converter=org.apache.kafka.copycat.json.JsonConverter -offset.value.converter=org.apache.kafka.copycat.json.JsonConverter -offset.key.converter.schemas.enable=false -offset.value.converter.schemas.enable=false +# The internal converter used for offsets and config data is configurable and must be specified, but most users will +# always want to use the built-in default. Offset and config data is never visible outside of Copcyat in this format. +internal.key.converter=org.apache.kafka.copycat.json.JsonConverter +internal.value.converter=org.apache.kafka.copycat.json.JsonConverter +internal.key.converter.schemas.enable=false +internal.value.converter.schemas.enable=false offset.storage.file.filename=/tmp/copycat.offsets # Flush much faster than normal, which is useful for testing/debugging http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java index b5e8896..b0230b2 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatDistributed.java @@ -20,9 +20,8 @@ package org.apache.kafka.copycat.cli; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.utils.Utils; import org.apache.kafka.copycat.runtime.Copycat; -import org.apache.kafka.copycat.runtime.Herder; import org.apache.kafka.copycat.runtime.Worker; -import org.apache.kafka.copycat.runtime.standalone.StandaloneHerder; +import org.apache.kafka.copycat.runtime.distributed.DistributedHerder; import org.apache.kafka.copycat.storage.KafkaOffsetBackingStore; import org.apache.kafka.copycat.util.Callback; import org.apache.kafka.copycat.util.FutureCallback; @@ -59,7 +58,8 @@ public class CopycatDistributed { WorkerConfig workerConfig = new WorkerConfig(workerProps); Worker worker = new Worker(workerConfig, new KafkaOffsetBackingStore()); - Herder herder = new StandaloneHerder(worker); + DistributedHerder herder = new DistributedHerder(worker); + herder.configure(workerConfig.originals()); final Copycat copycat = new Copycat(worker, herder); copycat.start(); @@ -73,7 +73,7 @@ public class CopycatDistributed { log.error("Failed to create job for {}", connectorPropsFile); } }); - herder.addConnector(connectorProps, cb); + herder.addConnector(Utils.propsToStringMap(connectorProps), cb); cb.get(); } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java index 12ec154..65a15e4 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/CopycatStandalone.java @@ -75,7 +75,7 @@ public class CopycatStandalone { log.error("Failed to create job for {}", connectorPropsFile); } }); - herder.addConnector(connectorProps, cb); + herder.addConnector(Utils.propsToStringMap(connectorProps), cb); cb.get(); } } catch (Throwable t) { http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java index a976d90..2a3f539 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/cli/WorkerConfig.java @@ -57,13 +57,13 @@ public class WorkerConfig extends AbstractConfig { public static final String VALUE_CONVERTER_CLASS_DOC = "Converter class for value Copycat data that implements the Converter interface."; - public static final String OFFSET_KEY_CONVERTER_CLASS_CONFIG = "offset.key.converter"; - public static final String OFFSET_KEY_CONVERTER_CLASS_DOC = - "Converter class for offset key Copycat data that implements the Converter interface."; + public static final String INTERNAL_KEY_CONVERTER_CLASS_CONFIG = "internal.key.converter"; + public static final String INTERNAL_KEY_CONVERTER_CLASS_DOC = + "Converter class for internal key Copycat data that implements the Converter interface. Used for converting data like offsets and configs."; - public static final String OFFSET_VALUE_CONVERTER_CLASS_CONFIG = "offset.value.converter"; - public static final String OFFSET_VALUE_CONVERTER_CLASS_DOC = - "Converter class for offset value Copycat data that implements the Converter interface."; + public static final String INTERNAL_VALUE_CONVERTER_CLASS_CONFIG = "internal.value.converter"; + public static final String INTERNAL_VALUE_CONVERTER_CLASS_DOC = + "Converter class for offset value Copycat data that implements the Converter interface. Used for converting data like offsets and configs."; public static final String TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG = "task.shutdown.graceful.timeout.ms"; @@ -95,10 +95,10 @@ public class WorkerConfig extends AbstractConfig { Importance.HIGH, KEY_CONVERTER_CLASS_DOC) .define(VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, Importance.HIGH, VALUE_CONVERTER_CLASS_DOC) - .define(OFFSET_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, OFFSET_KEY_CONVERTER_CLASS_DOC) - .define(OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, - Importance.HIGH, OFFSET_VALUE_CONVERTER_CLASS_DOC) + .define(INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, INTERNAL_KEY_CONVERTER_CLASS_DOC) + .define(INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Type.CLASS, + Importance.HIGH, INTERNAL_VALUE_CONVERTER_CLASS_DOC) .define(TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_CONFIG, Type.LONG, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DEFAULT, Importance.LOW, TASK_SHUTDOWN_GRACEFUL_TIMEOUT_MS_DOC) http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java index 336597e..767c88b 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/ConnectorConfig.java @@ -22,7 +22,8 @@ import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigDef.Importance; import org.apache.kafka.common.config.ConfigDef.Type; -import java.util.Properties; +import java.util.HashMap; +import java.util.Map; /** *

@@ -64,10 +65,10 @@ public class ConnectorConfig extends AbstractConfig { } public ConnectorConfig() { - this(new Properties()); + this(new HashMap()); } - public ConnectorConfig(Properties props) { + public ConnectorConfig(Map props) { super(config, props); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java index 7f8b7c2..31e68ef 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Herder.java @@ -19,7 +19,7 @@ package org.apache.kafka.copycat.runtime; import org.apache.kafka.copycat.util.Callback; -import java.util.Properties; +import java.util.Map; /** *

@@ -53,15 +53,24 @@ public interface Herder { * the leader herder if necessary. * * @param connectorProps user-specified properties for this job - * @param callback callback to invoke when the request completes + * @param callback callback to invoke when the request completes */ - void addConnector(Properties connectorProps, Callback callback); + void addConnector(Map connectorProps, Callback callback); /** * Delete a connector job by name. * - * @param name name of the connector job to shutdown and delete + * @param name name of the connector job to shutdown and delete * @param callback callback to invoke when the request completes */ void deleteConnector(String name, Callback callback); -} + + /** + * Requests reconfiguration of the task. This should only be triggered by + * {@link HerderConnectorContext}. + * + * @param connName name of the connector that should be reconfigured + */ + void requestTaskReconfiguration(String connName); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java new file mode 100644 index 0000000..7a64bd5 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/HerderConnectorContext.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime; + +import org.apache.kafka.copycat.connector.ConnectorContext; + +/** + * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks + * in a single process. + */ +public class HerderConnectorContext implements ConnectorContext { + + private Herder herder; + private String connectorName; + + public HerderConnectorContext(Herder herder, String connectorName) { + this.herder = herder; + this.connectorName = connectorName; + } + + @Override + public void requestTaskReconfiguration() { + // This is trivial to forward since there is only one herder and it's in memory in this + // process + herder.requestTaskReconfiguration(connectorName); + } +} http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java index a34a014..0fdab4c 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/Worker.java @@ -52,8 +52,8 @@ public class Worker { private WorkerConfig config; private Converter keyConverter; private Converter valueConverter; - private Converter offsetKeyConverter; - private Converter offsetValueConverter; + private Converter internalKeyConverter; + private Converter internalValueConverter; private OffsetBackingStore offsetBackingStore; private HashMap tasks = new HashMap<>(); private KafkaProducer producer; @@ -71,10 +71,10 @@ public class Worker { this.keyConverter.configure(config.originalsWithPrefix("key.converter."), true); this.valueConverter = config.getConfiguredInstance(WorkerConfig.VALUE_CONVERTER_CLASS_CONFIG, Converter.class); this.valueConverter.configure(config.originalsWithPrefix("value.converter."), false); - this.offsetKeyConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_KEY_CONVERTER_CLASS_CONFIG, Converter.class); - this.offsetKeyConverter.configure(config.originalsWithPrefix("offset.key.converter."), true); - this.offsetValueConverter = config.getConfiguredInstance(WorkerConfig.OFFSET_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); - this.offsetValueConverter.configure(config.originalsWithPrefix("offset.value.converter."), false); + this.internalKeyConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, Converter.class); + this.internalKeyConverter.configure(config.originalsWithPrefix("internal.key.converter."), true); + this.internalValueConverter = config.getConfiguredInstance(WorkerConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, Converter.class); + this.internalValueConverter.configure(config.originalsWithPrefix("internal.value.converter."), false); this.offsetBackingStore = offsetBackingStore; this.offsetBackingStore.configure(config.originals()); @@ -157,9 +157,9 @@ public class Worker { if (task instanceof SourceTask) { SourceTask sourceTask = (SourceTask) task; OffsetStorageReader offsetReader = new OffsetStorageReaderImpl(offsetBackingStore, id.connector(), - offsetKeyConverter, offsetValueConverter); + internalKeyConverter, internalValueConverter); OffsetStorageWriter offsetWriter = new OffsetStorageWriter(offsetBackingStore, id.connector(), - offsetKeyConverter, offsetValueConverter); + internalKeyConverter, internalValueConverter); workerTask = new WorkerSourceTask(id, sourceTask, keyConverter, valueConverter, producer, offsetReader, offsetWriter, config, time); } else if (task instanceof SinkTask) { @@ -201,4 +201,11 @@ public class Worker { return task; } + public Converter getInternalKeyConverter() { + return internalKeyConverter; + } + + public Converter getInternalValueConverter() { + return internalValueConverter; + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java new file mode 100644 index 0000000..719dd09 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/ClusterConfigState.java @@ -0,0 +1,122 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.distributed; + +import org.apache.kafka.copycat.util.ConnectorTaskId; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * An immutable snapshot of the configuration state of connectors and tasks in a Copycat cluster. + */ +public class ClusterConfigState { + private final long offset; + private final Map connectorTaskCounts; + private final Map> connectorConfigs; + private final Map> taskConfigs; + private final Set inconsistentConnectors; + + public ClusterConfigState(long offset, + Map connectorTaskCounts, + Map> connectorConfigs, + Map> taskConfigs, + Set inconsistentConnectors) { + this.offset = offset; + this.connectorTaskCounts = connectorTaskCounts; + this.connectorConfigs = connectorConfigs; + this.taskConfigs = taskConfigs; + this.inconsistentConnectors = inconsistentConnectors; + } + + /** + * Get the last offset read to generate this config state. This offset is not guaranteed to be perfectly consistent + * with the recorded state because some partial updates to task configs may have been read. + * @return the latest config offset + */ + public long offset() { + return offset; + } + + /** + * Get a list of the connectors in this configuration + */ + public Collection connectors() { + return connectorTaskCounts.keySet(); + } + + /** + * Get the configuration for a connector. + * @param connector name of the connector + * @return a map containing configuration parameters + */ + public Map connectorConfig(String connector) { + return connectorConfigs.get(connector); + } + + /** + * Get the configuration for a task. + * @param task id of the task + * @return a map containing configuration parameters + */ + public Map taskConfig(ConnectorTaskId task) { + return taskConfigs.get(task); + } + + /** + * Get the current set of task IDs for the specified connector. + * @param connectorName the name of the connector to look up task configs for + * @return the current set of connector task IDs + */ + public Collection tasks(String connectorName) { + if (inconsistentConnectors.contains(connectorName)) + return Collections.EMPTY_LIST; + + Integer numTasks = connectorTaskCounts.get(connectorName); + if (numTasks == null) + throw new IllegalArgumentException("Connector does not exist in current configuration."); + + List taskIds = new ArrayList<>(); + for (int taskIndex = 0; taskIndex < numTasks; taskIndex++) { + ConnectorTaskId taskId = new ConnectorTaskId(connectorName, taskIndex); + taskIds.add(taskId); + } + return taskIds; + } + + /** + * Get the set of connectors which have inconsistent data in this snapshot. These inconsistencies can occur due to + * partially completed writes combined with log compaction. + * + * Connectors in this set will appear in the output of {@link #connectors()} since their connector configuration is + * available, but not in the output of {@link #taskConfig(ConnectorTaskId)} since the task configs are incomplete. + * + * When a worker detects a connector in this state, it should request that the connector regenerate its task + * configurations. + * + * @return the set of inconsistent connectors + */ + public Set inconsistentConnectors() { + return inconsistentConnectors; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java new file mode 100644 index 0000000..5273658 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/distributed/DistributedHerder.java @@ -0,0 +1,320 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.runtime.distributed; + +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.connector.Connector; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.runtime.ConnectorConfig; +import org.apache.kafka.copycat.runtime.Herder; +import org.apache.kafka.copycat.runtime.HerderConnectorContext; +import org.apache.kafka.copycat.runtime.Worker; +import org.apache.kafka.copycat.sink.SinkConnector; +import org.apache.kafka.copycat.sink.SinkTask; +import org.apache.kafka.copycat.storage.KafkaConfigStorage; +import org.apache.kafka.copycat.util.Callback; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.*; + +/** + * Distributed "herder" that coordinates with other workers to spread work across multiple processes. + */ +public class DistributedHerder implements Herder { + private static final Logger log = LoggerFactory.getLogger(DistributedHerder.class); + + private Worker worker; + private KafkaConfigStorage configStorage; + private ClusterConfigState configState; + private HashMap connectors = new HashMap<>(); + + public DistributedHerder(Worker worker) { + this.worker = worker; + this.configStorage = new KafkaConfigStorage(worker.getInternalValueConverter(), + new ConnectorConfigCallback(), new TaskConfigCallback()); + } + + // Public for testing (mock KafkaConfigStorage) + public DistributedHerder(Worker worker, KafkaConfigStorage configStorage) { + this.worker = worker; + this.configStorage = configStorage; + } + + public synchronized void configure(Map configs) { + configStorage.configure(configs); + } + + public synchronized void start() { + log.info("Herder starting"); + + configStorage.start(); + + log.info("Restoring connectors from stored configs"); + restoreConnectors(); + + log.info("Herder started"); + } + + public synchronized void stop() { + log.info("Herder stopping"); + + // There's no coordination/hand-off to do here since this is all standalone. Instead, we + // should just clean up the stuff we normally would, i.e. cleanly checkpoint and shutdown all + // the tasks. + for (Map.Entry entry : connectors.entrySet()) { + ConnectorState state = entry.getValue(); + stopConnector(state); + } + connectors.clear(); + + if (configStorage != null) { + configStorage.stop(); + configStorage = null; + } + + log.info("Herder stopped"); + } + + @Override + public synchronized void addConnector(Map connectorProps, + Callback callback) { + try { + // Ensure the config is written to storage first + ConnectorConfig connConfig = new ConnectorConfig(connectorProps); + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + configStorage.putConnectorConfig(connName, connectorProps); + + ConnectorState connState = createConnector(connConfig); + if (callback != null) + callback.onCompletion(null, connState.name); + // This should always be a new job, create jobs from scratch + createConnectorTasks(connState); + } catch (CopycatException e) { + if (callback != null) + callback.onCompletion(e, null); + } + } + + @Override + public synchronized void deleteConnector(String name, Callback callback) { + try { + destroyConnector(name); + if (callback != null) + callback.onCompletion(null, null); + } catch (CopycatException e) { + if (callback != null) + callback.onCompletion(e, null); + } + } + + @Override + public synchronized void requestTaskReconfiguration(String connName) { + ConnectorState state = connectors.get(connName); + if (state == null) { + log.error("Task that requested reconfiguration does not exist: {}", connName); + return; + } + updateConnectorTasks(state); + } + + // Creates and configures the connector. Does not setup any tasks + private ConnectorState createConnector(ConnectorConfig connConfig) { + String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); + String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); + log.info("Creating connector {} of type {}", connName, className); + int maxTasks = connConfig.getInt(ConnectorConfig.TASKS_MAX_CONFIG); + List topics = connConfig.getList(ConnectorConfig.TOPICS_CONFIG); // Sinks only + Properties configs = connConfig.unusedProperties(); + + if (connectors.containsKey(connName)) { + log.error("Ignoring request to create connector due to conflicting connector name"); + throw new CopycatException("Connector with name " + connName + " already exists"); + } + + final Connector connector; + try { + connector = instantiateConnector(className); + } catch (Throwable t) { + // Catches normal exceptions due to instantiation errors as well as any runtime errors that + // may be caused by user code + throw new CopycatException("Failed to create connector instance", t); + } + connector.initialize(new HerderConnectorContext(this, connName)); + try { + connector.start(configs); + } catch (CopycatException e) { + throw new CopycatException("Connector threw an exception while starting", e); + } + ConnectorState state = new ConnectorState(connName, connector, maxTasks, topics); + connectors.put(connName, state); + + log.info("Finished creating connector {}", connName); + + return state; + } + + private static Connector instantiateConnector(String className) { + try { + return Utils.newInstance(className, Connector.class); + } catch (ClassNotFoundException e) { + throw new CopycatException("Couldn't instantiate connector class", e); + } + } + + private void destroyConnector(String connName) { + log.info("Destroying connector {}", connName); + ConnectorState state = connectors.get(connName); + if (state == null) { + log.error("Failed to destroy connector {} because it does not exist", connName); + throw new CopycatException("Connector does not exist"); + } + + stopConnector(state); + configStorage.putConnectorConfig(state.name, null); + connectors.remove(state.name); + + log.info("Finished destroying connector {}", connName); + } + + // Stops a connectors tasks, then the connector + private void stopConnector(ConnectorState state) { + removeConnectorTasks(state); + try { + state.connector.stop(); + } catch (CopycatException e) { + log.error("Error shutting down connector {}: ", state.connector, e); + } + } + + private void createConnectorTasks(ConnectorState state) { + String taskClassName = state.connector.taskClass().getName(); + + log.info("Creating tasks for connector {} of type {}", state.name, taskClassName); + + List taskConfigs = state.connector.taskConfigs(state.maxTasks); + + // Generate the final configs, including framework provided settings + Map taskProps = new HashMap<>(); + for (int i = 0; i < taskConfigs.size(); i++) { + ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); + Properties config = taskConfigs.get(i); + // TODO: This probably shouldn't be in the Herder. It's nice to have Copycat ensure the list of topics + // is automatically provided to tasks since it is required by the framework, but this + String subscriptionTopics = Utils.join(state.inputTopics, ","); + if (state.connector instanceof SinkConnector) { + // Make sure we don't modify the original since the connector may reuse it internally + Properties configForSink = new Properties(); + configForSink.putAll(config); + configForSink.setProperty(SinkTask.TOPICS_CONFIG, subscriptionTopics); + config = configForSink; + } + taskProps.put(taskId, config); + } + + // And initiate the tasks + for (int i = 0; i < taskConfigs.size(); i++) { + ConnectorTaskId taskId = new ConnectorTaskId(state.name, i); + Properties config = taskProps.get(taskId); + try { + worker.addTask(taskId, taskClassName, config); + // We only need to store the task IDs so we can clean up. + state.tasks.add(taskId); + } catch (Throwable e) { + log.error("Failed to add task {}: ", taskId, e); + // Swallow this so we can continue updating the rest of the tasks + // FIXME what's the proper response? Kill all the tasks? Consider this the same as a task + // that died after starting successfully. + } + } + } + + private void removeConnectorTasks(ConnectorState state) { + Iterator taskIter = state.tasks.iterator(); + while (taskIter.hasNext()) { + ConnectorTaskId taskId = taskIter.next(); + try { + worker.stopTask(taskId); + taskIter.remove(); + } catch (CopycatException e) { + log.error("Failed to stop task {}: ", taskId, e); + // Swallow this so we can continue stopping the rest of the tasks + // FIXME: Forcibly kill the task? + } + } + } + + private void updateConnectorTasks(ConnectorState state) { + removeConnectorTasks(state); + createConnectorTasks(state); + } + + private void restoreConnectors() { + configState = configStorage.snapshot(); + Collection connNames = configState.connectors(); + for (String connName : connNames) { + log.info("Restoring connector {}", connName); + Map connProps = configState.connectorConfig(connName); + ConnectorConfig connConfig = new ConnectorConfig(connProps); + ConnectorState connState = createConnector(connConfig); + // Because this coordinator is standalone, connectors are only restored when this process + // starts and we know there can't be any existing tasks. So in this special case we're able + // to just create the tasks rather than having to check for existing tasks and sort out + // whether they need to be reconfigured. + createConnectorTasks(connState); + } + } + + + + private static class ConnectorState { + public String name; + public Connector connector; + public int maxTasks; + public List inputTopics; + Set tasks; + + public ConnectorState(String name, Connector connector, int maxTasks, + List inputTopics) { + this.name = name; + this.connector = connector; + this.maxTasks = maxTasks; + this.inputTopics = inputTopics; + this.tasks = new HashSet<>(); + } + } + + private class ConnectorConfigCallback implements Callback { + @Override + public void onCompletion(Throwable error, String result) { + configState = configStorage.snapshot(); + // FIXME + } + } + + private class TaskConfigCallback implements Callback> { + @Override + public void onCompletion(Throwable error, List result) { + configState = configStorage.snapshot(); + // FIXME + } + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java deleted file mode 100644 index 0e14015..0000000 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneConnectorContext.java +++ /dev/null @@ -1,42 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - *

- * http://www.apache.org/licenses/LICENSE-2.0 - *

- * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.copycat.runtime.standalone; - -import org.apache.kafka.copycat.connector.ConnectorContext; - -/** - * ConnectorContext for use with the StandaloneHerder, which maintains all connectors and tasks - * in a single process. - */ -class StandaloneConnectorContext implements ConnectorContext { - - private StandaloneHerder herder; - private String connectorName; - - public StandaloneConnectorContext(StandaloneHerder herder, String connectorName) { - this.herder = herder; - this.connectorName = connectorName; - } - - @Override - public void requestTaskReconfiguration() { - // This is trivial to forward since there is only one herder and it's in memory in this - // process - herder.requestTaskReconfiguration(connectorName); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java index 45d428d..d5670fd 100644 --- a/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/runtime/standalone/StandaloneHerder.java @@ -22,6 +22,7 @@ import org.apache.kafka.copycat.connector.Connector; import org.apache.kafka.copycat.errors.CopycatException; import org.apache.kafka.copycat.runtime.ConnectorConfig; import org.apache.kafka.copycat.runtime.Herder; +import org.apache.kafka.copycat.runtime.HerderConnectorContext; import org.apache.kafka.copycat.runtime.Worker; import org.apache.kafka.copycat.sink.SinkConnector; import org.apache.kafka.copycat.sink.SinkTask; @@ -65,7 +66,7 @@ public class StandaloneHerder implements Herder { } @Override - public synchronized void addConnector(Properties connectorProps, + public synchronized void addConnector(Map connectorProps, Callback callback) { try { ConnectorState connState = createConnector(connectorProps); @@ -91,8 +92,18 @@ public class StandaloneHerder implements Herder { } } - // Creates the and configures the connector. Does not setup any tasks - private ConnectorState createConnector(Properties connectorProps) { + @Override + public synchronized void requestTaskReconfiguration(String connName) { + ConnectorState state = connectors.get(connName); + if (state == null) { + log.error("Task that requested reconfiguration does not exist: {}", connName); + return; + } + updateConnectorTasks(state); + } + + // Creates and configures the connector. Does not setup any tasks + private ConnectorState createConnector(Map connectorProps) { ConnectorConfig connConfig = new ConnectorConfig(connectorProps); String connName = connConfig.getString(ConnectorConfig.NAME_CONFIG); String className = connConfig.getString(ConnectorConfig.CONNECTOR_CLASS_CONFIG); @@ -114,7 +125,7 @@ public class StandaloneHerder implements Herder { // may be caused by user code throw new CopycatException("Failed to create connector instance", t); } - connector.initialize(new StandaloneConnectorContext(this, connName)); + connector.initialize(new HerderConnectorContext(this, connName)); try { connector.start(configs); } catch (CopycatException e) { @@ -222,21 +233,6 @@ public class StandaloneHerder implements Herder { createConnectorTasks(state); } - /** - * Requests reconfiguration of the task. This should only be triggered by - * {@link StandaloneConnectorContext}. - * - * @param connName name of the connector that should be reconfigured - */ - public synchronized void requestTaskReconfiguration(String connName) { - ConnectorState state = connectors.get(connName); - if (state == null) { - log.error("Task that requested reconfiguration does not exist: {}", connName); - return; - } - updateConnectorTasks(state); - } - private static class ConnectorState { public String name; http://git-wip-us.apache.org/repos/asf/kafka/blob/36d44693/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java ---------------------------------------------------------------------- diff --git a/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java new file mode 100644 index 0000000..366bf13 --- /dev/null +++ b/copycat/runtime/src/main/java/org/apache/kafka/copycat/storage/KafkaConfigStorage.java @@ -0,0 +1,546 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.copycat.storage; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.utils.SystemTime; +import org.apache.kafka.common.utils.Utils; +import org.apache.kafka.copycat.data.Schema; +import org.apache.kafka.copycat.data.SchemaAndValue; +import org.apache.kafka.copycat.data.SchemaBuilder; +import org.apache.kafka.copycat.data.Struct; +import org.apache.kafka.copycat.errors.CopycatException; +import org.apache.kafka.copycat.errors.DataException; +import org.apache.kafka.copycat.runtime.distributed.ClusterConfigState; +import org.apache.kafka.copycat.util.Callback; +import org.apache.kafka.copycat.util.ConnectorTaskId; +import org.apache.kafka.copycat.util.KafkaBasedLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +/** + *

+ * Provides persistent storage of Copycat connector configurations in a Kafka topic. + *

+ *

+ * This class manages both connector and task configurations. It tracks three types of configuration entries: + *

+ * 1. Connector config: map of string -> string configurations passed to the Connector class, with support for + * expanding this format if necessary. (Kafka key: connector-[connector-id]). + * These configs are *not* ephemeral. They represent the source of truth. If the entire Copycat + * cluster goes down, this is all that is really needed to recover. + * 2. Task configs: map of string -> string configurations passed to the Task class, with support for expanding + * this format if necessary. (Kafka key: task-[connector-id]-[task-id]). + * These configs are ephemeral; they are stored here to a) disseminate them to all workers while + * ensuring agreement and b) to allow faster cluster/worker recovery since the common case + * of recovery (restoring a connector) will simply result in the same configuration as before + * the failure. + * 3. Task commit "configs": records indicating that previous task config entries should be committed and all task + * configs for a connector can be applied. (Kafka key: commit-[connector-id]. + * This config has two effects. First, it records the number of tasks the connector is currently + * running (and can therefore increase/decrease parallelism). Second, because each task config + * is stored separately but they need to be applied together to ensure each partition is assigned + * to a single task, this record also indicates that task configs for the specified connector + * can be "applied" or "committed". + *

+ *

+ * This configuration is expected to be stored in a *single partition* and *compacted* topic. Using a single partition + * ensures we can enforce ordering on messages, allowing Kafka to be used as a write ahead log. Compaction allows + * us to clean up outdated configurations over time. However, this combination has some important implications for + * the implementation of this class and the configuration state that it may expose. + *

+ *

+ * Connector configurations are independent of all other configs, so they are handled easily. Writing a single record + * is already atomic, so these can be applied as soon as they are read. One connectors config does not affect any + * others, and they do not need to coordinate with the connector's task configuration at all. + *

+ *

+ * The most obvious implication for task configs is the need for the commit messages. Because Kafka does not + * currently have multi-record transactions or support atomic batch record writes, task commit messages are required + * to ensure that readers do not end up using inconsistent configs. For example, consider if a connector wrote configs + * for its tasks, then was reconfigured and only managed to write updated configs for half its tasks. If task configs + * were applied immediately you could be using half the old configs and half the new configs. In that condition, some + * partitions may be double-assigned because the old config and new config may use completely different assignments. + * Therefore, when reading the log, we must buffer config updates for a connector's tasks and only apply atomically them + * once a commit message has been read. + *

+ *

+ * However, there are also further challenges. This simple buffering approach would work fine as long as the entire log was + * always available, but we would like to be able to enable compaction so our configuration topic does not grow + * indefinitely. Compaction may break a normal log because old entries will suddenly go missing. A new worker reading + * from the beginning of the log in order to build up the full current configuration will see task commits, but some + * records required for those commits will have been removed because the same keys have subsequently been rewritten. + * For example, if you have a sequence of record keys [connector-foo-config, task-foo-1-config, task-foo-2-config, + * commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)], we can end up with a compacted log containing + * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config, commit-foo (1 task)]. When read + * back, the first commit will see an invalid state because the first task-foo-1-config has been cleaned up. + *

+ *

+ * Compaction can further complicate things if writing new task configs fails mid-write. Consider a similar scenario + * as the previous one, but in this case both the first and second update will write 2 task configs. However, the + * second write fails half of the way through: + * [connector-foo-config, task-foo-1-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. Now compaction + * occurs and we're left with + * [connector-foo-config, task-foo-2-config, commit-foo (2 tasks), task-foo-1-config]. At the first commit, we don't + * have a complete set of configs. And because of the failure, there is no second commit. We are left in an inconsistent + * state with no obvious way to resolve the issue -- we can try to keep on reading, but the failed node may never + * recover and write the updated config. Meanwhile, other workers may have seen the entire log; they will see the second + * task-foo-1-config waiting to be applied, but will otherwise think everything is ok -- they have a valid set of task + * configs for connector "foo". + *

+ *

+ * Because we can encounter these inconsistencies and addressing them requires support from the rest of the system + * (resolving the task configuration inconsistencies requires support from the connector instance to regenerate updated + * configs), this class exposes not only the current set of configs, but also which connectors have inconsistent data. + * This allows users of this class (i.e., Herder implementations) to take action to resolve any inconsistencies. These + * inconsistencies should be rare (as described above, due to compaction combined with leader failures in the middle + * of updating task configurations). + *

+ *

+ * Note that the expectation is that this config storage system has only a single writer at a time. + * The caller (Herder) must ensure this is the case. In distributed mode this will require forwarding config change + * requests to the leader in the cluster (i.e. the worker group coordinated by the Kafka broker). + *

+ *

+ * Since processing of the config log occurs in a background thread, callers must take care when using accessors. + * To simplify handling this correctly, this class only exposes a mechanism to snapshot the current state of the cluster. + * Updates may continue to be applied (and callbacks invoked) in the background. Callers must take care that they are + * using a consistent snapshot and only update when it is safe. In particular, if task configs are updated which require + * synchronization across workers to commit offsets and update the configuration, callbacks and updates during the + * rebalance must be deferred. + *

+ */ +public class KafkaConfigStorage { + private static final Logger log = LoggerFactory.getLogger(KafkaConfigStorage.class); + + public static final String CONFIG_TOPIC_CONFIG = "config.storage.topic"; + + public static final String CONNECTOR_PREFIX = "connector-"; + + public static String CONNECTOR_KEY(String connectorName) { + return CONNECTOR_PREFIX + connectorName; + } + + public static final String TASK_PREFIX = "task-"; + + public static String TASK_KEY(ConnectorTaskId taskId) { + return TASK_PREFIX + taskId.connector() + "-" + taskId.task(); + } + + public static final String COMMIT_TASKS_PREFIX = "commit-"; + + public static String COMMIT_TASKS_KEY(String connectorName) { + return COMMIT_TASKS_PREFIX + connectorName; + } + + // Note that while using real serialization for values as we have here, but ad hoc string serialization for keys, + // isn't ideal, we use this approach because it avoids any potential problems with schema evolution or + // converter/serializer changes causing keys to change. We need to absolutely ensure that the keys remain precisely + // the same. + public static final Schema CONNECTOR_CONFIGURATION_V0 = SchemaBuilder.struct() + .field("properties", SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.OPTIONAL_STRING_SCHEMA)) + .build(); + public static final Schema TASK_CONFIGURATION_V0 = CONNECTOR_CONFIGURATION_V0; + public static final Schema CONNECTOR_TASKS_COMMIT_V0 = SchemaBuilder.struct() + .field("tasks", Schema.INT32_SCHEMA) + .build(); + + private static final long READ_TO_END_TIMEOUT_MS = 30000; + + private final Object lock; + private boolean starting; + private final Converter converter; + private final Callback connectorConfigCallback; + private final Callback> tasksConfigCallback; + private String topic; + // Data is passed to the log already serialized. We use a converter to handle translating to/from generic Copycat + // format to serialized form + private KafkaBasedLog configLog; + // Connector -> # of tasks + private Map connectorTaskCounts = new HashMap<>(); + // Connector and task configs: name or id -> config map + private Map> connectorConfigs = new HashMap<>(); + private Map> taskConfigs = new HashMap<>(); + // Set of connectors where we saw a task commit with an incomplete set of task config updates, indicating the data + // is in an inconsistent state and we cannot safely use them until they have been refreshed. + private Set inconsistent = new HashSet<>(); + // The most recently read offset. This does not take into account deferred task updates/commits, so we may have + // outstanding data to be applied. + private long offset; + + // Connector -> Map[ConnectorTaskId -> Configs] + private Map>> deferredTaskUpdates = new HashMap<>(); + + + public KafkaConfigStorage(Converter converter, Callback connectorConfigCallback, Callback> tasksConfigCallback) { + this.lock = new Object(); + this.starting = false; + this.converter = converter; + this.connectorConfigCallback = connectorConfigCallback; + this.tasksConfigCallback = tasksConfigCallback; + + offset = -1; + } + + public void configure(Map configs) { + if (configs.get(CONFIG_TOPIC_CONFIG) == null) + throw new CopycatException("Must specify topic for Copycat connector configuration."); + topic = (String) configs.get(CONFIG_TOPIC_CONFIG); + + Map producerProps = new HashMap<>(); + producerProps.putAll(configs); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer"); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer"); + producerProps.put(ProducerConfig.ACKS_CONFIG, "all"); + + Map consumerProps = new HashMap<>(); + consumerProps.putAll(configs); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer"); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArrayDeserializer"); + + configLog = createKafkaBasedLog(topic, producerProps, consumerProps, consumedCallback); + } + + public void start() { + log.info("Starting KafkaConfigStorage"); + // During startup, callbacks are *not* invoked. You can grab a snapshot after starting -- just take care that + // updates can continue to occur in the background + starting = true; + configLog.start(); + starting = false; + log.info("Started KafkaConfigStorage"); + } + + public void stop() { + log.info("Closing KafkaConfigStorage"); + configLog.stop(); + log.info("Closed KafkaConfigStorage"); + } + + /** + * Get a snapshot of the current state of the cluster. + */ + public ClusterConfigState snapshot() { + synchronized (lock) { + // Doing a shallow copy of the data is safe here because the complex nested data that is copied should all be + // immutable configs + return new ClusterConfigState( + offset, + new HashMap<>(connectorTaskCounts), + new HashMap<>(connectorConfigs), + new HashMap<>(taskConfigs), + new HashSet<>(inconsistent) + ); + } + } + + /** + * Write this connector configuration to persistent storage and wait until it has been acknowledge and read back by + * tailing the Kafka log with a consumer. + * + * @param connector name of the connector to write data for + * @param properties the configuration to write + */ + public void putConnectorConfig(String connector, Map properties) { + Struct copycatConfig = new Struct(CONNECTOR_CONFIGURATION_V0); + copycatConfig.put("properties", properties); + byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_CONFIGURATION_V0, copycatConfig); + + try { + configLog.send(CONNECTOR_KEY(connector), serializedConfig); + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write connector configuration to Kafka: ", e); + throw new CopycatException("Error writing connector configuration to Kafka", e); + } + } + + /** + * Write these task configurations and associated commit messages, unless an inconsistency is found that indicates + * that we would be leaving one of the referenced connectors with an inconsistent state. + * + * @param configs map containing task configurations + * @throws CopycatException if the task configurations do not resolve inconsistencies found in the existing root + * and task configurations. + */ + public void putTaskConfigs(Map> configs) { + // Make sure we're at the end of the log. We should be the only writer, but we want to make sure we don't have + // any outstanding lagging data to consume. + try { + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write root configuration to Kafka: ", e); + throw new CopycatException("Error writing root configuration to Kafka", e); + } + + // In theory, there is only a single writer and we shouldn't need this lock since the background thread should + // not invoke any callbacks that would conflict, but in practice this guards against inconsistencies due to + // the root config being updated. + Map newTaskCounts = new HashMap<>(); + synchronized (lock) { + // Validate tasks in this assignment. Any task configuration updates should include updates for *all* tasks + // in the connector -- we should have all task IDs 0 - N-1 within a connector if any task is included here + Map> updatedConfigIdsByConnector = taskIdsByConnector(configs); + for (Map.Entry> taskConfigSetEntry : updatedConfigIdsByConnector.entrySet()) { + if (!completeTaskIdSet(taskConfigSetEntry.getValue(), taskConfigSetEntry.getValue().size())) { + log.error("Submitted task configuration contain invalid range of task IDs, ignoring this submission"); + throw new CopycatException("Error writing task configurations: found some connectors with invalid connectors"); + } + newTaskCounts.put(taskConfigSetEntry.getKey(), taskConfigSetEntry.getValue().size()); + } + } + + // Start sending all the individual updates + for (Map.Entry> taskConfigEntry : configs.entrySet()) { + Struct copycatConfig = new Struct(TASK_CONFIGURATION_V0); + copycatConfig.put("properties", taskConfigEntry.getValue()); + byte[] serializedConfig = converter.fromCopycatData(topic, TASK_CONFIGURATION_V0, copycatConfig); + configLog.send(TASK_KEY(taskConfigEntry.getKey()), serializedConfig); + } + + // Finally, send the commit to update the number of tasks and apply the new configs, then wait until we read to + // the end of the log + try { + // Read to end to ensure all the task configs have been written + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // Write all the commit messages + for (Map.Entry taskCountEntry : newTaskCounts.entrySet()) { + Struct copycatConfig = new Struct(CONNECTOR_TASKS_COMMIT_V0); + copycatConfig.put("tasks", taskCountEntry.getValue()); + byte[] serializedConfig = converter.fromCopycatData(topic, CONNECTOR_TASKS_COMMIT_V0, copycatConfig); + configLog.send(COMMIT_TASKS_KEY(taskCountEntry.getKey()), serializedConfig); + } + + // Read to end to ensure all the commit messages have been written + configLog.readToEnd().get(READ_TO_END_TIMEOUT_MS, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException | TimeoutException e) { + log.error("Failed to write root configuration to Kafka: ", e); + throw new CopycatException("Error writing root configuration to Kafka", e); + } + } + + private KafkaBasedLog createKafkaBasedLog(String topic, Map producerProps, + Map consumerProps, Callback> consumedCallback) { + return new KafkaBasedLog<>(topic, producerProps, consumerProps, consumedCallback, new SystemTime()); + } + + private final Callback> consumedCallback = new Callback>() { + @Override + public void onCompletion(Throwable error, ConsumerRecord record) { + if (error != null) { + log.error("Unexpected in consumer callback for KafkaConfigStorage: ", error); + return; + } + + final SchemaAndValue value; + try { + value = converter.toCopycatData(topic, record.value()); + } catch (DataException e) { + log.error("Failed to convert config data to Copycat format: ", e); + return; + } + offset = record.offset(); + + if (record.key().startsWith(CONNECTOR_PREFIX)) { + String connectorName = record.key().substring(CONNECTOR_PREFIX.length()); + synchronized (lock) { + // Connector configs can be applied and callbacks invoked immediately + if (!(value.value() instanceof Map)) { + log.error("Found connector configuration (" + record.key() + ") in wrong format: " + value.value().getClass()); + return; + } + Object newConnectorConfig = ((Map) value.value()).get("properties"); + if (!(newConnectorConfig instanceof Map)) { + log.error("Invalid data for connector config: properties filed should be a Map but is " + newConnectorConfig.getClass()); + return; + } + connectorConfigs.put(connectorName, (Map) newConnectorConfig); + } + if (!starting) + connectorConfigCallback.onCompletion(null, connectorName); + } else if (record.key().startsWith(TASK_PREFIX)) { + synchronized (lock) { + ConnectorTaskId taskId = parseTaskId(record.key()); + if (taskId == null) { + log.error("Ignoring task configuration because " + record.key() + " couldn't be parsed as a task config key"); + return; + } + if (!(value.value() instanceof Map)) { + log.error("Ignoring task configuration because it is in the wrong format: " + value.value()); + return; + } + + Object newTaskConfig = ((Map) value.value()).get("properties"); + if (!(newTaskConfig instanceof Map)) { + log.error("Invalid data for task config: properties filed should be a Map but is " + newTaskConfig.getClass()); + return; + } + + Map> deferred = deferredTaskUpdates.get(taskId.connector()); + if (deferred == null) { + deferred = new HashMap<>(); + deferredTaskUpdates.put(taskId.connector(), deferred); + } + deferred.put(taskId, (Map) newTaskConfig); + } + } else if (record.key().startsWith(COMMIT_TASKS_PREFIX)) { + String connectorName = record.key().substring(COMMIT_TASKS_PREFIX.length()); + List updatedTasks = new ArrayList<>(); + synchronized (lock) { + // Apply any outstanding deferred task updates for the given connector. Note that just because we + // encounter a commit message does not mean it will result in consistent output. In particular due to + // compaction, there may be cases where . For example if we have the following sequence of writes: + // + // 1. Write connector "foo"'s config + // 2. Write connector "foo", task 1's config <-- compacted + // 3. Write connector "foo", task 2's config + // 4. Write connector "foo" task commit message + // 5. Write connector "foo", task 1's config + // 6. Write connector "foo", task 2's config + // 7. Write connector "foo" task commit message + // + // then when a new worker starts up, if message 2 had been compacted, then when message 4 is applied + // "foo" will not have a complete set of configs. Only when message 7 is applied will the complete + // configuration be available. Worse, if the leader died while writing messages 5, 6, and 7 such that + // only 5 was written, then there may be nothing that will finish writing the configs and get the + // log back into a consistent state. + // + // It is expected that the user of this class (i.e., the Herder) will take the necessary action to + // resolve this (i.e., get the connector to recommit its configuration). This inconsistent state is + // exposed in the snapshots provided via ClusterConfigState so they are easy to handle. + if (!(value.value() instanceof Map)) { // Schema-less, so we get maps instead of structs + log.error("Ignoring connector tasks configuration commit because it is in the wrong format: " + value.value()); + return; + } + + Map> deferred = deferredTaskUpdates.get(connectorName); + + Object newTaskCountObj = ((Map) value.value()).get("tasks"); + Integer newTaskCount = (Integer) newTaskCountObj; + + // Validate the configs we're supposed to update to ensure we're getting a complete configuration + // update of all tasks that are expected based on the number of tasks in the commit message. + Map> updatedConfigIdsByConnector = taskIdsByConnector(deferred); + Set taskIdSet = updatedConfigIdsByConnector.get(connectorName); + if (!completeTaskIdSet(taskIdSet, newTaskCount)) { + // Given the logic for writing commit messages, we should only hit this condition due to compacted + // historical data, in which case we would not have applied any updates yet and there will be no + // task config data already committed for the connector, so we shouldn't have to clear any data + // out. All we need to do is add the flag marking it inconsistent. + inconsistent.add(connectorName); + } else { + if (deferred != null) { + taskConfigs.putAll(deferred); + updatedTasks.addAll(taskConfigs.keySet()); + } + inconsistent.remove(connectorName); + } + // Always clear the deferred entries, even if we didn't apply them. If they represented an inconsistent + // update, then we need to see a completely fresh set of configs after this commit message, so we don't + // want any of these outdated configs + if (deferred != null) + deferred.clear(); + + connectorTaskCounts.put(connectorName, newTaskCount); + } + + if (!starting) + tasksConfigCallback.onCompletion(null, updatedTasks); + } else { + log.error("Discarding config update record with invalid key: " + record.key()); + } + } + }; + + private ConnectorTaskId parseTaskId(String key) { + String[] parts = key.split("-"); + if (parts.length < 3) return null; + + try { + int taskNum = Integer.parseInt(parts[parts.length - 1]); + String connectorName = Utils.join(Arrays.copyOfRange(parts, 1, parts.length - 1), "-"); + return new ConnectorTaskId(connectorName, taskNum); + } catch (NumberFormatException e) { + return null; + } + } + + /** + * Given task configurations, get a set of integer task IDs organized by connector name. + */ + private Map> taskIdsByConnector(Map> configs) { + Map> connectorTaskIds = new HashMap<>(); + if (configs == null) + return connectorTaskIds; + for (Map.Entry> taskConfigEntry : configs.entrySet()) { + ConnectorTaskId taskId = taskConfigEntry.getKey(); + if (!connectorTaskIds.containsKey(taskId.connector())) + connectorTaskIds.put(taskId.connector(), new TreeSet()); + connectorTaskIds.get(taskId.connector()).add(taskId.task()); + } + return connectorTaskIds; + } + + private boolean completeTaskIdSet(Set idSet, int expectedSize) { + // Note that we do *not* check for the exact set. This is an important implication of compaction. If we start out + // with 2 tasks, then reduce to 1, we'll end up with log entries like: + // + // 1. Connector "foo" config + // 2. Connector "foo", task 1 config + // 3. Connector "foo", task 2 config + // 4. Connector "foo", commit 2 tasks + // 5. Connector "foo", task 1 config + // 6. Connector "foo", commit 1 tasks + // + // However, due to compaction we could end up with a log that looks like this: + // + // 1. Connector "foo" config + // 3. Connector "foo", task 2 config + // 5. Connector "foo", task 1 config + // 6. Connector "foo", commit 1 tasks + // + // which isn't incorrect, but would appear in this code to have an extra task configuration. Instead, we just + // validate that all the configs specified by the commit message are present. This should be fine because the + // logic for writing configs ensures all the task configs are written (and reads them back) before writing the + // commit message. + + if (idSet.size() < expectedSize) + return false; + + for (int i = 0; i < expectedSize; i++) + if (!idSet.contains(i)) + return false; + return true; + } +} +