Return-Path: X-Original-To: apmail-kafka-commits-archive@www.apache.org Delivered-To: apmail-kafka-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id BB56D19EF4 for ; Wed, 20 Apr 2016 21:10:20 +0000 (UTC) Received: (qmail 78927 invoked by uid 500); 20 Apr 2016 21:10:20 -0000 Delivered-To: apmail-kafka-commits-archive@kafka.apache.org Received: (qmail 78894 invoked by uid 500); 20 Apr 2016 21:10:20 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 78881 invoked by uid 99); 20 Apr 2016 21:10:20 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 20 Apr 2016 21:10:20 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7632DDFC70; Wed, 20 Apr 2016 21:10:20 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: ewencp@apache.org To: commits@kafka.apache.org Date: Wed, 20 Apr 2016 21:10:20 -0000 Message-Id: <6382454ed71f408684616d751b4c5a76@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/5] kafka git commit: KAFKA-2370: kafka connect pause/resume API Repository: kafka Updated Branches: refs/heads/trunk 280efe7f7 -> c9485b78a http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java new file mode 100644 index 0000000..eaad34b --- /dev/null +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java @@ -0,0 +1,537 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + **/ + +package org.apache.kafka.connect.storage; + +import org.apache.kafka.clients.CommonClientConfigs; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.record.TimestampType; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; +import org.apache.kafka.connect.runtime.distributed.DistributedConfig; +import org.apache.kafka.connect.util.Callback; +import org.apache.kafka.connect.util.ConnectorTaskId; +import org.apache.kafka.connect.util.KafkaBasedLog; +import org.apache.kafka.connect.util.TestFuture; +import org.easymock.Capture; +import org.easymock.EasyMock; +import org.easymock.IAnswer; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.powermock.api.easymock.PowerMock; +import org.powermock.api.easymock.annotation.Mock; +import org.powermock.core.classloader.annotations.PowerMockIgnore; +import org.powermock.core.classloader.annotations.PrepareForTest; +import org.powermock.modules.junit4.PowerMockRunner; +import org.powermock.reflect.Whitebox; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; + +@RunWith(PowerMockRunner.class) +@PrepareForTest(KafkaConfigBackingStore.class) +@PowerMockIgnore("javax.management.*") +@SuppressWarnings("unchecked") +public class KafkaConfigBackingStoreTest { + private static final String TOPIC = "connect-configs"; + private static final Map DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>(); + private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG; + + static { + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); + DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); + DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS); + } + + private static final List CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); + private static final List CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2"); + private static final List COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2"); + private static final List TARGET_STATE_KEYS = Arrays.asList("target-state-connector1", "target-state-connector2"); + + // Need a) connector with multiple tasks and b) multiple connectors + private static final List TASK_IDS = Arrays.asList( + new ConnectorTaskId("connector1", 0), + new ConnectorTaskId("connector1", 1), + new ConnectorTaskId("connector2", 0) + ); + private static final List TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0"); + + // Need some placeholders -- the contents don't matter here, just that they are restored properly + private static final List> SAMPLE_CONFIGS = Arrays.asList( + Collections.singletonMap("config-key-one", "config-value-one"), + Collections.singletonMap("config-key-two", "config-value-two"), + Collections.singletonMap("config-key-three", "config-value-three") + ); + private static final List CONNECTOR_CONFIG_STRUCTS = Arrays.asList( + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), + new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) + ); + private static final List TASK_CONFIG_STRUCTS = Arrays.asList( + new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), + new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) + ); + + private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR + = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); + + // The exact format doesn't matter here since both conversions are mocked + private static final List CONFIGS_SERIALIZED = Arrays.asList( + "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), + "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), + "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes() + ); + + @Mock + private Converter converter; + @Mock + private ConfigBackingStore.UpdateListener configUpdateListener; + @Mock + KafkaBasedLog storeLog; + private KafkaConfigBackingStore configStorage; + + private Capture capturedTopic = EasyMock.newCapture(); + private Capture> capturedProducerProps = EasyMock.newCapture(); + private Capture> capturedConsumerProps = EasyMock.newCapture(); + private Capture>> capturedConsumedCallback = EasyMock.newCapture(); + + private long logOffset = 0; + + @Before + public void setUp() { + configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter); + configStorage.setUpdateListener(configUpdateListener); + } + + @Test + public void testStartStop() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + assertEquals(TOPIC, capturedTopic.getValue()); + assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); + assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); + + configStorage.start(); + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testPutConnectorConfig() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); + + expectConvertWriteAndRead( + CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), + "properties", SAMPLE_CONFIGS.get(0)); + configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0)); + EasyMock.expectLastCall(); + + expectConvertWriteAndRead( + CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), + "properties", SAMPLE_CONFIGS.get(1)); + configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1)); + EasyMock.expectLastCall(); + + // Config deletion + expectConvertWriteAndRead( + CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null); + configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1)); + EasyMock.expectLastCall(); + + // Target state deletion + storeLog.send(TARGET_STATE_KEYS.get(1), null); + PowerMock.expectLastCall(); + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Null before writing + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); + + // Writing should block until it is written and read back from Kafka + configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)); + configState = configStorage.snapshot(); + assertEquals(1, configState.offset()); + assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); + + // Second should also block and all configs should still be available + configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1)); + configState = configStorage.snapshot(); + assertEquals(2, configState.offset()); + assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1))); + + // Deletion should remove the second one we added + configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1)); + configState = configStorage.snapshot(); + assertEquals(3, configState.offset()); + assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); + assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testPutTaskConfigs() throws Exception { + expectConfigure(); + expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); + + // Task configs should read to end, write to the log, read to end, write root, then read to end again + expectReadToEnd(new LinkedHashMap()); + expectConvertWriteRead( + TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), + "properties", SAMPLE_CONFIGS.get(0)); + expectConvertWriteRead( + TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), + "properties", SAMPLE_CONFIGS.get(1)); + expectReadToEnd(new LinkedHashMap()); + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), + "tasks", 2); // Starts with 0 tasks, after update has 2 + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))); + EasyMock.expectLastCall(); + + // Records to be read by consumer as it reads to the end of the log + LinkedHashMap serializedConfigs = new LinkedHashMap<>(); + serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1)); + serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); + expectReadToEnd(serializedConfigs); + + expectStop(); + + PowerMock.replayAll(); + + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Bootstrap as if we had already added the connector, but no tasks had been added yet + whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST); + + // Null before writing + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(-1, configState.offset()); + assertNull(configState.taskConfig(TASK_IDS.get(0))); + assertNull(configState.taskConfig(TASK_IDS.get(1))); + + // Writing task task configs should block until all the writes have been performed and the root record update + // has completed + Map> taskConfigs = new HashMap<>(); + taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)); + taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1)); + configStorage.putTaskConfigs("connector1", taskConfigs); + + // Validate root config by listing all connectors and tasks + configState = configStorage.snapshot(); + assertEquals(3, configState.offset()); + String connectorName = CONNECTOR_IDS.get(0); + assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); + assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testRestore() throws Exception { + // Restoring data should notify only of the latest values after loading is complete. This also validates + // that inconsistent state is ignored. + + expectConfigure(); + // Overwrite each type at least once to ensure we see the latest data after loading + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + // Connector after root update should make it through, task update shouldn't + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), + new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); + LinkedHashMap deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); + deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); + logOffset = 7; + expectStart(existingRecords, deserialized); + + // Shouldn't see any callbacks since this is during startup + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + + // Should see a single connector and its config should be the last one seen anywhere in the log + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] + assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); + // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected + assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + @Test + public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { + // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. + // We start out by loading an initial configuration where we started to write a task update and failed before + // writing an the commit, and then compaction cleaned up the earlier record. + + expectConfigure(); + List> existingRecords = Arrays.asList( + new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), + // This is the record that has been compacted: + //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), + new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), + new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), + new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); + LinkedHashMap deserialized = new LinkedHashMap(); + deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); + deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); + deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); + logOffset = 6; + expectStart(existingRecords, deserialized); + + // One failed attempt to write new task configs + expectReadToEnd(new LinkedHashMap()); + + // Successful attempt to write new task config + expectReadToEnd(new LinkedHashMap()); + expectConvertWriteRead( + TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), + "properties", SAMPLE_CONFIGS.get(0)); + expectReadToEnd(new LinkedHashMap()); + expectConvertWriteRead( + COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), + "tasks", 1); // Updated to just 1 task + // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks + configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0))); + EasyMock.expectLastCall(); + // Records to be read by consumer as it reads to the end of the log + LinkedHashMap serializedConfigs = new LinkedHashMap<>(); + serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); + serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); + expectReadToEnd(serializedConfigs); + + + expectStop(); + + PowerMock.replayAll(); + + configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); + configStorage.start(); + // After reading the log, it should have been in an inconsistent state + ClusterConfigState configState = configStorage.snapshot(); + assertEquals(6, configState.offset()); // Should always be next to be read, not last committed + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list + assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0))); + // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] + assertNull(configState.taskConfig(TASK_IDS.get(0))); + assertNull(configState.taskConfig(TASK_IDS.get(1))); + assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); + + // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks) + try { + configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2))); + fail("Should have failed due to incomplete task set."); + } catch (KafkaException e) { + // expected + } + + // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case + // we are going to shrink the number of tasks to 1 + configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0))); + // Validate updated config + configState = configStorage.snapshot(); + // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written + // to the topic. Only the last call with 1 task config + 1 commit actually gets written. + assertEquals(8, configState.offset()); + assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); + assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); + assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); + assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); + + configStorage.stop(); + + PowerMock.verifyAll(); + } + + private void expectConfigure() throws Exception { + PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", + EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), + EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback)) + .andReturn(storeLog); + } + + // If non-empty, deserializations should be a LinkedHashMap + private void expectStart(final List> preexistingRecords, + final Map deserializations) throws Exception { + storeLog.start(); + PowerMock.expectLastCall().andAnswer(new IAnswer() { + @Override + public Object answer() throws Throwable { + for (ConsumerRecord rec : preexistingRecords) + capturedConsumedCallback.getValue().onCompletion(null, rec); + return null; + } + }); + for (Map.Entry deserializationEntry : deserializations.entrySet()) { + // Note null schema because default settings for internal serialization are schema-less + EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey()))) + .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue()))); + } + } + + private void expectStop() { + storeLog.stop(); + PowerMock.expectLastCall(); + } + + // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back + // from the log. Validate the data that is captured when the conversion is performed matches the specified data + // (by checking a single field's value) + private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, + final String dataFieldName, final Object dataFieldValue) { + final Capture capturedRecord = EasyMock.newCapture(); + if (serialized != null) + EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) + .andReturn(serialized); + storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); + PowerMock.expectLastCall(); + EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized))) + .andAnswer(new IAnswer() { + @Override + public SchemaAndValue answer() throws Throwable { + if (dataFieldName != null) + assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); + // Note null schema because default settings for internal serialization are schema-less + return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); + } + }); + } + + // This map needs to maintain ordering + private void expectReadToEnd(final LinkedHashMap serializedConfigs) { + EasyMock.expect(storeLog.readToEnd()) + .andAnswer(new IAnswer>() { + @Override + public Future answer() throws Throwable { + TestFuture future = new TestFuture(); + for (Map.Entry entry : serializedConfigs.entrySet()) + capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue())); + future.resolveOnGet((Void) null); + return future; + } + }); + } + + + private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, + final String dataFieldName, final Object dataFieldValue) { + expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue); + LinkedHashMap recordsToRead = new LinkedHashMap<>(); + recordsToRead.put(configKey, serialized); + expectReadToEnd(recordsToRead); + } + + // Manually insert a connector into config storage, updating the task configs, connector config, and root config + private void whiteboxAddConnector(String connectorName, Map connectorConfig, List> taskConfigs) { + Map> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs"); + for (int i = 0; i < taskConfigs.size(); i++) + storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i)); + + Map> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs"); + connectorConfigs.put(connectorName, connectorConfig); + + Whitebox.>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size()); + } + + // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted + private Map structToMap(Struct struct) { + HashMap result = new HashMap<>(); + for (Field field : struct.schema().fields()) + result.put(field.name(), struct.get(field)); + return result; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java ---------------------------------------------------------------------- diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java deleted file mode 100644 index 5e79a8d..0000000 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java +++ /dev/null @@ -1,534 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - **/ - -package org.apache.kafka.connect.storage; - -import org.apache.kafka.clients.CommonClientConfigs; -import org.apache.kafka.clients.consumer.ConsumerConfig; -import org.apache.kafka.clients.consumer.ConsumerRecord; -import org.apache.kafka.clients.producer.ProducerConfig; -import org.apache.kafka.common.KafkaException; -import org.apache.kafka.common.record.TimestampType; -import org.apache.kafka.connect.data.Field; -import org.apache.kafka.connect.data.Schema; -import org.apache.kafka.connect.data.SchemaAndValue; -import org.apache.kafka.connect.data.Struct; -import org.apache.kafka.connect.runtime.distributed.ClusterConfigState; -import org.apache.kafka.connect.runtime.distributed.DistributedConfig; -import org.apache.kafka.connect.util.Callback; -import org.apache.kafka.connect.util.ConnectorTaskId; -import org.apache.kafka.connect.util.KafkaBasedLog; -import org.apache.kafka.connect.util.TestFuture; -import org.easymock.Capture; -import org.easymock.EasyMock; -import org.easymock.IAnswer; -import org.junit.Before; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.powermock.api.easymock.PowerMock; -import org.powermock.api.easymock.annotation.Mock; -import org.powermock.core.classloader.annotations.PowerMockIgnore; -import org.powermock.core.classloader.annotations.PrepareForTest; -import org.powermock.modules.junit4.PowerMockRunner; -import org.powermock.reflect.Whitebox; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; - -@RunWith(PowerMockRunner.class) -@PrepareForTest(KafkaConfigStorage.class) -@PowerMockIgnore("javax.management.*") -@SuppressWarnings("unchecked") -public class KafkaConfigStorageTest { - private static final String TOPIC = "connect-configs"; - private static final Map DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>(); - private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG; - - static { - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic"); - DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter"); - DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS); - } - - private static final List CONNECTOR_IDS = Arrays.asList("connector1", "connector2"); - private static final List CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2"); - private static final List COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2"); - - // Need a) connector with multiple tasks and b) multiple connectors - private static final List TASK_IDS = Arrays.asList( - new ConnectorTaskId("connector1", 0), - new ConnectorTaskId("connector1", 1), - new ConnectorTaskId("connector2", 0) - ); - private static final List TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0"); - - // Need some placeholders -- the contents don't matter here, just that they are restored properly - private static final List> SAMPLE_CONFIGS = Arrays.asList( - Collections.singletonMap("config-key-one", "config-value-one"), - Collections.singletonMap("config-key-two", "config-value-two"), - Collections.singletonMap("config-key-three", "config-value-three") - ); - private static final List CONNECTOR_CONFIG_STRUCTS = Arrays.asList( - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)), - new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2)) - ); - private static final List TASK_CONFIG_STRUCTS = Arrays.asList( - new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)), - new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)) - ); - - private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR - = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2); - - // The exact format doesn't matter here since both conversions are mocked - private static final List CONFIGS_SERIALIZED = Arrays.asList( - "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(), - "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(), - "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes() - ); - - @Mock - private Converter converter; - @Mock - private Callback connectorReconfiguredCallback; - @Mock - private Callback> tasksReconfiguredCallback; - @Mock - KafkaBasedLog storeLog; - private KafkaConfigStorage configStorage; - - private Capture capturedTopic = EasyMock.newCapture(); - private Capture> capturedProducerProps = EasyMock.newCapture(); - private Capture> capturedConsumerProps = EasyMock.newCapture(); - private Capture>> capturedConsumedCallback = EasyMock.newCapture(); - - private long logOffset = 0; - - @Before - public void setUp() { - configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"}, - converter, connectorReconfiguredCallback, tasksReconfiguredCallback); - } - - @Test - public void testStartStop() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); - assertEquals(TOPIC, capturedTopic.getValue()); - assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG)); - assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG)); - - configStorage.start(); - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutConnectorConfig() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0)); - EasyMock.expectLastCall(); - - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), - "properties", SAMPLE_CONFIGS.get(1)); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); - EasyMock.expectLastCall(); - - // Config deletion - expectConvertWriteAndRead( - CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null); - connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1)); - EasyMock.expectLastCall(); - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); - configStorage.start(); - - // Null before writing - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Writing should block until it is written and read back from Kafka - configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0)); - configState = configStorage.snapshot(); - assertEquals(1, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Second should also block and all configs should still be available - configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1)); - configState = configStorage.snapshot(); - assertEquals(2, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1))); - - // Deletion should remove the second one we added - configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null); - configState = configStorage.snapshot(); - assertEquals(3, configState.offset()); - assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0))); - assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1))); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutTaskConfigs() throws Exception { - expectConfigure(); - expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP); - - // Task configs should read to end, write to the log, read to end, write root, then read to end again - expectReadToEnd(new LinkedHashMap()); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1), - "properties", SAMPLE_CONFIGS.get(1)); - expectReadToEnd(new LinkedHashMap()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), - "tasks", 2); // Starts with 0 tasks, after update has 2 - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1))); - EasyMock.expectLastCall(); - - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1)); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); - expectReadToEnd(serializedConfigs); - - expectStop(); - - PowerMock.replayAll(); - - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); - configStorage.start(); - - // Bootstrap as if we had already added the connector, but no tasks had been added yet - whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST); - - // Null before writing - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(-1, configState.offset()); - assertNull(configState.taskConfig(TASK_IDS.get(0))); - assertNull(configState.taskConfig(TASK_IDS.get(1))); - - // Writing task task configs should block until all the writes have been performed and the root record update - // has completed - Map> taskConfigs = new HashMap<>(); - taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)); - taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1)); - configStorage.putTaskConfigs(taskConfigs); - - // Validate root config by listing all connectors and tasks - configState = configStorage.snapshot(); - assertEquals(3, configState.offset()); - String connectorName = CONNECTOR_IDS.get(0); - assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName)); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testRestore() throws Exception { - // Restoring data should notify only of the latest values after loading is complete. This also validates - // that inconsistent state is ignored. - - expectConfigure(); - // Overwrite each type at least once to ensure we see the latest data after loading - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), - new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), - // Connector after root update should make it through, task update shouldn't - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)), - new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6))); - LinkedHashMap deserialized = new LinkedHashMap(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2)); - deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 7; - expectStart(existingRecords, deserialized); - - // Shouldn't see any callbacks since this is during startup - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); - configStorage.start(); - - // Should see a single connector and its config should be the last one seen anywhere in the log - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2] - assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0))); - // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected - assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - @Test - public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception { - // Test a case where a failure and compaction has left us in an inconsistent state when reading the log. - // We start out by loading an initial configuration where we started to write a task update and failed before - // writing an the commit, and then compaction cleaned up the earlier record. - - expectConfigure(); - List> existingRecords = Arrays.asList( - new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)), - // This is the record that has been compacted: - //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)), - new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)), - new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)), - new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5))); - LinkedHashMap deserialized = new LinkedHashMap(); - deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0)); - deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR); - deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1)); - logOffset = 6; - expectStart(existingRecords, deserialized); - - // One failed attempt to write new task configs - expectReadToEnd(new LinkedHashMap()); - - // Successful attempt to write new task config - expectReadToEnd(new LinkedHashMap()); - expectConvertWriteRead( - TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0), - "properties", SAMPLE_CONFIGS.get(0)); - expectReadToEnd(new LinkedHashMap()); - expectConvertWriteRead( - COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2), - "tasks", 1); // Updated to just 1 task - // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks - tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0))); - EasyMock.expectLastCall(); - // Records to be read by consumer as it reads to the end of the log - LinkedHashMap serializedConfigs = new LinkedHashMap<>(); - serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)); - serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2)); - expectReadToEnd(serializedConfigs); - - - expectStop(); - - PowerMock.replayAll(); - - configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG); - configStorage.start(); - // After reading the log, it should have been in an inconsistent state - ClusterConfigState configState = configStorage.snapshot(); - assertEquals(6, configState.offset()); // Should always be next to be read, not last committed - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list - assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0))); - // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0] - assertNull(configState.taskConfig(TASK_IDS.get(0))); - assertNull(configState.taskConfig(TASK_IDS.get(1))); - assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors()); - - // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks) - try { - configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2))); - fail("Should have failed due to incomplete task set."); - } catch (KafkaException e) { - // expected - } - - // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case - // we are going to shrink the number of tasks to 1 - configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0))); - // Validate updated config - configState = configStorage.snapshot(); - // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written - // to the topic. Only the last call with 1 task config + 1 commit actually gets written. - assertEquals(8, configState.offset()); - assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors())); - assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0))); - assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0))); - assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors()); - - configStorage.stop(); - - PowerMock.verifyAll(); - } - - private void expectConfigure() throws Exception { - PowerMock.expectPrivate(configStorage, "createKafkaBasedLog", - EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps), - EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback)) - .andReturn(storeLog); - } - - // If non-empty, deserializations should be a LinkedHashMap - private void expectStart(final List> preexistingRecords, - final Map deserializations) throws Exception { - storeLog.start(); - PowerMock.expectLastCall().andAnswer(new IAnswer() { - @Override - public Object answer() throws Throwable { - for (ConsumerRecord rec : preexistingRecords) - capturedConsumedCallback.getValue().onCompletion(null, rec); - return null; - } - }); - for (Map.Entry deserializationEntry : deserializations.entrySet()) { - // Note null schema because default settings for internal serialization are schema-less - EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey()))) - .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue()))); - } - } - - private void expectStop() { - storeLog.stop(); - PowerMock.expectLastCall(); - } - - // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back - // from the log. Validate the data that is captured when the conversion is performed matches the specified data - // (by checking a single field's value) - private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized, - final String dataFieldName, final Object dataFieldValue) { - final Capture capturedRecord = EasyMock.newCapture(); - if (serialized != null) - EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord))) - .andReturn(serialized); - storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized)); - PowerMock.expectLastCall(); - EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized))) - .andAnswer(new IAnswer() { - @Override - public SchemaAndValue answer() throws Throwable { - if (dataFieldName != null) - assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName)); - // Note null schema because default settings for internal serialization are schema-less - return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue())); - } - }); - } - - // This map needs to maintain ordering - private void expectReadToEnd(final LinkedHashMap serializedConfigs) { - EasyMock.expect(storeLog.readToEnd()) - .andAnswer(new IAnswer>() { - @Override - public Future answer() throws Throwable { - TestFuture future = new TestFuture(); - for (Map.Entry entry : serializedConfigs.entrySet()) - capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue())); - future.resolveOnGet((Void) null); - return future; - } - }); - } - - - private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized, - final String dataFieldName, final Object dataFieldValue) { - expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue); - LinkedHashMap recordsToRead = new LinkedHashMap<>(); - recordsToRead.put(configKey, serialized); - expectReadToEnd(recordsToRead); - } - - // Manually insert a connector into config storage, updating the task configs, connector config, and root config - private void whiteboxAddConnector(String connectorName, Map connectorConfig, List> taskConfigs) { - Map> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs"); - for (int i = 0; i < taskConfigs.size(); i++) - storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i)); - - Map> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs"); - connectorConfigs.put(connectorName, connectorConfig); - - Whitebox.>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size()); - } - - // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted - private Map structToMap(Struct struct) { - HashMap result = new HashMap<>(); - for (Field field : struct.schema().fields()) - result.put(field.name(), struct.get(field)); - return result; - } - -}