kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ewe...@apache.org
Subject [1/5] kafka git commit: KAFKA-2370: kafka connect pause/resume API
Date Wed, 20 Apr 2016 21:10:20 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 280efe7f7 -> c9485b78a


http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
new file mode 100644
index 0000000..eaad34b
--- /dev/null
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigBackingStoreTest.java
@@ -0,0 +1,537 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ **/
+
+package org.apache.kafka.connect.storage;
+
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.record.TimestampType;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
+import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
+import org.apache.kafka.connect.util.Callback;
+import org.apache.kafka.connect.util.ConnectorTaskId;
+import org.apache.kafka.connect.util.KafkaBasedLog;
+import org.apache.kafka.connect.util.TestFuture;
+import org.easymock.Capture;
+import org.easymock.EasyMock;
+import org.easymock.IAnswer;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.powermock.api.easymock.PowerMock;
+import org.powermock.api.easymock.annotation.Mock;
+import org.powermock.core.classloader.annotations.PowerMockIgnore;
+import org.powermock.core.classloader.annotations.PrepareForTest;
+import org.powermock.modules.junit4.PowerMockRunner;
+import org.powermock.reflect.Whitebox;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Future;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.fail;
+
+@RunWith(PowerMockRunner.class)
+@PrepareForTest(KafkaConfigBackingStore.class)
+@PowerMockIgnore("javax.management.*")
+@SuppressWarnings("unchecked")
+public class KafkaConfigBackingStoreTest {
+    private static final String TOPIC = "connect-configs";
+    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
+    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
+
+    static {
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
+        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
+    }
+
+    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
+    private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
+    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
+    private static final List<String> TARGET_STATE_KEYS =  Arrays.asList("target-state-connector1", "target-state-connector2");
+
+    // Need a) connector with multiple tasks and b) multiple connectors
+    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
+            new ConnectorTaskId("connector1", 0),
+            new ConnectorTaskId("connector1", 1),
+            new ConnectorTaskId("connector2", 0)
+    );
+    private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
+
+    // Need some placeholders -- the contents don't matter here, just that they are restored properly
+    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
+            Collections.singletonMap("config-key-one", "config-value-one"),
+            Collections.singletonMap("config-key-two", "config-value-two"),
+            Collections.singletonMap("config-key-three", "config-value-three")
+    );
+    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
+            new Struct(KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
+    );
+    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
+            new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
+            new Struct(KafkaConfigBackingStore.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
+    );
+
+    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
+            = new Struct(KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
+
+    // The exact format doesn't matter here since both conversions are mocked
+    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
+            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
+            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
+            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
+    );
+
+    @Mock
+    private Converter converter;
+    @Mock
+    private ConfigBackingStore.UpdateListener configUpdateListener;
+    @Mock
+    KafkaBasedLog<String, byte[]> storeLog;
+    private KafkaConfigBackingStore configStorage;
+
+    private Capture<String> capturedTopic = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
+    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
+    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
+
+    private long logOffset = 0;
+
+    @Before
+    public void setUp() {
+        configStorage = PowerMock.createPartialMock(KafkaConfigBackingStore.class, new String[]{"createKafkaBasedLog"}, converter);
+        configStorage.setUpdateListener(configUpdateListener);
+    }
+
+    @Test
+    public void testStartStop() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        assertEquals(TOPIC, capturedTopic.getValue());
+        assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
+        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
+
+        configStorage.start();
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutConnectorConfig() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(0));
+        EasyMock.expectLastCall();
+
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        configUpdateListener.onConnectorConfigUpdate(CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        // Config deletion
+        expectConvertWriteAndRead(
+                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigBackingStore.CONNECTOR_CONFIGURATION_V0, null, null, null);
+        configUpdateListener.onConnectorConfigRemove(CONNECTOR_IDS.get(1));
+        EasyMock.expectLastCall();
+
+        // Target state deletion
+        storeLog.send(TARGET_STATE_KEYS.get(1), null);
+        PowerMock.expectLastCall();
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Writing should block until it is written and read back from Kafka
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        configState = configStorage.snapshot();
+        assertEquals(1, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Second should also block and all configs should still be available
+        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configState = configStorage.snapshot();
+        assertEquals(2, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        // Deletion should remove the second one we added
+        configStorage.removeConnectorConfig(CONNECTOR_IDS.get(1));
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigs() throws Exception {
+        expectConfigure();
+        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
+
+        // Task configs should read to end, write to the log, read to end, write root, then read to end again
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(1), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
+                "properties", SAMPLE_CONFIGS.get(1));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 2); // Starts with 0 tasks, after update has 2
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
+        EasyMock.expectLastCall();
+
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Bootstrap as if we had already added the connector, but no tasks had been added yet
+        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
+
+        // Null before writing
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(-1, configState.offset());
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+
+        // Writing task task configs should block until all the writes have been performed and the root record update
+        // has completed
+        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
+        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
+        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
+        configStorage.putTaskConfigs("connector1", taskConfigs);
+
+        // Validate root config by listing all connectors and tasks
+        configState = configStorage.snapshot();
+        assertEquals(3, configState.offset());
+        String connectorName = CONNECTOR_IDS.get(0);
+        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testRestore() throws Exception {
+        // Restoring data should notify only of the latest values after loading is complete. This also validates
+        // that inconsistent state is ignored.
+
+        expectConfigure();
+        // Overwrite each type at least once to ensure we see the latest data after loading
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                // Connector after root update should make it through, task update shouldn't
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
+                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
+        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 7;
+        expectStart(existingRecords, deserialized);
+
+        // Shouldn't see any callbacks since this is during startup
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+
+        // Should see a single connector and its config should be the last one seen anywhere in the log
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
+        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
+        // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
+        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    @Test
+    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
+        // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
+        // We start out by loading an initial configuration where we started to write a task update and failed before
+        // writing an the commit, and then compaction cleaned up the earlier record.
+
+        expectConfigure();
+        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
+                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
+                // This is the record that has been compacted:
+                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
+                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
+                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
+                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
+        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
+        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
+        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
+        deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
+        logOffset = 6;
+        expectStart(existingRecords, deserialized);
+
+        // One failed attempt to write new task configs
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+
+        // Successful attempt to write new task config
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                TASK_CONFIG_KEYS.get(0), KafkaConfigBackingStore.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
+                "properties", SAMPLE_CONFIGS.get(0));
+        expectReadToEnd(new LinkedHashMap<String, byte[]>());
+        expectConvertWriteRead(
+                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigBackingStore.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
+                "tasks", 1); // Updated to just 1 task
+        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
+        configUpdateListener.onTaskConfigUpdate(Arrays.asList(TASK_IDS.get(0)));
+        EasyMock.expectLastCall();
+        // Records to be read by consumer as it reads to the end of the log
+        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
+        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
+        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
+        expectReadToEnd(serializedConfigs);
+
+
+        expectStop();
+
+        PowerMock.replayAll();
+
+        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
+        configStorage.start();
+        // After reading the log, it should have been in an inconsistent state
+        ClusterConfigState configState = configStorage.snapshot();
+        assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
+        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
+        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
+        assertNull(configState.taskConfig(TASK_IDS.get(0)));
+        assertNull(configState.taskConfig(TASK_IDS.get(1)));
+        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
+
+        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
+        try {
+            configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
+            fail("Should have failed due to incomplete task set.");
+        } catch (KafkaException e) {
+            // expected
+        }
+
+        // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
+        // we are going to shrink the number of tasks to 1
+        configStorage.putTaskConfigs("connector1", Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
+        // Validate updated config
+        configState = configStorage.snapshot();
+        // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
+        // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
+        assertEquals(8, configState.offset());
+        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
+        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
+        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
+        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
+
+        configStorage.stop();
+
+        PowerMock.verifyAll();
+    }
+
+    private void expectConfigure() throws Exception {
+        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
+                EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
+                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
+                .andReturn(storeLog);
+    }
+
+    // If non-empty, deserializations should be a LinkedHashMap
+    private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
+                             final Map<byte[], Struct> deserializations) throws Exception {
+        storeLog.start();
+        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
+            @Override
+            public Object answer() throws Throwable {
+                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
+                    capturedConsumedCallback.getValue().onCompletion(null, rec);
+                return null;
+            }
+        });
+        for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
+            // Note null schema because default settings for internal serialization are schema-less
+            EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
+                    .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
+        }
+    }
+
+    private void expectStop() {
+        storeLog.stop();
+        PowerMock.expectLastCall();
+    }
+
+    // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
+    // from the log. Validate the data that is captured when the conversion is performed matches the specified data
+    // (by checking a single field's value)
+    private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                        final String dataFieldName, final Object dataFieldValue) {
+        final Capture<Struct> capturedRecord = EasyMock.newCapture();
+        if (serialized != null)
+            EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
+                    .andReturn(serialized);
+        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
+        PowerMock.expectLastCall();
+        EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
+                .andAnswer(new IAnswer<SchemaAndValue>() {
+                    @Override
+                    public SchemaAndValue answer() throws Throwable {
+                        if (dataFieldName != null)
+                            assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
+                        // Note null schema because default settings for internal serialization are schema-less
+                        return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
+                    }
+                });
+    }
+
+    // This map needs to maintain ordering
+    private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
+        EasyMock.expect(storeLog.readToEnd())
+                .andAnswer(new IAnswer<Future<Void>>() {
+                    @Override
+                    public Future<Void> answer() throws Throwable {
+                        TestFuture<Void> future = new TestFuture<Void>();
+                        for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
+                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
+                        future.resolveOnGet((Void) null);
+                        return future;
+                    }
+                });
+    }
+
+
+    private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
+                                           final String dataFieldName, final Object dataFieldValue) {
+        expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
+        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
+        recordsToRead.put(configKey, serialized);
+        expectReadToEnd(recordsToRead);
+    }
+
+    // Manually insert a connector into config storage, updating the task configs, connector config, and root config
+    private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
+        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
+        for (int i = 0; i < taskConfigs.size(); i++)
+            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
+
+        Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
+        connectorConfigs.put(connectorName, connectorConfig);
+
+        Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
+    }
+
+    // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
+    private Map<String, Object> structToMap(Struct struct) {
+        HashMap<String, Object> result = new HashMap<>();
+        for (Field field : struct.schema().fields())
+            result.put(field.name(), struct.get(field));
+        return result;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/c9485b78/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
deleted file mode 100644
index 5e79a8d..0000000
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/storage/KafkaConfigStorageTest.java
+++ /dev/null
@@ -1,534 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- **/
-
-package org.apache.kafka.connect.storage;
-
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.common.KafkaException;
-import org.apache.kafka.common.record.TimestampType;
-import org.apache.kafka.connect.data.Field;
-import org.apache.kafka.connect.data.Schema;
-import org.apache.kafka.connect.data.SchemaAndValue;
-import org.apache.kafka.connect.data.Struct;
-import org.apache.kafka.connect.runtime.distributed.ClusterConfigState;
-import org.apache.kafka.connect.runtime.distributed.DistributedConfig;
-import org.apache.kafka.connect.util.Callback;
-import org.apache.kafka.connect.util.ConnectorTaskId;
-import org.apache.kafka.connect.util.KafkaBasedLog;
-import org.apache.kafka.connect.util.TestFuture;
-import org.easymock.Capture;
-import org.easymock.EasyMock;
-import org.easymock.IAnswer;
-import org.junit.Before;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.powermock.api.easymock.PowerMock;
-import org.powermock.api.easymock.annotation.Mock;
-import org.powermock.core.classloader.annotations.PowerMockIgnore;
-import org.powermock.core.classloader.annotations.PrepareForTest;
-import org.powermock.modules.junit4.PowerMockRunner;
-import org.powermock.reflect.Whitebox;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Future;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.fail;
-
-@RunWith(PowerMockRunner.class)
-@PrepareForTest(KafkaConfigStorage.class)
-@PowerMockIgnore("javax.management.*")
-@SuppressWarnings("unchecked")
-public class KafkaConfigStorageTest {
-    private static final String TOPIC = "connect-configs";
-    private static final Map<String, String> DEFAULT_CONFIG_STORAGE_PROPS = new HashMap<>();
-    private static final DistributedConfig DEFAULT_DISTRIBUTED_CONFIG;
-
-    static {
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.CONFIG_TOPIC_CONFIG, TOPIC);
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.OFFSET_STORAGE_TOPIC_CONFIG, "connect-offsets");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.GROUP_ID_CONFIG, "connect");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.STATUS_STORAGE_TOPIC_CONFIG, "status-topic");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "broker1:9092,broker2:9093");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_KEY_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_CONFIG_STORAGE_PROPS.put(DistributedConfig.INTERNAL_VALUE_CONVERTER_CLASS_CONFIG, "org.apache.kafka.connect.json.JsonConverter");
-        DEFAULT_DISTRIBUTED_CONFIG = new DistributedConfig(DEFAULT_CONFIG_STORAGE_PROPS);
-    }
-
-    private static final List<String> CONNECTOR_IDS = Arrays.asList("connector1", "connector2");
-    private static final List<String> CONNECTOR_CONFIG_KEYS = Arrays.asList("connector-connector1", "connector-connector2");
-    private static final List<String> COMMIT_TASKS_CONFIG_KEYS = Arrays.asList("commit-connector1", "commit-connector2");
-
-    // Need a) connector with multiple tasks and b) multiple connectors
-    private static final List<ConnectorTaskId> TASK_IDS = Arrays.asList(
-            new ConnectorTaskId("connector1", 0),
-            new ConnectorTaskId("connector1", 1),
-            new ConnectorTaskId("connector2", 0)
-    );
-    private static final List<String> TASK_CONFIG_KEYS = Arrays.asList("task-connector1-0", "task-connector1-1", "task-connector2-0");
-
-    // Need some placeholders -- the contents don't matter here, just that they are restored properly
-    private static final List<Map<String, String>> SAMPLE_CONFIGS = Arrays.asList(
-            Collections.singletonMap("config-key-one", "config-value-one"),
-            Collections.singletonMap("config-key-two", "config-value-two"),
-            Collections.singletonMap("config-key-three", "config-value-three")
-    );
-    private static final List<Struct> CONNECTOR_CONFIG_STRUCTS = Arrays.asList(
-            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
-            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1)),
-            new Struct(KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(2))
-    );
-    private static final List<Struct> TASK_CONFIG_STRUCTS = Arrays.asList(
-            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(0)),
-            new Struct(KafkaConfigStorage.TASK_CONFIGURATION_V0).put("properties", SAMPLE_CONFIGS.get(1))
-    );
-
-    private static final Struct TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR
-            = new Struct(KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0).put("tasks", 2);
-
-    // The exact format doesn't matter here since both conversions are mocked
-    private static final List<byte[]> CONFIGS_SERIALIZED = Arrays.asList(
-            "config-bytes-1".getBytes(), "config-bytes-2".getBytes(), "config-bytes-3".getBytes(),
-            "config-bytes-4".getBytes(), "config-bytes-5".getBytes(), "config-bytes-6".getBytes(),
-            "config-bytes-7".getBytes(), "config-bytes-8".getBytes(), "config-bytes-9".getBytes()
-    );
-
-    @Mock
-    private Converter converter;
-    @Mock
-    private Callback<String> connectorReconfiguredCallback;
-    @Mock
-    private Callback<List<ConnectorTaskId>> tasksReconfiguredCallback;
-    @Mock
-    KafkaBasedLog<String, byte[]> storeLog;
-    private KafkaConfigStorage configStorage;
-
-    private Capture<String> capturedTopic = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedProducerProps = EasyMock.newCapture();
-    private Capture<Map<String, Object>> capturedConsumerProps = EasyMock.newCapture();
-    private Capture<Callback<ConsumerRecord<String, byte[]>>> capturedConsumedCallback = EasyMock.newCapture();
-
-    private long logOffset = 0;
-
-    @Before
-    public void setUp() {
-        configStorage = PowerMock.createPartialMock(KafkaConfigStorage.class, new String[]{"createKafkaBasedLog"},
-                converter, connectorReconfiguredCallback, tasksReconfiguredCallback);
-    }
-
-    @Test
-    public void testStartStop() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
-        assertEquals(TOPIC, capturedTopic.getValue());
-        assertEquals("org.apache.kafka.common.serialization.StringSerializer", capturedProducerProps.getValue().get(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG));
-        assertEquals("org.apache.kafka.common.serialization.ByteArraySerializer", capturedProducerProps.getValue().get(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG));
-        assertEquals("org.apache.kafka.common.serialization.StringDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG));
-        assertEquals("org.apache.kafka.common.serialization.ByteArrayDeserializer", capturedConsumerProps.getValue().get(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG));
-
-        configStorage.start();
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutConnectorConfig() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(0));
-        EasyMock.expectLastCall();
-
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
-                "properties", SAMPLE_CONFIGS.get(1));
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
-        EasyMock.expectLastCall();
-
-        // Config deletion
-        expectConvertWriteAndRead(
-                CONNECTOR_CONFIG_KEYS.get(1), KafkaConfigStorage.CONNECTOR_CONFIGURATION_V0, null, null, null);
-        connectorReconfiguredCallback.onCompletion(null, CONNECTOR_IDS.get(1));
-        EasyMock.expectLastCall();
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
-        configStorage.start();
-
-        // Null before writing
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(-1, configState.offset());
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Writing should block until it is written and read back from Kafka
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0));
-        configState = configStorage.snapshot();
-        assertEquals(1, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Second should also block and all configs should still be available
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), SAMPLE_CONFIGS.get(1));
-        configState = configStorage.snapshot();
-        assertEquals(2, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(1), configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        // Deletion should remove the second one we added
-        configStorage.putConnectorConfig(CONNECTOR_IDS.get(1), null);
-        configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        assertNull(configState.connectorConfig(CONNECTOR_IDS.get(1)));
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutTaskConfigs() throws Exception {
-        expectConfigure();
-        expectStart(Collections.EMPTY_LIST, Collections.EMPTY_MAP);
-
-        // Task configs should read to end, write to the log, read to end, write root, then read to end again
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(1), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(1),
-                "properties", SAMPLE_CONFIGS.get(1));
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
-                "tasks", 2); // Starts with 0 tasks, after update has 2
-        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
-        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)));
-        EasyMock.expectLastCall();
-
-        // Records to be read by consumer as it reads to the end of the log
-        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(1));
-        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
-        expectReadToEnd(serializedConfigs);
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
-        configStorage.start();
-
-        // Bootstrap as if we had already added the connector, but no tasks had been added yet
-        whiteboxAddConnector(CONNECTOR_IDS.get(0), SAMPLE_CONFIGS.get(0), Collections.EMPTY_LIST);
-
-        // Null before writing
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(-1, configState.offset());
-        assertNull(configState.taskConfig(TASK_IDS.get(0)));
-        assertNull(configState.taskConfig(TASK_IDS.get(1)));
-
-        // Writing task task configs should block until all the writes have been performed and the root record update
-        // has completed
-        Map<ConnectorTaskId, Map<String, String>> taskConfigs = new HashMap<>();
-        taskConfigs.put(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0));
-        taskConfigs.put(TASK_IDS.get(1), SAMPLE_CONFIGS.get(1));
-        configStorage.putTaskConfigs(taskConfigs);
-
-        // Validate root config by listing all connectors and tasks
-        configState = configStorage.snapshot();
-        assertEquals(3, configState.offset());
-        String connectorName = CONNECTOR_IDS.get(0);
-        assertEquals(Arrays.asList(connectorName), new ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(connectorName));
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(1), configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testRestore() throws Exception {
-        // Restoring data should notify only of the latest values after loading is complete. This also validates
-        // that inconsistent state is ignored.
-
-        expectConfigure();
-        // Overwrite each type at least once to ensure we see the latest data after loading
-        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                new ConsumerRecord<>(TOPIC, 0, 1, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 3, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(3)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                // Connector after root update should make it through, task update shouldn't
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)),
-                new ConsumerRecord<>(TOPIC, 0, 6, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(6)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
-        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(1), TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(3), CONNECTOR_CONFIG_STRUCTS.get(1));
-        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
-        deserialized.put(CONFIGS_SERIALIZED.get(5), CONNECTOR_CONFIG_STRUCTS.get(2));
-        deserialized.put(CONFIGS_SERIALIZED.get(6), TASK_CONFIG_STRUCTS.get(1));
-        logOffset = 7;
-        expectStart(existingRecords, deserialized);
-
-        // Shouldn't see any callbacks since this is during startup
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
-        configStorage.start();
-
-        // Should see a single connector and its config should be the last one seen anywhere in the log
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(7, configState.offset()); // Should always be next to be read, even if uncommitted
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
-        // CONNECTOR_CONFIG_STRUCTS[2] -> SAMPLE_CONFIGS[2]
-        assertEquals(SAMPLE_CONFIGS.get(2), configState.connectorConfig(CONNECTOR_IDS.get(0)));
-        // Should see 2 tasks for that connector. Only config updates before the root key update should be reflected
-        assertEquals(Arrays.asList(TASK_IDS.get(0), TASK_IDS.get(1)), configState.tasks(CONNECTOR_IDS.get(0)));
-        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    @Test
-    public void testPutTaskConfigsDoesNotResolveAllInconsistencies() throws Exception {
-        // Test a case where a failure and compaction has left us in an inconsistent state when reading the log.
-        // We start out by loading an initial configuration where we started to write a task update and failed before
-        // writing an the commit, and then compaction cleaned up the earlier record.
-
-        expectConfigure();
-        List<ConsumerRecord<String, byte[]>> existingRecords = Arrays.asList(
-                new ConsumerRecord<>(TOPIC, 0, 0, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, CONNECTOR_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0)),
-                // This is the record that has been compacted:
-                //new ConsumerRecord<>(TOPIC, 0, 1, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(1)),
-                new ConsumerRecord<>(TOPIC, 0, 2, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(1), CONFIGS_SERIALIZED.get(2)),
-                new ConsumerRecord<>(TOPIC, 0, 4, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(4)),
-                new ConsumerRecord<>(TOPIC, 0, 5, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(5)));
-        LinkedHashMap<byte[], Struct> deserialized = new LinkedHashMap();
-        deserialized.put(CONFIGS_SERIALIZED.get(0), CONNECTOR_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(2), TASK_CONFIG_STRUCTS.get(0));
-        deserialized.put(CONFIGS_SERIALIZED.get(4), TASKS_COMMIT_STRUCT_TWO_TASK_CONNECTOR);
-        deserialized.put(CONFIGS_SERIALIZED.get(5), TASK_CONFIG_STRUCTS.get(1));
-        logOffset = 6;
-        expectStart(existingRecords, deserialized);
-
-        // One failed attempt to write new task configs
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-
-        // Successful attempt to write new task config
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                TASK_CONFIG_KEYS.get(0), KafkaConfigStorage.TASK_CONFIGURATION_V0, CONFIGS_SERIALIZED.get(0),
-                "properties", SAMPLE_CONFIGS.get(0));
-        expectReadToEnd(new LinkedHashMap<String, byte[]>());
-        expectConvertWriteRead(
-                COMMIT_TASKS_CONFIG_KEYS.get(0), KafkaConfigStorage.CONNECTOR_TASKS_COMMIT_V0, CONFIGS_SERIALIZED.get(2),
-                "tasks", 1); // Updated to just 1 task
-        // As soon as root is rewritten, we should see a callback notifying us that we reconfigured some tasks
-        tasksReconfiguredCallback.onCompletion(null, Arrays.asList(TASK_IDS.get(0)));
-        EasyMock.expectLastCall();
-        // Records to be read by consumer as it reads to the end of the log
-        LinkedHashMap<String, byte[]> serializedConfigs = new LinkedHashMap<>();
-        serializedConfigs.put(TASK_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(0));
-        serializedConfigs.put(COMMIT_TASKS_CONFIG_KEYS.get(0), CONFIGS_SERIALIZED.get(2));
-        expectReadToEnd(serializedConfigs);
-
-
-        expectStop();
-
-        PowerMock.replayAll();
-
-        configStorage.configure(DEFAULT_DISTRIBUTED_CONFIG);
-        configStorage.start();
-        // After reading the log, it should have been in an inconsistent state
-        ClusterConfigState configState = configStorage.snapshot();
-        assertEquals(6, configState.offset()); // Should always be next to be read, not last committed
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
-        // Inconsistent data should leave us with no tasks listed for the connector and an entry in the inconsistent list
-        assertEquals(Collections.EMPTY_LIST, configState.tasks(CONNECTOR_IDS.get(0)));
-        // Both TASK_CONFIG_STRUCTS[0] -> SAMPLE_CONFIGS[0]
-        assertNull(configState.taskConfig(TASK_IDS.get(0)));
-        assertNull(configState.taskConfig(TASK_IDS.get(1)));
-        assertEquals(Collections.singleton(CONNECTOR_IDS.get(0)), configState.inconsistentConnectors());
-
-        // First try sending an invalid set of configs (can't possibly represent a valid config set for the tasks)
-        try {
-            configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(1), SAMPLE_CONFIGS.get(2)));
-            fail("Should have failed due to incomplete task set.");
-        } catch (KafkaException e) {
-            // expected
-        }
-
-        // Next, issue a write that has everything that is needed and it should be accepted. Note that in this case
-        // we are going to shrink the number of tasks to 1
-        configStorage.putTaskConfigs(Collections.singletonMap(TASK_IDS.get(0), SAMPLE_CONFIGS.get(0)));
-        // Validate updated config
-        configState = configStorage.snapshot();
-        // This is only two more ahead of the last one because multiple calls fail, and so their configs are not written
-        // to the topic. Only the last call with 1 task config + 1 commit actually gets written.
-        assertEquals(8, configState.offset());
-        assertEquals(Arrays.asList(CONNECTOR_IDS.get(0)), new ArrayList<>(configState.connectors()));
-        assertEquals(Arrays.asList(TASK_IDS.get(0)), configState.tasks(CONNECTOR_IDS.get(0)));
-        assertEquals(SAMPLE_CONFIGS.get(0), configState.taskConfig(TASK_IDS.get(0)));
-        assertEquals(Collections.EMPTY_SET, configState.inconsistentConnectors());
-
-        configStorage.stop();
-
-        PowerMock.verifyAll();
-    }
-
-    private void expectConfigure() throws Exception {
-        PowerMock.expectPrivate(configStorage, "createKafkaBasedLog",
-                EasyMock.capture(capturedTopic), EasyMock.capture(capturedProducerProps),
-                EasyMock.capture(capturedConsumerProps), EasyMock.capture(capturedConsumedCallback))
-                .andReturn(storeLog);
-    }
-
-    // If non-empty, deserializations should be a LinkedHashMap
-    private void expectStart(final List<ConsumerRecord<String, byte[]>> preexistingRecords,
-                             final Map<byte[], Struct> deserializations) throws Exception {
-        storeLog.start();
-        PowerMock.expectLastCall().andAnswer(new IAnswer<Object>() {
-            @Override
-            public Object answer() throws Throwable {
-                for (ConsumerRecord<String, byte[]> rec : preexistingRecords)
-                    capturedConsumedCallback.getValue().onCompletion(null, rec);
-                return null;
-            }
-        });
-        for (Map.Entry<byte[], Struct> deserializationEntry : deserializations.entrySet()) {
-            // Note null schema because default settings for internal serialization are schema-less
-            EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(deserializationEntry.getKey())))
-                    .andReturn(new SchemaAndValue(null, structToMap(deserializationEntry.getValue())));
-        }
-    }
-
-    private void expectStop() {
-        storeLog.stop();
-        PowerMock.expectLastCall();
-    }
-
-    // Expect a conversion & write to the underlying log, followed by a subsequent read when the data is consumed back
-    // from the log. Validate the data that is captured when the conversion is performed matches the specified data
-    // (by checking a single field's value)
-    private void expectConvertWriteRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                        final String dataFieldName, final Object dataFieldValue) {
-        final Capture<Struct> capturedRecord = EasyMock.newCapture();
-        if (serialized != null)
-            EasyMock.expect(converter.fromConnectData(EasyMock.eq(TOPIC), EasyMock.eq(valueSchema), EasyMock.capture(capturedRecord)))
-                    .andReturn(serialized);
-        storeLog.send(EasyMock.eq(configKey), EasyMock.aryEq(serialized));
-        PowerMock.expectLastCall();
-        EasyMock.expect(converter.toConnectData(EasyMock.eq(TOPIC), EasyMock.aryEq(serialized)))
-                .andAnswer(new IAnswer<SchemaAndValue>() {
-                    @Override
-                    public SchemaAndValue answer() throws Throwable {
-                        if (dataFieldName != null)
-                            assertEquals(dataFieldValue, capturedRecord.getValue().get(dataFieldName));
-                        // Note null schema because default settings for internal serialization are schema-less
-                        return new SchemaAndValue(null, serialized == null ? null : structToMap(capturedRecord.getValue()));
-                    }
-                });
-    }
-
-    // This map needs to maintain ordering
-    private void expectReadToEnd(final LinkedHashMap<String, byte[]> serializedConfigs) {
-        EasyMock.expect(storeLog.readToEnd())
-                .andAnswer(new IAnswer<Future<Void>>() {
-                    @Override
-                    public Future<Void> answer() throws Throwable {
-                        TestFuture<Void> future = new TestFuture<Void>();
-                        for (Map.Entry<String, byte[]> entry : serializedConfigs.entrySet())
-                            capturedConsumedCallback.getValue().onCompletion(null, new ConsumerRecord<>(TOPIC, 0, logOffset++, 0L, TimestampType.CREATE_TIME, 0L, 0, 0, entry.getKey(), entry.getValue()));
-                        future.resolveOnGet((Void) null);
-                        return future;
-                    }
-                });
-    }
-
-
-    private void expectConvertWriteAndRead(final String configKey, final Schema valueSchema, final byte[] serialized,
-                                                                 final String dataFieldName, final Object dataFieldValue) {
-        expectConvertWriteRead(configKey, valueSchema, serialized, dataFieldName, dataFieldValue);
-        LinkedHashMap<String, byte[]> recordsToRead = new LinkedHashMap<>();
-        recordsToRead.put(configKey, serialized);
-        expectReadToEnd(recordsToRead);
-    }
-
-    // Manually insert a connector into config storage, updating the task configs, connector config, and root config
-    private void whiteboxAddConnector(String connectorName, Map<String, String> connectorConfig, List<Map<String, String>> taskConfigs) {
-        Map<ConnectorTaskId, Map<String, String>> storageTaskConfigs = Whitebox.getInternalState(configStorage, "taskConfigs");
-        for (int i = 0; i < taskConfigs.size(); i++)
-            storageTaskConfigs.put(new ConnectorTaskId(connectorName, i), taskConfigs.get(i));
-
-        Map<String, Map<String, String>> connectorConfigs = Whitebox.getInternalState(configStorage, "connectorConfigs");
-        connectorConfigs.put(connectorName, connectorConfig);
-
-        Whitebox.<Map<String, Integer>>getInternalState(configStorage, "connectorTaskCounts").put(connectorName, taskConfigs.size());
-    }
-
-    // Generates a Map representation of Struct. Only does shallow traversal, so nested structs are not converted
-    private Map<String, Object> structToMap(Struct struct) {
-        HashMap<String, Object> result = new HashMap<>();
-        for (Field field : struct.schema().fields())
-            result.put(field.name(), struct.get(field));
-        return result;
-    }
-
-}


Mime
View raw message