kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [kafka] branch trunk updated: KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921)
Date Sun, 13 Jan 2019 18:18:03 GMT
This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 8afce0e  KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921)
8afce0e is described below

commit 8afce0e338e3715824e309f6289af6bc607ce020
Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
AuthorDate: Sun Jan 13 10:17:52 2019 -0800

    KAFKA-4453 : Added code to separate controller connections and requests from the data plane (#5921)
    
    KIP-291 Implementation : Added code to separate controller connections and requests from the data plane.
    
    Tested with local deployment that the controller request are handled by the control plane and other requests are handled by the data plane.
    Also added unit tests in order to test the functionality.
    
    Author: Lucas Wang <luwang@linkedin.com>,
    Author: Mayuresh Gharat <gharatmayuresh15@gmail.com>
    
    Reviewers: Joel Koshy <jjkoshy@gmail.com>, Jun Rao <junrao@gmail.com>
---
 .../controller/ControllerChannelManager.scala      |   8 +-
 .../main/scala/kafka/network/RequestChannel.scala  |  13 +-
 .../main/scala/kafka/network/SocketServer.scala    | 235 ++++++++++++++-------
 .../scala/kafka/server/DynamicBrokerConfig.scala   |   2 +-
 core/src/main/scala/kafka/server/KafkaConfig.scala |  63 +++++-
 .../scala/kafka/server/KafkaRequestHandler.scala   |  10 +-
 core/src/main/scala/kafka/server/KafkaServer.scala |  38 +++-
 .../kafka/api/AdminClientIntegrationTest.scala     |   4 +-
 .../kafka/api/AuthorizerIntegrationTest.scala      |  10 +-
 .../integration/kafka/api/BaseQuotaTest.scala      |   2 +-
 .../kafka/api/EndToEndAuthorizationTest.scala      |  46 ++--
 .../api/SaslSslAdminClientIntegrationTest.scala    |   4 +-
 .../server/DynamicBrokerReconfigurationTest.scala  |   6 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   4 +-
 .../controller/ControllerIntegrationTest.scala     |  45 +++-
 .../unit/kafka/network/SocketServerTest.scala      | 218 ++++++++++---------
 .../kafka/server/DynamicConfigChangeTest.scala     |   4 +-
 .../scala/unit/kafka/server/KafkaConfigTest.scala  |  25 +++
 .../unit/kafka/server/MetadataRequestTest.scala    |   8 +-
 .../scala/unit/kafka/server/RequestQuotaTest.scala |   2 +-
 .../test/scala/unit/kafka/utils/TestUtils.scala    |   4 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   4 +-
 .../integration/utils/IntegrationTestUtils.java    |   2 +-
 23 files changed, 500 insertions(+), 257 deletions(-)

diff --git a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
index b5c6a91..083e952 100755
--- a/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
+++ b/core/src/main/scala/kafka/controller/ControllerChannelManager.scala
@@ -107,14 +107,16 @@ class ControllerChannelManager(controllerContext: ControllerContext, config: Kaf
   private def addNewBroker(broker: Broker) {
     val messageQueue = new LinkedBlockingQueue[QueueItem]
     debug(s"Controller ${config.brokerId} trying to connect to broker ${broker.id}")
-    val brokerNode = broker.node(config.interBrokerListenerName)
+    val controllerToBrokerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+    val controllerToBrokerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+    val brokerNode = broker.node(controllerToBrokerListenerName)
     val logContext = new LogContext(s"[Controller id=${config.brokerId}, targetBrokerId=${brokerNode.idString}] ")
     val networkClient = {
       val channelBuilder = ChannelBuilders.clientChannelBuilder(
-        config.interBrokerSecurityProtocol,
+        controllerToBrokerSecurityProtocol,
         JaasContext.Type.SERVER,
         config,
-        config.interBrokerListenerName,
+        controllerToBrokerListenerName,
         config.saslMechanismInterBrokerProtocol,
         time,
         config.saslInterBrokerHandshakeRequestEnable
diff --git a/core/src/main/scala/kafka/network/RequestChannel.scala b/core/src/main/scala/kafka/network/RequestChannel.scala
index 00b0968..988c14f 100644
--- a/core/src/main/scala/kafka/network/RequestChannel.scala
+++ b/core/src/main/scala/kafka/network/RequestChannel.scala
@@ -41,6 +41,7 @@ object RequestChannel extends Logging {
 
   val RequestQueueSizeMetric = "RequestQueueSize"
   val ResponseQueueSizeMetric = "ResponseQueueSize"
+  val ControlPlaneMetricPrefix = "ControlPlane"
   val ProcessorMetricTag = "processor"
 
   def isRequestLoggingEnabled: Boolean = requestLogger.underlying.isDebugEnabled
@@ -272,17 +273,19 @@ object RequestChannel extends Logging {
   }
 }
 
-class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
+class RequestChannel(val queueSize: Int, val metricNamePrefix : String = "") extends KafkaMetricsGroup {
   import RequestChannel._
   val metrics = new RequestChannel.Metrics
   private val requestQueue = new ArrayBlockingQueue[BaseRequest](queueSize)
   private val processors = new ConcurrentHashMap[Int, Processor]()
+  val requestQueueSizeMetricName = metricNamePrefix.concat(RequestQueueSizeMetric)
+  val responseQueueSizeMetricName = metricNamePrefix.concat(ResponseQueueSizeMetric)
 
-  newGauge(RequestQueueSizeMetric, new Gauge[Int] {
+  newGauge(requestQueueSizeMetricName, new Gauge[Int] {
       def value = requestQueue.size
   })
 
-  newGauge(ResponseQueueSizeMetric, new Gauge[Int]{
+  newGauge(responseQueueSizeMetricName, new Gauge[Int]{
     def value = processors.values.asScala.foldLeft(0) {(total, processor) =>
       total + processor.responseQueueSize
     }
@@ -292,7 +295,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
     if (processors.putIfAbsent(processor.id, processor) != null)
       warn(s"Unexpected processor with processorId ${processor.id}")
 
-    newGauge(ResponseQueueSizeMetric,
+    newGauge(responseQueueSizeMetricName,
       new Gauge[Int] {
         def value = processor.responseQueueSize
       },
@@ -302,7 +305,7 @@ class RequestChannel(val queueSize: Int) extends KafkaMetricsGroup {
 
   def removeProcessor(processorId: Int): Unit = {
     processors.remove(processorId)
-    removeMetric(ResponseQueueSizeMetric, Map(ProcessorMetricTag -> processorId.toString))
+    removeMetric(responseQueueSizeMetricName, Map(ProcessorMetricTag -> processorId.toString))
   }
 
   /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
diff --git a/core/src/main/scala/kafka/network/SocketServer.scala b/core/src/main/scala/kafka/network/SocketServer.scala
index ae09a03..125efdc 100644
--- a/core/src/main/scala/kafka/network/SocketServer.scala
+++ b/core/src/main/scala/kafka/network/SocketServer.scala
@@ -51,13 +51,28 @@ import scala.collection.mutable.{ArrayBuffer, Buffer}
 import scala.util.control.ControlThrowable
 
 /**
- * An NIO socket server. The threading model is
- *   1 Acceptor thread that handles new connections
- *   Acceptor has N Processor threads that each have their own selector and read requests from sockets
- *   M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ * Handles new connections, requests and responses to and from broker.
+ * Kafka supports two types of request planes :
+ *  - data-plane :
+ *    - Handles requests from clients and other brokers in the cluster.
+ *    - The threading model is
+ *      1 Acceptor thread per listener, that handles new connections.
+ *      It is possible to configure multiple data-planes by specifying multiple "," separated endpoints for "listeners" in KafkaConfig.
+ *      Acceptor has N Processor threads that each have their own selector and read requests from sockets
+ *      M Handler threads that handle requests and produce responses back to the processor threads for writing.
+ *  - control-plane :
+ *    - Handles requests from controller. This is optional and can be configured by specifying "control.plane.listener.name".
+ *      If not configured, the controller requests are handled by the data-plane.
+ *    - The threading model is
+ *      1 Acceptor thread that handles new connections
+ *      Acceptor has 1 Processor thread that has its own selector and read requests from the socket.
+ *      1 Handler thread that handles requests and produce responses back to the processor thread for writing.
  */
 class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time, val credentialProvider: CredentialProvider) extends Logging with KafkaMetricsGroup {
 
+  val DataPlanePrefix = "data-plane"
+  val ControlPlanePrefix = "control-plane"
+
   private val maxQueuedRequests = config.queuedMaxRequests
 
   private val logContext = new LogContext(s"[SocketServer brokerId=${config.brokerId}] ")
@@ -68,11 +83,16 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   private val memoryPoolDepletedTimeMetricName = metrics.metricName("MemoryPoolDepletedTimeTotal", "socket-server-metrics")
   memoryPoolSensor.add(new Meter(TimeUnit.MILLISECONDS, memoryPoolDepletedPercentMetricName, memoryPoolDepletedTimeMetricName))
   private val memoryPool = if (config.queuedMaxBytes > 0) new SimpleMemoryPool(config.queuedMaxBytes, config.socketRequestMaxBytes, false, memoryPoolSensor) else MemoryPool.NONE
-  val requestChannel = new RequestChannel(maxQueuedRequests)
-  private val processors = new ConcurrentHashMap[Int, Processor]()
-  private var nextProcessorId = 0
+  // data-plane
+  private val dataPlaneProcessors = new ConcurrentHashMap[Int, Processor]()
+  private[network] val dataPlaneAcceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  val dataPlaneRequestChannel = new RequestChannel(maxQueuedRequests)
+  // control-plane
+  private var controlPlaneProcessorOpt : Option[Processor] = None
+  private[network] var controlPlaneAcceptorOpt : Option[Acceptor] = None
+  val controlPlaneRequestChannelOpt: Option[RequestChannel] = config.controlPlaneListenerName.map(_ => new RequestChannel(20, RequestChannel.ControlPlaneMetricPrefix))
 
-  private[network] val acceptors = new ConcurrentHashMap[EndPoint, Acceptor]()
+  private var nextProcessorId = 0
   private var connectionQuotas: ConnectionQuotas = _
   private var stoppedProcessingRequests = false
 
@@ -91,9 +111,11 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   def startup(startupProcessors: Boolean = true) {
     this.synchronized {
       connectionQuotas = new ConnectionQuotas(config.maxConnectionsPerIp, config.maxConnectionsPerIpOverrides)
-      createAcceptorAndProcessors(config.numNetworkThreads, config.listeners)
+      createControlPlaneAcceptorAndProcessor(config.controlPlaneListener)
+      createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, config.dataPlaneListeners)
       if (startupProcessors) {
-        startProcessors()
+        startControlPlaneProcessor()
+        startDataPlaneProcessors()
       }
     }
 
@@ -101,12 +123,25 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
-          val ioWaitRatioMetricNames = processors.values.asScala.map { p =>
+          val ioWaitRatioMetricNames = dataPlaneProcessors.values.asScala.map { p =>
             metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
           }
           ioWaitRatioMetricNames.map { metricName =>
             Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
-          }.sum / processors.size
+          }.sum / dataPlaneProcessors.size
+        }
+      }
+    )
+    newGauge("ControlPlaneNetworkProcessorAvgIdlePercent",
+      new Gauge[Double] {
+
+        def value = SocketServer.this.synchronized {
+          val ioWaitRatioMetricName = controlPlaneProcessorOpt.map { p =>
+            metrics.metricName("io-wait-ratio", "socket-server-metrics", p.metricTags)
+          }
+          ioWaitRatioMetricName.map { metricName =>
+            Option(metrics.metric(metricName)).fold(0.0)(m => Math.min(m.metricValue.asInstanceOf[Double], 1.0))
+          }.getOrElse(Double.NaN)
         }
       }
     )
@@ -124,7 +159,7 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
       new Gauge[Double] {
 
         def value = SocketServer.this.synchronized {
-          val expiredConnectionsKilledCountMetricNames = processors.values.asScala.map { p =>
+          val expiredConnectionsKilledCountMetricNames = dataPlaneProcessors.values.asScala.map { p =>
             metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
           }
           expiredConnectionsKilledCountMetricNames.map { metricName =>
@@ -133,96 +168,148 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
         }
       }
     )
-    info(s"Started ${acceptors.size} acceptor threads")
+    newGauge("ControlPlaneExpiredConnectionsKilledCount",
+      new Gauge[Double] {
+
+        def value = SocketServer.this.synchronized {
+          val expiredConnectionsKilledCountMetricNames = controlPlaneProcessorOpt.map { p =>
+            metrics.metricName("expired-connections-killed-count", "socket-server-metrics", p.metricTags)
+          }
+          expiredConnectionsKilledCountMetricNames.map { metricName =>
+            Option(metrics.metric(metricName)).fold(0.0)(m => m.metricValue.asInstanceOf[Double])
+          }.getOrElse(0.0)
+        }
+      }
+    )
+    info(s"Started ${dataPlaneAcceptors.size} acceptor threads for data-plane")
+    if (controlPlaneAcceptorOpt.isDefined)
+      info("Started control-plane acceptor thread")
   }
 
   /**
-   * Starts processors of all the acceptors of this server if they have not already been started.
-   * This method is used for delayed starting of processors if [[kafka.network.SocketServer#startup]]
+   * Starts processors of all the data-plane acceptors of this server if they have not already been started.
+   * This method is used for delayed starting of data-plane processors if [[kafka.network.SocketServer#startup]]
    * was invoked with `startupProcessors=false`.
    */
-  def startProcessors(): Unit = synchronized {
-    acceptors.values.asScala.foreach { _.startProcessors() }
-    info(s"Started processors for ${acceptors.size} acceptors")
+  def startDataPlaneProcessors(): Unit = synchronized {
+    dataPlaneAcceptors.values.asScala.foreach { _.startProcessors(DataPlanePrefix) }
+    info(s"Started data-plane processors for ${dataPlaneAcceptors.size} acceptors")
+  }
+
+  /**
+   * Start the processor of control-plane acceptor of this server if it has not already been started.
+   * This method is used for delayed starting of control-plane processor if [[kafka.network.SocketServer#startup]]
+   * was invoked with `startupProcessors=false`.
+   */
+  def startControlPlaneProcessor(): Unit = synchronized {
+    if (controlPlaneAcceptorOpt.isDefined) {
+      controlPlaneAcceptorOpt.get.startProcessors(ControlPlanePrefix)
+      info(s"Started control-plane processor for the control-plane acceptor")
+    }
   }
 
   private def endpoints = config.listeners.map(l => l.listenerName -> l).toMap
 
-  private def createAcceptorAndProcessors(processorsPerListener: Int,
-                                          endpoints: Seq[EndPoint]): Unit = synchronized {
+  private def createDataPlaneAcceptorsAndProcessors(dataProcessorsPerListener: Int,
+                                                    endpoints: Seq[EndPoint]): Unit = synchronized {
+    endpoints.foreach { endpoint =>
+      val dataPlaneAcceptor = createAcceptor(endpoint)
+      addDataPlaneProcessors(dataPlaneAcceptor, endpoint, dataProcessorsPerListener)
+      KafkaThread.nonDaemon(s"data-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", dataPlaneAcceptor).start()
+      dataPlaneAcceptor.awaitStartup()
+      dataPlaneAcceptors.put(endpoint, dataPlaneAcceptor)
+      info(s"Created data-plane acceptor and processors for endpoint : $endpoint")
+    }
+  }
+
+  private def createControlPlaneAcceptorAndProcessor(endpointOpt: Option[EndPoint]): Unit = synchronized {
+    endpointOpt.foreach { endpoint =>
+      val controlPlaneAcceptor = createAcceptor(endpoint)
+      val controlPlaneProcessor = newProcessor(nextProcessorId, controlPlaneRequestChannelOpt.get, connectionQuotas, endpoint.listenerName, endpoint.securityProtocol, memoryPool)
+      controlPlaneAcceptorOpt = Some(controlPlaneAcceptor)
+      controlPlaneProcessorOpt = Some(controlPlaneProcessor)
+      val listenerProcessors = new ArrayBuffer[Processor]()
+      listenerProcessors += controlPlaneProcessor
+      controlPlaneRequestChannelOpt.foreach(_.addProcessor(controlPlaneProcessor))
+      nextProcessorId += 1
+      controlPlaneAcceptor.addProcessors(listenerProcessors, ControlPlanePrefix)
+      KafkaThread.nonDaemon(s"control-plane-kafka-socket-acceptor-${endpoint.listenerName}-${endpoint.securityProtocol}-${endpoint.port}", controlPlaneAcceptor).start()
+      controlPlaneAcceptor.awaitStartup()
+      info(s"Created control-plane acceptor and processor for endpoint : $endpoint")
+    }
+  }
 
+  private def createAcceptor(endPoint: EndPoint) : Acceptor = synchronized {
     val sendBufferSize = config.socketSendBufferBytes
     val recvBufferSize = config.socketReceiveBufferBytes
     val brokerId = config.brokerId
-
-    endpoints.foreach { endpoint =>
-      val listenerName = endpoint.listenerName
-      val securityProtocol = endpoint.securityProtocol
-
-      val acceptor = new Acceptor(endpoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
-      addProcessors(acceptor, endpoint, processorsPerListener)
-      KafkaThread.nonDaemon(s"kafka-socket-acceptor-$listenerName-$securityProtocol-${endpoint.port}", acceptor).start()
-      acceptor.awaitStartup()
-      acceptors.put(endpoint, acceptor)
-    }
+    new Acceptor(endPoint, sendBufferSize, recvBufferSize, brokerId, connectionQuotas)
   }
 
-  private def addProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
+  private def addDataPlaneProcessors(acceptor: Acceptor, endpoint: EndPoint, newProcessorsPerListener: Int): Unit = synchronized {
     val listenerName = endpoint.listenerName
     val securityProtocol = endpoint.securityProtocol
     val listenerProcessors = new ArrayBuffer[Processor]()
-
     for (_ <- 0 until newProcessorsPerListener) {
-      val processor = newProcessor(nextProcessorId, connectionQuotas, listenerName, securityProtocol, memoryPool)
+      val processor = newProcessor(nextProcessorId, dataPlaneRequestChannel, connectionQuotas, listenerName, securityProtocol, memoryPool)
       listenerProcessors += processor
-      requestChannel.addProcessor(processor)
+      dataPlaneRequestChannel.addProcessor(processor)
       nextProcessorId += 1
     }
-    listenerProcessors.foreach(p => processors.put(p.id, p))
-    acceptor.addProcessors(listenerProcessors)
+    listenerProcessors.foreach(p => dataPlaneProcessors.put(p.id, p))
+    acceptor.addProcessors(listenerProcessors, DataPlanePrefix)
   }
 
   /**
-    * Stop processing requests and new connections.
-    */
+   * Stop processing requests and new connections.
+   */
   def stopProcessingRequests() = {
     info("Stopping socket server request processors")
     this.synchronized {
-      acceptors.asScala.values.foreach(_.shutdown())
-      processors.asScala.values.foreach(_.shutdown())
-      requestChannel.clear()
+      dataPlaneAcceptors.asScala.values.foreach(_.shutdown())
+      controlPlaneAcceptorOpt.foreach(_.shutdown())
+      dataPlaneProcessors.asScala.values.foreach(_.shutdown())
+      controlPlaneProcessorOpt.foreach(_.shutdown())
+      dataPlaneRequestChannel.clear()
+      controlPlaneRequestChannelOpt.foreach(_.clear())
       stoppedProcessingRequests = true
     }
     info("Stopped socket server request processors")
   }
 
   def resizeThreadPool(oldNumNetworkThreads: Int, newNumNetworkThreads: Int): Unit = synchronized {
-    info(s"Resizing network thread pool size for each listener from $oldNumNetworkThreads to $newNumNetworkThreads")
+    info(s"Resizing network thread pool size for each data-plane listener from $oldNumNetworkThreads to $newNumNetworkThreads")
     if (newNumNetworkThreads > oldNumNetworkThreads) {
-      acceptors.asScala.foreach { case (endpoint, acceptor) =>
-        addProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
+      dataPlaneAcceptors.asScala.foreach { case (endpoint, acceptor) =>
+        addDataPlaneProcessors(acceptor, endpoint, newNumNetworkThreads - oldNumNetworkThreads)
       }
     } else if (newNumNetworkThreads < oldNumNetworkThreads)
-      acceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, requestChannel))
+      dataPlaneAcceptors.asScala.values.foreach(_.removeProcessors(oldNumNetworkThreads - newNumNetworkThreads, dataPlaneRequestChannel))
   }
 
   /**
-    * Shutdown the socket server. If still processing requests, shutdown
-    * acceptors and processors first.
-    */
+   * Shutdown the socket server. If still processing requests, shutdown
+   * acceptors and processors first.
+   */
   def shutdown() = {
     info("Shutting down socket server")
     this.synchronized {
       if (!stoppedProcessingRequests)
         stopProcessingRequests()
-      requestChannel.shutdown()
+      dataPlaneRequestChannel.shutdown()
+      controlPlaneRequestChannelOpt.foreach(_.shutdown())
     }
     info("Shutdown completed")
   }
 
   def boundPort(listenerName: ListenerName): Int = {
     try {
-      acceptors.get(endpoints(listenerName)).serverChannel.socket.getLocalPort
+      val acceptor = dataPlaneAcceptors.get(endpoints(listenerName))
+      if (acceptor != null) {
+        acceptor.serverChannel.socket.getLocalPort
+      } else {
+        controlPlaneAcceptorOpt.map (_.serverChannel.socket().getLocalPort).getOrElse(throw new KafkaException("Could not find listenerName : " + listenerName + " in data-plane or control-plane"))
+      }
     } catch {
       case e: Exception =>
         throw new KafkaException("Tried to check server's port before server was started or checked for port of non-existing protocol", e)
@@ -230,15 +317,15 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
   }
 
   def addListeners(listenersAdded: Seq[EndPoint]): Unit = synchronized {
-    info(s"Adding listeners for endpoints $listenersAdded")
-    createAcceptorAndProcessors(config.numNetworkThreads, listenersAdded)
-    startProcessors()
+    info(s"Adding data-plane listeners for endpoints $listenersAdded")
+    createDataPlaneAcceptorsAndProcessors(config.numNetworkThreads, listenersAdded)
+    startDataPlaneProcessors()
   }
 
   def removeListeners(listenersRemoved: Seq[EndPoint]): Unit = synchronized {
-    info(s"Removing listeners for endpoints $listenersRemoved")
+    info(s"Removing data-plane listeners for endpoints $listenersRemoved")
     listenersRemoved.foreach { endpoint =>
-      acceptors.asScala.remove(endpoint).foreach(_.shutdown())
+      dataPlaneAcceptors.asScala.remove(endpoint).foreach(_.shutdown())
     }
   }
 
@@ -252,8 +339,8 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     connectionQuotas.updateMaxConnectionsPerIpOverride(maxConnectionsPerIpOverrides)
   }
 
-  /* `protected` for test usage */
-  protected[network] def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+  // `protected` for test usage
+  protected[network] def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                       securityProtocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
     new Processor(id,
       time,
@@ -272,12 +359,12 @@ class SocketServer(val config: KafkaConfig, val metrics: Metrics, val time: Time
     )
   }
 
-  /* For test usage */
+  // For test usage
   private[network] def connectionCount(address: InetAddress): Int =
     Option(connectionQuotas).fold(0)(_.get(address))
 
-  /* For test usage */
-  private[network] def processor(index: Int): Processor = processors.get(index)
+  // For test usage
+  private[network] def dataPlaneProcessor(index: Int): Processor = dataPlaneProcessors.get(index)
 
 }
 
@@ -357,21 +444,21 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
   private val processors = new ArrayBuffer[Processor]()
   private val processorsStarted = new AtomicBoolean
 
-  private[network] def addProcessors(newProcessors: Buffer[Processor]): Unit = synchronized {
+  private[network] def addProcessors(newProcessors: Buffer[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors ++= newProcessors
     if (processorsStarted.get)
-      startProcessors(newProcessors)
+      startProcessors(newProcessors, processorThreadPrefix)
   }
 
-  private[network] def startProcessors(): Unit = synchronized {
+  private[network] def startProcessors(processorThreadPrefix: String): Unit = synchronized {
     if (!processorsStarted.getAndSet(true)) {
-      startProcessors(processors)
+      startProcessors(processors, processorThreadPrefix)
     }
   }
 
-  private def startProcessors(processors: Seq[Processor]): Unit = synchronized {
+  private def startProcessors(processors: Seq[Processor], processorThreadPrefix: String): Unit = synchronized {
     processors.foreach { processor =>
-      KafkaThread.nonDaemon(s"kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
+      KafkaThread.nonDaemon(processorThreadPrefix + s"-kafka-network-thread-$brokerId-${endPoint.listenerName}-${endPoint.securityProtocol}-${processor.id}",
         processor).start()
     }
   }
@@ -444,9 +531,9 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     }
   }
 
-  /*
-   * Create a server socket to listen for connections on.
-   */
+  /**
+  * Create a server socket to listen for connections on.
+  */
   private def openServerSocket(host: String, port: Int): ServerSocketChannel = {
     val socketAddress =
       if (host == null || host.trim.isEmpty)
@@ -468,7 +555,7 @@ private[kafka] class Acceptor(val endPoint: EndPoint,
     serverChannel
   }
 
-  /*
+  /**
    * Accept a new connection
    */
   def accept(key: SelectionKey, processor: Processor) {
@@ -564,7 +651,7 @@ private[kafka] class Processor(val id: Int,
     // also includes the listener name)
     Map(NetworkProcessorMetricTag -> id.toString)
   )
-  
+
   val expiredConnectionsKilledCount = new Total()
   private val expiredConnectionsKilledCountMetricName = metrics.metricName("expired-connections-killed-count", "socket-server-metrics", metricTags)
   metrics.addMetric(expiredConnectionsKilledCountMetricName, expiredConnectionsKilledCount)
@@ -688,7 +775,7 @@ private[kafka] class Processor(val id: Int,
     }
   }
 
-  /* `protected` for test usage */
+  // `protected` for test usage
   protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
     val connectionId = response.request.context.connectionId
     trace(s"Socket server received response to send to $connectionId, registering for write and sending data: $response")
diff --git a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
index 2c0f6c1..1c56572 100755
--- a/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
+++ b/core/src/main/scala/kafka/server/DynamicBrokerConfig.scala
@@ -640,7 +640,7 @@ class DynamicThreadPool(server: KafkaServer) extends BrokerReconfigurable {
 
   override def reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit = {
     if (newConfig.numIoThreads != oldConfig.numIoThreads)
-      server.requestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
+      server.dataPlaneRequestHandlerPool.resizeThreadPool(newConfig.numIoThreads)
     if (newConfig.numNetworkThreads != oldConfig.numNetworkThreads)
       server.socketServer.resizeThreadPool(oldConfig.numNetworkThreads, newConfig.numNetworkThreads)
     if (newConfig.numReplicaFetchers != oldConfig.numReplicaFetchers)
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 13f555a..c3c3295 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -286,6 +286,7 @@ object KafkaConfig {
   val AdvertisedPortProp = "advertised.port"
   val AdvertisedListenersProp = "advertised.listeners"
   val ListenerSecurityProtocolMapProp = "listener.security.protocol.map"
+  val ControlPlaneListenerNameProp = "control.plane.listener.name"
   val SocketSendBufferBytesProp = "socket.send.buffer.bytes"
   val SocketReceiveBufferBytesProp = "socket.receive.buffer.bytes"
   val SocketRequestMaxBytesProp = "socket.request.max.bytes"
@@ -503,7 +504,7 @@ object KafkaConfig {
   val NumIoThreadsDoc = "The number of threads that the server uses for processing requests, which may include disk I/O"
   val NumReplicaAlterLogDirsThreadsDoc = "The number of threads that can move replicas between log directories, which may include disk I/O"
   val BackgroundThreadsDoc = "The number of threads to use for various background processing tasks"
-  val QueuedMaxRequestsDoc = "The number of queued requests allowed before blocking the network threads"
+  val QueuedMaxRequestsDoc = "The number of queued requests allowed for data-plane, before blocking the network threads"
   val QueuedMaxRequestBytesDoc = "The number of queued bytes allowed before no more requests are read"
   val RequestTimeoutMsDoc = CommonClientConfigs.REQUEST_TIMEOUT_MS_DOC
   /************* Authorizer Configuration ***********/
@@ -546,6 +547,22 @@ object KafkaConfig {
     "prefix (the listener name is lowercased) to the config name. For example, to set a different keystore for the " +
     "INTERNAL listener, a config with name <code>listener.name.internal.ssl.keystore.location</code> would be set. " +
     "If the config for the listener name is not set, the config will fallback to the generic config (i.e. <code>ssl.keystore.location</code>). "
+  val controlPlaneListenerNameDoc = "Name of listener used for communication between controller and brokers. " +
+    s"Broker will use the $ControlPlaneListenerNameProp to locate the endpoint in $ListenersProp list, to listen for connections from the controller. " +
+    "For example, if a broker's config is :\n" +
+    "listeners = INTERNAL://192.1.1.8:9092, EXTERNAL://10.1.1.5:9093, CONTROLLER://192.1.1.8:9094\n" +
+    "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+    "control.plane.listener.name = CONTROLLER\n" +
+    "On startup, the broker will start listening on \"192.1.1.8:9094\" with security protocol \"SSL\".\n" +
+    s"On controller side, when it discovers a broker's published endpoints through zookeeper, it will use the $ControlPlaneListenerNameProp " +
+    "to find the endpoint, which it will use to establish connection to the broker.\n" +
+    "For example, if the broker's published endpoints on zookeeper are :\n" +
+    "\"endpoints\" : [\"INTERNAL://broker1.example.com:9092\",\"EXTERNAL://broker1.example.com:9093\",\"CONTROLLER://broker1.example.com:9094\"]\n" +
+    " and the controller's config is :\n" +
+    "listener.security.protocol.map = INTERNAL:PLAINTEXT, EXTERNAL:SSL, CONTROLLER:SSL\n" +
+    "control.plane.listener.name = CONTROLLER\n" +
+    "then controller will use \"broker1.example.com:9094\" with security protocol \"SSL\" to connect to the broker.\n" +
+    "If not explicitly configured, the default value will be null and there will be no dedicated endpoints for controller connections."
 
   val SocketSendBufferBytesDoc = "The SO_SNDBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
   val SocketReceiveBufferBytesDoc = "The SO_RCVBUF buffer of the socket sever sockets. If the value is -1, the OS default will be used."
@@ -846,6 +863,7 @@ object KafkaConfig {
       .define(AdvertisedPortProp, INT, null, HIGH, AdvertisedPortDoc)
       .define(AdvertisedListenersProp, STRING, null, HIGH, AdvertisedListenersDoc)
       .define(ListenerSecurityProtocolMapProp, STRING, Defaults.ListenerSecurityProtocolMap, LOW, ListenerSecurityProtocolMapDoc)
+      .define(ControlPlaneListenerNameProp, STRING, null, HIGH, controlPlaneListenerNameDoc)
       .define(SocketSendBufferBytesProp, INT, Defaults.SocketSendBufferBytes, HIGH, SocketSendBufferBytesDoc)
       .define(SocketReceiveBufferBytesProp, INT, Defaults.SocketReceiveBufferBytes, HIGH, SocketReceiveBufferBytesDoc)
       .define(SocketRequestMaxBytesProp, INT, Defaults.SocketRequestMaxBytes, atLeast(1), HIGH, SocketRequestMaxBytesDoc)
@@ -1265,6 +1283,8 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
 
   def interBrokerListenerName = getInterBrokerListenerNameAndSecurityProtocol._1
   def interBrokerSecurityProtocol = getInterBrokerListenerNameAndSecurityProtocol._2
+  def controlPlaneListenerName = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => listenerName }
+  def controlPlaneSecurityProtocol = getControlPlaneListenerNameAndSecurityProtocol.map { case (listenerName, securityProtocol) => securityProtocol }
   def saslMechanismInterBrokerProtocol = getString(KafkaConfig.SaslMechanismInterBrokerProtocolProp)
   val saslInterBrokerHandshakeRequestEnable = interBrokerProtocolVersion >= KAFKA_0_10_0_IV1
 
@@ -1337,6 +1357,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }.getOrElse(CoreUtils.listenerListToEndPoints("PLAINTEXT://" + hostName + ":" + port, listenerSecurityProtocolMap))
   }
 
+  def controlPlaneListener: Option[EndPoint] = {
+    controlPlaneListenerName.map { listenerName =>
+      listeners.filter(endpoint => endpoint.listenerName.value() == listenerName.value()).head
+    }
+  }
+
+  def dataPlaneListeners: Seq[EndPoint] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+      case Some(controlPlaneListenerName) => listeners.filterNot(_.listenerName.value() == controlPlaneListenerName)
+      case None => listeners
+    }
+  }
+
   // If the user defined advertised listeners, we use those
   // If he didn't but did define advertised host or port, we'll use those and fill in the missing value from regular host / port or defaults
   // If none of these are defined, we'll use the listeners
@@ -1368,6 +1401,19 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     }
   }
 
+  private def getControlPlaneListenerNameAndSecurityProtocol: Option[(ListenerName, SecurityProtocol)] = {
+    Option(getString(KafkaConfig.ControlPlaneListenerNameProp)) match {
+      case Some(name) =>
+        val listenerName = ListenerName.normalised(name)
+        val securityProtocol = listenerSecurityProtocolMap.getOrElse(listenerName,
+          throw new ConfigException(s"Listener with ${listenerName.value} defined in " +
+            s"${KafkaConfig.ControlPlaneListenerNameProp} not found in ${KafkaConfig.ListenerSecurityProtocolMapProp}."))
+        Some(listenerName, securityProtocol)
+
+      case None => None
+   }
+  }
+
   private def getSecurityProtocol(protocolName: String, configName: String): SecurityProtocol = {
     try SecurityProtocol.forName(protocolName)
     catch {
@@ -1419,6 +1465,17 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
       s"${KafkaConfig.AdvertisedListenersProp} cannot use the nonroutable meta-address 0.0.0.0. "+
       s"Use a routable IP address.")
 
+    // validate controller.listener.name config
+    if (controlPlaneListenerName.isDefined) {
+      require(advertisedListenerNames.contains(controlPlaneListenerName.get),
+        s"${KafkaConfig.ControlPlaneListenerNameProp} must be a listener name defined in ${KafkaConfig.AdvertisedListenersProp}. " +
+        s"The valid options based on currently configured listeners are ${advertisedListenerNames.map(_.value).mkString(",")}")
+      // controlPlaneListenerName should be different from interBrokerListenerName
+      require(!controlPlaneListenerName.get.value().equals(interBrokerListenerName.value()),
+        s"${KafkaConfig.ControlPlaneListenerNameProp}, when defined, should have a different value from the inter broker listener name. " +
+        s"Currently they both have the value ${controlPlaneListenerName.get}")
+    }
+
     val recordVersion = logMessageFormatVersion.recordVersion
     require(interBrokerProtocolVersion.recordVersion.value >= recordVersion.value,
       s"log.message.format.version $logMessageFormatVersionString can only be used when inter.broker.protocol.version " +
@@ -1443,7 +1500,7 @@ class KafkaConfig(val props: java.util.Map[_, _], doLog: Boolean, dynamicConfigO
     if (connectionsMaxIdleMs >= 0)
       require(failedAuthenticationDelayMs < connectionsMaxIdleMs,
         s"${KafkaConfig.FailedAuthenticationDelayMsProp}=$failedAuthenticationDelayMs should always be less than" +
-          s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
-          " authentication responses from timing out")
+        s" ${KafkaConfig.ConnectionsMaxIdleMsProp}=$connectionsMaxIdleMs to prevent failed" +
+        s" authentication responses from timing out")
   }
 }
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index d0d4121..e0ad1b6 100755
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -96,13 +96,15 @@ class KafkaRequestHandlerPool(val brokerId: Int,
                               val requestChannel: RequestChannel,
                               val apis: KafkaApis,
                               time: Time,
-                              numThreads: Int) extends Logging with KafkaMetricsGroup {
+                              numThreads: Int,
+                              requestHandlerAvgIdleMetricName: String,
+                              logAndThreadNamePrefix : String) extends Logging with KafkaMetricsGroup {
 
   private val threadPoolSize: AtomicInteger = new AtomicInteger(numThreads)
   /* a meter to track the average free capacity of the request handlers */
-  private val aggregateIdleMeter = newMeter("RequestHandlerAvgIdlePercent", "percent", TimeUnit.NANOSECONDS)
+  private val aggregateIdleMeter = newMeter(requestHandlerAvgIdleMetricName, "percent", TimeUnit.NANOSECONDS)
 
-  this.logIdent = "[Kafka Request Handler on Broker " + brokerId + "], "
+  this.logIdent = "[" + logAndThreadNamePrefix + " Kafka Request Handler on Broker " + brokerId + "], "
   val runnables = new mutable.ArrayBuffer[KafkaRequestHandler](numThreads)
   for (i <- 0 until numThreads) {
     createHandler(i)
@@ -110,7 +112,7 @@ class KafkaRequestHandlerPool(val brokerId: Int,
 
   def createHandler(id: Int): Unit = synchronized {
     runnables += new KafkaRequestHandler(id, brokerId, aggregateIdleMeter, threadPoolSize, requestChannel, apis, time)
-    KafkaThread.daemon("kafka-request-handler-" + id, runnables(id)).start()
+    KafkaThread.daemon(logAndThreadNamePrefix + "-kafka-request-handler-" + id, runnables(id)).start()
   }
 
   def resizeThreadPool(newSize: Int): Unit = synchronized {
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 36bc0f1..26e1447 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -113,10 +113,13 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
 
   val brokerState: BrokerState = new BrokerState
 
-  var apis: KafkaApis = null
+  var dataPlaneRequestProcessor: KafkaApis = null
+  var controlPlaneRequestProcessor: KafkaApis = null
+
   var authorizer: Option[Authorizer] = None
   var socketServer: SocketServer = null
-  var requestHandlerPool: KafkaRequestHandlerPool = null
+  var dataPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
+  var controlPlaneRequestHandlerPool: KafkaRequestHandlerPool = null
 
   var logDirFailureChannel: LogDirFailureChannel = null
   var logManager: LogManager = null
@@ -291,12 +294,21 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
             KafkaServer.MIN_INCREMENTAL_FETCH_SESSION_EVICTION_MS))
 
         /* start processing requests */
-        apis = new KafkaApis(socketServer.requestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+        dataPlaneRequestProcessor = new KafkaApis(socketServer.dataPlaneRequestChannel, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
           kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
           fetchManager, brokerTopicStats, clusterId, time, tokenManager)
 
-        requestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.requestChannel, apis, time,
-          config.numIoThreads)
+        dataPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.dataPlaneRequestChannel, dataPlaneRequestProcessor, time,
+          config.numIoThreads, "RequestHandlerAvgIdlePercent", socketServer.DataPlanePrefix)
+
+        config.controlPlaneListener.foreach { _ =>
+          controlPlaneRequestProcessor = new KafkaApis(socketServer.controlPlaneRequestChannelOpt.get, replicaManager, adminManager, groupCoordinator, transactionCoordinator,
+            kafkaController, zkClient, config.brokerId, config, metadataCache, metrics, authorizer, quotaManagers,
+            fetchManager, brokerTopicStats, clusterId, time, tokenManager)
+
+          controlPlaneRequestHandlerPool = new KafkaRequestHandlerPool(config.brokerId, socketServer.controlPlaneRequestChannelOpt.get, controlPlaneRequestProcessor, time,
+            1, "ControlPlaneRequestHandlerAvgIdlePercent", socketServer.ControlPlanePrefix)
+        }
 
         Mx4jLoader.maybeLoad()
 
@@ -313,7 +325,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         dynamicConfigManager = new DynamicConfigManager(zkClient, dynamicConfigHandlers)
         dynamicConfigManager.startup()
 
-        socketServer.startProcessors()
+        socketServer.startDataPlaneProcessors()
+        socketServer.startControlPlaneProcessor()
         brokerState.newState(RunningAsBroker)
         shutdownLatch = new CountDownLatch(1)
         startupComplete.set(true)
@@ -579,14 +592,17 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
         // Socket server will be shutdown towards the end of the sequence.
         if (socketServer != null)
           CoreUtils.swallow(socketServer.stopProcessingRequests(), this)
-        if (requestHandlerPool != null)
-          CoreUtils.swallow(requestHandlerPool.shutdown(), this)
-
+        if (dataPlaneRequestHandlerPool != null)
+          CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this)
+        if (controlPlaneRequestHandlerPool != null)
+          CoreUtils.swallow(controlPlaneRequestHandlerPool.shutdown(), this)
         if (kafkaScheduler != null)
           CoreUtils.swallow(kafkaScheduler.shutdown(), this)
 
-        if (apis != null)
-          CoreUtils.swallow(apis.close(), this)
+        if (dataPlaneRequestProcessor != null)
+          CoreUtils.swallow(dataPlaneRequestProcessor.close(), this)
+        if (controlPlaneRequestProcessor != null)
+          CoreUtils.swallow(controlPlaneRequestProcessor.close(), this)
         CoreUtils.swallow(authorizer.foreach(_.close()), this)
         if (adminManager != null)
           CoreUtils.swallow(adminManager.shutdown(), this)
diff --git a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
index ee69ae4..92d1758 100644
--- a/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AdminClientIntegrationTest.scala
@@ -244,9 +244,9 @@ class AdminClientIntegrationTest extends IntegrationTestHarness with Logging {
     client = AdminClient.create(createConfig())
     val nodes = client.describeCluster.nodes.get()
     val clusterId = client.describeCluster().clusterId().get()
-    assertEquals(servers.head.apis.clusterId, clusterId)
+    assertEquals(servers.head.dataPlaneRequestProcessor.clusterId, clusterId)
     val controller = client.describeCluster().controller().get()
-    assertEquals(servers.head.apis.metadataCache.getControllerId.
+    assertEquals(servers.head.dataPlaneRequestProcessor.metadataCache.getControllerId.
       getOrElse(MetadataResponse.NO_CONTROLLER_ID), controller.id())
     val brokers = brokerList.split(",")
     assertEquals(brokers.size, nodes.size)
diff --git a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
index 1b51101..4c0459e 100644
--- a/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/AuthorizerIntegrationTest.scala
@@ -1452,9 +1452,9 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   def removeAllAcls() = {
-    servers.head.apis.authorizer.get.getAcls().keys.foreach { resource =>
-      servers.head.apis.authorizer.get.removeAcls(resource)
-      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls().keys.foreach { resource =>
+      servers.head.dataPlaneRequestProcessor.authorizer.get.removeAcls(resource)
+      TestUtils.waitAndVerifyAcls(Set.empty[Acl], servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
     }
   }
 
@@ -1507,8 +1507,8 @@ class AuthorizerIntegrationTest extends BaseRequestTest {
   }
 
   private def addAndVerifyAcls(acls: Set[Acl], resource: Resource) = {
-    servers.head.apis.authorizer.get.addAcls(acls, resource)
-    TestUtils.waitAndVerifyAcls(servers.head.apis.authorizer.get.getAcls(resource) ++ acls, servers.head.apis.authorizer.get, resource)
+    servers.head.dataPlaneRequestProcessor.authorizer.get.addAcls(acls, resource)
+    TestUtils.waitAndVerifyAcls(servers.head.dataPlaneRequestProcessor.authorizer.get.getAcls(resource) ++ acls, servers.head.dataPlaneRequestProcessor.authorizer.get, resource)
   }
 
   private def consumeRecords(consumer: Consumer[Array[Byte], Array[Byte]],
diff --git a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
index 9b02d80..b28a40f 100644
--- a/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseQuotaTest.scala
@@ -285,7 +285,7 @@ abstract class QuotaTestClients(topic: String,
 
   def waitForQuotaUpdate(producerQuota: Long, consumerQuota: Long, requestQuota: Double, server: KafkaServer = leaderNode) {
     TestUtils.retry(10000) {
-      val quotaManagers = server.apis.quotas
+      val quotaManagers = server.dataPlaneRequestProcessor.quotas
       val overrideProducerQuota = quota(quotaManagers.produce, userPrincipal, producerClientId)
       val overrideConsumerQuota = quota(quotaManagers.fetch, userPrincipal, consumerClientId)
       val overrideProducerRequestQuota = quota(quotaManagers.request, userPrincipal, producerClientId)
diff --git a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
index 854e338..0587a6d 100644
--- a/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/EndToEndAuthorizationTest.scala
@@ -5,7 +5,7 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * 
+ *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
@@ -182,8 +182,8 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   override def setUp() {
     super.setUp()
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.apis.authorizer.get, Resource.ClusterResource)
-      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.apis.authorizer.get, Resource(Topic, "*", LITERAL))
+      TestUtils.waitAndVerifyAcls(ClusterActionAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource.ClusterResource)
+      TestUtils.waitAndVerifyAcls(TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, Resource(Topic, "*", LITERAL))
     }
     // create the test topic with all the brokers as replicas
     createTopic(topic, 1, 3)
@@ -221,7 +221,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
       assertTrue("failed re-authentications not 0", TestUtils.totalMetricValue(s, "failed-reauthentication-total") == 0)
     }
   }
-  
+
   private def getGauge(metricName: String) = {
     Metrics.defaultRegistry.allMetrics.asScala
            .filterKeys(k => k.getName == metricName)
@@ -275,16 +275,16 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   private def setWildcardResourceAcls() {
     AclCommand.main(produceConsumeWildcardAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.apis.authorizer.get, wildcardTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, wildcardGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl ++ TopicBrokerReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, wildcardGroupResource)
     }
   }
 
   private def setPrefixedResourceAcls() {
     AclCommand.main(produceConsumePrefixedAclsArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, prefixedTopicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, prefixedGroupResource)
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedTopicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, prefixedGroupResource)
     }
   }
 
@@ -292,9 +292,9 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(consumeAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get,
+      TestUtils.waitAndVerifyAcls(TopicReadAcl ++ TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get,
         new Resource(Topic, tp.topic, PatternType.LITERAL))
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -315,7 +315,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def testNoProduceWithDescribeAcl(): Unit = {
     AclCommand.main(describeAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicDescribeAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
     try{
       val producer = createProducer()
@@ -327,7 +327,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
    /**
     * Tests that a consumer fails to consume messages without the appropriate
     * ACL set.
@@ -341,7 +341,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     consumeRecords(consumer)
     confirmReauthenticationMetrics
   }
-  
+
   @Test(expected = classOf[TopicAuthorizationException])
   def testNoConsumeWithoutDescribeAclViaSubscribe(): Unit = {
     noConsumeWithoutDescribeAclSetup()
@@ -350,13 +350,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     // this should timeout since the consumer will not be able to fetch any metadata for the topic
     consumeRecords(consumer, timeout = 3000)
   }
-  
+
   private def noConsumeWithoutDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
 
     val producer = createProducer()
@@ -365,10 +365,10 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     AclCommand.main(deleteDescribeAclArgs)
     AclCommand.main(deleteWriteAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaAssign(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -384,7 +384,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
   @Test
   def testNoConsumeWithDescribeAclViaSubscribe(): Unit = {
     noConsumeWithDescribeAclSetup()
@@ -400,13 +400,13 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
     }
     confirmReauthenticationMetrics
   }
-  
+
   private def noConsumeWithDescribeAclSetup(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     AclCommand.main(groupAclArgs)
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
-      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.apis.authorizer.get, groupResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(GroupReadAcl, s.dataPlaneRequestProcessor.authorizer.get, groupResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
@@ -420,7 +420,7 @@ abstract class EndToEndAuthorizationTest extends IntegrationTestHarness with Sas
   def testNoGroupAcl(): Unit = {
     AclCommand.main(produceAclArgs(tp.topic))
     servers.foreach { s =>
-      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.apis.authorizer.get, topicResource)
+      TestUtils.waitAndVerifyAcls(TopicWriteAcl ++ TopicDescribeAcl ++ TopicCreateAcl, s.dataPlaneRequestProcessor.authorizer.get, topicResource)
     }
     val producer = createProducer()
     sendRecords(producer, numRecords, tp)
diff --git a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
index 55e1529..cb2186c 100644
--- a/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
+++ b/core/src/test/scala/integration/kafka/api/SaslSslAdminClientIntegrationTest.scala
@@ -71,7 +71,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def addClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     authorizer.addAcls(acls, AuthResource.ClusterResource)
     TestUtils.waitAndVerifyAcls(prevAcls ++ acls, authorizer, AuthResource.ClusterResource)
@@ -79,7 +79,7 @@ class SaslSslAdminClientIntegrationTest extends AdminClientIntegrationTest with
 
   private def removeClusterAcl(permissionType: PermissionType, operation: Operation): Unit = {
     val acls = Set(clusterAcl(permissionType, operation))
-    val authorizer = servers.head.apis.authorizer.get
+    val authorizer = servers.head.dataPlaneRequestProcessor.authorizer.get
     val prevAcls = authorizer.getAcls(AuthResource.ClusterResource)
     Assert.assertTrue(authorizer.removeAcls(acls, AuthResource.ClusterResource))
     TestUtils.waitAndVerifyAcls(prevAcls -- acls, authorizer, AuthResource.ClusterResource)
diff --git a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
index 853f999..c13b0a3 100644
--- a/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
@@ -501,8 +501,8 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
 
   @Test
   def testThreadPoolResize(): Unit = {
-    val requestHandlerPrefix = "kafka-request-handler-"
-    val networkThreadPrefix = "kafka-network-thread-"
+    val requestHandlerPrefix = "data-plane-kafka-request-handler-"
+    val networkThreadPrefix = "data-plane-kafka-network-thread-"
     val fetcherThreadPrefix = "ReplicaFetcherThread-"
     // Executor threads and recovery threads are not verified since threads may not be running
     // For others, thread count should be configuredCount * threadMultiplier * numBrokers
@@ -577,7 +577,7 @@ class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with SaslSet
       "", mayReceiveDuplicates = false)
     verifyThreadPoolResize(KafkaConfig.NumNetworkThreadsProp, config.numNetworkThreads,
       networkThreadPrefix, mayReceiveDuplicates = true)
-    verifyThreads("kafka-socket-acceptor-", config.listeners.size)
+    verifyThreads("data-plane-kafka-socket-acceptor-", config.listeners.size)
 
     verifyProcessorMetrics()
     verifyMarkPartitionsForTruncation()
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index 2306a92..a64f6e7 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -175,8 +175,8 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
index 23f6225..08747a8 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerIntegrationTest.scala
@@ -32,8 +32,10 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.errors.{ControllerMovedException, StaleBrokerEpochException}
 import org.apache.log4j.Level
 import kafka.utils.LogCaptureAppender
+import org.apache.kafka.common.metrics.KafkaMetric
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.util.Try
 
 class ControllerIntegrationTest extends ZooKeeperTestHarness {
@@ -83,6 +85,31 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     waitUntilControllerEpoch(firstControllerEpoch + 1, "controller epoch was not incremented after controller move")
   }
 
+  @Test
+  def testMetadataPropagationOnControlPlane(): Unit = {
+    servers = makeServers(1, listeners = Some("PLAINTEXT://localhost:0,CONTROLLER://localhost:5000"), listenerSecurityProtocolMap = Some("PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT"),
+      controlPlaneListenerName = Some("CONTROLLER"))
+    TestUtils.waitUntilBrokerMetadataIsPropagated(servers)
+    val controlPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+    val dataPlaneMetricMap = mutable.Map[String, KafkaMetric]()
+    servers.head.metrics.metrics().values().asScala.foreach { kafkaMetric =>
+      if (kafkaMetric.metricName().tags().values().contains("CONTROLLER")) {
+        controlPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+      }
+      if (kafkaMetric.metricName().tags().values().contains("PLAINTEXT")) {
+        dataPlaneMetricMap.put(kafkaMetric.metricName().name(), kafkaMetric)
+      }
+    }
+    assertEquals(1e-0, controlPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, dataPlaneMetricMap.get("response-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(1e-0, controlPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+    assertEquals(0e-0, dataPlaneMetricMap.get("request-total").get.metricValue().asInstanceOf[Double], 0)
+    assertTrue(controlPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] > 1.0)
+    assertTrue(dataPlaneMetricMap.get("incoming-byte-total").get.metricValue().asInstanceOf[Double] == 0.0)
+    assertTrue(controlPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 2.0)
+    assertTrue(dataPlaneMetricMap.get("network-io-total").get.metricValue().asInstanceOf[Double] == 0.0)
+  }
+
   // This test case is used to ensure that there will be no correctness issue after we avoid sending out full
   // UpdateMetadataRequest to all brokers in the cluster
   @Test
@@ -376,10 +403,10 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     var activeServers = servers.filter(s => s.config.brokerId != 2)
     // wait for the update metadata request to trickle to the brokers
     TestUtils.waitUntilTrue(() =>
-      activeServers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
+      activeServers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.isr.size != 3),
       "Topic test not created after timeout")
     assertEquals(0, partitionsRemaining.size)
-    var partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    var partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
     var leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
     assertEquals(2, partitionStateInfo.basePartitionState.isr.size)
@@ -388,16 +415,16 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
     partitionsRemaining = resultQueue.take().get
     assertEquals(0, partitionsRemaining.size)
     activeServers = servers.filter(s => s.config.brokerId == 0)
-    partitionStateInfo = activeServers.head.apis.metadataCache.getPartitionInfo(topic,partition).get
+    partitionStateInfo = activeServers.head.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get
     leaderAfterShutdown = partitionStateInfo.basePartitionState.leader
     assertEquals(0, leaderAfterShutdown)
 
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+    assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
     controller.controlledShutdown(0, servers.find(_.config.brokerId == 0).get.kafkaController.brokerEpoch, controlledShutdownCallback)
     partitionsRemaining = resultQueue.take().get
     assertEquals(1, partitionsRemaining.size)
     // leader doesn't change since all the replicas are shut down
-    assertTrue(servers.forall(_.apis.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
+    assertTrue(servers.forall(_.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic,partition).get.basePartitionState.leader == 0))
   }
 
   @Test
@@ -585,12 +612,18 @@ class ControllerIntegrationTest extends ZooKeeperTestHarness {
   private def makeServers(numConfigs: Int,
                           autoLeaderRebalanceEnable: Boolean = false,
                           uncleanLeaderElectionEnable: Boolean = false,
-                          enableControlledShutdown: Boolean = true) = {
+                          enableControlledShutdown: Boolean = true,
+                          listeners : Option[String] = None,
+                          listenerSecurityProtocolMap : Option[String] = None,
+                          controlPlaneListenerName : Option[String] = None) = {
     val configs = TestUtils.createBrokerConfigs(numConfigs, zkConnect, enableControlledShutdown = enableControlledShutdown)
     configs.foreach { config =>
       config.setProperty(KafkaConfig.AutoLeaderRebalanceEnableProp, autoLeaderRebalanceEnable.toString)
       config.setProperty(KafkaConfig.UncleanLeaderElectionEnableProp, uncleanLeaderElectionEnable.toString)
       config.setProperty(KafkaConfig.LeaderImbalanceCheckIntervalSecondsProp, "1")
+      listeners.foreach(listener => config.setProperty(KafkaConfig.ListenersProp, listener))
+      listenerSecurityProtocolMap.foreach(listenerMap => config.setProperty(KafkaConfig.ListenerSecurityProtocolMapProp, listenerMap))
+      controlPlaneListenerName.foreach(controlPlaneListener => config.setProperty(KafkaConfig.ControlPlaneListenerNameProp, controlPlaneListener))
     }
     configs.map(config => TestUtils.createServer(KafkaConfig.fromProps(config)))
   }
diff --git a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
index b598337..2609d9e 100644
--- a/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
+++ b/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
@@ -21,7 +21,7 @@ import java.io._
 import java.net._
 import java.nio.ByteBuffer
 import java.nio.channels.SocketChannel
-import java.util.{HashMap, Random}
+import java.util.{HashMap, Properties, Random}
 
 import com.yammer.metrics.core.{Gauge, Meter}
 import com.yammer.metrics.{Metrics => YammerMetrics}
@@ -134,8 +134,8 @@ class SocketServerTest extends JUnitSuite {
     channel.sendResponse(new RequestChannel.SendResponse(request, send, Some(request.header.toString), None))
   }
 
-  def connect(s: SocketServer = server, protocol: SecurityProtocol = SecurityProtocol.PLAINTEXT, localAddr: InetAddress = null) = {
-    val socket = new Socket("localhost", s.boundPort(ListenerName.forSecurityProtocol(protocol)), localAddr, 0)
+  def connect(s: SocketServer = server, listenerName: ListenerName = ListenerName.forSecurityProtocol(SecurityProtocol.PLAINTEXT), localAddr: InetAddress = null, port: Int = 0) = {
+    val socket = new Socket("localhost", s.boundPort(listenerName), localAddr, port)
     sockets += socket
     socket
   }
@@ -144,13 +144,13 @@ class SocketServerTest extends JUnitSuite {
   def connectAndProcessRequest(s: SocketServer): (Socket, String) = {
     val socket = connect(s)
     val request = sendAndReceiveRequest(socket, s)
-    processRequest(s.requestChannel, request)
+    processRequest(s.dataPlaneRequestChannel, request)
     (socket, request.context.connectionId)
   }
 
   def sendAndReceiveRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
     sendRequest(socket, producerRequestBytes())
-    receiveRequest(server.requestChannel)
+    receiveRequest(server.dataPlaneRequestChannel)
   }
 
   def shutdownServerAndMetrics(server: SocketServer): Unit = {
@@ -176,16 +176,30 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def simpleRequest() {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     // Test PLAINTEXT socket
     sendRequest(plainSocket, serializedBytes)
-    processRequest(server.requestChannel)
+    processRequest(server.dataPlaneRequestChannel)
     assertEquals(serializedBytes.toSeq, receiveResponse(plainSocket).toSeq)
   }
 
   @Test
+  def testControlPlaneRequest(): Unit = {
+    val testProps = new Properties
+    testProps.putAll(props)
+    testProps.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    testProps.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT")
+    testProps.put("control.plane.listener.name", "CONTROLLER")
+    val config = KafkaConfig.fromProps(testProps)
+    withTestableServer(config, { testableServer =>
+      val socket = connect(testableServer, config.controlPlaneListenerName.get, localAddr = InetAddress.getLocalHost, port = 5000)
+      sendAndReceiveControllerRequest(socket, testableServer)
+    })
+  }
+
+  @Test
   def tooBigRequestIsRejected() {
     val tooManyBytes = new Array[Byte](server.config.socketRequestMaxBytes + 1)
     new Random().nextBytes(tooManyBytes)
@@ -205,41 +219,41 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testGracefulClose() {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     for (_ <- 0 until 10)
       sendRequest(plainSocket, serializedBytes)
     plainSocket.close()
     for (_ <- 0 until 10) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataPlaneRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+      server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
   @Test
   def testNoOpAction(): Unit = {
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     val serializedBytes = producerRequestBytes()
 
     for (_ <- 0 until 3)
       sendRequest(plainSocket, serializedBytes)
     for (_ <- 0 until 3) {
-      val request = receiveRequest(server.requestChannel)
+      val request = receiveRequest(server.dataPlaneRequestChannel)
       assertNotNull("receiveRequest timed out", request)
-      server.requestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
+      server.dataPlaneRequestChannel.sendResponse(new RequestChannel.NoOpResponse(request))
     }
   }
 
   @Test
   def testConnectionId() {
-    val sockets = (1 to 5).map(_ => connect(protocol = SecurityProtocol.PLAINTEXT))
+    val sockets = (1 to 5).map(_ => connect())
     val serializedBytes = producerRequestBytes()
 
     val requests = sockets.map{socket =>
       sendRequest(socket, serializedBytes)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataPlaneRequestChannel)
     }
     requests.zipWithIndex.foreach { case (request, i) =>
       val index = request.context.connectionId.split("-").last
@@ -258,36 +272,36 @@ class SocketServerTest extends JUnitSuite {
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider)
 
     def openChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-      overrideServer.processor(request.processor).channel(request.context.connectionId)
+      overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId)
     def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-      overrideServer.processor(request.processor).openOrClosingChannel(request.context.connectionId)
+      overrideServer.dataPlaneProcessor(request.processor).openOrClosingChannel(request.context.connectionId)
 
     try {
       overrideServer.startup()
       val serializedBytes = producerRequestBytes()
 
       // Connection with no staged receives
-      val socket1 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      val socket1 = connect(overrideServer)
       sendRequest(socket1, serializedBytes)
-      val request1 = receiveRequest(overrideServer.requestChannel)
+      val request1 = receiveRequest(overrideServer.dataPlaneRequestChannel)
       assertTrue("Channel not open", openChannel(request1).nonEmpty)
       assertEquals(openChannel(request1), openOrClosingChannel(request1))
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request1).isEmpty, "Failed to close idle channel")
       assertTrue("Channel not removed", openChannel(request1).isEmpty)
-      processRequest(overrideServer.requestChannel, request1)
+      processRequest(overrideServer.dataPlaneRequestChannel, request1)
 
       // Connection with staged receives
-      val socket2 = connect(overrideServer, protocol = SecurityProtocol.PLAINTEXT)
+      val socket2 = connect(overrideServer)
       val request2 = sendRequestsUntilStagedReceive(overrideServer, socket2, serializedBytes)
 
       time.sleep(idleTimeMs + 1)
       TestUtils.waitUntilTrue(() => openChannel(request2).isEmpty, "Failed to close idle channel")
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).nonEmpty, "Channel removed without processing staged receives")
-      processRequest(overrideServer.requestChannel, request2) // this triggers a failed send since channel has been closed
+      processRequest(overrideServer.dataPlaneRequestChannel, request2) // this triggers a failed send since channel has been closed
       TestUtils.waitUntilTrue(() => openOrClosingChannel(request2).isEmpty, "Failed to remove channel with failed sends")
-      assertNull("Received request after failed send", overrideServer.requestChannel.receiveRequest(200))
+      assertNull("Received request after failed send", overrideServer.dataPlaneRequestChannel.receiveRequest(200))
 
     } finally {
       shutdownServerAndMetrics(overrideServer)
@@ -304,9 +318,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, time, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
             override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -319,8 +333,8 @@ class SocketServerTest extends JUnitSuite {
       }
     }
 
-    def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
-    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+    def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
     def connectionCount = overrideServer.connectionCount(InetAddress.getByName("127.0.0.1"))
 
     // Create a client connection and wait for server to register the connection with the selector. For
@@ -361,7 +375,7 @@ class SocketServerTest extends JUnitSuite {
       assertSame(channel1, openOrClosingChannel.getOrElse(throw new RuntimeException("Channel not found")))
 
       // Complete request with failed send so that `channel1` is removed from Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataPlaneRequestChannel, request)
       TestUtils.waitUntilTrue(() => connectionCount == 0 && openOrClosingChannel.isEmpty, "Failed to remove channel with failed send")
 
       // Check that new connections can be created with the same id since `channel1` is no longer in Selector
@@ -380,14 +394,14 @@ class SocketServerTest extends JUnitSuite {
     def sendTwoRequestsReceiveOne(): RequestChannel.Request = {
       sendRequest(socket, requestBytes, flush = false)
       sendRequest(socket, requestBytes, flush = true)
-      receiveRequest(server.requestChannel)
+      receiveRequest(server.dataPlaneRequestChannel)
     }
     val (request, hasStagedReceives) = TestUtils.computeUntilTrue(sendTwoRequestsReceiveOne()) { req =>
       val connectionId = req.context.connectionId
-      val hasStagedReceives = server.processor(0).numStagedReceives(connectionId) > 0
+      val hasStagedReceives = server.dataPlaneProcessor(0).numStagedReceives(connectionId) > 0
       if (!hasStagedReceives) {
-        processRequest(server.requestChannel, req)
-        processRequest(server.requestChannel)
+        processRequest(server.dataPlaneRequestChannel, req)
+        processRequest(server.dataPlaneRequestChannel)
       }
       hasStagedReceives
     }
@@ -403,11 +417,11 @@ class SocketServerTest extends JUnitSuite {
 
     // Mimic a primitive request handler that fetches the request from RequestChannel and place a response with a
     // throttled channel.
-    val request = receiveRequest(server.requestChannel)
+    val request = receiveRequest(server.dataPlaneRequestChannel)
     val byteBuffer = request.body[AbstractRequest].serialize(request.header)
     val send = new NetworkSend(request.context.connectionId, byteBuffer)
     def channelThrottlingCallback(response: RequestChannel.Response): Unit = {
-      server.requestChannel.sendResponse(response)
+      server.dataPlaneRequestChannel.sendResponse(response)
     }
     val throttledChannel = new ThrottledChannel(request, new MockTime(), 100, channelThrottlingCallback)
     val response =
@@ -415,7 +429,7 @@ class SocketServerTest extends JUnitSuite {
         new RequestChannel.SendResponse(request, send, Some(request.header.toString), None)
       else
         new RequestChannel.NoOpResponse(request)
-    server.requestChannel.sendResponse(response)
+    server.dataPlaneRequestChannel.sendResponse(response)
 
     // Quota manager would call notifyThrottlingDone() on throttling completion. Simulate it if throttleingInProgress is
     // false.
@@ -426,11 +440,11 @@ class SocketServerTest extends JUnitSuite {
   }
 
   def openOrClosingChannel(request: RequestChannel.Request): Option[KafkaChannel] =
-    server.processor(0).openOrClosingChannel(request.context.connectionId)
+    server.dataPlaneProcessor(0).openOrClosingChannel(request.context.connectionId)
 
   @Test
   def testSendActionResponseWithThrottledChannelWhereThrottlingInProgress() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, false, true)
@@ -444,7 +458,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testSendActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, false, false)
@@ -459,7 +473,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testNoOpActionResponseWithThrottledChannelWhereThrottlingInProgress() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, true, true)
@@ -471,7 +485,7 @@ class SocketServerTest extends JUnitSuite {
 
   @Test
   def testNoOpActionResponseWithThrottledChannelWhereThrottlingAlreadyDone() {
-    val socket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val socket = connect()
     val serializedBytes = producerRequestBytes()
     // SendAction with throttling in progress
     val request = throttledChannelTestSetUp(socket, serializedBytes, true, false)
@@ -485,16 +499,16 @@ class SocketServerTest extends JUnitSuite {
   @Test
   def testSocketsCloseOnShutdown() {
     // open a connection
-    val plainSocket = connect(protocol = SecurityProtocol.PLAINTEXT)
+    val plainSocket = connect()
     plainSocket.setTcpNoDelay(true)
     val bytes = new Array[Byte](40)
     // send a request first to make sure the connection has been picked up by the socket server
     sendRequest(plainSocket, bytes, Some(0))
-    processRequest(server.requestChannel)
+    processRequest(server.dataPlaneRequestChannel)
     // the following sleep is necessary to reliably detect the connection close when we send data below
     Thread.sleep(200L)
-    // make sure the sockets are open
-    server.acceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
+    // make sure the sockets ar e open
+    server.dataPlaneAcceptors.asScala.values.foreach(acceptor => assertFalse(acceptor.serverChannel.socket.isClosed))
     // then shutdown the server
     shutdownServerAndMetrics(server)
 
@@ -527,7 +541,7 @@ class SocketServerTest extends JUnitSuite {
     val conn2 = connect()
     val serializedBytes = producerRequestBytes()
     sendRequest(conn2, serializedBytes)
-    val request = server.requestChannel.receiveRequest(2000)
+    val request = server.dataPlaneRequestChannel.receiveRequest(2000)
     assertNotNull(request)
   }
 
@@ -555,7 +569,7 @@ class SocketServerTest extends JUnitSuite {
       val conn2 = connect(server)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn2, serializedBytes)
-      val request = server.requestChannel.receiveRequest(2000)
+      val request = server.dataPlaneRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try to connect from the external facing interface, which should fail
@@ -583,7 +597,7 @@ class SocketServerTest extends JUnitSuite {
       // it should succeed
       val serializedBytes = producerRequestBytes()
       sendRequest(conns.last, serializedBytes)
-      val request = overrideServer.requestChannel.receiveRequest(2000)
+      val request = overrideServer.dataPlaneRequestChannel.receiveRequest(2000)
       assertNotNull(request)
 
       // now try one more (should fail)
@@ -627,7 +641,7 @@ class SocketServerTest extends JUnitSuite {
       byteBuffer.get(serializedBytes)
 
       sendRequest(sslSocket, serializedBytes)
-      processRequest(overrideServer.requestChannel)
+      processRequest(overrideServer.dataPlaneRequestChannel)
       assertEquals(serializedBytes.toSeq, receiveResponse(sslSocket).toSeq)
       sslSocket.close()
     } finally {
@@ -640,7 +654,7 @@ class SocketServerTest extends JUnitSuite {
     val socket = connect()
     val bytes = new Array[Byte](40)
     sendRequest(socket, bytes, Some(0))
-    assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.requestChannel).session.principal)
+    assertEquals(KafkaPrincipal.ANONYMOUS, receiveRequest(server.dataPlaneRequestChannel).session.principal)
   }
 
   /* Test that we update request metrics if the client closes the connection while the broker response is in flight. */
@@ -650,9 +664,9 @@ class SocketServerTest extends JUnitSuite {
     val serverMetrics = new Metrics
     var conn: Socket = null
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, MemoryPool.NONE, new LogContext()) {
           override protected[network] def sendResponse(response: RequestChannel.Response, responseSend: Send) {
@@ -668,7 +682,7 @@ class SocketServerTest extends JUnitSuite {
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
 
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataPlaneRequestChannel
       val request = receiveRequest(channel)
 
       val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -696,9 +710,9 @@ class SocketServerTest extends JUnitSuite {
     @volatile var selector: TestableSelector = null
     val overrideConnectionId = "127.0.0.1:1-127.0.0.1:2-0"
     val overrideServer = new SocketServer(KafkaConfig.fromProps(props), serverMetrics, Time.SYSTEM, credentialProvider) {
-      override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+      override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
-        new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas,
+        new Processor(id, time, config.socketRequestMaxBytes, dataPlaneRequestChannel, connectionQuotas,
           config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics,
           credentialProvider, memoryPool, new LogContext()) {
           override protected[network] def connectionId(socket: Socket): String = overrideConnectionId
@@ -711,8 +725,8 @@ class SocketServerTest extends JUnitSuite {
       }
     }
 
-    def openChannel: Option[KafkaChannel] = overrideServer.processor(0).channel(overrideConnectionId)
-    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.processor(0).openOrClosingChannel(overrideConnectionId)
+    def openChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).channel(overrideConnectionId)
+    def openOrClosingChannel: Option[KafkaChannel] = overrideServer.dataPlaneProcessor(0).openOrClosingChannel(overrideConnectionId)
 
     try {
       overrideServer.startup()
@@ -730,7 +744,7 @@ class SocketServerTest extends JUnitSuite {
       socket.close()
 
       // Complete request with socket exception so that the channel is removed from Selector.closingChannels
-      processRequest(overrideServer.requestChannel, request)
+      processRequest(overrideServer.dataPlaneRequestChannel, request)
       TestUtils.waitUntilTrue(() => openOrClosingChannel.isEmpty, "Channel not closed after failed send")
       assertTrue("Unexpected completed send", selector.completedSends.isEmpty)
     } finally {
@@ -755,10 +769,10 @@ class SocketServerTest extends JUnitSuite {
       conn = connect(overrideServer)
       val serializedBytes = producerRequestBytes()
       sendRequest(conn, serializedBytes)
-      val channel = overrideServer.requestChannel
+      val channel = overrideServer.dataPlaneRequestChannel
       val request = receiveRequest(channel)
 
-      TestUtils.waitUntilTrue(() => overrideServer.processor(request.processor).channel(request.context.connectionId).isEmpty,
+      TestUtils.waitUntilTrue(() => overrideServer.dataPlaneProcessor(request.processor).channel(request.context.connectionId).isEmpty,
         s"Idle connection `${request.context.connectionId}` was not closed by selector")
 
       val requestMetrics = channel.metrics(request.header.apiKey.name)
@@ -780,10 +794,10 @@ class SocketServerTest extends JUnitSuite {
     server.stopProcessingRequests()
     val version = ApiKeys.PRODUCE.latestVersion
     val version2 = (version - 1).toShort
-    for (_ <- 0 to 1) server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
-    server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
-    assertEquals(2, server.requestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
-    server.requestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
+    for (_ <- 0 to 1) server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).mark()
+    server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version2).mark()
+    assertEquals(2, server.dataPlaneRequestChannel.metrics(ApiKeys.PRODUCE.name).requestRate(version).count())
+    server.dataPlaneRequestChannel.updateErrorMetrics(ApiKeys.PRODUCE, Map(Errors.NONE -> 1))
     val nonZeroMeters = Map(s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version" -> 2,
         s"kafka.network:type=RequestMetrics,name=RequestsPerSec,request=Produce,version=$version2" -> 1,
         "kafka.network:type=RequestMetrics,name=ErrorsPerSec,request=Produce,error=NONE" -> 1)
@@ -808,7 +822,7 @@ class SocketServerTest extends JUnitSuite {
       .allMetrics.asScala
       .filterKeys(k => k.getName.endsWith("IdlePercent") || k.getName.endsWith("NetworkProcessorAvgIdlePercent"))
       .collect { case (k, metric: Gauge[_]) => (k, metric.value().asInstanceOf[Double]) }
-      .filter { case (_, value) => value != 0.0 }
+      .filter { case (_, value) => value != 0.0 && !value.equals(Double.NaN) }
 
     assertEquals(Map.empty, nonZeroMetricNamesAndValues)
   }
@@ -818,7 +832,7 @@ class SocketServerTest extends JUnitSuite {
     val kafkaMetricNames = metrics.metrics.keySet.asScala.filter(_.tags.asScala.get("listener").nonEmpty)
     assertFalse(kafkaMetricNames.isEmpty)
 
-    val expectedListeners = Set("PLAINTEXT", "TRACE")
+    val expectedListeners = Set("PLAINTEXT")
     kafkaMetricNames.foreach { kafkaMetricName =>
       assertTrue(expectedListeners.contains(kafkaMetricName.tags.get("listener")))
     }
@@ -846,7 +860,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def configureNewConnectionException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
 
       testableSelector.updateMinWakeup(2)
@@ -856,7 +870,7 @@ class SocketServerTest extends JUnitSuite {
       TestUtils.waitUntilTrue(() => testableServer.connectionCount(localAddress) == 1, "Failed channel not removed")
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -871,7 +885,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processNewResponseException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
@@ -879,12 +893,12 @@ class SocketServerTest extends JUnitSuite {
       sockets.foreach(sendRequest(_, producerRequestBytes()))
 
       testableServer.testableSelector.addFailure(SelectorOperation.Send)
-      sockets.foreach(_ => processRequest(testableServer.requestChannel))
+      sockets.foreach(_ => processRequest(testableServer.dataPlaneRequestChannel))
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -894,13 +908,13 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def sendCancelledKeyException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
       val sockets = (1 to 2).map(_ => connect(testableServer))
       sockets.foreach(sendRequest(_, producerRequestBytes()))
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataPlaneRequestChannel
 
       val requests = sockets.map(_ => receiveRequest(requestChannel))
       val failedConnectionId = requests(0).context.connectionId
@@ -912,7 +926,7 @@ class SocketServerTest extends JUnitSuite {
 
       val successfulSocket = if (isSocketConnectionId(failedConnectionId, sockets(0))) sockets(1) else sockets(0)
       assertProcessorHealthy(testableServer, Seq(successfulSocket))
-    }
+    })
   }
 
   /**
@@ -922,7 +936,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def closingChannelException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       testableSelector.updateMinWakeup(2)
 
@@ -933,13 +947,13 @@ class SocketServerTest extends JUnitSuite {
 
       testableSelector.addFailure(SelectorOperation.Send)
       sockets(0).close()
-      processRequest(testableServer.requestChannel, request)
-      processRequest(testableServer.requestChannel) // Also process request from other channel
+      processRequest(testableServer.dataPlaneRequestChannel, request)
+      processRequest(testableServer.dataPlaneRequestChannel) // Also process request from other channel
       testableSelector.waitForOperations(SelectorOperation.Send, 2)
       testableServer.waitForChannelClose(request.context.connectionId, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, Seq(sockets(1)))
-    }
+    })
   }
 
   /**
@@ -954,10 +968,10 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processCompletedReceiveException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val sockets = (1 to 2).map(_ => connect(testableServer))
       val testableSelector = testableServer.testableSelector
-      val requestChannel = testableServer.requestChannel
+      val requestChannel = testableServer.dataPlaneRequestChannel
 
       testableSelector.cachedCompletedReceives.minPerPoll = 2
       testableSelector.addFailure(SelectorOperation.Mute)
@@ -968,7 +982,7 @@ class SocketServerTest extends JUnitSuite {
       requests.foreach(processRequest(requestChannel, _))
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -983,18 +997,18 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processCompletedSendException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val testableSelector = testableServer.testableSelector
       val sockets = (1 to 2).map(_ => connect(testableServer))
       val requests = sockets.map(sendAndReceiveRequest(_, testableServer))
 
       testableSelector.addFailure(SelectorOperation.Unmute)
-      requests.foreach(processRequest(testableServer.requestChannel, _))
+      requests.foreach(processRequest(testableServer.dataPlaneRequestChannel, _))
       testableSelector.waitForOperations(SelectorOperation.Unmute, 2)
       testableServer.waitForChannelClose(testableSelector.allFailedChannels.head, locallyClosed = true)
 
       assertProcessorHealthy(testableServer, testableSelector.notFailed(sockets))
-    }
+    })
   }
 
   /**
@@ -1007,7 +1021,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def processDisconnectedException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val (socket, connectionId) = connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1021,7 +1035,7 @@ class SocketServerTest extends JUnitSuite {
       testableServer.waitForChannelClose(connectionId, locallyClosed = false)
 
       assertProcessorHealthy(testableServer)
-    }
+    })
   }
 
   /**
@@ -1029,7 +1043,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def pollException(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       val (socket, _) = connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1038,7 +1052,7 @@ class SocketServerTest extends JUnitSuite {
       testableSelector.waitForOperations(SelectorOperation.Poll, 2)
 
       assertProcessorHealthy(testableServer, Seq(socket))
-    }
+    })
   }
 
   /**
@@ -1046,7 +1060,7 @@ class SocketServerTest extends JUnitSuite {
    */
   @Test
   def controlThrowable(): Unit = {
-    withTestableServer { testableServer =>
+    withTestableServer (testWithServer = { testableServer =>
       connectAndProcessRequest(testableServer)
       val testableSelector = testableServer.testableSelector
 
@@ -1056,12 +1070,12 @@ class SocketServerTest extends JUnitSuite {
       testableSelector.waitForOperations(SelectorOperation.Poll, 1)
 
       testableSelector.waitForOperations(SelectorOperation.CloseSelector, 1)
-    }
+    })
   }
 
-  private def withTestableServer(testWithServer: TestableSocketServer => Unit): Unit = {
+  private def withTestableServer(config : KafkaConfig = config, testWithServer: TestableSocketServer => Unit): Unit = {
     props.put("listeners", "PLAINTEXT://localhost:0")
-    val testableServer = new TestableSocketServer
+    val testableServer = new TestableSocketServer(config)
     testableServer.startup()
     try {
         testWithServer(testableServer)
@@ -1070,10 +1084,15 @@ class SocketServerTest extends JUnitSuite {
     }
   }
 
+  def sendAndReceiveControllerRequest(socket: Socket, server: SocketServer): RequestChannel.Request = {
+    sendRequest(socket, producerRequestBytes())
+    receiveRequest(server.controlPlaneRequestChannelOpt.get)
+  }
+
   private def assertProcessorHealthy(testableServer: TestableSocketServer, healthySockets: Seq[Socket] = Seq.empty): Unit = {
     val selector = testableServer.testableSelector
     selector.reset()
-    val requestChannel = testableServer.requestChannel
+    val requestChannel = testableServer.dataPlaneRequestChannel
 
     // Check that existing channels behave as expected
     healthySockets.foreach { socket =>
@@ -1096,18 +1115,17 @@ class SocketServerTest extends JUnitSuite {
   def isSocketConnectionId(connectionId: String, socket: Socket): Boolean =
     connectionId.contains(s":${socket.getLocalPort}-")
 
-  class TestableSocketServer extends SocketServer(KafkaConfig.fromProps(props),
+  class TestableSocketServer(config : KafkaConfig = config) extends SocketServer(config,
       new Metrics, Time.SYSTEM, credentialProvider) {
 
     @volatile var selector: Option[TestableSelector] = None
 
-    override def newProcessor(id: Int, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
+    override def newProcessor(id: Int, requestChannel: RequestChannel, connectionQuotas: ConnectionQuotas, listenerName: ListenerName,
                                 protocol: SecurityProtocol, memoryPool: MemoryPool): Processor = {
       new Processor(id, time, config.socketRequestMaxBytes, requestChannel, connectionQuotas, config.connectionsMaxIdleMs,
         config.failedAuthenticationDelayMs, listenerName, protocol, config, metrics, credentialProvider, memoryPool, new LogContext()) {
         override protected[network] def createSelector(channelBuilder: ChannelBuilder): Selector = {
-           val testableSelector = new TestableSelector(config, channelBuilder, time, metrics)
-           assertEquals(None, selector)
+           val testableSelector = new TestableSelector(config, channelBuilder, time, metrics, metricTags.asScala)
            selector = Some(testableSelector)
            testableSelector
         }
@@ -1131,7 +1149,7 @@ class SocketServerTest extends JUnitSuite {
       val openCount = selector.allChannels.size - 1 // minus one for the channel just closed above
       TestUtils.waitUntilTrue(() => connectionCount(localAddress) == openCount, "Connection count not decremented")
       TestUtils.waitUntilTrue(() =>
-        processor(0).inflightResponseCount == 0, "Inflight responses not cleared")
+        dataPlaneProcessor(0).inflightResponseCount == 0, "Inflight responses not cleared")
       assertNull("Channel not removed", selector.channel(connectionId))
       assertNull("Closing channel not removed", selector.closingChannel(connectionId))
     }
@@ -1149,9 +1167,9 @@ class SocketServerTest extends JUnitSuite {
     case object CloseSelector extends SelectorOperation
   }
 
-  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics)
+  class TestableSelector(config: KafkaConfig, channelBuilder: ChannelBuilder, time: Time, metrics: Metrics, metricTags: mutable.Map[String, String] = mutable.Map.empty)
         extends Selector(config.socketRequestMaxBytes, config.connectionsMaxIdleMs, config.failedAuthenticationDelayMs,
-            metrics, time, "socket-server", new HashMap, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
+            metrics, time, "socket-server", metricTags.asJava, false, true, channelBuilder, MemoryPool.NONE, new LogContext()) {
 
     val failures = mutable.Map[SelectorOperation, Exception]()
     val operationCounts = mutable.Map[SelectorOperation, Int]().withDefaultValue(0)
@@ -1302,4 +1320,4 @@ class SocketServerTest extends JUnitSuite {
       sockets.filterNot(socket => isSocketConnectionId(failedConnectionId, socket))
     }
   }
-}
+}
\ No newline at end of file
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index 789dbae..f5cc134 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -93,7 +93,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     props.put(DynamicConfig.Client.ProducerByteRateOverrideProp, "1000")
     props.put(DynamicConfig.Client.ConsumerByteRateOverrideProp, "2000")
 
-    val quotaManagers = servers.head.apis.quotas
+    val quotaManagers = servers.head.dataPlaneRequestProcessor.quotas
     rootEntityType match {
       case ConfigType.Client => adminZkClient.changeClientIdConfig(configEntityName, props)
       case _ => adminZkClient.changeUserOrUserClientIdConfig(configEntityName, props)
@@ -179,7 +179,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Remove config change znodes to force quota initialization only through loading of user/client quotas
     zkClient.getChildren(ConfigEntityChangeNotificationZNode.path).foreach { p => zkClient.deletePath(ConfigEntityChangeNotificationZNode.path + "/" + p) }
     server.startup()
-    val quotaManagers = server.apis.quotas
+    val quotaManagers = server.dataPlaneRequestProcessor.quotas
 
     assertEquals(Quota.upperBound(1000),  quotaManagers.produce.quota("someuser", "overriddenClientId"))
     assertEquals(Quota.upperBound(2000),  quotaManagers.fetch.quota("someuser", "overriddenClientId"))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
index 10dba1a..f21f384 100755
--- a/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaConfigTest.scala
@@ -230,6 +230,31 @@ class KafkaConfigTest {
   }
 
   @Test
+  def testControlPlaneListenerName() = {
+    val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect)
+    props.put("listeners", "PLAINTEXT://localhost:0,CONTROLLER://localhost:5000")
+    props.put("listener.security.protocol.map", "PLAINTEXT:PLAINTEXT,CONTROLLER:SSL")
+    props.put("control.plane.listener.name", "CONTROLLER")
+    assertTrue(isValidKafkaConfig(props))
+
+    val serverConfig = KafkaConfig.fromProps(props)
+    val controlEndpoint = serverConfig.controlPlaneListener.get
+    assertEquals("localhost", controlEndpoint.host)
+    assertEquals(5000, controlEndpoint.port)
+    assertEquals(SecurityProtocol.SSL, controlEndpoint.securityProtocol)
+
+    //advertised listener should contain control-plane listener
+    val advertisedEndpoints = serverConfig.advertisedListeners
+    assertFalse(advertisedEndpoints.filter { endpoint =>
+      endpoint.securityProtocol == controlEndpoint.securityProtocol && endpoint.listenerName.value().equals(controlEndpoint.listenerName.value())
+    }.isEmpty)
+
+    // interBrokerListener name should be different from control-plane listener name
+    val interBrokerListenerName = serverConfig.interBrokerListenerName
+    assertFalse(interBrokerListenerName.value().equals(controlEndpoint.listenerName.value()))
+  }
+
+  @Test
   def testBadListenerProtocol() {
     val props = new Properties()
     props.put(KafkaConfig.BrokerIdProp, "1")
diff --git a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
index 4e60a8f..ef3dece 100644
--- a/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/MetadataRequestTest.scala
@@ -74,7 +74,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertNotEquals("Controller id should switch to a new broker", controllerId, controllerId2)
     TestUtils.waitUntilTrue(() => {
       val metadataResponse2 = sendMetadataRequest(MetadataRequest.Builder.allTopics.build(1.toShort))
-      metadataResponse2.controller != null && controllerServer2.apis.brokerId == metadataResponse2.controller.id
+      metadataResponse2.controller != null && controllerServer2.dataPlaneRequestProcessor.brokerId == metadataResponse2.controller.id
     }, "Controller id should match the active controller after failover", 5000)
   }
 
@@ -178,7 +178,7 @@ class MetadataRequestTest extends BaseRequestTest {
     assertEquals(topic1, topicMetadata1.topic)
     assertEquals(Errors.INVALID_TOPIC_EXCEPTION, topicMetadata2.error)
     assertEquals(topic2, topicMetadata2.topic)
-    
+
     TestUtils.waitUntilLeaderIsElectedOrChanged(zkClient, topic1, 0)
     TestUtils.waitUntilMetadataIsPropagated(servers, topic1, 0)
 
@@ -250,7 +250,7 @@ class MetadataRequestTest extends BaseRequestTest {
     val metadataResponse = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
     val partitionMetadata = metadataResponse.topicMetadata.asScala.head.partitionMetadata.asScala.head
     val downNode = servers.find { server =>
-      val serverId = server.apis.brokerId
+      val serverId = server.dataPlaneRequestProcessor.brokerId
       val leaderId = partitionMetadata.leader.id
       val replicaIds = partitionMetadata.replicas.asScala.map(_.id)
       serverId != leaderId && replicaIds.contains(serverId)
@@ -260,7 +260,7 @@ class MetadataRequestTest extends BaseRequestTest {
     TestUtils.waitUntilTrue(() => {
       val response = sendMetadataRequest(new MetadataRequest(List(replicaDownTopic).asJava, true, 1.toShort))
       val metadata = response.topicMetadata.asScala.head.partitionMetadata.asScala.head
-      val replica = metadata.replicas.asScala.find(_.id == downNode.apis.brokerId).get
+      val replica = metadata.replicas.asScala.find(_.id == downNode.dataPlaneRequestProcessor.brokerId).get
       replica.host == "" & replica.port == -1
     }, "Replica was not found down", 5000)
 
diff --git a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
index ae83536..c9e4e78 100644
--- a/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestQuotaTest.scala
@@ -99,7 +99,7 @@ class RequestQuotaTest extends BaseRequestTest {
     adminZkClient.changeClientIdConfig(Sanitizer.sanitize(smallQuotaConsumerClientId), quotaProps)
 
     TestUtils.retry(10000) {
-      val quotaManager = servers.head.apis.quotas.request
+      val quotaManager = servers.head.dataPlaneRequestProcessor.quotas.request
       assertEquals(s"Default request quota not set", Quota.upperBound(0.01), quotaManager.quota("some-user", "some-client"))
       assertEquals(s"Request quota override not set", Quota.upperBound(2000), quotaManager.quota("some-user", unthrottledClientId))
     }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index e5ea6a4..ecdb8c8 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -835,7 +835,7 @@ object TestUtils extends Logging {
                                           timeout: Long = JTestUtils.DEFAULT_MAX_WAIT_MS): Unit = {
     val expectedBrokerIds = servers.map(_.config.brokerId).toSet
     TestUtils.waitUntilTrue(() => servers.forall(server =>
-      expectedBrokerIds == server.apis.metadataCache.getAliveBrokers.map(_.id).toSet
+      expectedBrokerIds == server.dataPlaneRequestProcessor.metadataCache.getAliveBrokers.map(_.id).toSet
     ), "Timed out waiting for broker metadata to propagate to all servers", timeout)
   }
 
@@ -855,7 +855,7 @@ object TestUtils extends Logging {
     TestUtils.waitUntilTrue(() =>
       servers.foldLeft(true) {
         (result, server) =>
-          val partitionStateOpt = server.apis.metadataCache.getPartitionInfo(topic, partition)
+          val partitionStateOpt = server.dataPlaneRequestProcessor.metadataCache.getPartitionInfo(topic, partition)
           partitionStateOpt match {
             case None => false
             case Some(partitionState) =>
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index b15f8ac..c120caa 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -313,8 +313,8 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     // Test that the existing clientId overrides are read
     val server = TestUtils.createServer(KafkaConfig.fromProps(TestUtils.createBrokerConfig(0, zkConnect)))
     servers = Seq(server)
-    assertEquals(new Quota(1000, true), server.apis.quotas.produce.quota("ANONYMOUS", clientId))
-    assertEquals(new Quota(2000, true), server.apis.quotas.fetch.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(1000, true), server.dataPlaneRequestProcessor.quotas.produce.quota("ANONYMOUS", clientId))
+    assertEquals(new Quota(2000, true), server.dataPlaneRequestProcessor.quotas.fetch.quota("ANONYMOUS", clientId))
   }
 
   @Test
diff --git a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
index 1097285..a99fd5a 100644
--- a/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/streams/integration/utils/IntegrationTestUtils.java
@@ -517,7 +517,7 @@ public class IntegrationTestUtils {
                                                      final long timeout) throws InterruptedException {
         TestUtils.waitForCondition(() -> {
             for (final KafkaServer server : servers) {
-                final MetadataCache metadataCache = server.apis().metadataCache();
+                final MetadataCache metadataCache = server.dataPlaneRequestProcessor().metadataCache();
                 final Option<UpdateMetadataRequest.PartitionState> partitionInfo =
                         metadataCache.getPartitionInfo(topic, partition);
                 if (partitionInfo.isEmpty()) {


Mime
View raw message