kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From j...@apache.org
Subject [kafka] branch trunk updated: KAFKA-6241; Enable dynamic updates of broker SSL keystore (#4263)
Date Tue, 23 Jan 2018 16:44:35 GMT
This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b814a16  KAFKA-6241; Enable dynamic updates of broker SSL keystore (#4263)
b814a16 is described below

commit b814a16b968d144802d08523b5c359d6706f5632
Author: Rajini Sivaram <rajinisivaram@googlemail.com>
AuthorDate: Tue Jan 23 08:44:31 2018 -0800

    KAFKA-6241; Enable dynamic updates of broker SSL keystore (#4263)
    
    Enable dynamic broker configuration (see KIP-226 for details). Includes
     - Base implementation to allow specific broker configs and custom configs to be dynamically updated
     - Extend DescribeConfigsRequest/Response to return all synonym configs and their sources in the order of precedence
     - Extend AdminClient to alter dynamic broker configs
     - Dynamic update of SSL keystores
    
    Reviewers: Ted Yu <yuzhihong@gmail.com>, Jason Gustafson <jason@confluent.io>
---
 checkstyle/suppressions.xml                        |   2 +-
 .../apache/kafka/clients/admin/ConfigEntry.java    | 138 +++++-
 .../clients/admin/DescribeConfigsOptions.java      |  17 +
 .../kafka/clients/admin/KafkaAdminClient.java      |  87 +++-
 .../org/apache/kafka/common/Reconfigurable.java    |  49 +++
 .../org/apache/kafka/common/config/ConfigDef.java  |  33 ++
 .../apache/kafka/common/config/ConfigResource.java |   8 +
 .../org/apache/kafka/common/config/SslConfigs.java |   7 +
 .../kafka/common/network/ChannelBuilder.java       |   9 +-
 .../kafka/common/network/ChannelBuilders.java      |  21 +-
 .../network/ListenerReconfigurable.java}           |  26 +-
 .../kafka/common/network/SaslChannelBuilder.java   |  34 +-
 .../kafka/common/network/SslChannelBuilder.java    |  37 +-
 .../apache/kafka/common/protocol/types/Struct.java |   6 +-
 .../common/requests/DescribeConfigsRequest.java    |  29 +-
 .../common/requests/DescribeConfigsResponse.java   | 160 ++++++-
 .../kafka/common/security/ssl/SslFactory.java      | 282 ++++++++++--
 .../apache/kafka/common/network/NioEchoServer.java |   2 +-
 .../common/network/SaslChannelBuilderTest.java     |   2 +-
 .../kafka/common/network/SslSelectorTest.java      |   6 +-
 .../common/network/SslTransportLayerTest.java      |  82 +++-
 .../kafka/common/requests/RequestResponseTest.java |  26 +-
 .../authenticator/SaslAuthenticatorTest.java       |   4 +-
 .../kafka/common/security/ssl/SslFactoryTest.java  |  65 ++-
 core/src/main/scala/kafka/log/LogConfig.scala      |  31 +-
 .../main/scala/kafka/network/SocketServer.scala    |  36 +-
 .../src/main/scala/kafka/server/AdminManager.scala | 153 +++++--
 .../main/scala/kafka/server/ConfigHandler.scala    |   8 +-
 .../scala/kafka/server/DynamicBrokerConfig.scala   | 360 ++++++++++++++++
 .../main/scala/kafka/server/DynamicConfig.scala    |  18 +-
 core/src/main/scala/kafka/server/KafkaApis.scala   |   2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  50 ++-
 core/src/main/scala/kafka/server/KafkaServer.scala |  13 +-
 .../scala/kafka/server/KafkaServerStartable.scala  |   6 +-
 core/src/main/scala/kafka/utils/ZkUtils.scala      |   1 +
 core/src/main/scala/kafka/zk/AdminZkClient.scala   |  27 +-
 core/src/main/scala/kafka/zk/ZkData.scala          |   6 +-
 .../server/DynamicBrokerReconfigurationTest.scala  | 471 +++++++++++++++++++++
 .../kafka/server/DynamicBrokerConfigTest.scala     | 153 +++++++
 .../unit/kafka/server/DynamicConfigTest.scala      |   7 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |  28 +-
 41 files changed, 2270 insertions(+), 232 deletions(-)

diff --git a/checkstyle/suppressions.xml b/checkstyle/suppressions.xml
index 05d4a52..664a3f6 100644
--- a/checkstyle/suppressions.xml
+++ b/checkstyle/suppressions.xml
@@ -51,7 +51,7 @@
               files="(Utils|Topic|KafkaLZ4BlockOutputStream|AclData).java"/>
 
     <suppress checks="CyclomaticComplexity"
-              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
+              files="(ConsumerCoordinator|Fetcher|Sender|KafkaProducer|BufferPool|ConfigDef|RecordAccumulator|KerberosLogin|AbstractRequest|AbstractResponse|Selector|SslFactory|SslTransportLayer|SaslClientAuthenticator|SaslClientCallbackHandler).java"/>
 
     <suppress checks="JavaNCSS"
               files="AbstractRequest.java|KerberosLogin.java|WorkerSinkTaskTest.java"/>
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
index 3f24c81..e8da646 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/ConfigEntry.java
@@ -19,6 +19,8 @@ package org.apache.kafka.clients.admin;
 
 import org.apache.kafka.common.annotation.InterfaceStability;
 
+import java.util.Collections;
+import java.util.List;
 import java.util.Objects;
 
 /**
@@ -31,9 +33,10 @@ public class ConfigEntry {
 
     private final String name;
     private final String value;
-    private final boolean isDefault;
+    private final ConfigSource source;
     private final boolean isSensitive;
     private final boolean isReadOnly;
+    private final List<ConfigSynonym> synonyms;
 
     /**
      * Create a configuration entry with the provided values.
@@ -53,14 +56,36 @@ public class ConfigEntry {
      * @param isDefault whether the config value is the default or if it's been explicitly set
      * @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive
      * @param isReadOnly whether the config is read-only and cannot be updated
+     * @deprecated since 1.1.0. This constructor will be removed in a future release.
      */
     public ConfigEntry(String name, String value, boolean isDefault, boolean isSensitive, boolean isReadOnly) {
+        this(name,
+             value,
+             isDefault ? ConfigSource.DEFAULT_CONFIG : ConfigSource.UNKNOWN,
+             isSensitive,
+             isReadOnly,
+             Collections.<ConfigSynonym>emptyList());
+    }
+
+    /**
+     * Create a configuration with the provided values.
+     *
+     * @param name the non-null config name
+     * @param value the config value or null
+     * @param source the source of this config entry
+     * @param isSensitive whether the config value is sensitive, the broker never returns the value if it is sensitive
+     * @param isReadOnly whether the config is read-only and cannot be updated
+     * @param synonyms Synonym configs in order of precedence
+     */
+    ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean isReadOnly,
+                List<ConfigSynonym> synonyms) {
         Objects.requireNonNull(name, "name should not be null");
         this.name = name;
         this.value = value;
-        this.isDefault = isDefault;
+        this.source = source;
         this.isSensitive = isSensitive;
         this.isReadOnly = isReadOnly;
+        this.synonyms = synonyms;
     }
 
     /**
@@ -78,10 +103,17 @@ public class ConfigEntry {
     }
 
     /**
+     * Return the source of this configuration entry.
+     */
+    public ConfigSource source() {
+        return source;
+    }
+
+    /**
      * Return whether the config value is the default or if it's been explicitly set.
      */
     public boolean isDefault() {
-        return isDefault;
+        return source == ConfigSource.DEFAULT_CONFIG;
     }
 
     /**
@@ -99,6 +131,15 @@ public class ConfigEntry {
         return isReadOnly;
     }
 
+    /**
+     * Returns all config values that may be used as the value of this config along with their source,
+     * in the order of precedence. The list starts with the value returned in this ConfigEntry.
+     * The list is empty if synonyms were not requested using {@link DescribeConfigsOptions#includeSynonyms(boolean)}
+     */
+    public List<ConfigSynonym> synonyms() {
+        return  synonyms;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
@@ -110,9 +151,10 @@ public class ConfigEntry {
 
         return this.name.equals(that.name) &&
                 this.value != null ? this.value.equals(that.value) : that.value == null &&
-                this.isDefault == that.isDefault &&
                 this.isSensitive == that.isSensitive &&
-                this.isReadOnly == that.isReadOnly;
+                this.isReadOnly == that.isReadOnly &&
+                this.source == that.source &&
+                Objects.equals(this.synonyms, that.synonyms);
     }
 
     @Override
@@ -121,9 +163,10 @@ public class ConfigEntry {
         int result = 1;
         result = prime * result + name.hashCode();
         result = prime * result + ((value == null) ? 0 : value.hashCode());
-        result = prime * result + (isDefault ? 1 : 0);
         result = prime * result + (isSensitive ? 1 : 0);
         result = prime * result + (isReadOnly ? 1 : 0);
+        result = prime * result + source.hashCode();
+        result = prime * result + synonyms.hashCode();
         return result;
     }
 
@@ -132,9 +175,90 @@ public class ConfigEntry {
         return "ConfigEntry(" +
                 "name=" + name +
                 ", value=" + value +
-                ", isDefault=" + isDefault +
+                ", source=" + source +
                 ", isSensitive=" + isSensitive +
                 ", isReadOnly=" + isReadOnly +
+                ", synonyms=" + synonyms +
                 ")";
     }
+
+
+    /**
+     * Source of configuration entries.
+     */
+    public enum ConfigSource {
+        DYNAMIC_TOPIC_CONFIG,           // dynamic topic config that is configured for a specific topic
+        DYNAMIC_BROKER_CONFIG,          // dynamic broker config that is configured for a specific broker
+        DYNAMIC_DEFAULT_BROKER_CONFIG,  // dynamic broker config that is configured as default for all brokers in the cluster
+        STATIC_BROKER_CONFIG,           // static broker config provided as broker properties at start up (e.g. server.properties file)
+        DEFAULT_CONFIG,                 // built-in default configuration for configs that have a default value
+        UNKNOWN                         // source unknown e.g. in the ConfigEntry used for alter requests where source is not set
+    }
+
+    /**
+     * Class representing a configuration synonym of a {@link ConfigEntry}.
+     */
+    public static class ConfigSynonym {
+
+        private final String name;
+        private final String value;
+        private final ConfigSource source;
+
+        /**
+         * Create a configuration synonym with the provided values.
+         *
+         * @param name Configuration name (this may be different from the name of the associated {@link ConfigEntry}
+         * @param value Configuration value
+         * @param source {@link ConfigSource} of this configuraton
+         */
+        ConfigSynonym(String name, String value, ConfigSource source) {
+            this.name = name;
+            this.value = value;
+            this.source = source;
+        }
+
+        /**
+         * Returns the name of this configuration.
+         */
+        public String name() {
+            return name;
+        }
+
+        /**
+         * Returns the value of this configuration, which may be null if the configuration is sensitive.
+         */
+        public String value() {
+            return value;
+        }
+
+        /**
+         * Returns the source of this configuration.
+         */
+        public ConfigSource source() {
+            return source;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) return true;
+            if (o == null || getClass() != o.getClass()) return false;
+
+            ConfigSynonym that = (ConfigSynonym) o;
+            return Objects.equals(name, that.name) && Objects.equals(value, that.value) && source == that.source;
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(name, value, source);
+        }
+
+        @Override
+        public String toString() {
+            return "ConfigSynonym(" +
+                    "name=" + name +
+                    ", value=" + value +
+                    ", source=" + source +
+                    ")";
+        }
+    }
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
index b0b9b3c..aa667af 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
@@ -29,6 +29,8 @@ import java.util.Collection;
 @InterfaceStability.Evolving
 public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
 
+    private boolean includeSynonyms = false;
+
     /**
      * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
      * AdminClient should be used.
@@ -40,4 +42,19 @@ public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptio
         return this;
     }
 
+    /**
+     * Return true if synonym configs should be returned in the response.
+     */
+    public boolean includeSynonyms() {
+        return includeSynonyms;
+    }
+
+    /**
+     * Set to true if synonym configs should be returned in the response.
+     */
+    public DescribeConfigsOptions includeSynonyms(boolean includeSynonyms) {
+        this.includeSynonyms = includeSynonyms;
+        return this;
+    }
+
 }
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
index cf48846..6d6788d 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
+++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
@@ -1494,7 +1494,7 @@ public class KafkaAdminClient extends AdminClient {
         final Collection<Resource> unifiedRequestResources = new ArrayList<>(configResources.size());
 
         for (ConfigResource resource : configResources) {
-            if (resource.type() == ConfigResource.Type.BROKER) {
+            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
                 brokerFutures.put(resource, new KafkaFutureImpl<Config>());
                 brokerResources.add(configResourceToResource(resource));
             } else {
@@ -1510,7 +1510,8 @@ public class KafkaAdminClient extends AdminClient {
 
                 @Override
                 AbstractRequest.Builder createRequest(int timeoutMs) {
-                    return new DescribeConfigsRequest.Builder(unifiedRequestResources);
+                    return new DescribeConfigsRequest.Builder(unifiedRequestResources)
+                            .includeSynonyms(options.includeSynonyms());
                 }
 
                 @Override
@@ -1531,8 +1532,10 @@ public class KafkaAdminClient extends AdminClient {
                         }
                         List<ConfigEntry> configEntries = new ArrayList<>();
                         for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
-                            configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
-                                configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                            configEntries.add(new ConfigEntry(configEntry.name(),
+                                    configEntry.value(), configSource(configEntry.source()),
+                                    configEntry.isSensitive(), configEntry.isReadOnly(),
+                                    configSynonyms(configEntry)));
                         }
                         future.complete(new Config(configEntries));
                     }
@@ -1554,7 +1557,8 @@ public class KafkaAdminClient extends AdminClient {
 
                 @Override
                 AbstractRequest.Builder createRequest(int timeoutMs) {
-                    return new DescribeConfigsRequest.Builder(Collections.singleton(resource));
+                    return new DescribeConfigsRequest.Builder(Collections.singleton(resource))
+                            .includeSynonyms(options.includeSynonyms());
                 }
 
                 @Override
@@ -1573,7 +1577,8 @@ public class KafkaAdminClient extends AdminClient {
                         List<ConfigEntry> configEntries = new ArrayList<>();
                         for (DescribeConfigsResponse.ConfigEntry configEntry : config.entries()) {
                             configEntries.add(new ConfigEntry(configEntry.name(), configEntry.value(),
-                                configEntry.isDefault(), configEntry.isSensitive(), configEntry.isReadOnly()));
+                                configSource(configEntry.source()), configEntry.isSensitive(), configEntry.isReadOnly(),
+                                configSynonyms(configEntry)));
                         }
                         brokerFuture.complete(new Config(configEntries));
                     }
@@ -1606,24 +1611,74 @@ public class KafkaAdminClient extends AdminClient {
         return new Resource(resourceType, configResource.name());
     }
 
+    private List<ConfigEntry.ConfigSynonym> configSynonyms(DescribeConfigsResponse.ConfigEntry configEntry) {
+        List<ConfigEntry.ConfigSynonym> synonyms = new ArrayList<>(configEntry.synonyms().size());
+        for (DescribeConfigsResponse.ConfigSynonym synonym : configEntry.synonyms()) {
+            synonyms.add(new ConfigEntry.ConfigSynonym(synonym.name(), synonym.value(), configSource(synonym.source())));
+        }
+        return synonyms;
+    }
+
+    private ConfigEntry.ConfigSource configSource(DescribeConfigsResponse.ConfigSource source) {
+        ConfigEntry.ConfigSource configSource;
+        switch (source) {
+            case TOPIC_CONFIG:
+                configSource = ConfigEntry.ConfigSource.DYNAMIC_TOPIC_CONFIG;
+                break;
+            case DYNAMIC_BROKER_CONFIG:
+                configSource = ConfigEntry.ConfigSource.DYNAMIC_BROKER_CONFIG;
+                break;
+            case DYNAMIC_DEFAULT_BROKER_CONFIG:
+                configSource = ConfigEntry.ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG;
+                break;
+            case STATIC_BROKER_CONFIG:
+                configSource = ConfigEntry.ConfigSource.STATIC_BROKER_CONFIG;
+                break;
+            case DEFAULT_CONFIG:
+                configSource = ConfigEntry.ConfigSource.DEFAULT_CONFIG;
+                break;
+            default:
+                throw new IllegalArgumentException("Unexpected config source " + source);
+        }
+        return configSource;
+    }
+
     @Override
     public AlterConfigsResult alterConfigs(Map<ConfigResource, Config> configs, final AlterConfigsOptions options) {
-        final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>(configs.size());
-        for (ConfigResource configResource : configs.keySet()) {
-            futures.put(configResource, new KafkaFutureImpl<Void>());
+        final Map<ConfigResource, KafkaFutureImpl<Void>> allFutures = new HashMap<>();
+        // We must make a separate AlterConfigs request for every BROKER resource we want to alter
+        // and send the request to that specific broker. Other resources are grouped together into
+        // a single request that may be sent to any broker.
+        final Collection<ConfigResource> unifiedRequestResources = new ArrayList<>();
+
+        for (ConfigResource resource : configs.keySet()) {
+            if (resource.type() == ConfigResource.Type.BROKER && !resource.isDefault()) {
+                NodeProvider nodeProvider = new ConstantNodeIdProvider(Integer.parseInt(resource.name()));
+                allFutures.putAll(alterConfigs(configs, options, Collections.singleton(resource), nodeProvider));
+            } else
+                unifiedRequestResources.add(resource);
         }
-        final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(configs.size());
-        for (Map.Entry<ConfigResource, Config> entry : configs.entrySet()) {
+        if (!unifiedRequestResources.isEmpty())
+          allFutures.putAll(alterConfigs(configs, options, unifiedRequestResources, new LeastLoadedNodeProvider()));
+        return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(allFutures));
+    }
+
+    private Map<ConfigResource, KafkaFutureImpl<Void>> alterConfigs(Map<ConfigResource, Config> configs,
+                                                                    final AlterConfigsOptions options,
+                                                                    Collection<ConfigResource> resources,
+                                                                    NodeProvider nodeProvider) {
+        final Map<ConfigResource, KafkaFutureImpl<Void>> futures = new HashMap<>();
+        final Map<Resource, AlterConfigsRequest.Config> requestMap = new HashMap<>(resources.size());
+        for (ConfigResource resource : resources) {
             List<AlterConfigsRequest.ConfigEntry> configEntries = new ArrayList<>();
-            for (ConfigEntry configEntry: entry.getValue().entries())
+            for (ConfigEntry configEntry: configs.get(resource).entries())
                 configEntries.add(new AlterConfigsRequest.ConfigEntry(configEntry.name(), configEntry.value()));
-            ConfigResource resource = entry.getKey();
             requestMap.put(configResourceToResource(resource), new AlterConfigsRequest.Config(configEntries));
+            futures.put(resource, new KafkaFutureImpl<Void>());
         }
 
         final long now = time.milliseconds();
-        runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()),
-                new LeastLoadedNodeProvider()) {
+        runnable.call(new Call("alterConfigs", calcDeadlineMs(now, options.timeoutMs()), nodeProvider) {
 
             @Override
             public AbstractRequest.Builder createRequest(int timeoutMs) {
@@ -1649,7 +1704,7 @@ public class KafkaAdminClient extends AdminClient {
                 completeAllExceptionally(futures.values(), throwable);
             }
         }, now);
-        return new AlterConfigsResult(new HashMap<ConfigResource, KafkaFuture<Void>>(futures));
+        return futures;
     }
 
     @Override
diff --git a/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
new file mode 100644
index 0000000..3339dce
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/Reconfigurable.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common;
+
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Interface for reconfigurable classes that support dynamic configuration.
+ */
+public interface Reconfigurable extends Configurable {
+
+    /**
+     * Returns the names of configs that may be reconfigured.
+     */
+    Set<String> reconfigurableConfigs();
+
+    /**
+     * Validates the provided configuration. The provided map contains
+     * all configs including any reconfigurable configs that may be different
+     * from the initial configuration. Reconfiguration will be not performed
+     * if this method returns false or throws any exception.
+     */
+    boolean validateReconfiguration(Map<String, ?> configs);
+
+    /**
+     * Reconfigures this instance with the given key-value pairs. The provided
+     * map contains all configs including any reconfigurable configs that
+     * may have changed since the object was initially configured using
+     * {@link Configurable#configure(Map)}. This method will only be invoked if
+     * the configs have passed validation using {@link #validateReconfiguration(Map)}.
+     */
+    void reconfigure(Map<String, ?> configs);
+
+}
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
index 7b9881f..3340ab3 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigDef.java
@@ -105,6 +105,15 @@ public class ConfigDef {
         return Collections.unmodifiableSet(configKeys.keySet());
     }
 
+    public Map<String, Object> defaultValues() {
+        Map<String, Object> defaultValues = new HashMap<>();
+        for (ConfigKey key : configKeys.values()) {
+            if (key.defaultValue != NO_DEFAULT_VALUE)
+                defaultValues.put(key.name, key.defaultValue);
+        }
+        return defaultValues;
+    }
+
     public ConfigDef define(ConfigKey key) {
         if (configKeys.containsKey(key.name)) {
             throw new ConfigException("Configuration " + key.name + " is defined twice.");
@@ -746,6 +755,30 @@ public class ConfigDef {
     }
 
     /**
+     * Converts a map of config (key, value) pairs to a map of strings where each value
+     * is converted to a string. This method should be used with care since it stores
+     * actual password values to String. Values from this map should never be used in log entries.
+     */
+    public static  Map<String, String> convertToStringMapWithPasswordValues(Map<String, ?> configs) {
+        Map<String, String> result = new HashMap<>();
+        for (Map.Entry<String, ?> entry : configs.entrySet()) {
+            Object value = entry.getValue();
+            String strValue;
+            if (value instanceof Password)
+                strValue = ((Password) value).value();
+            else if (value instanceof List)
+                strValue = convertToString(value, Type.LIST);
+            else if (value instanceof Class)
+                strValue = convertToString(value, Type.CLASS);
+            else
+                strValue = convertToString(value, null);
+            if (strValue != null)
+                result.put(entry.getKey(), strValue);
+        }
+        return result;
+    }
+
+    /**
      * The config types
      */
     public enum Type {
diff --git a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
index cd397ad..4402c26 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/ConfigResource.java
@@ -61,6 +61,14 @@ public final class ConfigResource {
         return name;
     }
 
+    /**
+     * Returns true if this is the default resource of a resource type.
+     * Resource name is empty for the default resource.
+     */
+    public boolean isDefault() {
+        return name.isEmpty();
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o)
diff --git a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
index 042b051..fd4d39e 100644
--- a/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
+++ b/clients/src/main/java/org/apache/kafka/common/config/SslConfigs.java
@@ -17,9 +17,11 @@
 package org.apache.kafka.common.config;
 
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
+import org.apache.kafka.common.utils.Utils;
 
 import javax.net.ssl.KeyManagerFactory;
 import javax.net.ssl.TrustManagerFactory;
+import java.util.Set;
 
 public class SslConfigs {
     /*
@@ -135,4 +137,9 @@ public class SslConfigs {
                 .define(SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_ENDPOINT_IDENTIFICATION_ALGORITHM_DOC)
                 .define(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, ConfigDef.Type.STRING, null, ConfigDef.Importance.LOW, SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_DOC);
     }
+
+    public static final Set<String> RECONFIGURABLE_CONFIGS = Utils.mkSet(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG,
+            SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG,
+            SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+            SslConfigs.SSL_KEY_PASSWORD_CONFIG);
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 54689f3..e3b2eeb 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -16,22 +16,17 @@
  */
 package org.apache.kafka.common.network;
 
-import java.util.Map;
 import java.nio.channels.SelectionKey;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.memory.MemoryPool;
 
 
 /**
  * A ChannelBuilder interface to build Channel based on configs
  */
-public interface ChannelBuilder extends AutoCloseable {
-
-    /**
-     * Configure this class with the given key-value pairs
-     */
-    void configure(Map<String, ?> configs) throws KafkaException;
+public interface ChannelBuilder extends AutoCloseable, Configurable {
 
     /**
      * returns a Channel with TransportLayer and Authenticator configured.
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
index 8f301ad..e6df78e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
@@ -59,7 +59,7 @@ public class ChannelBuilders {
             if (clientSaslMechanism == null)
                 throw new IllegalArgumentException("`clientSaslMechanism` must be non-null in client mode if `securityProtocol` is `" + securityProtocol + "`");
         }
-        return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, clientSaslMechanism,
+        return create(securityProtocol, Mode.CLIENT, contextType, config, listenerName, false, clientSaslMechanism,
                 saslHandshakeRequestEnable, null, null);
     }
 
@@ -71,12 +71,13 @@ public class ChannelBuilders {
      * @return the configured `ChannelBuilder`
      */
     public static ChannelBuilder serverChannelBuilder(ListenerName listenerName,
+                                                      boolean isInterBrokerListener,
                                                       SecurityProtocol securityProtocol,
                                                       AbstractConfig config,
                                                       CredentialCache credentialCache,
                                                       DelegationTokenCache tokenCache) {
-        return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName, null,
-                true, credentialCache, tokenCache);
+        return create(securityProtocol, Mode.SERVER, JaasContext.Type.SERVER, config, listenerName,
+                isInterBrokerListener, null, true, credentialCache, tokenCache);
     }
 
     private static ChannelBuilder create(SecurityProtocol securityProtocol,
@@ -84,6 +85,7 @@ public class ChannelBuilders {
                                          JaasContext.Type contextType,
                                          AbstractConfig config,
                                          ListenerName listenerName,
+                                         boolean isInterBrokerListener,
                                          String clientSaslMechanism,
                                          boolean saslHandshakeRequestEnable,
                                          CredentialCache credentialCache,
@@ -98,14 +100,21 @@ public class ChannelBuilders {
         switch (securityProtocol) {
             case SSL:
                 requireNonNullMode(mode, securityProtocol);
-                channelBuilder = new SslChannelBuilder(mode);
+                channelBuilder = new SslChannelBuilder(mode, listenerName, isInterBrokerListener);
                 break;
             case SASL_SSL:
             case SASL_PLAINTEXT:
                 requireNonNullMode(mode, securityProtocol);
                 JaasContext jaasContext = JaasContext.load(contextType, listenerName, configs);
-                channelBuilder = new SaslChannelBuilder(mode, jaasContext, securityProtocol, listenerName,
-                        clientSaslMechanism, saslHandshakeRequestEnable, credentialCache, tokenCache);
+                channelBuilder = new SaslChannelBuilder(mode,
+                        jaasContext,
+                        securityProtocol,
+                        listenerName,
+                        isInterBrokerListener,
+                        clientSaslMechanism,
+                        saslHandshakeRequestEnable,
+                        credentialCache,
+                        tokenCache);
                 break;
             case PLAINTEXT:
                 channelBuilder = new PlaintextChannelBuilder();
diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java b/clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
similarity index 52%
copy from clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
copy to clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
index b0b9b3c..3541212 100644
--- a/clients/src/main/java/org/apache/kafka/clients/admin/DescribeConfigsOptions.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ListenerReconfigurable.java
@@ -14,30 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+package org.apache.kafka.common.network;
 
-package org.apache.kafka.clients.admin;
-
-import org.apache.kafka.common.annotation.InterfaceStability;
-
-import java.util.Collection;
+import org.apache.kafka.common.Reconfigurable;
 
 /**
- * Options for {@link AdminClient#describeConfigs(Collection)}.
- *
- * The API of this class is evolving, see {@link AdminClient} for details.
+ * Interface for reconfigurable entities associated with a listener.
  */
-@InterfaceStability.Evolving
-public class DescribeConfigsOptions extends AbstractOptions<DescribeConfigsOptions> {
+public interface ListenerReconfigurable extends Reconfigurable {
 
     /**
-     * Set the request timeout in milliseconds for this operation or {@code null} if the default request timeout for the
-     * AdminClient should be used.
-     *
+     * Returns the listener name associated with this reconfigurable. Listener-specific
+     * configs corresponding to this listener name are provided for reconfiguration.
      */
-    // This method is retained to keep binary compatibility with 0.11
-    public DescribeConfigsOptions timeoutMs(Integer timeoutMs) {
-        this.timeoutMs = timeoutMs;
-        return this;
-    }
-
+    ListenerName listenerName();
 }
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index d6dd5fc..1716e0e 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -18,6 +18,7 @@ package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.SecurityProtocol;
@@ -39,16 +40,19 @@ import java.lang.reflect.Method;
 import java.net.Socket;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
+import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 
 import javax.security.auth.Subject;
 
-public class SaslChannelBuilder implements ChannelBuilder {
+public class SaslChannelBuilder implements ChannelBuilder, ListenerReconfigurable {
     private static final Logger log = LoggerFactory.getLogger(SaslChannelBuilder.class);
 
     private final SecurityProtocol securityProtocol;
     private final ListenerName listenerName;
+    private final boolean isInterBrokerListener;
     private final String clientSaslMechanism;
     private final Mode mode;
     private final JaasContext jaasContext;
@@ -65,6 +69,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                               JaasContext jaasContext,
                               SecurityProtocol securityProtocol,
                               ListenerName listenerName,
+                              boolean isInterBrokerListener,
                               String clientSaslMechanism,
                               boolean handshakeRequestEnable,
                               CredentialCache credentialCache,
@@ -73,6 +78,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
         this.jaasContext = jaasContext;
         this.securityProtocol = securityProtocol;
         this.listenerName = listenerName;
+        this.isInterBrokerListener = isInterBrokerListener;
         this.handshakeRequestEnable = handshakeRequestEnable;
         this.clientSaslMechanism = clientSaslMechanism;
         this.credentialCache = credentialCache;
@@ -108,7 +114,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
 
             if (this.securityProtocol == SecurityProtocol.SASL_SSL) {
                 // Disable SSL client authentication as we are using SASL authentication
-                this.sslFactory = new SslFactory(mode, "none");
+                this.sslFactory = new SslFactory(mode, "none", isInterBrokerListener);
                 this.sslFactory.configure(configs);
             }
         } catch (Exception e) {
@@ -118,6 +124,30 @@ public class SaslChannelBuilder implements ChannelBuilder {
     }
 
     @Override
+    public Set<String> reconfigurableConfigs() {
+        return securityProtocol == SecurityProtocol.SASL_SSL ? SslConfigs.RECONFIGURABLE_CONFIGS : Collections.<String>emptySet();
+    }
+
+    @Override
+    public boolean validateReconfiguration(Map<String, ?> configs) {
+        if (this.securityProtocol == SecurityProtocol.SASL_SSL)
+            return sslFactory.validateReconfiguration(configs);
+        else
+            return true;
+    }
+
+    @Override
+    public void reconfigure(Map<String, ?> configs) {
+        if (this.securityProtocol == SecurityProtocol.SASL_SSL)
+            sslFactory.reconfigure(configs);
+    }
+
+    @Override
+    public ListenerName listenerName() {
+        return listenerName;
+    }
+
+    @Override
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SocketChannel socketChannel = (SocketChannel) key.channel();
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index 9519e58..e024d32 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.network;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.KafkaPrincipal;
 import org.apache.kafka.common.security.auth.KafkaPrincipalBuilder;
@@ -33,21 +34,31 @@ import java.net.InetSocketAddress;
 import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
+import java.util.Set;
 
-public class SslChannelBuilder implements ChannelBuilder {
+public class SslChannelBuilder implements ChannelBuilder, ListenerReconfigurable {
     private static final Logger log = LoggerFactory.getLogger(SslChannelBuilder.class);
+
+    private final ListenerName listenerName;
+    private final boolean isInterBrokerListener;
     private SslFactory sslFactory;
     private Mode mode;
     private Map<String, ?> configs;
 
-    public SslChannelBuilder(Mode mode) {
+    /**
+     * Constructs a SSL channel builder. ListenerName is provided only
+     * for server channel builder and will be null for client channel builder.
+     */
+    public SslChannelBuilder(Mode mode, ListenerName listenerName, boolean isInterBrokerListener) {
         this.mode = mode;
+        this.listenerName = listenerName;
+        this.isInterBrokerListener = isInterBrokerListener;
     }
 
     public void configure(Map<String, ?> configs) throws KafkaException {
         try {
             this.configs = configs;
-            this.sslFactory = new SslFactory(mode);
+            this.sslFactory = new SslFactory(mode, null, isInterBrokerListener);
             this.sslFactory.configure(this.configs);
         } catch (Exception e) {
             throw new KafkaException(e);
@@ -55,6 +66,26 @@ public class SslChannelBuilder implements ChannelBuilder {
     }
 
     @Override
+    public Set<String> reconfigurableConfigs() {
+        return SslConfigs.RECONFIGURABLE_CONFIGS;
+    }
+
+    @Override
+    public boolean validateReconfiguration(Map<String, ?> configs) {
+        return sslFactory.validateReconfiguration(configs);
+    }
+
+    @Override
+    public void reconfigure(Map<String, ?> configs) {
+        sslFactory.reconfigure(configs);
+    }
+
+    @Override
+    public ListenerName listenerName() {
+        return listenerName;
+    }
+
+    @Override
     public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
index b825201..6fb6b20 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Struct.java
@@ -289,7 +289,11 @@ public class Struct {
     }
 
     public Struct setIfExists(Field def, Object value) {
-        BoundField field = this.schema.get(def.name);
+        return setIfExists(def.name, value);
+    }
+
+    public Struct setIfExists(String fieldName, Object value) {
+        BoundField field = this.schema.get(fieldName);
         if (field != null)
             this.values[field.index] = value;
         return this;
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
index 74e25f4..4156338 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsRequest.java
@@ -30,12 +30,14 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 
 public class DescribeConfigsRequest extends AbstractRequest {
 
     private static final String RESOURCES_KEY_NAME = "resources";
+    private static final String INCLUDE_SYNONYMS = "include_synonyms";
     private static final String RESOURCE_TYPE_KEY_NAME = "resource_type";
     private static final String RESOURCE_NAME_KEY_NAME = "resource_name";
     private static final String CONFIG_NAMES_KEY_NAME = "config_names";
@@ -48,18 +50,29 @@ public class DescribeConfigsRequest extends AbstractRequest {
     private static final Schema DESCRIBE_CONFIGS_REQUEST_V0 = new Schema(
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."));
 
+    private static final Schema DESCRIBE_CONFIGS_REQUEST_V1 = new Schema(
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_REQUEST_RESOURCE_V0), "An array of config resources to be returned."),
+            new Field(INCLUDE_SYNONYMS, BOOLEAN));
+
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0};
+        return new Schema[]{DESCRIBE_CONFIGS_REQUEST_V0, DESCRIBE_CONFIGS_REQUEST_V1};
     }
 
     public static class Builder extends AbstractRequest.Builder {
         private final Map<Resource, Collection<String>> resourceToConfigNames;
+        private boolean includeSynonyms;
 
         public Builder(Map<Resource, Collection<String>> resourceToConfigNames) {
             super(ApiKeys.DESCRIBE_CONFIGS);
             this.resourceToConfigNames = resourceToConfigNames;
         }
 
+        public Builder includeSynonyms(boolean includeSynonyms) {
+            this.includeSynonyms = includeSynonyms;
+            return this;
+        }
+
         public Builder(Collection<Resource> resources) {
             this(toResourceToConfigNames(resources));
         }
@@ -73,16 +86,17 @@ public class DescribeConfigsRequest extends AbstractRequest {
 
         @Override
         public DescribeConfigsRequest build(short version) {
-            return new DescribeConfigsRequest(version, resourceToConfigNames);
+            return new DescribeConfigsRequest(version, resourceToConfigNames, includeSynonyms);
         }
     }
 
     private final Map<Resource, Collection<String>> resourceToConfigNames;
+    private final boolean includeSynonyms;
 
-    public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames) {
+    public DescribeConfigsRequest(short version, Map<Resource, Collection<String>> resourceToConfigNames, boolean includeSynonyms) {
         super(version);
         this.resourceToConfigNames = resourceToConfigNames;
-
+        this.includeSynonyms = includeSynonyms;
     }
 
     public DescribeConfigsRequest(Struct struct, short version) {
@@ -104,6 +118,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
 
             resourceToConfigNames.put(new Resource(resourceType, resourceName), configNames);
         }
+        this.includeSynonyms = struct.hasField(INCLUDE_SYNONYMS) ? struct.getBoolean(INCLUDE_SYNONYMS) : false;
     }
 
     public Collection<Resource> resources() {
@@ -117,6 +132,10 @@ public class DescribeConfigsRequest extends AbstractRequest {
         return resourceToConfigNames.get(resource);
     }
 
+    public boolean includeSynonyms() {
+        return includeSynonyms;
+    }
+
     @Override
     protected Struct toStruct() {
         Struct struct = new Struct(ApiKeys.DESCRIBE_CONFIGS.requestSchema(version()));
@@ -133,6 +152,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
             resourceStructs.add(resourceStruct);
         }
         struct.set(RESOURCES_KEY_NAME, resourceStructs.toArray(new Struct[0]));
+        struct.setIfExists(INCLUDE_SYNONYMS, includeSynonyms);
         return struct;
     }
 
@@ -141,6 +161,7 @@ public class DescribeConfigsRequest extends AbstractRequest {
         short version = version();
         switch (version) {
             case 0:
+            case 1:
                 ApiError error = ApiError.fromThrowable(e);
                 Map<Resource, DescribeConfigsResponse.Config> errors = new HashMap<>(resources().size());
                 DescribeConfigsResponse.Config config = new DescribeConfigsResponse.Config(error,
diff --git a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
index 91bf30e..62012f4 100644
--- a/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
+++ b/clients/src/main/java/org/apache/kafka/common/requests/DescribeConfigsResponse.java
@@ -27,6 +27,7 @@ import org.apache.kafka.common.protocol.types.Struct;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -54,24 +55,53 @@ public class DescribeConfigsResponse extends AbstractResponse {
     private static final String IS_DEFAULT_KEY_NAME = "is_default";
     private static final String READ_ONLY_KEY_NAME = "read_only";
 
+    private static final String CONFIG_SYNONYMS_KEY_NAME = "config_synonyms";
+    private static final String CONFIG_SOURCE_KEY_NAME = "config_source";
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0 = new Schema(
+                    new Field(CONFIG_NAME_KEY_NAME, STRING),
+                    new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+                    new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+                    new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+                    new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN));
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1 = new Schema(
+            new Field(CONFIG_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+            new Field(CONFIG_SOURCE_KEY_NAME, INT8));
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1 = new Schema(
+            new Field(CONFIG_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
+            new Field(READ_ONLY_KEY_NAME, BOOLEAN),
+            new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
+            new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN),
+            new Field(CONFIG_SYNONYMS_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_SYNONYM_V1)));
+
     private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0 = new Schema(
             ERROR_CODE,
             ERROR_MESSAGE,
             new Field(RESOURCE_TYPE_KEY_NAME, INT8),
             new Field(RESOURCE_NAME_KEY_NAME, STRING),
-            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(new Schema(
-                    new Field(CONFIG_NAME_KEY_NAME, STRING),
-                    new Field(CONFIG_VALUE_KEY_NAME, NULLABLE_STRING),
-                    new Field(READ_ONLY_KEY_NAME, BOOLEAN),
-                    new Field(IS_DEFAULT_KEY_NAME, BOOLEAN),
-                    new Field(IS_SENSITIVE_KEY_NAME, BOOLEAN)))));
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V0)));
+
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1 = new Schema(
+            ERROR_CODE,
+            ERROR_MESSAGE,
+            new Field(RESOURCE_TYPE_KEY_NAME, INT8),
+            new Field(RESOURCE_NAME_KEY_NAME, STRING),
+            new Field(CONFIG_ENTRIES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTRY_V1)));
 
     private static final Schema DESCRIBE_CONFIGS_RESPONSE_V0 = new Schema(
             THROTTLE_TIME_MS,
             new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V0)));
 
+    private static final Schema DESCRIBE_CONFIGS_RESPONSE_V1 = new Schema(
+            THROTTLE_TIME_MS,
+            new Field(RESOURCES_KEY_NAME, new ArrayOf(DESCRIBE_CONFIGS_RESPONSE_ENTITY_V1)));
+
     public static Schema[] schemaVersions() {
-        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0};
+        return new Schema[]{DESCRIBE_CONFIGS_RESPONSE_V0, DESCRIBE_CONFIGS_RESPONSE_V1};
     }
 
     public static class Config {
@@ -96,15 +126,19 @@ public class DescribeConfigsResponse extends AbstractResponse {
         private final String name;
         private final String value;
         private final boolean isSensitive;
-        private final boolean isDefault;
+        private final ConfigSource source;
         private final boolean readOnly;
+        private final Collection<ConfigSynonym> synonyms;
+
+        public ConfigEntry(String name, String value, ConfigSource source, boolean isSensitive, boolean readOnly,
+                           Collection<ConfigSynonym> synonyms) {
 
-        public ConfigEntry(String name, String value, boolean isSensitive, boolean isDefault, boolean readOnly) {
             this.name = name;
             this.value = value;
+            this.source = source;
             this.isSensitive = isSensitive;
-            this.isDefault = isDefault;
             this.readOnly = readOnly;
+            this.synonyms = synonyms;
         }
 
         public String name() {
@@ -119,15 +153,66 @@ public class DescribeConfigsResponse extends AbstractResponse {
             return isSensitive;
         }
 
-        public boolean isDefault() {
-            return isDefault;
+        public ConfigSource source() {
+            return source;
         }
 
         public boolean isReadOnly() {
             return readOnly;
         }
+
+        public Collection<ConfigSynonym> synonyms() {
+            return synonyms;
+        }
+    }
+
+    public enum ConfigSource {
+        UNKNOWN_CONFIG((byte) 0),
+        TOPIC_CONFIG((byte) 1),
+        DYNAMIC_BROKER_CONFIG((byte) 2),
+        DYNAMIC_DEFAULT_BROKER_CONFIG((byte) 3),
+        STATIC_BROKER_CONFIG((byte) 4),
+        DEFAULT_CONFIG((byte) 5);
+
+        final byte id;
+        private static final ConfigSource[] VALUES = values();
+
+        ConfigSource(byte id) {
+            this.id = id;
+        }
+
+        public static ConfigSource forId(byte id) {
+            if (id < 0)
+                throw new IllegalArgumentException("id should be positive, id: " + id);
+            if (id >= VALUES.length)
+                return UNKNOWN_CONFIG;
+            return VALUES[id];
+        }
     }
 
+    public static class ConfigSynonym {
+        private final String name;
+        private final String value;
+        private final ConfigSource source;
+
+        public ConfigSynonym(String name, String value, ConfigSource source) {
+            this.name = name;
+            this.value = value;
+            this.source = source;
+        }
+
+        public String name() {
+            return name;
+        }
+        public String value() {
+            return value;
+        }
+        public ConfigSource source() {
+            return source;
+        }
+    }
+
+
     private final int throttleTimeMs;
     private final Map<Resource, Config> configs;
 
@@ -155,9 +240,42 @@ public class DescribeConfigsResponse extends AbstractResponse {
                 String configName = configEntriesStruct.getString(CONFIG_NAME_KEY_NAME);
                 String configValue = configEntriesStruct.getString(CONFIG_VALUE_KEY_NAME);
                 boolean isSensitive = configEntriesStruct.getBoolean(IS_SENSITIVE_KEY_NAME);
-                boolean isDefault = configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME);
+                ConfigSource configSource;
+                if (configEntriesStruct.hasField(CONFIG_SOURCE_KEY_NAME))
+                    configSource = ConfigSource.forId(configEntriesStruct.getByte(CONFIG_SOURCE_KEY_NAME));
+                else if (configEntriesStruct.hasField(IS_DEFAULT_KEY_NAME)) {
+                    if (configEntriesStruct.getBoolean(IS_DEFAULT_KEY_NAME))
+                        configSource = ConfigSource.DEFAULT_CONFIG;
+                    else {
+                        switch (resourceType) {
+                            case BROKER:
+                                configSource = ConfigSource.STATIC_BROKER_CONFIG;
+                                break;
+                            case TOPIC:
+                                configSource = ConfigSource.TOPIC_CONFIG;
+                                break;
+                            default:
+                                configSource = ConfigSource.UNKNOWN_CONFIG;
+                                break;
+                        }
+                    }
+                } else
+                    throw new IllegalStateException("Config entry should contain either is_default or config_source");
                 boolean readOnly = configEntriesStruct.getBoolean(READ_ONLY_KEY_NAME);
-                configEntries.add(new ConfigEntry(configName, configValue, isSensitive, isDefault, readOnly));
+                Collection<ConfigSynonym> synonyms;
+                if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
+                    Object[] synonymsArray = configEntriesStruct.getArray(CONFIG_SYNONYMS_KEY_NAME);
+                    synonyms = new ArrayList<>(synonymsArray.length);
+                    for (Object synonymObj: synonymsArray) {
+                        Struct synonymStruct = (Struct) synonymObj;
+                        String synonymConfigName = synonymStruct.getString(CONFIG_NAME_KEY_NAME);
+                        String synonymConfigValue = synonymStruct.getString(CONFIG_VALUE_KEY_NAME);
+                        ConfigSource source = ConfigSource.forId(synonymStruct.getByte(CONFIG_SOURCE_KEY_NAME));
+                        synonyms.add(new ConfigSynonym(synonymConfigName, synonymConfigValue, source));
+                    }
+                } else
+                    synonyms = Collections.emptyList();
+                configEntries.add(new ConfigEntry(configName, configValue, configSource, isSensitive, readOnly, synonyms));
             }
             Config config = new Config(error, configEntries);
             configs.put(resource, config);
@@ -205,9 +323,21 @@ public class DescribeConfigsResponse extends AbstractResponse {
                 configEntriesStruct.set(CONFIG_NAME_KEY_NAME, configEntry.name);
                 configEntriesStruct.set(CONFIG_VALUE_KEY_NAME, configEntry.value);
                 configEntriesStruct.set(IS_SENSITIVE_KEY_NAME, configEntry.isSensitive);
-                configEntriesStruct.set(IS_DEFAULT_KEY_NAME, configEntry.isDefault);
+                configEntriesStruct.setIfExists(CONFIG_SOURCE_KEY_NAME, configEntry.source.id);
+                configEntriesStruct.setIfExists(IS_DEFAULT_KEY_NAME, configEntry.source == ConfigSource.DEFAULT_CONFIG);
                 configEntriesStruct.set(READ_ONLY_KEY_NAME, configEntry.readOnly);
                 configEntryStructs.add(configEntriesStruct);
+                if (configEntriesStruct.hasField(CONFIG_SYNONYMS_KEY_NAME)) {
+                    List<Struct> configSynonymStructs = new ArrayList<>(configEntry.synonyms.size());
+                    for (ConfigSynonym synonym : configEntry.synonyms) {
+                        Struct configSynonymStruct = configEntriesStruct.instance(CONFIG_SYNONYMS_KEY_NAME);
+                        configSynonymStruct.set(CONFIG_NAME_KEY_NAME, synonym.name);
+                        configSynonymStruct.set(CONFIG_VALUE_KEY_NAME, synonym.value);
+                        configSynonymStruct.set(CONFIG_SOURCE_KEY_NAME, synonym.source.id);
+                        configSynonymStructs.add(configSynonymStruct);
+                    }
+                    configEntriesStruct.set(CONFIG_SYNONYMS_KEY_NAME, configSynonymStructs.toArray(new Struct[0]));
+                }
             }
             resourceStruct.set(CONFIG_ENTRIES_KEY_NAME, configEntryStructs.toArray(new Struct[0]));
             
diff --git a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
index 6b2fe74..285582c 100644
--- a/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
+++ b/clients/src/main/java/org/apache/kafka/common/security/ssl/SslFactory.java
@@ -16,40 +16,53 @@
  */
 package org.apache.kafka.common.security.ssl;
 
-import org.apache.kafka.common.Configurable;
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.Reconfigurable;
+import org.apache.kafka.common.config.ConfigException;
 import org.apache.kafka.common.config.SslConfigs;
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs;
 import org.apache.kafka.common.network.Mode;
 import org.apache.kafka.common.config.types.Password;
+import org.apache.kafka.common.utils.Utils;
 
+import javax.net.ssl.KeyManager;
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLEngineResult;
+import javax.net.ssl.SSLException;
+import javax.net.ssl.SSLParameters;
+import javax.net.ssl.TrustManagerFactory;
 import java.io.FileInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.security.GeneralSecurityException;
 import java.security.KeyStore;
+import java.security.Principal;
 import java.security.SecureRandom;
+import java.security.cert.Certificate;
+import java.security.cert.X509Certificate;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Enumeration;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+import java.util.HashSet;
 
-import javax.net.ssl.KeyManager;
-import javax.net.ssl.KeyManagerFactory;
-import javax.net.ssl.SSLContext;
-import javax.net.ssl.SSLEngine;
-import javax.net.ssl.SSLParameters;
-import javax.net.ssl.TrustManagerFactory;
-
-public class SslFactory implements Configurable {
 
+public class SslFactory implements Reconfigurable {
     private final Mode mode;
     private final String clientAuthConfigOverride;
+    private final boolean keystoreVerifiableUsingTruststore;
 
     private String protocol;
     private String provider;
     private String kmfAlgorithm;
     private String tmfAlgorithm;
     private SecurityStore keystore = null;
-    private Password keyPassword;
     private SecurityStore truststore;
     private String[] cipherSuites;
     private String[] enabledProtocols;
@@ -60,12 +73,13 @@ public class SslFactory implements Configurable {
     private boolean wantClientAuth;
 
     public SslFactory(Mode mode) {
-        this(mode, null);
+        this(mode, null, false);
     }
 
-    public SslFactory(Mode mode, String clientAuthConfigOverride) {
+    public SslFactory(Mode mode, String clientAuthConfigOverride, boolean keystoreVerifiableUsingTruststore) {
         this.mode = mode;
         this.clientAuthConfigOverride = clientAuthConfigOverride;
+        this.keystoreVerifiableUsingTruststore = keystoreVerifiableUsingTruststore;
     }
 
     @Override
@@ -109,23 +123,68 @@ public class SslFactory implements Configurable {
         this.kmfAlgorithm = (String) configs.get(SslConfigs.SSL_KEYMANAGER_ALGORITHM_CONFIG);
         this.tmfAlgorithm = (String) configs.get(SslConfigs.SSL_TRUSTMANAGER_ALGORITHM_CONFIG);
 
-        createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+        this.keystore = createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
                        (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
                        (Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
                        (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
 
-        createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
+        this.truststore = createTruststore((String) configs.get(SslConfigs.SSL_TRUSTSTORE_TYPE_CONFIG),
                          (String) configs.get(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG),
                          (Password) configs.get(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG));
         try {
-            this.sslContext = createSSLContext();
+            this.sslContext = createSSLContext(keystore);
         } catch (Exception e) {
             throw new KafkaException(e);
         }
     }
 
+    @Override
+    public Set<String> reconfigurableConfigs() {
+        return SslConfigs.RECONFIGURABLE_CONFIGS;
+    }
+
+    @Override
+    public boolean validateReconfiguration(Map<String, ?> configs) {
+        try {
+            SecurityStore newKeystore = maybeCreateNewKeystore(configs);
+            if (newKeystore != null)
+                createSSLContext(newKeystore);
+            return true;
+        } catch (Exception e) {
+            throw new KafkaException("Validation of dynamic config update failed", e);
+        }
+    }
+
+    @Override
+    public void reconfigure(Map<String, ?> configs) throws KafkaException {
+        SecurityStore newKeystore = maybeCreateNewKeystore(configs);
+        if (newKeystore != null) {
+            try {
+                this.sslContext = createSSLContext(newKeystore);
+                this.keystore = newKeystore;
+            } catch (Exception e) {
+                throw new KafkaException("Reconfiguration of SSL keystore failed", e);
+            }
+        }
+    }
+
+    private SecurityStore maybeCreateNewKeystore(Map<String, ?> configs) {
+        boolean keystoreChanged = Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG), keystore.type) ||
+                Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG), keystore.path) ||
+                Objects.equals(configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG), keystore.password) ||
+                Objects.equals(configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG), keystore.keyPassword);
+
+        if (keystoreChanged) {
+            return createKeystore((String) configs.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+                    (String) configs.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+                    (Password) configs.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                    (Password) configs.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG));
+        } else
+            return null;
+    }
 
-    private SSLContext createSSLContext() throws GeneralSecurityException, IOException  {
+    // package access for testing
+    SSLContext createSSLContext(SecurityStore keystore) throws GeneralSecurityException, IOException  {
         SSLContext sslContext;
         if (provider != null)
             sslContext = SSLContext.getInstance(protocol, provider);
@@ -137,7 +196,7 @@ public class SslFactory implements Configurable {
             String kmfAlgorithm = this.kmfAlgorithm != null ? this.kmfAlgorithm : KeyManagerFactory.getDefaultAlgorithm();
             KeyManagerFactory kmf = KeyManagerFactory.getInstance(kmfAlgorithm);
             KeyStore ks = keystore.load();
-            Password keyPassword = this.keyPassword != null ? this.keyPassword : keystore.password;
+            Password keyPassword = keystore.keyPassword != null ? keystore.keyPassword : keystore.password;
             kmf.init(ks, keyPassword.value().toCharArray());
             keyManagers = kmf.getKeyManagers();
         }
@@ -148,10 +207,23 @@ public class SslFactory implements Configurable {
         tmf.init(ts);
 
         sslContext.init(keyManagers, tmf.getTrustManagers(), this.secureRandomImplementation);
+        if (keystore != null && keystore != this.keystore) {
+            if (this.keystore == null)
+                throw new ConfigException("Cannot add SSL keystore to an existing listener for which no keystore was configured.");
+            if (keystoreVerifiableUsingTruststore)
+                SSLConfigValidatorEngine.validate(this, sslContext);
+            if (!CertificateEntries.create(this.keystore.load()).equals(CertificateEntries.create(keystore.load()))) {
+                throw new ConfigException("Keystore DistinguishedName or SubjectAltNames do not match");
+            }
+        }
         return sslContext;
     }
 
     public SSLEngine createSslEngine(String peerHost, int peerPort) {
+        return createSslEngine(sslContext, peerHost, peerPort);
+    }
+
+    private SSLEngine createSslEngine(SSLContext sslContext, String peerHost, int peerPort) {
         SSLEngine sslEngine = sslContext.createSSLEngine(peerHost, peerPort);
         if (cipherSuites != null) sslEngine.setEnabledCipherSuites(cipherSuites);
         if (enabledProtocols != null) sslEngine.setEnabledProtocols(enabledProtocols);
@@ -181,38 +253,42 @@ public class SslFactory implements Configurable {
         return sslContext;
     }
 
-    private void createKeystore(String type, String path, Password password, Password keyPassword) {
+    private SecurityStore createKeystore(String type, String path, Password password, Password keyPassword) {
         if (path == null && password != null) {
             throw new KafkaException("SSL key store is not specified, but key store password is specified.");
         } else if (path != null && password == null) {
             throw new KafkaException("SSL key store is specified, but key store password is not specified.");
         } else if (path != null && password != null) {
-            this.keystore = new SecurityStore(type, path, password);
-            this.keyPassword = keyPassword;
-        }
+            return new SecurityStore(type, path, password, keyPassword);
+        } else
+            return null; // path == null, clients may use this path with brokers that don't require client auth
     }
 
-    private void createTruststore(String type, String path, Password password) {
+    private SecurityStore createTruststore(String type, String path, Password password) {
         if (path == null && password != null) {
             throw new KafkaException("SSL trust store is not specified, but trust store password is specified.");
         } else if (path != null) {
-            this.truststore = new SecurityStore(type, path, password);
-        }
+            return new SecurityStore(type, path, password, null);
+        } else
+            return null;
     }
 
-    private static class SecurityStore {
+    // package access for testing
+    static class SecurityStore {
         private final String type;
         private final String path;
         private final Password password;
+        private final Password keyPassword;
 
-        private SecurityStore(String type, String path, Password password) {
+        SecurityStore(String type, String path, Password password, Password keyPassword) {
             Objects.requireNonNull(type, "type must not be null");
             this.type = type;
             this.path = path;
             this.password = password;
+            this.keyPassword = keyPassword;
         }
 
-        private KeyStore load() throws GeneralSecurityException, IOException {
+        KeyStore load() throws GeneralSecurityException, IOException {
             FileInputStream in = null;
             try {
                 KeyStore ks = KeyStore.getInstance(type);
@@ -227,4 +303,156 @@ public class SslFactory implements Configurable {
         }
     }
 
+    /**
+     * Validator used to verify dynamic update of keystore used in inter-broker communication.
+     * The validator checks that a successful handshake can be performed using the keystore and
+     * truststore configured on this SslFactory.
+     */
+    static class SSLConfigValidatorEngine {
+        private static final ByteBuffer EMPTY_BUF = ByteBuffer.allocate(0);
+        private final SSLEngine sslEngine;
+        private SSLEngineResult handshakeResult;
+        private ByteBuffer appBuffer;
+        private ByteBuffer netBuffer;
+
+        static void validate(SslFactory sslFactory, SSLContext sslContext) throws SSLException {
+            SSLConfigValidatorEngine clientEngine = new SSLConfigValidatorEngine(sslFactory, sslContext, Mode.CLIENT);
+            SSLConfigValidatorEngine serverEngine = new SSLConfigValidatorEngine(sslFactory, sslContext, Mode.SERVER);
+            try {
+                clientEngine.beginHandshake();
+                serverEngine.beginHandshake();
+                while (!serverEngine.complete() || !clientEngine.complete()) {
+                    clientEngine.handshake(serverEngine);
+                    serverEngine.handshake(clientEngine);
+                }
+            } finally {
+                clientEngine.close();
+                serverEngine.close();
+            }
+        }
+
+        private SSLConfigValidatorEngine(SslFactory sslFactory, SSLContext sslContext, Mode mode) {
+            this.sslEngine = sslFactory.createSslEngine(sslContext, "localhost", 0); // these hints are not used for validation
+            sslEngine.setUseClientMode(mode == Mode.CLIENT);
+            appBuffer = ByteBuffer.allocate(sslEngine.getSession().getApplicationBufferSize());
+            netBuffer = ByteBuffer.allocate(sslEngine.getSession().getPacketBufferSize());
+        }
+
+        void beginHandshake() throws SSLException {
+            sslEngine.beginHandshake();
+        }
+
+        void handshake(SSLConfigValidatorEngine peerEngine) throws SSLException {
+            SSLEngineResult.HandshakeStatus handshakeStatus = sslEngine.getHandshakeStatus();
+            while (true) {
+                switch (handshakeStatus) {
+                    case NEED_WRAP:
+                        handshakeResult = sslEngine.wrap(EMPTY_BUF, netBuffer);
+                        switch (handshakeResult.getStatus()) {
+                            case OK: break;
+                            case BUFFER_OVERFLOW:
+                                netBuffer.compact();
+                                netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+                                netBuffer.flip();
+                                break;
+                            case BUFFER_UNDERFLOW:
+                            case CLOSED:
+                            default:
+                                throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+                        }
+                        return;
+                    case NEED_UNWRAP:
+                        if (peerEngine.netBuffer.position() == 0) // no data to unwrap, return to process peer
+                            return;
+                        peerEngine.netBuffer.flip(); // unwrap the data from peer
+                        handshakeResult = sslEngine.unwrap(peerEngine.netBuffer, appBuffer);
+                        peerEngine.netBuffer.compact();
+                        handshakeStatus = handshakeResult.getHandshakeStatus();
+                        switch (handshakeResult.getStatus()) {
+                            case OK: break;
+                            case BUFFER_OVERFLOW:
+                                appBuffer = Utils.ensureCapacity(appBuffer, sslEngine.getSession().getApplicationBufferSize());
+                                break;
+                            case BUFFER_UNDERFLOW:
+                                netBuffer = Utils.ensureCapacity(netBuffer, sslEngine.getSession().getPacketBufferSize());
+                                break;
+                            case CLOSED:
+                            default:
+                                throw new SSLException("Unexpected handshake status: " + handshakeResult.getStatus());
+                        }
+                        break;
+                    case NEED_TASK:
+                        sslEngine.getDelegatedTask().run();
+                        handshakeStatus = sslEngine.getHandshakeStatus();
+                        break;
+                    case FINISHED:
+                        return;
+                    case NOT_HANDSHAKING:
+                        if (handshakeResult.getHandshakeStatus() != SSLEngineResult.HandshakeStatus.FINISHED)
+                            throw new SSLException("Did not finish handshake");
+                        return;
+                    default:
+                        throw new IllegalStateException("Unexpected handshake status " + handshakeStatus);
+                }
+            }
+        }
+
+        boolean complete() {
+            return sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.FINISHED ||
+                    sslEngine.getHandshakeStatus() == SSLEngineResult.HandshakeStatus.NOT_HANDSHAKING;
+        }
+
+        void close() {
+            sslEngine.closeOutbound();
+            try {
+                sslEngine.closeInbound();
+            } catch (Exception e) {
+                // ignore
+            }
+        }
+    }
+
+    static class CertificateEntries {
+        private final Principal subjectPrincipal;
+        private final Set<List<?>> subjectAltNames;
+
+        static List<CertificateEntries> create(KeyStore keystore) throws GeneralSecurityException, IOException {
+            Enumeration<String> aliases = keystore.aliases();
+            List<CertificateEntries> entries = new ArrayList<>();
+            while (aliases.hasMoreElements()) {
+                String alias = aliases.nextElement();
+                Certificate cert  = keystore.getCertificate(alias);
+                if (cert instanceof X509Certificate)
+                    entries.add(new CertificateEntries((X509Certificate) cert));
+            }
+            return entries;
+        }
+
+        CertificateEntries(X509Certificate cert) throws GeneralSecurityException {
+            this.subjectPrincipal = cert.getSubjectX500Principal();
+            Collection<List<?>> altNames = cert.getSubjectAlternativeNames();
+            // use a set for comparison
+            this.subjectAltNames = altNames != null ? new HashSet<>(altNames) : Collections.<List<?>>emptySet();
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(subjectPrincipal, subjectAltNames);
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (!(obj instanceof CertificateEntries))
+                return false;
+            CertificateEntries other = (CertificateEntries) obj;
+            return Objects.equals(subjectPrincipal, other.subjectPrincipal) &&
+                    Objects.equals(subjectAltNames, other.subjectAltNames);
+        }
+
+        @Override
+        public String toString() {
+            return "subjectPrincipal=" + subjectPrincipal +
+                    ", subjectAltNames=" + subjectAltNames;
+        }
+    }
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index ff8929a..0352ade 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -82,7 +82,7 @@ public class NioEchoServer extends Thread {
         if (securityProtocol == SecurityProtocol.SASL_PLAINTEXT || securityProtocol == SecurityProtocol.SASL_SSL)
             ScramCredentialUtils.createCache(credentialCache, ScramMechanism.mechanismNames());
         if (channelBuilder == null)
-            channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialCache, tokenCache);
+            channelBuilder = ChannelBuilders.serverChannelBuilder(listenerName, false, securityProtocol, config, credentialCache, tokenCache);
         this.metrics = new Metrics();
         this.selector = new Selector(5000, metrics, new MockTime(), "MetricGroup", channelBuilder, new LogContext());
         acceptorThread = new AcceptorThread();
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
index a64cb45..86d26d4 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SaslChannelBuilderTest.java
@@ -72,7 +72,7 @@ public class SaslChannelBuilderTest {
         jaasConfig.addEntry("jaasContext", PlainLoginModule.class.getName(), new HashMap<String, Object>());
         JaasContext jaasContext = new JaasContext("jaasContext", JaasContext.Type.SERVER, jaasConfig);
         return new SaslChannelBuilder(Mode.CLIENT, jaasContext, securityProtocol, new ListenerName("PLAIN"),
-                "PLAIN", true, null, null);
+                false, "PLAIN", true, null, null);
     }
 
 }
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
index 96c7bc2..1d78e5a 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslSelectorTest.java
@@ -64,7 +64,7 @@ public class SslSelectorTest extends SelectorTest {
         this.server.start();
         this.time = new MockTime();
         sslClientConfigs = TestSslUtils.createSslConfig(false, false, Mode.CLIENT, trustStoreFile, "client");
-        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         this.channelBuilder.configure(sslClientConfigs);
         this.metrics = new Metrics();
         this.selector = new Selector(5000, metrics, time, "MetricGroup", channelBuilder, new LogContext());
@@ -149,7 +149,7 @@ public class SslSelectorTest extends SelectorTest {
         //the initial channel builder is for clients, we need a server one
         File trustStoreFile = File.createTempFile("truststore", ".jks");
         Map<String, Object> sslServerConfigs = TestSslUtils.createSslConfig(false, true, Mode.SERVER, trustStoreFile, "server");
-        channelBuilder = new SslChannelBuilder(Mode.SERVER);
+        channelBuilder = new SslChannelBuilder(Mode.SERVER, null, false);
         channelBuilder.configure(sslServerConfigs);
         selector = new Selector(NetworkReceive.UNLIMITED, 5000, metrics, time, "MetricGroup",
                 new HashMap<String, String>(), true, false, channelBuilder, pool, new LogContext());
@@ -236,7 +236,7 @@ public class SslSelectorTest extends SelectorTest {
     private static class TestSslChannelBuilder extends SslChannelBuilder {
 
         public TestSslChannelBuilder(Mode mode) {
-            super(mode);
+            super(mode, null, false);
         }
 
         @Override
diff --git a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
index 1f55246..5cc66e5 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/SslTransportLayerTest.java
@@ -80,7 +80,7 @@ public class SslTransportLayerTest {
         clientCertStores = new CertStores(false, "client", "localhost");
         sslServerConfigs = serverCertStores.getTrustingConfig(clientCertStores);
         sslClientConfigs = clientCertStores.getTrustingConfig(serverCertStores);
-        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        this.channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         this.channelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
     }
@@ -429,7 +429,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidSecureRandomImplementation() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         try {
             sslClientConfigs.put(SslConfigs.SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
@@ -444,7 +444,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidTruststorePassword() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         try {
             sslClientConfigs.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
@@ -459,7 +459,7 @@ public class SslTransportLayerTest {
      */
     @Test
     public void testInvalidKeystorePassword() throws Exception {
-        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT);
+        SslChannelBuilder channelBuilder = new SslChannelBuilder(Mode.CLIENT, null, false);
         try {
             sslClientConfigs.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "invalid");
             channelBuilder.configure(sslClientConfigs);
@@ -752,7 +752,7 @@ public class SslTransportLayerTest {
 
     @Test
     public void testCloseSsl() throws Exception {
-        testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT));
+        testClose(SecurityProtocol.SSL, new SslChannelBuilder(Mode.CLIENT, null, false));
     }
 
     @Test
@@ -792,17 +792,81 @@ public class SslTransportLayerTest {
         }, 5000, "All requests sent were not processed");
     }
 
-    private void createSelector(Map<String, Object> sslClientConfigs) {
-        createSelector(sslClientConfigs, null, null, null);
+    /**
+     * Tests reconfiguration of server keystore. Verifies that existing connections continue
+     * to work with old keystore and new connections work with new keystore.
+     */
+    @Test
+    public void testServerKeystoreDynamicUpdate() throws Exception {
+        SecurityProtocol securityProtocol = SecurityProtocol.SSL;
+        TestSecurityConfig config = new TestSecurityConfig(sslServerConfigs);
+        ListenerName listenerName = ListenerName.forSecurityProtocol(securityProtocol);
+        ChannelBuilder serverChannelBuilder = ChannelBuilders.serverChannelBuilder(listenerName,
+                false, securityProtocol, config, null, null);
+        server = new NioEchoServer(listenerName, securityProtocol, config,
+                "localhost", serverChannelBuilder, null);
+        server.start();
+        InetSocketAddress addr = new InetSocketAddress("localhost", server.port());
+
+        // Verify that client with matching truststore can authenticate, send and receive
+        String oldNode = "0";
+        Selector oldClientSelector = createSelector(sslClientConfigs);
+        oldClientSelector.connect(oldNode, addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(selector, oldNode, 100, 10);
+
+        CertStores newServerCertStores = new CertStores(true, "server", "localhost");
+        sslServerConfigs = newServerCertStores.getTrustingConfig(clientCertStores);
+        assertTrue("SslChannelBuilder not reconfigurable", serverChannelBuilder instanceof ListenerReconfigurable);
+        ListenerReconfigurable reconfigurableBuilder = (ListenerReconfigurable) serverChannelBuilder;
+        assertEquals(listenerName, reconfigurableBuilder.listenerName());
+        reconfigurableBuilder.validateReconfiguration(sslServerConfigs);
+        reconfigurableBuilder.reconfigure(sslServerConfigs);
+
+        // Verify that new client with old truststore fails
+        oldClientSelector.connect("1", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.waitForChannelClose(oldClientSelector, "1", ChannelState.State.AUTHENTICATION_FAILED);
+
+        // Verify that new client with new truststore can authenticate, send and receive
+        sslClientConfigs = clientCertStores.getTrustingConfig(newServerCertStores);
+        Selector newClientSelector = createSelector(sslClientConfigs);
+        newClientSelector.connect("2", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(newClientSelector, "2", 100, 10);
+
+        // Verify that old client continues to work
+        NetworkTestUtils.checkClientConnection(oldClientSelector, oldNode, 100, 10);
+
+        CertStores invalidCertStores = new CertStores(true, "server", "127.0.0.1");
+        Map<String, Object>  invalidConfigs = invalidCertStores.getTrustingConfig(clientCertStores);
+        try {
+            reconfigurableBuilder.validateReconfiguration(invalidConfigs);
+            fail("Should have failed validation with an exception with different SubjectAltName");
+        } catch (KafkaException e) {
+            // expected exception
+        }
+        try {
+            reconfigurableBuilder.reconfigure(invalidConfigs);
+            fail("Should have failed to reconfigure with different SubjectAltName");
+        } catch (KafkaException e) {
+            // expected exception
+        }
+
+        // Verify that new connections continue to work with the server with previously configured keystore after failed reconfiguration
+        newClientSelector.connect("3", addr, BUFFER_SIZE, BUFFER_SIZE);
+        NetworkTestUtils.checkClientConnection(newClientSelector, "3", 100, 10);
+    }
+
+    private Selector createSelector(Map<String, Object> sslClientConfigs) {
+        return createSelector(sslClientConfigs, null, null, null);
     }
 
-    private void createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
+    private Selector createSelector(Map<String, Object> sslClientConfigs, final Integer netReadBufSize,
                                 final Integer netWriteBufSize, final Integer appBufSize) {
         TestSslChannelBuilder channelBuilder = new TestSslChannelBuilder(Mode.CLIENT);
         channelBuilder.configureBufferSizes(netReadBufSize, netWriteBufSize, appBufSize);
         this.channelBuilder = channelBuilder;
         this.channelBuilder.configure(sslClientConfigs);
         this.selector = new Selector(5000, new Metrics(), new MockTime(), "MetricGroup", channelBuilder, new LogContext());
+        return selector;
     }
 
     private NioEchoServer createEchoServer(ListenerName listenerName, SecurityProtocol securityProtocol) throws Exception {
@@ -823,7 +887,7 @@ public class SslTransportLayerTest {
         int flushDelayCount = 0;
 
         public TestSslChannelBuilder(Mode mode) {
-            super(mode);
+            super(mode, null, false);
         }
 
         public void configureBufferSizes(Integer netReadBufSize, Integer netWriteBufSize, Integer appBufSize) {
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 2771f2f..c18f5c2 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -242,10 +242,14 @@ public class RequestResponseTest {
         checkRequest(createAlterConfigsRequest());
         checkErrorResponse(createAlterConfigsRequest(), new UnknownServerException());
         checkResponse(createAlterConfigsResponse(), 0);
-        checkRequest(createDescribeConfigsRequest());
-        checkRequest(createDescribeConfigsRequestWithConfigEntries());
-        checkErrorResponse(createDescribeConfigsRequest(), new UnknownServerException());
+        checkRequest(createDescribeConfigsRequest(0));
+        checkRequest(createDescribeConfigsRequestWithConfigEntries(0));
+        checkErrorResponse(createDescribeConfigsRequest(0), new UnknownServerException());
         checkResponse(createDescribeConfigsResponse(), 0);
+        checkRequest(createDescribeConfigsRequest(1));
+        checkRequest(createDescribeConfigsRequestWithConfigEntries(1));
+        checkErrorResponse(createDescribeConfigsRequest(1), new UnknownServerException());
+        checkResponse(createDescribeConfigsResponse(), 1);
         checkRequest(createCreatePartitionsRequest());
         checkRequest(createCreatePartitionsRequestWithAssignments());
         checkErrorResponse(createCreatePartitionsRequest(), new InvalidTopicException());
@@ -1031,25 +1035,29 @@ public class RequestResponseTest {
         return new DeleteAclsResponse(0, responses);
     }
 
-    private DescribeConfigsRequest createDescribeConfigsRequest() {
+    private DescribeConfigsRequest createDescribeConfigsRequest(int version) {
         return new DescribeConfigsRequest.Builder(asList(
                 new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"),
-                new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"))).build((short) 0);
+                new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic")))
+                .build((short) version);
     }
 
-    private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries() {
+    private DescribeConfigsRequest createDescribeConfigsRequestWithConfigEntries(int version) {
         Map<org.apache.kafka.common.requests.Resource, Collection<String>> resources = new HashMap<>();
         resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), asList("foo", "bar"));
         resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic"), null);
         resources.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.TOPIC, "topic a"), Collections.<String>emptyList());
-        return new DescribeConfigsRequest.Builder(resources).build((short) 0);
+        return new DescribeConfigsRequest.Builder(resources).build((short) version);
     }
 
     private DescribeConfigsResponse createDescribeConfigsResponse() {
         Map<org.apache.kafka.common.requests.Resource, DescribeConfigsResponse.Config> configs = new HashMap<>();
+        List<DescribeConfigsResponse.ConfigSynonym> synonyms = Collections.emptyList();
         List<DescribeConfigsResponse.ConfigEntry> configEntries = asList(
-                new DescribeConfigsResponse.ConfigEntry("config_name", "config_value", false, true, false),
-                new DescribeConfigsResponse.ConfigEntry("another_name", "another value", true, false, true)
+                new DescribeConfigsResponse.ConfigEntry("config_name", "config_value",
+                        DescribeConfigsResponse.ConfigSource.DYNAMIC_BROKER_CONFIG, true, false, synonyms),
+                new DescribeConfigsResponse.ConfigEntry("another_name", "another value",
+                        DescribeConfigsResponse.ConfigSource.DEFAULT_CONFIG, false, true, synonyms)
         );
         configs.put(new org.apache.kafka.common.requests.Resource(org.apache.kafka.common.requests.ResourceType.BROKER, "0"), new DescribeConfigsResponse.Config(
                 ApiError.NONE, configEntries));
diff --git a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
index bd7fee3..e3d6b7a 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorTest.java
@@ -992,7 +992,7 @@ public class SaslAuthenticatorTest {
         if (isScram)
             ScramCredentialUtils.createCache(credentialCache, Arrays.asList(saslMechanism));
         SaslChannelBuilder serverChannelBuilder = new SaslChannelBuilder(Mode.SERVER, jaasContext,
-                securityProtocol, listenerName, saslMechanism, true, credentialCache, null) {
+                securityProtocol, listenerName, false, saslMechanism, true, credentialCache, null) {
 
             @Override
             protected SaslServerAuthenticator buildServerAuthenticator(Map<String, ?> configs, String id,
@@ -1034,7 +1034,7 @@ public class SaslAuthenticatorTest {
         final Map<String, ?> configs = Collections.emptyMap();
         final JaasContext jaasContext = JaasContext.load(JaasContext.Type.CLIENT, null, configs);
         SaslChannelBuilder clientChannelBuilder = new SaslChannelBuilder(Mode.CLIENT, jaasContext,
-                securityProtocol, listenerName, saslMechanism, true, null, null) {
+                securityProtocol, listenerName, false, saslMechanism, true, null, null) {
 
             @Override
             protected SaslClientAuthenticator buildClientAuthenticator(Map<String, ?> configs, String id,
diff --git a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
index 5546a55..3a89260 100644
--- a/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/security/ssl/SslFactoryTest.java
@@ -17,18 +17,23 @@
 package org.apache.kafka.common.security.ssl;
 
 import java.io.File;
+import java.security.KeyStore;
 import java.util.Map;
 
+import javax.net.ssl.SSLContext;
 import javax.net.ssl.SSLEngine;
+import javax.net.ssl.SSLHandshakeException;
 
 import org.apache.kafka.common.config.SslConfigs;
+import org.apache.kafka.common.config.types.Password;
 import org.apache.kafka.test.TestSslUtils;
 import org.apache.kafka.common.network.Mode;
 import org.junit.Test;
 
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -76,4 +81,60 @@ public class SslFactoryTest {
         assertTrue(engine.getUseClientMode());
     }
 
+    @Test
+    public void testKeyStoreTrustStoreValidation() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+                Mode.SERVER, trustStoreFile, "server");
+        SslFactory sslFactory = new SslFactory(Mode.SERVER);
+        sslFactory.configure(serverSslConfig);
+        SSLContext sslContext = sslFactory.createSSLContext(securityStore(serverSslConfig));
+        assertNotNull("SSL context not created", sslContext);
+    }
+
+    @Test
+    public void testUntrustedKeyStoreValidation() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+                Mode.SERVER, trustStoreFile, "server");
+        Map<String, Object> untrustedConfig = TestSslUtils.createSslConfig(false, true,
+                Mode.SERVER, File.createTempFile("truststore", ".jks"), "server");
+        SslFactory sslFactory = new SslFactory(Mode.SERVER, null, true);
+        sslFactory.configure(serverSslConfig);
+        try {
+            sslFactory.createSSLContext(securityStore(untrustedConfig));
+            fail("Validation did not fail with untrusted keystore");
+        } catch (SSLHandshakeException e) {
+            // Expected exception
+        }
+    }
+
+    @Test
+    public void testCertificateEntriesValidation() throws Exception {
+        File trustStoreFile = File.createTempFile("truststore", ".jks");
+        Map<String, Object> serverSslConfig = TestSslUtils.createSslConfig(false, true,
+                Mode.SERVER, trustStoreFile, "server");
+        Map<String, Object> newCnConfig = TestSslUtils.createSslConfig(false, true,
+                Mode.SERVER, File.createTempFile("truststore", ".jks"), "server", "Another CN");
+        KeyStore ks1 = securityStore(serverSslConfig).load();
+        KeyStore ks2 = securityStore(serverSslConfig).load();
+        assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
+
+        // Use different alias name, validation should succeed
+        ks2.setCertificateEntry("another", ks1.getCertificate("localhost"));
+        assertEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks2));
+
+        KeyStore ks3 = securityStore(newCnConfig).load();
+        assertNotEquals(SslFactory.CertificateEntries.create(ks1), SslFactory.CertificateEntries.create(ks3));
+    }
+
+    private SslFactory.SecurityStore securityStore(Map<String, Object> sslConfig) {
+        return new SslFactory.SecurityStore(
+                (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG),
+                (String) sslConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG),
+                (Password) sslConfig.get(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG),
+                (Password) sslConfig.get(SslConfigs.SSL_KEY_PASSWORD_CONFIG)
+        );
+    }
+
 }
diff --git a/core/src/main/scala/kafka/log/LogConfig.scala b/core/src/main/scala/kafka/log/LogConfig.scala
index 57b4112..26cc504 100755
--- a/core/src/main/scala/kafka/log/LogConfig.scala
+++ b/core/src/main/scala/kafka/log/LogConfig.scala
@@ -29,7 +29,7 @@ import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, TopicConfig}
 import org.apache.kafka.common.record.TimestampType
 import org.apache.kafka.common.utils.Utils
 
-import scala.collection.mutable
+import scala.collection.{Map, mutable}
 import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList, Validator}
 
 object Defaults {
@@ -297,4 +297,33 @@ object LogConfig {
     configDef.parse(props)
   }
 
+  /**
+   * Map topic config to the broker config with highest priority. Some of these have additional synonyms
+   * that can be obtained using [[kafka.server.DynamicBrokerConfig#brokerConfigSynonyms]]
+   */
+  val TopicConfigSynonyms = Map(
+    SegmentBytesProp -> KafkaConfig.LogSegmentBytesProp,
+    SegmentMsProp -> KafkaConfig.LogRollTimeMillisProp,
+    SegmentJitterMsProp -> KafkaConfig.LogRollTimeJitterMillisProp,
+    SegmentIndexBytesProp -> KafkaConfig.LogIndexSizeMaxBytesProp,
+    FlushMessagesProp -> KafkaConfig.LogFlushIntervalMessagesProp,
+    FlushMsProp -> KafkaConfig.LogFlushIntervalMsProp,
+    RetentionBytesProp -> KafkaConfig.LogRetentionBytesProp,
+    RetentionMsProp -> KafkaConfig.LogRetentionTimeMillisProp,
+    MaxMessageBytesProp -> KafkaConfig.MessageMaxBytesProp,
+    IndexIntervalBytesProp -> KafkaConfig.LogIndexIntervalBytesProp,
+    DeleteRetentionMsProp -> KafkaConfig.LogCleanerDeleteRetentionMsProp,
+    MinCompactionLagMsProp -> KafkaConfig.LogCleanerMinCompactionLagMsProp,
+    FileDeleteDelayMsProp -> KafkaConfig.LogDeleteDelayMsProp,
+    MinCleanableDirtyRatioProp -> KafkaConfig.LogCleanerMinCleanRatioProp,
+    CleanupPolicyProp -> KafkaConfig.LogCleanupPolicyProp,
+    UncleanLeaderElectionEnableProp -> KafkaConfig.UncleanLeaderElectionEnableProp,
+    MinInSyncReplicasProp -> KafkaConfig.MinInSyncReplicasProp,
+    CompressionTypeProp -> KafkaConfig.CompressionTypeProp,
+    PreAllocateEnableProp -> KafkaConfig.LogPreAllocateProp,
+    MessageFormatVersionProp -> KafkaConfig.LogMessageFormatVersionProp,
+    MessageTimestampTypeProp -> KafkaConfig.LogMessageTimestampTypeProp,
+    MessageTimestampDifferenceMaxMsProp -> KafkaConfig.LogMessageTimestampDifferenceMaxMsProp
+  )
+
 }
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index d5e49a5..b40bf84 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -31,6 +31,7 @@ import kafka.metrics.KafkaMetricsGroup
 import kafka.security.CredentialProvider
 import kafka.server.KafkaConfig
 import kafka.utils._
+import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.memory.{MemoryPool, SimpleMemoryPool}
 import org.apache.kafka.common.metrics._
 import org.apache.kafka.common.metrics.stats.Meter
@@ -437,20 +438,29 @@ private[kafka] class Processor(val id: Int,
   )
 
   private val selector = createSelector(
-      ChannelBuilders.serverChannelBuilder(listenerName, securityProtocol, config, credentialProvider.credentialCache, credentialProvider.tokenCache))
+    ChannelBuilders.serverChannelBuilder(listenerName,
+      listenerName == config.interBrokerListenerName,
+      securityProtocol,
+      config,
+      credentialProvider.credentialCache,
+      credentialProvider.tokenCache))
   // Visible to override for testing
-  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = new KSelector(
-    maxRequestSize,
-    connectionsMaxIdleMs,
-    metrics,
-    time,
-    "socket-server",
-    metricTags,
-    false,
-    true,
-    channelBuilder,
-    memoryPool,
-    logContext)
+  protected[network] def createSelector(channelBuilder: ChannelBuilder): KSelector = {
+    if (channelBuilder.isInstanceOf[Reconfigurable])
+      config.addReconfigurable(channelBuilder.asInstanceOf[Reconfigurable])
+    new KSelector(
+      maxRequestSize,
+      connectionsMaxIdleMs,
+      metrics,
+      time,
+      "socket-server",
+      metricTags,
+      false,
+      true,
+      channelBuilder,
+      memoryPool,
+      logContext)
+  }
 
   // Connection ids have the format `localAddr:localPort-remoteAddr:remotePort-index`. The index is a
   // non-negative incrementing value that ensures that even if remotePort is reused after a connection is
diff --git a/core/src/main/scala/kafka/server/AdminManager.scala b/core/src/main/scala/kafka/server/AdminManager.scala
index 8f69000..596dde0 100644
--- a/core/src/main/scala/kafka/server/AdminManager.scala
+++ b/core/src/main/scala/kafka/server/AdminManager.scala
@@ -32,6 +32,7 @@ import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.requests.CreateTopicsRequest._
+import org.apache.kafka.common.requests.DescribeConfigsResponse.ConfigSource
 import org.apache.kafka.common.requests.{AlterConfigsRequest, ApiError, DescribeConfigsResponse, Resource, ResourceType}
 import org.apache.kafka.server.policy.{AlterConfigPolicy, CreateTopicPolicy}
 import org.apache.kafka.server.policy.CreateTopicPolicy.RequestMetadata
@@ -280,24 +281,17 @@ class AdminManager(val config: KafkaConfig,
     }
   }
 
-  def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]]): Map[Resource, DescribeConfigsResponse.Config] = {
+  def describeConfigs(resourceToConfigNames: Map[Resource, Option[Set[String]]], includeSynonyms: Boolean): Map[Resource, DescribeConfigsResponse.Config] = {
     resourceToConfigNames.map { case (resource, configNames) =>
 
-      def createResponseConfig(config: AbstractConfig, isReadOnly: Boolean, isDefault: String => Boolean): DescribeConfigsResponse.Config = {
-        val filteredConfigPairs = config.values.asScala.filter { case (configName, _) =>
+      def createResponseConfig(config: AbstractConfig, createConfigEntry: (String, Any) => DescribeConfigsResponse.ConfigEntry): DescribeConfigsResponse.Config = {
+        val allConfigs = config.originals.asScala.filter(_._2 != null) ++ config.values.asScala
+        val filteredConfigPairs = allConfigs.filter { case (configName, _) =>
           /* Always returns true if configNames is None */
-          configNames.map(_.contains(configName)).getOrElse(true)
+          configNames.forall(_.contains(configName))
         }.toIndexedSeq
 
-        val configEntries = filteredConfigPairs.map { case (name, value) =>
-          val configEntryType = config.typeOf(name)
-          val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
-          val valueAsString =
-            if (isSensitive) null
-            else ConfigDef.convertToString(value, configEntryType)
-          new DescribeConfigsResponse.ConfigEntry(name, valueAsString, isSensitive, isDefault(name), isReadOnly)
-        }
-
+        val configEntries = filteredConfigPairs.map { case (name, value) => createConfigEntry(name, value) }
         new DescribeConfigsResponse.Config(ApiError.NONE, configEntries.asJava)
       }
 
@@ -310,15 +304,12 @@ class AdminManager(val config: KafkaConfig,
             // Consider optimizing this by caching the configs or retrieving them from the `Log` when possible
             val topicProps = adminZkClient.fetchEntityConfig(ConfigType.Topic, topic)
             val logConfig = LogConfig.fromProps(KafkaServer.copyKafkaConfigToLog(config), topicProps)
-            createResponseConfig(logConfig, isReadOnly = false, name => !topicProps.containsKey(name))
+            createResponseConfig(logConfig, createTopicConfigEntry(logConfig, topicProps, includeSynonyms))
 
           case ResourceType.BROKER =>
-            val brokerId = try resource.name.toInt catch {
-              case _: NumberFormatException =>
-                throw new InvalidRequestException(s"Broker id must be an integer, but it is: ${resource.name}")
-            }
+            val brokerId = resourceNameToBrokerId(resource.name)
             if (brokerId == config.brokerId)
-              createResponseConfig(config, isReadOnly = true, name => !config.originals.containsKey(name))
+              createResponseConfig(config, createBrokerConfigEntry(includeSynonyms))
             else
               throw new InvalidRequestException(s"Unexpected broker id, expected ${config.brokerId}, but received $brokerId")
 
@@ -340,6 +331,16 @@ class AdminManager(val config: KafkaConfig,
 
   def alterConfigs(configs: Map[Resource, AlterConfigsRequest.Config], validateOnly: Boolean): Map[Resource, ApiError] = {
     configs.map { case (resource, config) =>
+
+      def validateConfigPolicy(resourceType: ConfigResource.Type): Unit = {
+        alterConfigPolicy match {
+          case Some(policy) =>
+            val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
+            policy.validate(new AlterConfigPolicy.RequestMetadata(
+              new ConfigResource(resourceType, resource.name), configEntriesMap.asJava))
+          case None =>
+        }
+      }
       try {
         resource.`type` match {
           case ResourceType.TOPIC =>
@@ -347,31 +348,40 @@ class AdminManager(val config: KafkaConfig,
 
             val properties = new Properties
             config.entries.asScala.foreach { configEntry =>
-              properties.setProperty(configEntry.name(), configEntry.value())
+              properties.setProperty(configEntry.name, configEntry.value)
+            }
+
+            adminZkClient.validateTopicConfig(topic, properties)
+            validateConfigPolicy(ConfigResource.Type.TOPIC)
+            if (!validateOnly)
+              adminZkClient.changeTopicConfig(topic, properties)
+
+            resource -> ApiError.NONE
+
+          case ResourceType.BROKER =>
+            val brokerId = if (resource.name == null || resource.name.isEmpty)
+              None
+            else
+              Some(resourceNameToBrokerId(resource.name))
+            val configProps = new Properties
+            config.entries.asScala.foreach { configEntry =>
+              configProps.setProperty(configEntry.name, configEntry.value)
             }
 
-            alterConfigPolicy match {
-              case Some(policy) =>
-                adminZkClient.validateTopicConfig(topic, properties)
-
-                val configEntriesMap = config.entries.asScala.map(entry => (entry.name, entry.value)).toMap
-                policy.validate(new AlterConfigPolicy.RequestMetadata(
-                  new ConfigResource(ConfigResource.Type.TOPIC, resource.name), configEntriesMap.asJava))
-
-                if (!validateOnly)
-                  adminZkClient.changeTopicConfig(topic, properties)
-              case None =>
-                if (validateOnly)
-                  adminZkClient.validateTopicConfig(topic, properties)
-                else
-                  adminZkClient.changeTopicConfig(topic, properties)
+            val perBrokerConfig = brokerId.nonEmpty
+            this.config.dynamicConfig.validate(configProps, perBrokerConfig)
+            validateConfigPolicy(ConfigResource.Type.BROKER)
+            if (!validateOnly) {
+              adminZkClient.changeBrokerConfig(brokerId,
+                this.config.dynamicConfig.toPersistentProps(configProps, perBrokerConfig))
             }
+
             resource -> ApiError.NONE
           case resourceType =>
-            throw new InvalidRequestException(s"AlterConfigs is only supported for topics, but resource type is $resourceType")
+            throw new InvalidRequestException(s"AlterConfigs is only supported for topics and brokers, but resource type is $resourceType")
         }
       } catch {
-        case e: ConfigException =>
+        case e @ (_: ConfigException | _: IllegalArgumentException) =>
           val message = s"Invalid config value for resource $resource: ${e.getMessage}"
           info(message)
           resource -> ApiError.fromThrowable(new InvalidRequestException(message, e))
@@ -392,4 +402,73 @@ class AdminManager(val config: KafkaConfig,
     CoreUtils.swallow(createTopicPolicy.foreach(_.close()), this)
     CoreUtils.swallow(alterConfigPolicy.foreach(_.close()), this)
   }
+
+  private def resourceNameToBrokerId(resourceName: String): Int = {
+    try resourceName.toInt catch {
+      case _: NumberFormatException =>
+        throw new InvalidRequestException(s"Broker id must be an integer, but it is: $resourceName")
+    }
+  }
+
+  private def brokerSynonyms(name: String): List[String] = {
+    DynamicBrokerConfig.brokerConfigSynonyms(name, matchListenerOverride = true)
+  }
+
+  private def configType(name: String, synonyms: List[String]): ConfigDef.Type = {
+    val configType = config.typeOf(name)
+    if (configType != null)
+      configType
+    else
+      synonyms.iterator.map(config.typeOf).find(_ != null).orNull
+  }
+
+  private def configSynonyms(name: String, synonyms: List[String], isSensitive: Boolean): List[DescribeConfigsResponse.ConfigSynonym] = {
+    val dynamicConfig = config.dynamicConfig
+    val allSynonyms = mutable.Buffer[DescribeConfigsResponse.ConfigSynonym]()
+
+    def maybeAddSynonym(map: Map[String, String], source: ConfigSource)(name: String): Unit = {
+      map.get(name).map { value =>
+        val configValue = if (isSensitive) null else value
+        allSynonyms += new DescribeConfigsResponse.ConfigSynonym(name, configValue, source)
+      }
+    }
+
+    synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicBrokerConfigs, ConfigSource.DYNAMIC_BROKER_CONFIG))
+    synonyms.foreach(maybeAddSynonym(dynamicConfig.currentDynamicDefaultConfigs, ConfigSource.DYNAMIC_DEFAULT_BROKER_CONFIG))
+    synonyms.foreach(maybeAddSynonym(dynamicConfig.staticBrokerConfigs, ConfigSource.STATIC_BROKER_CONFIG))
+    synonyms.foreach(maybeAddSynonym(dynamicConfig.staticDefaultConfigs, ConfigSource.DEFAULT_CONFIG))
+    allSynonyms.dropWhile(s => s.name != name).toList // e.g. drop listener overrides when describing base config
+  }
+
+  private def createTopicConfigEntry(logConfig: LogConfig, topicProps: Properties, includeSynonyms: Boolean)
+                                    (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
+    val configEntryType = logConfig.typeOf(name)
+    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+    val allSynonyms = {
+      val list = LogConfig.TopicConfigSynonyms.get(name)
+        .map(s => configSynonyms(s, brokerSynonyms(s), isSensitive))
+        .getOrElse(List.empty)
+      if (!topicProps.containsKey(name))
+        list
+      else
+        new DescribeConfigsResponse.ConfigSynonym(name, valueAsString, ConfigSource.TOPIC_CONFIG) +: list
+    }
+    val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
+    val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, false, synonyms.asJava)
+  }
+
+  private def createBrokerConfigEntry(includeSynonyms: Boolean)
+                                     (name: String, value: Any): DescribeConfigsResponse.ConfigEntry = {
+    val allNames = brokerSynonyms(name)
+    val configEntryType = configType(name, allNames)
+    val isSensitive = configEntryType == ConfigDef.Type.PASSWORD
+    val valueAsString = if (isSensitive) null else ConfigDef.convertToString(value, configEntryType)
+    val allSynonyms = configSynonyms(name, allNames, isSensitive)
+    val synonyms = if (!includeSynonyms) List.empty else allSynonyms
+    val source = if (allSynonyms.isEmpty) ConfigSource.DEFAULT_CONFIG else allSynonyms.head.source
+    val readOnly = !allNames.exists(DynamicBrokerConfig.AllDynamicConfigs.contains)
+    new DescribeConfigsResponse.ConfigEntry(name, valueAsString, source, isSensitive, readOnly, synonyms.asJava)
+  }
 }
diff --git a/core/src/main/scala/kafka/server/ConfigHandler.scala b/core/src/main/scala/kafka/server/ConfigHandler.scala
index 390222d..23322c1 100644
--- a/core/src/main/scala/kafka/server/ConfigHandler.scala
+++ b/core/src/main/scala/kafka/server/ConfigHandler.scala
@@ -177,7 +177,8 @@ class UserConfigHandler(private val quotaManagers: QuotaManagers, val credential
   * The callback provides the brokerId and the full properties set read from ZK.
   * This implementation reports the overrides to the respective ReplicationQuotaManager objects
   */
-class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
+class BrokerConfigHandler(private val brokerConfig: KafkaConfig,
+                          private val quotaManagers: QuotaManagers) extends ConfigHandler with Logging {
 
   def processConfigChanges(brokerId: String, properties: Properties) {
     def getOrDefault(prop: String): Long = {
@@ -186,7 +187,10 @@ class BrokerConfigHandler(private val brokerConfig: KafkaConfig, private val quo
       else
         DefaultReplicationThrottledRate
     }
-    if (brokerConfig.brokerId == brokerId.trim.toInt) {
+    if (brokerId == ConfigEntityName.Default)
+      brokerConfig.dynamicConfig.updateDefaultConfig(properties)
+    else if (brokerConfig.brokerId == brokerId.trim.toInt) {
+      brokerConfig.dynamicConfig.updateBrokerConfig(brokerConfig.brokerId, properties)
       quotaManagers.leader.updateQuota(upperBound(getOrDefault(LeaderReplicationThrottledRateProp)))
       quotaManagers.follower.updateQuota(upperBound(getOrDefault(FollowerReplicationThrottledRateProp)))
       quotaManagers.alterLogDirs.updateQuota(upperBound(getOrDefault(ReplicaAlterLogDirsIoMaxBytesPerSecondProp)))
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
new file mode 100755
index 0000000..f307b8d
--- /dev/null
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -0,0 +1,360 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+import java.util.concurrent.locks.ReentrantReadWriteLock
+
+import kafka.server.DynamicBrokerConfig._
+import kafka.utils.{CoreUtils, Logging}
+import kafka.zk.{AdminZkClient, KafkaZkClient}
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.{ConfigDef, ConfigException, SslConfigs}
+import org.apache.kafka.common.network.ListenerReconfigurable
+import org.apache.kafka.common.utils.Base64
+
+import scala.collection._
+import scala.collection.JavaConverters._
+
+/**
+  * Dynamic broker configurations are stored in ZooKeeper and may be defined at two levels:
+  * <ul>
+  *   <li>Per-broker configs persisted at <tt>/configs/brokers/{brokerId}</tt>: These can be described/altered
+  *       using AdminClient using the resource name brokerId.</li>
+  *   <li>Cluster-wide defaults persisted at <tt>/configs/brokers/&lt;default&gt;</tt>: These can be described/altered
+  *       using AdminClient using an empty resource name.</li>
+  * </ul>
+  * The order of precedence for broker configs is:
+  * <ol>
+  *   <li>DYNAMIC_BROKER_CONFIG: stored in ZK at /configs/brokers/{brokerId}</li>
+  *   <li>DYNAMIC_DEFAULT_BROKER_CONFIG: stored in ZK at /configs/brokers/&lt;default&gt;</li>
+  *   <li>STATIC_BROKER_CONFIG: properties that broker is started up with, typically from server.properties file</li>
+  *   <li>DEFAULT_CONFIG: Default configs defined in KafkaConfig</li>
+  * </ol>
+  * Log configs use topic config overrides if defined and fallback to broker defaults using the order of precedence above.
+  * Topic config overrides may use a different config name from the default broker config.
+  * See [[kafka.log.LogConfig#TopicConfigSynonyms]] for the mapping.
+  * <p>
+  * AdminClient returns all config synonyms in the order of precedence when configs are described with
+  * <code>includeSynonyms</code>. In addition to configs that may be defined with the same name at different levels,
+  * some configs have additional synonyms.
+  * </p>
+  * <ul>
+  *   <li>Listener configs may be defined using the prefix <tt>listener.name.{listenerName}.{configName}</tt>. These may be
+  *       configured as dynamic or static broker configs. Listener configs have higher precedence than the base configs
+  *       that don't specify the listener name. Listeners without a listener config use the base config. Base configs
+  *       may be defined only as STATIC_BROKER_CONFIG or DEFAULT_CONFIG and cannot be updated dynamically.<li>
+  *   <li>Some configs may be defined using multiple properties. For example, <tt>log.roll.ms</tt> and
+  *       <tt>log.roll.hours</tt> refer to the same config that may be defined in milliseconds or hours. The order of
+  *       precedence of these synonyms is described in the docs of these configs in [[kafka.server.KafkaConfig]].</li>
+  * </ul>
+  *
+  */
+object DynamicBrokerConfig {
+
+  private val DynamicPasswordConfigs = Set(
+    SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG,
+    SslConfigs.SSL_KEY_PASSWORD_CONFIG
+  )
+  private val DynamicSecurityConfigs = SslConfigs.RECONFIGURABLE_CONFIGS.asScala
+
+  val AllDynamicConfigs = mutable.Set[String]()
+  AllDynamicConfigs ++= DynamicSecurityConfigs
+
+  private val PerBrokerConfigs = DynamicSecurityConfigs
+
+  val ListenerConfigRegex = """listener\.name\.[^.]*\.(.*)""".r
+
+
+  def brokerConfigSynonyms(name: String, matchListenerOverride: Boolean): List[String] = {
+    name match {
+      case KafkaConfig.LogRollTimeMillisProp | KafkaConfig.LogRollTimeHoursProp =>
+        List(KafkaConfig.LogRollTimeMillisProp, KafkaConfig.LogRollTimeHoursProp)
+      case KafkaConfig.LogRollTimeJitterMillisProp | KafkaConfig.LogRollTimeJitterHoursProp =>
+        List(KafkaConfig.LogRollTimeJitterMillisProp, KafkaConfig.LogRollTimeJitterHoursProp)
+      case KafkaConfig.LogFlushIntervalMsProp => // LogFlushSchedulerIntervalMsProp is used as default
+        List(KafkaConfig.LogFlushIntervalMsProp, KafkaConfig.LogFlushSchedulerIntervalMsProp)
+      case KafkaConfig.LogRetentionTimeMillisProp | KafkaConfig.LogRetentionTimeMinutesProp | KafkaConfig.LogRetentionTimeHoursProp =>
+        List(KafkaConfig.LogRetentionTimeMillisProp, KafkaConfig.LogRetentionTimeMinutesProp, KafkaConfig.LogRetentionTimeHoursProp)
+      case ListenerConfigRegex(baseName) if matchListenerOverride => List(name, baseName)
+      case _ => List(name)
+    }
+  }
+
+  private[server] def addDynamicConfigs(configDef: ConfigDef): Unit = {
+    KafkaConfig.configKeys.filterKeys(AllDynamicConfigs.contains).values.foreach { config =>
+      configDef.define(config.name, config.`type`, config.defaultValue, config.validator,
+        config.importance, config.documentation, config.group, config.orderInGroup, config.width,
+        config.displayName, config.dependents, config.recommender)
+    }
+  }
+}
+
+class DynamicBrokerConfig(private val kafkaConfig: KafkaConfig) extends Logging {
+
+  private[server] val staticBrokerConfigs = ConfigDef.convertToStringMapWithPasswordValues(kafkaConfig.originalsFromThisConfig).asScala
+  private[server] val staticDefaultConfigs = ConfigDef.convertToStringMapWithPasswordValues(KafkaConfig.defaultValues.asJava).asScala
+  private val dynamicBrokerConfigs = mutable.Map[String, String]()
+  private val dynamicDefaultConfigs = mutable.Map[String, String]()
+  private val brokerId = kafkaConfig.brokerId
+  private val reconfigurables = mutable.Buffer[Reconfigurable]()
+  private val lock = new ReentrantReadWriteLock
+  private var currentConfig = kafkaConfig
+
+  private[server] def initialize(zkClient: KafkaZkClient): Unit = {
+    val adminZkClient = new AdminZkClient(zkClient)
+    updateDefaultConfig(adminZkClient.fetchEntityConfig(ConfigType.Broker, ConfigEntityName.Default))
+    updateBrokerConfig(brokerId, adminZkClient.fetchEntityConfig(ConfigType.Broker, brokerId.toString))
+  }
+
+  def addReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
+    require(reconfigurable.reconfigurableConfigs.asScala.forall(AllDynamicConfigs.contains))
+    reconfigurables += reconfigurable
+  }
+
+  def removeReconfigurable(reconfigurable: Reconfigurable): Unit = CoreUtils.inWriteLock(lock) {
+    reconfigurables -= reconfigurable
+  }
+
+  // Visibility for testing
+  private[server] def currentKafkaConfig: KafkaConfig = CoreUtils.inReadLock(lock) {
+    currentConfig
+  }
+
+  private[server] def currentDynamicBrokerConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
+    dynamicBrokerConfigs.clone()
+  }
+
+  private[server] def currentDynamicDefaultConfigs: Map[String, String] = CoreUtils.inReadLock(lock) {
+    dynamicDefaultConfigs.clone()
+  }
+
+  private[server] def updateBrokerConfig(brokerId: Int, persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+    try {
+      val props = fromPersistentProps(persistentProps, perBrokerConfig = true)
+      dynamicBrokerConfigs.clear()
+      dynamicBrokerConfigs ++= props.asScala
+      updateCurrentConfig()
+    } catch {
+      case e: Exception => error(s"Per-broker configs of $brokerId could not be applied: $persistentProps", e)
+    }
+  }
+
+  private[server] def updateDefaultConfig(persistentProps: Properties): Unit = CoreUtils.inWriteLock(lock) {
+    try {
+      val props = fromPersistentProps(persistentProps, perBrokerConfig = false)
+      dynamicDefaultConfigs.clear()
+      dynamicDefaultConfigs ++= props.asScala
+      updateCurrentConfig()
+    } catch {
+      case e: Exception => error(s"Cluster default configs could not be applied: $persistentProps", e)
+    }
+  }
+
+  private[server] def toPersistentProps(configProps: Properties, perBrokerConfig: Boolean): Properties = {
+    val props = configProps.clone().asInstanceOf[Properties]
+    // TODO (KAFKA-6246): encrypt passwords
+    def encodePassword(configName: String): Unit = {
+      val value = props.getProperty(configName)
+      if (value != null) {
+        if (!perBrokerConfig)
+          throw new ConfigException("Password config can be defined only at broker level")
+        props.setProperty(configName, Base64.encoder.encodeToString(value.getBytes(StandardCharsets.UTF_8)))
+      }
+    }
+    DynamicPasswordConfigs.foreach(encodePassword)
+    props
+  }
+
+  private[server] def fromPersistentProps(persistentProps: Properties, perBrokerConfig: Boolean): Properties = {
+    val props = persistentProps.clone().asInstanceOf[Properties]
+
+    // Remove all invalid configs from `props`
+    removeInvalidConfigs(props, perBrokerConfig)
+    def removeInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
+      if (invalidPropNames.nonEmpty) {
+        invalidPropNames.foreach(props.remove)
+        error(s"$errorMessage: $invalidPropNames")
+      }
+    }
+    removeInvalidProps(nonDynamicConfigs(props), "Non-dynamic configs configured in ZooKeeper will be ignored")
+    removeInvalidProps(securityConfigsWithoutListenerPrefix(props),
+      "Security configs can be dynamically updated only using listener prefix, base configs will be ignored")
+    if (!perBrokerConfig)
+      removeInvalidProps(perBrokerConfigs(props), "Per-broker configs defined at default cluster level will be ignored")
+
+    // TODO (KAFKA-6246): encrypt passwords
+    def decodePassword(configName: String): Unit = {
+      val value = props.getProperty(configName)
+      if (value != null) {
+        props.setProperty(configName, new String(Base64.decoder.decode(value), StandardCharsets.UTF_8))
+      }
+    }
+    DynamicPasswordConfigs.foreach(decodePassword)
+    props
+  }
+
+  private[server] def validate(props: Properties, perBrokerConfig: Boolean): Unit = CoreUtils.inReadLock(lock) {
+    def checkInvalidProps(invalidPropNames: Set[String], errorMessage: String): Unit = {
+      if (invalidPropNames.nonEmpty)
+        throw new ConfigException(s"$errorMessage: $invalidPropNames")
+    }
+    checkInvalidProps(nonDynamicConfigs(props), "Cannot update these configs dynamically")
+    checkInvalidProps(securityConfigsWithoutListenerPrefix(props),
+      "These security configs can be dynamically updated only per-listener using the listener prefix")
+    validateConfigTypes(props)
+    val newProps = mutable.Map[String, String]()
+    newProps ++= staticBrokerConfigs
+    if (perBrokerConfig) {
+      overrideProps(newProps, dynamicDefaultConfigs)
+      overrideProps(newProps, props.asScala)
+    } else {
+      checkInvalidProps(perBrokerConfigs(props),
+        "Cannot update these configs at default cluster level, broker id must be specified")
+      overrideProps(newProps, props.asScala)
+      overrideProps(newProps, dynamicBrokerConfigs)
+    }
+    processReconfiguration(newProps, validateOnly = true)
+  }
+
+  private def perBrokerConfigs(props: Properties): Set[String] = {
+    val configNames = props.asScala.keySet
+    configNames.intersect(PerBrokerConfigs) ++ configNames.filter(ListenerConfigRegex.findFirstIn(_).nonEmpty)
+  }
+
+  private def nonDynamicConfigs(props: Properties): Set[String] = {
+    props.asScala.keySet.intersect(DynamicConfig.Broker.nonDynamicProps)
+  }
+
+  private def securityConfigsWithoutListenerPrefix(props: Properties): Set[String] = {
+    DynamicSecurityConfigs.filter(props.containsKey)
+  }
+
+  private def validateConfigTypes(props: Properties): Unit = {
+    val baseProps = new Properties
+    props.asScala.foreach {
+      case (ListenerConfigRegex(baseName), v) => baseProps.put(baseName, v)
+      case (k, v) => baseProps.put(k, v)
+    }
+    DynamicConfig.Broker.validate(baseProps)
+  }
+
+  private def removeInvalidConfigs(props: Properties, perBrokerConfig: Boolean): Unit = {
+    try {
+      validateConfigTypes(props)
+      props.asScala
+    } catch {
+      case e: Exception =>
+        val invalidProps = props.asScala.filter { case (k, v) =>
+          val props1 = new Properties
+          props1.put(k, v)
+          try {
+            validateConfigTypes(props1)
+            false
+          } catch {
+            case _: Exception => true
+          }
+        }
+        invalidProps.foreach(props.remove)
+        val configSource = if (perBrokerConfig) "broker" else "default cluster"
+        error(s"Dynamic $configSource config contains invalid values: $invalidProps, these configs will be ignored", e)
+    }
+  }
+
+  private def updatedConfigs(newProps: java.util.Map[String, _], currentProps: java.util.Map[_, _]): mutable.Map[String, _] = {
+    newProps.asScala.filter {
+      case (k, v) => v != currentProps.get(k)
+    }
+  }
+
+  /**
+    * Updates values in `props` with the new values from `propsOverride`. Synonyms of updated configs
+    * are removed from `props` to ensure that the config with the higher precedence is applied. For example,
+    * if `log.roll.ms` was defined in server.properties and `log.roll.hours` is configured dynamically,
+    * `log.roll.hours` from the dynamic configuration will be used and `log.roll.ms` will be removed from
+    * `props` (even though `log.roll.hours` is secondary to `log.roll.ms`).
+    */
+  private def overrideProps(props: mutable.Map[String, String], propsOverride: mutable.Map[String, String]): Unit = {
+    propsOverride.foreach { case (k, v) =>
+      // Remove synonyms of `k` to ensure the right precedence is applied. But disable `matchListenerOverride`
+      // so that base configs corresponding to listener configs are not removed. Base configs should not be removed
+      // since they may be used by other listeners. It is ok to retain them in `props` since base configs cannot be
+      // dynamically updated and listener-specific configs have the higher precedence.
+      brokerConfigSynonyms(k, matchListenerOverride = false).foreach(props.remove)
+      props.put(k, v)
+    }
+  }
+
+  private def updateCurrentConfig(): Unit = {
+    val newProps = mutable.Map[String, String]()
+    newProps ++= staticBrokerConfigs
+    overrideProps(newProps, dynamicDefaultConfigs)
+    overrideProps(newProps, dynamicBrokerConfigs)
+    val newConfig = processReconfiguration(newProps, validateOnly = false)
+    if (newConfig ne currentConfig) {
+      currentConfig = newConfig
+      kafkaConfig.updateCurrentConfig(currentConfig)
+    }
+  }
+
+  private def processReconfiguration(newProps: Map[String, String], validateOnly: Boolean): KafkaConfig = {
+    val newConfig = new KafkaConfig(newProps.asJava, !validateOnly, None)
+    val updatedMap = updatedConfigs(newConfig.originalsFromThisConfig, currentConfig.originals)
+    if (updatedMap.nonEmpty) {
+      try {
+        val customConfigs = new util.HashMap[String, Object](newConfig.originalsFromThisConfig) // non-Kafka configs
+        newConfig.valuesFromThisConfig.keySet.asScala.foreach(customConfigs.remove)
+        reconfigurables.foreach {
+          case listenerReconfigurable: ListenerReconfigurable =>
+            val listenerName = listenerReconfigurable.listenerName
+            val oldValues = currentConfig.valuesWithPrefixOverride(listenerName.configPrefix)
+            val newValues = newConfig.valuesFromThisConfigWithPrefixOverride(listenerName.configPrefix)
+            val updatedKeys = updatedConfigs(newValues, oldValues).keySet
+            processReconfigurable(listenerReconfigurable, updatedKeys, newValues, customConfigs, validateOnly)
+          case reconfigurable =>
+            processReconfigurable(reconfigurable, updatedMap.keySet, newConfig.valuesFromThisConfig, customConfigs, validateOnly)
+        }
+        newConfig
+      } catch {
+        case e: Exception =>
+          if (!validateOnly)
+            error(s"Failed to update broker configuration with configs : ${newConfig.originalsFromThisConfig}", e)
+          throw new ConfigException("Invalid dynamic configuration", e)
+      }
+    }
+    else
+      currentConfig
+  }
+
+  private def processReconfigurable(reconfigurable: Reconfigurable, updatedKeys: Set[String],
+                                    allNewConfigs: util.Map[String, _], newCustomConfigs: util.Map[String, Object],
+                                    validateOnly: Boolean): Unit = {
+    if (reconfigurable.reconfigurableConfigs.asScala.intersect(updatedKeys).nonEmpty) {
+      val newConfigs = new util.HashMap[String, Object]
+      allNewConfigs.asScala.foreach { case (k, v) => newConfigs.put(k, v.asInstanceOf[AnyRef]) }
+      newConfigs.putAll(newCustomConfigs)
+      if (validateOnly) {
+        if (!reconfigurable.validateReconfiguration(newConfigs))
+          throw new ConfigException("Validation of dynamic config update failed")
+      } else
+        reconfigurable.reconfigure(newConfigs)
+    }
+  }
+}
diff --git a/core/src/main/scala/kafka/server/DynamicConfig.scala b/core/src/main/scala/kafka/server/DynamicConfig.scala
index ddfdff8..1a401d2 100644
--- a/core/src/main/scala/kafka/server/DynamicConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicConfig.scala
@@ -18,12 +18,14 @@
 package kafka.server
 
 import java.util.Properties
+
 import kafka.log.LogConfig
 import kafka.security.CredentialProvider
 import org.apache.kafka.common.config.ConfigDef
 import org.apache.kafka.common.config.ConfigDef.Importance._
 import org.apache.kafka.common.config.ConfigDef.Range._
 import org.apache.kafka.common.config.ConfigDef.Type._
+
 import scala.collection.JavaConverters._
 
 /**
@@ -57,10 +59,12 @@ object DynamicConfig {
       .define(LeaderReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, LeaderReplicationThrottledRateDoc)
       .define(FollowerReplicationThrottledRateProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, FollowerReplicationThrottledRateDoc)
       .define(ReplicaAlterLogDirsIoMaxBytesPerSecondProp, LONG, DefaultReplicationThrottledRate, atLeast(0), MEDIUM, ReplicaAlterLogDirsIoMaxBytesPerSecondDoc)
+    DynamicBrokerConfig.addDynamicConfigs(brokerConfigDef)
+    val nonDynamicProps = KafkaConfig.configNames.toSet -- brokerConfigDef.names.asScala
 
     def names = brokerConfigDef.names
 
-    def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props)
+    def validate(props: Properties) = DynamicConfig.validate(brokerConfigDef, props, customPropsAllowed = true)
   }
 
   object Client {
@@ -87,7 +91,7 @@ object DynamicConfig {
 
     def names = clientConfigs.names
 
-    def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props)
+    def validate(props: Properties) = DynamicConfig.validate(clientConfigs, props, customPropsAllowed = false)
   }
 
   object User {
@@ -100,14 +104,16 @@ object DynamicConfig {
 
     def names = userConfigs.names
 
-    def validate(props: Properties) = DynamicConfig.validate(userConfigs, props)
+    def validate(props: Properties) = DynamicConfig.validate(userConfigs, props, customPropsAllowed = false)
   }
 
-  private def validate(configDef: ConfigDef, props: Properties) = {
+  private def validate(configDef: ConfigDef, props: Properties, customPropsAllowed: Boolean) = {
     //Validate Names
     val names = configDef.names()
-    props.keys.asScala.foreach { name =>
-      require(names.contains(name), s"Unknown Dynamic Configuration '$name'.")
+    val propKeys = props.keySet.asScala.map(_.asInstanceOf[String])
+    if (!customPropsAllowed) {
+      val unknownKeys = propKeys.filter(!names.contains(_))
+      require(unknownKeys.isEmpty, s"Unknown Dynamic Configuration: $unknownKeys.")
     }
     //ValidateValues
     configDef.parse(props)
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala
index b54a637..1ff75c0 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -1959,7 +1959,7 @@ class KafkaApis(val requestChannel: RequestChannel,
     }
     val authorizedConfigs = adminManager.describeConfigs(authorizedResources.map { resource =>
       resource -> Option(describeConfigsRequest.configNames(resource)).map(_.asScala.toSet)
-    }.toMap)
+    }.toMap, describeConfigsRequest.includeSynonyms)
     val unauthorizedConfigs = unauthorizedResources.map { resource =>
       val error = configsAuthorizationApiError(request.session, resource)
       resource -> new DescribeConfigsResponse.Config(error, Collections.emptyList[DescribeConfigsResponse.ConfigEntry])
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6e8249f..47f13f6 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -17,6 +17,7 @@
 
 package kafka.server
 
+import java.util
 import java.util.Properties
 
 import kafka.api.{ApiVersion, KAFKA_0_10_0_IV1}
@@ -28,7 +29,8 @@ import kafka.message.{BrokerCompressionCodec, CompressionCodec, Message, Message
 import kafka.utils.CoreUtils
 import kafka.utils.Implicits._
 import org.apache.kafka.clients.CommonClientConfigs
-import org.apache.kafka.common.config.ConfigDef.ValidList
+import org.apache.kafka.common.Reconfigurable
+import org.apache.kafka.common.config.ConfigDef.{ConfigKey, ValidList}
 import org.apache.kafka.common.config.internals.BrokerSecurityConfigs
 import org.apache.kafka.common.config.{AbstractConfig, ConfigDef, ConfigException, SaslConfigs, SslConfigs, TopicConfig}
 import org.apache.kafka.common.metrics.Sensor
@@ -912,6 +914,8 @@ object KafkaConfig {
   }
 
   def configNames() = configDef.names().asScala.toList.sorted
+  private[server] def defaultValues: Map[String, _] = configDef.defaultValues.asScala
+  private[server] def configKeys: Map[String, ConfigKey] = configDef.configKeys.asScala
 
   def fromProps(props: Properties): KafkaConfig =
     fromProps(props, true)
@@ -933,9 +937,37 @@ object KafkaConfig {
 
 }
 
-class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
+class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigOverride: Option[DynamicBrokerConfig])
+  extends AbstractConfig(KafkaConfig.configDef, props, doLog) {
 
-  def this(props: java.util.Map[_, _]) = this(props, true)
+  def this(props: java.util.Map[_, _]) = this(props, true, None)
+  def this(props: java.util.Map[_, _], doLog: Boolean) = this(props, doLog, None)
+  private[server] val dynamicConfig = dynamicConfigOverride.getOrElse(new DynamicBrokerConfig(this))
+  // Cache the current config to avoid acquiring read lock to access from dynamicConfig
+  @volatile private var currentConfig = this
+
+  private[server] def updateCurrentConfig(newConfig: KafkaConfig): Unit = {
+    this.currentConfig = newConfig
+  }
+
+  override def originals: util.Map[String, AnyRef] =
+    if (this eq currentConfig) super.originals else currentConfig.originals
+  override def values: util.Map[String, _] =
+    if (this eq currentConfig) super.values else currentConfig.values
+  override def originalsStrings: util.Map[String, String] =
+    if (this eq currentConfig) super.originalsStrings else currentConfig.originalsStrings
+  override def originalsWithPrefix(prefix: String): util.Map[String, AnyRef] =
+    if (this eq currentConfig) super.originalsWithPrefix(prefix) else currentConfig.originalsWithPrefix(prefix)
+  override def valuesWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
+    if (this eq currentConfig) super.valuesWithPrefixOverride(prefix) else currentConfig.valuesWithPrefixOverride(prefix)
+  override def get(key: String): AnyRef =
+    if (this eq currentConfig) super.get(key) else currentConfig.get(key)
+
+  //  During dynamic update, we use the values from this config, these are only used in DynamicBrokerConfig
+  private[server] def originalsFromThisConfig: util.Map[String, AnyRef] = super.originals
+  private[server] def valuesFromThisConfig: util.Map[String, _] = super.values
+  private[server] def valuesFromThisConfigWithPrefixOverride(prefix: String): util.Map[String, AnyRef] =
+    super.valuesWithPrefixOverride(prefix)
 
   /** ********* Zookeeper Configuration ***********/
   val zkConnect: String = getString(KafkaConfig.ZkConnectProp)
@@ -1094,10 +1126,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val sslProtocol = getString(KafkaConfig.SslProtocolProp)
   val sslProvider = getString(KafkaConfig.SslProviderProp)
   val sslEnabledProtocols = getList(KafkaConfig.SslEnabledProtocolsProp)
-  val sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
-  val sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
-  val sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
-  val sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
+  def sslKeystoreType = getString(KafkaConfig.SslKeystoreTypeProp)
+  def sslKeystoreLocation = getString(KafkaConfig.SslKeystoreLocationProp)
+  def sslKeystorePassword = getPassword(KafkaConfig.SslKeystorePasswordProp)
+  def sslKeyPassword = getPassword(KafkaConfig.SslKeyPasswordProp)
   val sslTruststoreType = getString(KafkaConfig.SslTruststoreTypeProp)
   val sslTruststoreLocation = getString(KafkaConfig.SslTruststoreLocationProp)
   val sslTruststorePassword = getPassword(KafkaConfig.SslTruststorePasswordProp)
@@ -1143,6 +1175,10 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean) extends Abstra
   val advertisedListeners: Seq[EndPoint] = getAdvertisedListeners
   private[kafka] lazy val listenerSecurityProtocolMap = getListenerSecurityProtocolMap
 
+  def addReconfigurable(reconfigurable: Reconfigurable): Unit = {
+    dynamicConfig.addReconfigurable(reconfigurable)
+  }
+
   private def getLogRetentionTimeMillis: Long = {
     val millisInMinute = 60L * 1000L
     val millisInHour = 60L * millisInMinute
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 355b741..80b0eb7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -134,7 +134,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   var kafkaController: KafkaController = null
 
-  val kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+  var kafkaScheduler: KafkaScheduler = null
 
   var metadataCache: MetadataCache = null
   var quotaManagers: QuotaFactory.QuotaManagers = null
@@ -196,12 +196,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
       if (canStartup) {
         brokerState.newState(Starting)
 
-        /* start scheduler */
-        kafkaScheduler.startup()
-
         /* setup zookeeper */
         initZkClient(time)
 
+        // initialize dynamic broker configs from ZooKeeper. Any updates made after this will be
+        // applied after DynamicConfigManager starts.
+        config.dynamicConfig.initialize(zkClient)
+
+        /* start scheduler */
+        kafkaScheduler = new KafkaScheduler(config.backgroundThreads)
+        kafkaScheduler.startup()
+
         /* Get or create cluster_id */
         _clusterId = getOrGenerateClusterId(zkClient)
         info(s"Cluster ID = $clusterId")
diff --git a/core/src/main/scala/kafka/server/KafkaServerStartable.scala b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
index eb290d2..bbe9aba 100644
--- a/core/src/main/scala/kafka/server/KafkaServerStartable.scala
+++ b/core/src/main/scala/kafka/server/KafkaServerStartable.scala
@@ -25,12 +25,12 @@ import kafka.utils.{Exit, Logging, VerifiableProperties}
 object KafkaServerStartable {
   def fromProps(serverProps: Properties) = {
     val reporters = KafkaMetricsReporter.startReporters(new VerifiableProperties(serverProps))
-    new KafkaServerStartable(KafkaConfig.fromProps(serverProps), reporters)
+    new KafkaServerStartable(KafkaConfig.fromProps(serverProps, false), reporters)
   }
 }
 
-class KafkaServerStartable(val serverConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
-  private val server = new KafkaServer(serverConfig, kafkaMetricsReporters = reporters)
+class KafkaServerStartable(val staticServerConfig: KafkaConfig, reporters: Seq[KafkaMetricsReporter]) extends Logging {
+  private val server = new KafkaServer(staticServerConfig, kafkaMetricsReporters = reporters)
 
   def this(serverConfig: KafkaConfig) = this(serverConfig, Seq.empty)
 
diff --git a/core/src/main/scala/kafka/utils/ZkUtils.scala b/core/src/main/scala/kafka/utils/ZkUtils.scala
index 098670c..e04bce0 100644
--- a/core/src/main/scala/kafka/utils/ZkUtils.scala
+++ b/core/src/main/scala/kafka/utils/ZkUtils.scala
@@ -66,6 +66,7 @@ object ZkUtils {
   val BrokerSequenceIdPath = s"$BrokersPath/seqid"
   val ConfigChangesPath = s"$ConfigPath/changes"
   val ConfigUsersPath = s"$ConfigPath/users"
+  val ConfigBrokersPath = s"$ConfigPath/brokers"
   val ProducerIdBlockPath = "/latest_producer_id_block"
 
   val SecureZkRootPaths = ZkData.SecureRootPaths
diff --git a/core/src/main/scala/kafka/zk/AdminZkClient.scala b/core/src/main/scala/kafka/zk/AdminZkClient.scala
index fe64414..6d7df3f 100644
--- a/core/src/main/scala/kafka/zk/AdminZkClient.scala
+++ b/core/src/main/scala/kafka/zk/AdminZkClient.scala
@@ -357,11 +357,34 @@ class AdminZkClient(zkClient: KafkaZkClient) extends Logging {
     * @param configs: The config to change, as properties
     */
   def changeBrokerConfig(brokers: Seq[Int], configs: Properties): Unit = {
-    DynamicConfig.Broker.validate(configs)
-    brokers.foreach { broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
+    validateBrokerConfig(configs)
+    brokers.foreach {
+      broker => changeEntityConfig(ConfigType.Broker, broker.toString, configs)
     }
   }
 
+  /**
+    * Override a broker override or broker default config. These overrides will be persisted between sessions, and will
+    * override any defaults entered in the broker's config files
+    *
+    * @param broker: The broker to apply config changes to or None to update dynamic default configs
+    * @param configs: The config to change, as properties
+    */
+  def changeBrokerConfig(broker: Option[Int], configs: Properties): Unit = {
+    validateBrokerConfig(configs)
+    val entityName = broker.map(_.toString).getOrElse(ConfigEntityName.Default)
+    changeEntityConfig(ConfigType.Broker, broker.map(String.valueOf).getOrElse(ConfigEntityName.Default), configs)
+  }
+
+  /**
+    * Validate dynamic broker configs. Since broker configs may contain custom configs, the validation
+    * only verifies that the provided config does not contain any static configs.
+    * @param configs configs to validate
+    */
+  def validateBrokerConfig(configs: Properties): Unit = {
+    DynamicConfig.Broker.validate(configs)
+  }
+
   private def changeEntityConfig(rootEntityType: String, fullSanitizedEntityName: String, configs: Properties) {
     val sanitizedEntityPath = rootEntityType + '/' + fullSanitizedEntityName
     zkClient.setOrCreateEntityConfigs(rootEntityType, fullSanitizedEntityName, configs)
diff --git a/core/src/main/scala/kafka/zk/ZkData.scala b/core/src/main/scala/kafka/zk/ZkData.scala
index 99fe591..b6352fa 100644
--- a/core/src/main/scala/kafka/zk/ZkData.scala
+++ b/core/src/main/scala/kafka/zk/ZkData.scala
@@ -546,7 +546,11 @@ object ZkData {
     LogDirEventNotificationZNode.path
   ) ++ ConfigType.all.map(ConfigEntityTypeZNode.path)
 
-  val SensitiveRootPaths = Seq(ConfigEntityTypeZNode.path(ConfigType.User), DelegationTokensZNode.path)
+  val SensitiveRootPaths = Seq(
+    ConfigEntityTypeZNode.path(ConfigType.User),
+    ConfigEntityTypeZNode.path(ConfigType.Broker),
+    DelegationTokensZNode.path
+  )
 
   def sensitivePath(path: String): Boolean = {
     path != null && SensitiveRootPaths.exists(path.startsWith)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
new file mode 100644
index 0000000..fc06b73
--- /dev/null
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -0,0 +1,471 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one
+  * or more contributor license agreements.  See the NOTICE file
+  * distributed with this work for additional information
+  * regarding copyright ownership.  The ASF licenses this file
+  * to you under the Apache License, Version 2.0 (the
+  * "License"); you may not use this file except in compliance
+  * with the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.io.File
+import java.nio.file.{Files, StandardCopyOption}
+import java.util
+import java.util.{Collections, Properties}
+import java.util.concurrent.{ExecutionException, TimeUnit}
+
+import kafka.api.SaslSetup
+import kafka.coordinator.group.OffsetConfig
+import kafka.utils.{ShutdownableThread, TestUtils}
+import kafka.utils.Implicits._
+import kafka.zk.{ConfigEntityChangeNotificationZNode, ZooKeeperTestHarness}
+import org.apache.kafka.clients.admin.ConfigEntry.{ConfigSource, ConfigSynonym}
+import org.apache.kafka.clients.admin._
+import org.apache.kafka.clients.consumer.{ConsumerRecord, KafkaConsumer}
+import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.config.ConfigResource
+import org.apache.kafka.common.config.SslConfigs._
+import org.apache.kafka.common.config.types.Password
+import org.apache.kafka.common.errors.{AuthenticationException, InvalidRequestException}
+import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.network.{ListenerName, Mode}
+import org.apache.kafka.common.security.auth.SecurityProtocol
+import org.apache.kafka.common.serialization.{StringDeserializer, StringSerializer}
+import org.junit.Assert._
+import org.junit.{After, Before, Test}
+
+import scala.collection.mutable.ArrayBuffer
+import scala.collection.JavaConverters._
+
+object DynamicBrokerReconfigurationTest {
+  val SecureInternal = "INTERNAL"
+  val SecureExternal = "EXTERNAL"
+}
+
+class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSetup {
+
+  import DynamicBrokerReconfigurationTest._
+
+  private var servers = new ArrayBuffer[KafkaServer]
+  private val numServers = 3
+  private val producers = new ArrayBuffer[KafkaProducer[String, String]]
+  private val consumers = new ArrayBuffer[KafkaConsumer[String, String]]
+  private val adminClients = new ArrayBuffer[AdminClient]()
+  private val clientThreads = new ArrayBuffer[ShutdownableThread]()
+  private val topic = "testtopic"
+
+  private val kafkaClientSaslMechanism = "PLAIN"
+  private val kafkaServerSaslMechanisms = List("PLAIN")
+  private val clientSaslProps = kafkaClientSaslProperties(kafkaClientSaslMechanism, dynamicJaasConfig = true)
+
+  private val trustStoreFile1 = File.createTempFile("truststore", ".jks")
+  private val trustStoreFile2 = File.createTempFile("truststore", ".jks")
+  private val sslProperties1 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile1), "kafka")
+  private val sslProperties2 = TestUtils.sslConfigs(Mode.SERVER, clientCert = false, Some(trustStoreFile2), "kafka")
+  private val invalidSslProperties = invalidSslConfigs
+
+  @Before
+  override def setUp(): Unit = {
+    startSasl(jaasSections(kafkaServerSaslMechanisms, Some(kafkaClientSaslMechanism)))
+    super.setUp()
+
+    (0 until numServers).foreach { brokerId =>
+
+      val props = TestUtils.createBrokerConfig(brokerId, zkConnect, trustStoreFile = Some(trustStoreFile1))
+      // Ensure that we can support multiple listeners per security protocol and multiple security protocols
+      props.put(KafkaConfig.ListenersProp, s"$SecureInternal://localhost:0, $SecureExternal://localhost:0")
+      props.put(KafkaConfig.ListenerSecurityProtocolMapProp, s"$SecureInternal:SSL, $SecureExternal:SASL_SSL")
+      props.put(KafkaConfig.InterBrokerListenerNameProp, SecureInternal)
+      props.put(KafkaConfig.ZkEnableSecureAclsProp, "true")
+      props.put(KafkaConfig.SaslEnabledMechanismsProp, kafkaServerSaslMechanisms.mkString(","))
+
+      props ++= sslProperties1
+      addKeystoreWithListenerPrefix(sslProperties1, props, SecureInternal)
+
+      // Set invalid static properties to ensure that dynamic config is used
+      props ++= invalidSslProperties
+      addKeystoreWithListenerPrefix(invalidSslProperties, props, SecureExternal)
+
+      val kafkaConfig = KafkaConfig.fromProps(props)
+      configureDynamicKeystoreInZooKeeper(kafkaConfig, Seq(brokerId), sslProperties1)
+
+      servers += TestUtils.createServer(kafkaConfig)
+    }
+
+    TestUtils.createTopic(zkClient, Topic.GROUP_METADATA_TOPIC_NAME, OffsetConfig.DefaultOffsetsTopicNumPartitions,
+      replicationFactor = numServers, servers, servers.head.groupCoordinator.offsetsTopicConfigs)
+
+    TestUtils.createTopic(zkClient, topic, numPartitions = 10, replicationFactor = numServers, servers)
+    createAdminClient(SecurityProtocol.SSL, SecureInternal)
+  }
+
+  @After
+  override def tearDown() {
+    clientThreads.foreach(_.interrupt())
+    clientThreads.foreach(_.initiateShutdown())
+    clientThreads.foreach(_.join(5 * 1000))
+    producers.foreach(_.close())
+    consumers.foreach(_.close())
+    adminClients.foreach(_.close())
+    TestUtils.shutdownServers(servers)
+    super.tearDown()
+    closeSasl()
+  }
+
+  @Test
+  def testKeystoreUpdate(): Unit = {
+    val producer = createProducer(trustStoreFile1, retries = 0)
+    val consumer = createConsumer("group1", trustStoreFile1)
+    verifyProduceConsume(producer, consumer, 10)
+
+    // Producer with new truststore should fail to connect before keystore update
+    val producer2 = createProducer(trustStoreFile2, retries = 0)
+    verifyAuthenticationFailure(producer2)
+
+    // Update broker keystore
+    configureDynamicKeystoreInZooKeeper(servers.head.config, servers.map(_.config.brokerId), sslProperties2)
+    waitForKeystore(sslProperties2)
+
+    // New producer with old truststore should fail to connect
+    val producer1 = createProducer(trustStoreFile1, retries = 0)
+    verifyAuthenticationFailure(producer1)
+
+    // New producer with new truststore should work
+    val producer3 = createProducer(trustStoreFile2, retries = 0)
+    verifyProduceConsume(producer3, consumer, 10)
+
+    // Old producer with old truststore should continue to work (with their old connections)
+    verifyProduceConsume(producer, consumer, 10)
+  }
+
+  @Test
+  def testKeyStoreDescribeUsingAdminClient(): Unit = {
+
+    def verifyConfig(configName: String, configEntry: ConfigEntry, isSensitive: Boolean, expectedProps: Properties): Unit = {
+      if (isSensitive) {
+        assertTrue(s"Value is sensitive: $configName", configEntry.isSensitive)
+        assertNull(s"Sensitive value returned for $configName", configEntry.value)
+      } else {
+        assertFalse(s"Config is not sensitive: $configName", configEntry.isSensitive)
+        assertEquals(expectedProps.getProperty(configName), configEntry.value)
+      }
+    }
+
+    def verifySynonym(configName: String, synonym: ConfigSynonym, isSensitive: Boolean,
+                      expectedPrefix: String, expectedSource: ConfigSource, expectedProps: Properties): Unit = {
+      if (isSensitive)
+        assertNull(s"Sensitive value returned for $configName", synonym.value)
+      else
+        assertEquals(expectedProps.getProperty(configName), synonym.value)
+      assertTrue(s"Expected listener config, got $synonym", synonym.name.startsWith(expectedPrefix))
+      assertEquals(expectedSource, synonym.source)
+    }
+
+    def verifySynonyms(configName: String, synonyms: util.List[ConfigSynonym], isSensitive: Boolean,
+                       prefix: String, defaultValue: Option[String]): Unit = {
+
+      val overrideCount = if (prefix.isEmpty) 0 else 2
+      assertEquals(s"Wrong synonyms for $configName: $synonyms", 1 + overrideCount + defaultValue.size, synonyms.size)
+      if (overrideCount > 0) {
+        val listenerPrefix = "listener.name.external.ssl."
+        verifySynonym(configName, synonyms.get(0), isSensitive, listenerPrefix, ConfigSource.DYNAMIC_BROKER_CONFIG, sslProperties1)
+        verifySynonym(configName, synonyms.get(1), isSensitive, listenerPrefix, ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
+      }
+      verifySynonym(configName, synonyms.get(overrideCount), isSensitive, "ssl.", ConfigSource.STATIC_BROKER_CONFIG, invalidSslProperties)
+      defaultValue.foreach { value =>
+        val defaultProps = new Properties
+        defaultProps.setProperty(configName, value)
+        verifySynonym(configName, synonyms.get(overrideCount + 1), isSensitive, "ssl.", ConfigSource.DEFAULT_CONFIG, defaultProps)
+      }
+    }
+
+    def verifySslConfig(prefix: String, expectedProps: Properties, configDesc: Config): Unit = {
+      Seq(SSL_KEYSTORE_LOCATION_CONFIG, SSL_KEYSTORE_TYPE_CONFIG, SSL_KEYSTORE_PASSWORD_CONFIG, SSL_KEY_PASSWORD_CONFIG).foreach { configName =>
+        val desc = configEntry(configDesc, s"$prefix$configName")
+        val isSensitive = configName.contains("password")
+        verifyConfig(configName, desc, isSensitive, if (prefix.isEmpty) invalidSslProperties else sslProperties1)
+        val defaultValue = if (configName == SSL_KEYSTORE_TYPE_CONFIG) Some("JKS") else None
+        verifySynonyms(configName, desc.synonyms, isSensitive, prefix, defaultValue)
+      }
+    }
+
+    val adminClient = adminClients.head
+
+    val configDesc = describeConfig(adminClient)
+    verifySslConfig("listener.name.external.", sslProperties1, configDesc)
+    verifySslConfig("", invalidSslProperties, configDesc)
+  }
+
+  @Test
+  def testKeyStoreAlterUsingAdminClient(): Unit = {
+    val topic2 = "testtopic2"
+    TestUtils.createTopic(zkClient, topic2, numPartitions = 10, replicationFactor = numServers, servers)
+
+    // Start a producer and consumer that work with the current truststore.
+    // This should continue working while changes are made
+    val (producerThread, consumerThread) = startProduceConsume(retries = 0)
+    TestUtils.waitUntilTrue(() => consumerThread.received >= 10, "Messages not received")
+
+    // Update broker keystore for external listener
+    val adminClient = adminClients.head
+    alterSslKeystore(adminClient, sslProperties2, SecureExternal)
+
+    // Produce/consume should work with new truststore
+    val producer = createProducer(trustStoreFile2, retries = 0)
+    val consumer = createConsumer("group1", trustStoreFile2, topic2)
+    verifyProduceConsume(producer, consumer, 10, topic2)
+
+    // Broker keystore update for internal listener with incompatible keystore should fail without update
+    alterSslKeystore(adminClient, sslProperties2, SecureInternal, expectFailure = true)
+    verifyProduceConsume(producer, consumer, 10, topic2)
+
+    // Broker keystore update for internal listener with incompatible keystore should succeed
+    val sslPropertiesCopy = sslProperties1.clone().asInstanceOf[Properties]
+    val oldFile = new File(sslProperties1.getProperty(SSL_KEYSTORE_LOCATION_CONFIG))
+    val newFile = File.createTempFile("keystore", ".jks")
+    Files.copy(oldFile.toPath, newFile.toPath, StandardCopyOption.REPLACE_EXISTING)
+    sslPropertiesCopy.setProperty(SSL_KEYSTORE_LOCATION_CONFIG, newFile.getPath)
+    alterSslKeystore(adminClient, sslPropertiesCopy, SecureInternal)
+    verifyProduceConsume(producer, consumer, 10, topic2)
+
+    // Verify that all messages sent with retries=0 while keystores were being altered were consumed
+    stopAndVerifyProduceConsume(producerThread, consumerThread, mayFailRequests = false)
+  }
+
+  private def createProducer(trustStore: File, retries: Int,
+                             clientId: String = "test-producer"): KafkaProducer[String, String] = {
+    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+    val propsOverride = new Properties
+    propsOverride.put(ProducerConfig.CLIENT_ID_CONFIG, clientId)
+    val producer = TestUtils.createNewProducer(
+      bootstrapServers,
+      acks = -1,
+      retries = retries,
+      securityProtocol = SecurityProtocol.SASL_SSL,
+      trustStoreFile = Some(trustStore),
+      saslProperties = Some(clientSaslProps),
+      keySerializer = new StringSerializer,
+      valueSerializer = new StringSerializer,
+      props = Some(propsOverride))
+    producers += producer
+    producer
+  }
+
+  private def createConsumer(groupId: String, trustStore: File, topic: String = topic):KafkaConsumer[String, String] = {
+    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(SecureExternal))
+    val consumer = TestUtils.createNewConsumer(
+      bootstrapServers,
+      groupId,
+      securityProtocol = SecurityProtocol.SASL_SSL,
+      trustStoreFile = Some(trustStore),
+      saslProperties = Some(clientSaslProps),
+      keyDeserializer = new StringDeserializer,
+      valueDeserializer = new StringDeserializer)
+    consumer.subscribe(Collections.singleton(topic))
+    consumers += consumer
+    consumer
+  }
+
+  private def createAdminClient(securityProtocol: SecurityProtocol, listenerName: String): AdminClient = {
+    val config = new util.HashMap[String, Object]
+    val bootstrapServers = TestUtils.bootstrapServers(servers, new ListenerName(listenerName))
+    config.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
+    val securityProps: util.Map[Object, Object] =
+      TestUtils.adminClientSecurityConfigs(securityProtocol, Some(trustStoreFile1), Some(clientSaslProps))
+    securityProps.asScala.foreach { case (key, value) => config.put(key.asInstanceOf[String], value) }
+    val adminClient = AdminClient.create(config)
+    adminClients += adminClient
+    adminClient
+  }
+
+  private def verifyProduceConsume(producer: KafkaProducer[String, String],
+                                   consumer: KafkaConsumer[String, String],
+                                   numRecords: Int,
+                                   topic: String = topic): Unit = {
+    val producerRecords = (1 to numRecords).map(i => new ProducerRecord(topic, s"key$i", s"value$i"))
+    producerRecords.map(producer.send).map(_.get(10, TimeUnit.SECONDS))
+
+    val records = new ArrayBuffer[ConsumerRecord[String, String]]
+    TestUtils.waitUntilTrue(() => {
+      records ++= consumer.poll(50).asScala
+      records.size == numRecords
+    }, s"Consumed ${records.size} records until timeout instead of the expected $numRecords records")
+  }
+
+  private def verifyAuthenticationFailure(producer: KafkaProducer[_, _]): Unit = {
+    try {
+      producer.partitionsFor(topic)
+      fail("Producer connection did not fail with invalid keystore")
+    } catch {
+      case _:AuthenticationException => // expected exception
+    }
+  }
+
+  private def describeConfig(adminClient: AdminClient): Config = {
+    val configResources = servers.map { server =>
+      new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+    }
+    val describeOptions = new DescribeConfigsOptions().includeSynonyms(true)
+    val describeResult = adminClient.describeConfigs(configResources.asJava, describeOptions).all.get
+    assertEquals(servers.size, describeResult.values.size)
+    val configDescription = describeResult.values.iterator.next
+    assertFalse("Configs are empty", configDescription.entries.isEmpty)
+    configDescription
+  }
+
+  private def alterSslKeystore(adminClient: AdminClient, props: Properties, listener: String, expectFailure: Boolean  = false): Unit = {
+    val newProps = new Properties
+    val configPrefix = new ListenerName(listener).configPrefix
+    val keystoreLocation = props.getProperty(SSL_KEYSTORE_LOCATION_CONFIG)
+    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation)
+    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_TYPE_CONFIG", props.getProperty(SSL_KEYSTORE_TYPE_CONFIG))
+    newProps.setProperty(s"$configPrefix$SSL_KEYSTORE_PASSWORD_CONFIG", props.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+    newProps.setProperty(s"$configPrefix$SSL_KEY_PASSWORD_CONFIG", props.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+    reconfigureServers(newProps, perBrokerConfig = true, (s"$configPrefix$SSL_KEYSTORE_LOCATION_CONFIG", keystoreLocation), expectFailure)
+  }
+
+  private def alterConfigs(adminClient: AdminClient, props: Properties, perBrokerConfig: Boolean): AlterConfigsResult = {
+    val configEntries = props.asScala.map { case (k, v) => new ConfigEntry(k, v) }.toList.asJava
+    val newConfig = new Config(configEntries)
+    val configs = if (perBrokerConfig) {
+      servers.map { server =>
+        val resource = new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString)
+        (resource, newConfig)
+      }.toMap.asJava
+    } else {
+      Map(new ConfigResource(ConfigResource.Type.BROKER, "") -> newConfig).asJava
+    }
+    adminClient.alterConfigs(configs)
+  }
+
+  private def reconfigureServers(newProps: Properties, perBrokerConfig: Boolean, aPropToVerify: (String, String), expectFailure: Boolean = false): Unit = {
+    val alterResult = alterConfigs(adminClients.head, newProps, perBrokerConfig)
+    if (expectFailure) {
+      val oldProps = servers.head.config.values.asScala.filterKeys(newProps.containsKey)
+      val brokerResources = if (perBrokerConfig)
+        servers.map(server => new ConfigResource(ConfigResource.Type.BROKER, server.config.brokerId.toString))
+      else
+        Seq(new ConfigResource(ConfigResource.Type.BROKER, ""))
+      brokerResources.foreach { brokerResource =>
+        val exception = intercept[ExecutionException](alterResult.values.get(brokerResource).get)
+        assertTrue(exception.getCause.isInstanceOf[InvalidRequestException])
+      }
+      assertEquals(oldProps, servers.head.config.values.asScala.filterKeys(newProps.containsKey))
+    } else {
+      alterResult.all.get
+      waitForConfig(aPropToVerify._1, aPropToVerify._2)
+    }
+  }
+
+  private def configEntry(configDesc: Config, configName: String): ConfigEntry = {
+    configDesc.entries.asScala.find(cfg => cfg.name == configName)
+      .getOrElse(throw new IllegalStateException(s"Config not found $configName"))
+  }
+
+  private def addKeystoreWithListenerPrefix(srcProps: Properties, destProps: Properties, listener: String): Unit = {
+    val listenerPrefix = new ListenerName(listener).configPrefix
+    destProps.put(listenerPrefix + SSL_KEYSTORE_TYPE_CONFIG, srcProps.get(SSL_KEYSTORE_TYPE_CONFIG))
+    destProps.put(listenerPrefix + SSL_KEYSTORE_LOCATION_CONFIG, srcProps.get(SSL_KEYSTORE_LOCATION_CONFIG))
+    destProps.put(listenerPrefix + SSL_KEYSTORE_PASSWORD_CONFIG, srcProps.get(SSL_KEYSTORE_PASSWORD_CONFIG).asInstanceOf[Password].value)
+    destProps.put(listenerPrefix + SSL_KEY_PASSWORD_CONFIG, srcProps.get(SSL_KEY_PASSWORD_CONFIG).asInstanceOf[Password].value)
+  }
+
+  private def configureDynamicKeystoreInZooKeeper(kafkaConfig: KafkaConfig, brokers: Seq[Int], sslProperties: Properties): Unit = {
+    val keystoreProps = new Properties
+    addKeystoreWithListenerPrefix(sslProperties, keystoreProps, SecureExternal)
+    kafkaConfig.dynamicConfig.toPersistentProps(keystoreProps, perBrokerConfig = true)
+    zkClient.makeSurePersistentPathExists(ConfigEntityChangeNotificationZNode.path)
+    adminZkClient.changeBrokerConfig(brokers, keystoreProps)
+  }
+
+  private def waitForKeystore(sslProperties: Properties, maxWaitMs: Long = 10000): Unit = {
+    waitForConfig(new ListenerName(SecureExternal).configPrefix + SSL_KEYSTORE_LOCATION_CONFIG,
+      sslProperties.getProperty(SSL_KEYSTORE_LOCATION_CONFIG), maxWaitMs)
+
+  }
+
+  private def waitForConfig(propName: String, propValue: String, maxWaitMs: Long = 10000): Unit = {
+    servers.foreach { server =>
+      TestUtils.retry(maxWaitMs) {
+        assertEquals(propValue, server.config.originals.get(propName))
+      }
+    }
+  }
+
+  private def invalidSslConfigs: Properties = {
+    val props = new Properties
+    props.put(SSL_KEYSTORE_LOCATION_CONFIG, "invalid/file/path")
+    props.put(SSL_KEYSTORE_PASSWORD_CONFIG, new Password("invalid"))
+    props.put(SSL_KEY_PASSWORD_CONFIG, new Password("invalid"))
+    props.put(SSL_KEYSTORE_TYPE_CONFIG, "PKCS12")
+    props
+  }
+
+  private def startProduceConsume(retries: Int): (ProducerThread, ConsumerThread) = {
+    val producerThread = new ProducerThread(retries)
+    clientThreads += producerThread
+    val consumerThread = new ConsumerThread(producerThread)
+    clientThreads += consumerThread
+    consumerThread.start()
+    producerThread.start()
+    (producerThread, consumerThread)
+  }
+
+  private def stopAndVerifyProduceConsume(producerThread: ProducerThread, consumerThread: ConsumerThread,
+                                                                                   mayFailRequests: Boolean): Unit = {
+    producerThread.shutdown()
+    consumerThread.initiateShutdown()
+    consumerThread.awaitShutdown()
+    if (!mayFailRequests)
+      assertEquals(producerThread.sent, consumerThread.received)
+    else {
+      assertTrue(s"Some messages not received, sent=${producerThread.sent} received=${consumerThread.received}",
+        consumerThread.received >= producerThread.sent)
+    }
+  }
+
+  private class ProducerThread(retries: Int) extends ShutdownableThread("test-producer", isInterruptible = false) {
+    private val producer = createProducer(trustStoreFile1, retries)
+    @volatile var sent = 0
+    override def doWork(): Unit = {
+        try {
+            while (isRunning.get) {
+                sent += 1
+                val record = new ProducerRecord(topic, s"key$sent", s"value$sent")
+                producer.send(record).get(10, TimeUnit.SECONDS)
+              }
+          } finally {
+            producer.close()
+          }
+      }
+  }
+
+  private class ConsumerThread(producerThread: ProducerThread) extends ShutdownableThread("test-consumer", isInterruptible = false) {
+    private val consumer = createConsumer("group1", trustStoreFile1)
+    @volatile private var endTimeMs = Long.MaxValue
+    var received = 0
+    override def doWork(): Unit = {
+      try {
+        while (isRunning.get || (received < producerThread.sent && System.currentTimeMillis < endTimeMs)) {
+          received += consumer.poll(50).count
+        }
+      } finally {
+        consumer.close()
+      }
+    }
+    override def initiateShutdown(): Boolean = {
+      endTimeMs = System.currentTimeMillis + 10 * 1000
+      super.initiateShutdown()
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
new file mode 100755
index 0000000..2032011
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.Properties
+
+import kafka.utils.TestUtils
+import org.apache.kafka.common.config.{ConfigException, SslConfigs}
+import org.junit.Assert._
+import org.junit.Test
+
+class DynamicBrokerConfigTest {
+
+  @Test
+  def testConfigUpdate(): Unit = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    val oldKeystore = "oldKs.jks"
+    props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, oldKeystore)
+    val config = KafkaConfig(props)
+    val dynamicConfig = config.dynamicConfig
+    assertSame(config, dynamicConfig.currentKafkaConfig)
+    assertEquals(oldKeystore, config.sslKeystoreLocation)
+    assertEquals(oldKeystore,
+      config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+    assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+    (1 to 2).foreach { i =>
+      val props1 = new Properties
+      val newKeystore = s"ks$i.jks"
+      props1.put(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}", newKeystore)
+      dynamicConfig.updateBrokerConfig(0, props1)
+      assertNotSame(config, dynamicConfig.currentKafkaConfig)
+
+      assertEquals(newKeystore,
+        config.valuesWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(newKeystore,
+        config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(newKeystore,
+        config.valuesWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(newKeystore,
+        config.originalsWithPrefix("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+      assertEquals(oldKeystore, config.sslKeystoreLocation)
+      assertEquals(oldKeystore, config.originals.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.values.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.originalsStrings.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+
+      assertEquals(oldKeystore,
+        config.valuesFromThisConfigWithPrefixOverride("listener.name.external.").get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.valuesFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.originalsFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+      assertEquals(oldKeystore, config.valuesFromThisConfig.get(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG))
+    }
+  }
+
+  @Test
+  def testConfigUpdateWithSomeInvalidConfigs(): Unit = {
+    val origProps = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181)
+    origProps.put(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS")
+    val config = KafkaConfig(origProps)
+
+    def verifyConfigUpdateWithInvalidConfig(validProps: Map[String, String], invalidProps: Map[String, String]): Unit = {
+      val props = new Properties
+      validProps.foreach { case (k, v) => props.put(k, v) }
+      invalidProps.foreach { case (k, v) => props.put(k, v) }
+
+      // DynamicBrokerConfig#validate is used by AdminClient to validate the configs provided in
+      // in an AlterConfigs request. Validation should fail with an exception if any of the configs are invalid.
+      try {
+        config.dynamicConfig.validate(props, perBrokerConfig = true)
+        fail("Invalid config did not fail validation")
+      } catch {
+        case e: ConfigException => // expected exception
+      }
+
+      // DynamicBrokerConfig#updateBrokerConfig is used to update configs from ZooKeeper during
+      // startup and when configs are updated in ZK. Update should apply valid configs and ignore
+      // invalid ones.
+      config.dynamicConfig.updateBrokerConfig(0, props)
+      validProps.foreach { case (name, value) => assertEquals(value, config.originals.get(name)) }
+      invalidProps.keySet.foreach { name =>
+        assertEquals(origProps.get(name), config.originals.get(name))
+      }
+    }
+
+    val validProps = Map(s"listener.name.external.${SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG}" ->"ks.p12")
+    val securityPropsWithoutListenerPrefix = Map(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG -> "PKCS12")
+    verifyConfigUpdateWithInvalidConfig(validProps, securityPropsWithoutListenerPrefix)
+    val nonDynamicProps = Map(KafkaConfig.ZkConnectProp -> "somehost:2181")
+    verifyConfigUpdateWithInvalidConfig(validProps, nonDynamicProps)
+  }
+
+  @Test
+  def testSecurityConfigs(): Unit = {
+    def verifyUpdate(name: String, value: Object, invalidValue: Boolean): Unit = {
+      verifyConfigUpdate(name, value, perBrokerConfig = true, expectFailure = true)
+      verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = true, expectFailure = invalidValue)
+      verifyConfigUpdate(name, value, perBrokerConfig = false, expectFailure = true)
+      verifyConfigUpdate(s"listener.name.external.$name", value, perBrokerConfig = false, expectFailure = true)
+    }
+
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "ks.jks", invalidValue = false)
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_TYPE_CONFIG, "JKS", invalidValue = false)
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "password", invalidValue = false)
+    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "password", invalidValue = false)
+    verifyUpdate(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
+    verifyUpdate(SslConfigs.SSL_KEY_PASSWORD_CONFIG, 1.asInstanceOf[Integer], invalidValue = true)
+  }
+
+  private def verifyConfigUpdate(name: String, value: Object, perBrokerConfig: Boolean, expectFailure: Boolean) {
+    val config = KafkaConfig(TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 8181))
+    val props = new Properties
+    props.put(name, value)
+    val oldValue = config.originals.get(name)
+
+    def updateConfig() = {
+      if (perBrokerConfig)
+        config.dynamicConfig.updateBrokerConfig(0, props)
+      else
+        config.dynamicConfig.updateDefaultConfig(props)
+    }
+    if (!expectFailure) {
+      config.dynamicConfig.validate(props, perBrokerConfig)
+      updateConfig()
+      assertEquals(value, config.originals.get(name))
+    } else {
+      try {
+        config.dynamicConfig.validate(props, perBrokerConfig)
+        fail("Invalid config did not fail validation")
+      } catch {
+        case e: Exception => // expected exception
+      }
+      updateConfig()
+      assertEquals(oldValue, config.originals.get(name))
+    }
+  }
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
index b2378cf..cf09849 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigTest.scala
@@ -26,11 +26,6 @@ class DynamicConfigTest  extends ZooKeeperTestHarness {
   private final val someValue: String = "some interesting value"
 
   @Test(expected = classOf[IllegalArgumentException])
-  def shouldFailWhenChangingBrokerUnknownConfig() {
-    adminZkClient.changeBrokerConfig(Seq(0), propsWith(nonExistentConfig, someValue))
-  }
-
-  @Test(expected = classOf[IllegalArgumentException])
   def shouldFailWhenChangingClientIdUnknownConfig() {
     adminZkClient.changeClientIdConfig("ClientId", propsWith(nonExistentConfig, someValue))
   }
@@ -51,4 +46,4 @@ class DynamicConfigTest  extends ZooKeeperTestHarness {
     adminZkClient.changeBrokerConfig(Seq(0),
       propsWith(DynamicConfig.Broker.FollowerReplicationThrottledRateProp, "-100"))
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index 09e4c94..30a10c7 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -49,7 +49,7 @@ import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.network.{ListenerName, Mode}
 import org.apache.kafka.common.record._
 import org.apache.kafka.common.security.auth.SecurityProtocol
-import org.apache.kafka.common.serialization.{ByteArraySerializer, Serializer}
+import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer, Deserializer, Serializer}
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.common.utils.Utils._
 import org.apache.kafka.test.{TestSslUtils, TestUtils => JTestUtils}
@@ -613,16 +613,18 @@ object TestUtils extends Logging {
   /**
    * Create a new consumer with a few pre-configured properties.
    */
-  def createNewConsumer(brokerList: String,
-                        groupId: String = "group",
-                        autoOffsetReset: String = "earliest",
-                        partitionFetchSize: Long = 4096L,
-                        partitionAssignmentStrategy: String = classOf[RangeAssignor].getName,
-                        sessionTimeout: Int = 30000,
-                        securityProtocol: SecurityProtocol,
-                        trustStoreFile: Option[File] = None,
-                        saslProperties: Option[Properties] = None,
-                        props: Option[Properties] = None) : KafkaConsumer[Array[Byte],Array[Byte]] = {
+  def createNewConsumer[K, V](brokerList: String,
+                              groupId: String = "group",
+                              autoOffsetReset: String = "earliest",
+                              partitionFetchSize: Long = 4096L,
+                              partitionAssignmentStrategy: String = classOf[RangeAssignor].getName,
+                              sessionTimeout: Int = 30000,
+                              securityProtocol: SecurityProtocol,
+                              trustStoreFile: Option[File] = None,
+                              saslProperties: Option[Properties] = None,
+                              keyDeserializer: Deserializer[K] = new ByteArrayDeserializer,
+                              valueDeserializer: Deserializer[V] =new ByteArrayDeserializer,
+                              props: Option[Properties] = None) : KafkaConsumer[K, V] = {
     import org.apache.kafka.clients.consumer.ConsumerConfig
 
     val consumerProps = props.getOrElse(new Properties())
@@ -633,8 +635,6 @@ object TestUtils extends Logging {
     val defaultProps = Map(
       ConsumerConfig.RETRY_BACKOFF_MS_CONFIG -> "100",
       ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG -> "200",
-      ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
-      ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.ByteArrayDeserializer",
       ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG -> partitionAssignmentStrategy,
       ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG -> sessionTimeout.toString,
       ConsumerConfig.GROUP_ID_CONFIG -> groupId)
@@ -652,7 +652,7 @@ object TestUtils extends Logging {
     if(!consumerProps.containsKey(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG))
       consumerProps ++= consumerSecurityConfigs(securityProtocol, trustStoreFile, saslProperties)
 
-    new KafkaConsumer[Array[Byte],Array[Byte]](consumerProps)
+    new KafkaConsumer[K, V](consumerProps, keyDeserializer, valueDeserializer)
   }
 
   /**

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.

Mime
View raw message