kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [2/2] kafka git commit: KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests
Date Wed, 26 Jul 2017 06:20:08 GMT
KAFKA-4602; KIP-72 - Allow putting a bound on memory consumed by Incoming requests

this is the initial implementation.

Author: radai-rosenblatt <radai.rosenblatt@gmail.com>

Reviewers: Ewen Cheslack-Postava <me@ewencp.org>, Ismael Juma <ismael@juma.me.uk>, Rajini Sivaram <rajinisivaram@googlemail.com>, Jun Rao <junrao@gmail.com>

Closes #2330 from radai-rosenblatt/broker-memory-pool-with-muting


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/47ee8e95
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/47ee8e95
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/47ee8e95

Branch: refs/heads/trunk
Commit: 47ee8e954df62b9a79099e944ec4be29afe046f6
Parents: f15cdbc
Author: radai-rosenblatt <radai.rosenblatt@gmail.com>
Authored: Wed Jul 26 08:19:56 2017 +0200
Committer: Jun Rao <junrao@gmail.com>
Committed: Wed Jul 26 08:19:56 2017 +0200

----------------------------------------------------------------------
 checkstyle/import-control.xml                   |   5 +
 .../memory/GarbageCollectedMemoryPool.java      | 168 +++++++++++++++++++
 .../apache/kafka/common/memory/MemoryPool.java  |  95 +++++++++++
 .../kafka/common/memory/SimpleMemoryPool.java   | 137 +++++++++++++++
 .../kafka/common/network/ChannelBuilder.java    |   7 +-
 .../kafka/common/network/KafkaChannel.java      |  55 +++++-
 .../kafka/common/network/NetworkReceive.java    |  50 +++++-
 .../common/network/PlaintextChannelBuilder.java |   6 +-
 .../common/network/PlaintextTransportLayer.java |   5 +
 .../apache/kafka/common/network/Receive.java    |  12 +-
 .../common/network/SaslChannelBuilder.java      |   6 +-
 .../apache/kafka/common/network/Selector.java   | 131 +++++++++++++--
 .../kafka/common/network/SslChannelBuilder.java |   6 +-
 .../kafka/common/network/SslTransportLayer.java |   5 +
 .../kafka/common/network/TransportLayer.java    |   6 +-
 .../kafka/common/protocol/ProtoUtils.java       |  47 ++++++
 .../apache/kafka/common/protocol/Protocol.java  |  47 ++++++
 .../kafka/common/protocol/SchemaVisitor.java    |  27 +++
 .../common/protocol/SchemaVisitorAdapter.java   |  38 +++++
 .../kafka/common/protocol/types/Field.java      |   5 +
 .../kafka/common/protocol/types/Schema.java     |   4 +-
 .../org/apache/kafka/common/utils/Utils.java    |  33 +++-
 .../memory/GarbageCollectedMemoryPoolTest.java  | 163 ++++++++++++++++++
 .../kafka/common/network/NioEchoServer.java     |   5 +-
 .../kafka/common/network/PlaintextSender.java   |  44 +++++
 .../kafka/common/network/SelectorTest.java      |  92 +++++++++-
 .../kafka/common/network/SslSelectorTest.java   |  95 ++++++++++-
 .../apache/kafka/common/network/SslSender.java  |  83 +++++++++
 .../common/network/SslTransportLayerTest.java   |   3 +-
 .../kafka/common/protocol/ProtoUtilsTest.java   |  35 ++++
 .../apache/kafka/common/utils/UtilsTest.java    |  12 ++
 .../scala/kafka/network/RequestChannel.scala    |  25 ++-
 .../main/scala/kafka/network/SocketServer.scala |  33 +++-
 .../main/scala/kafka/server/KafkaConfig.scala   |   7 +
 .../kafka/server/KafkaRequestHandler.scala      |   7 +-
 .../unit/kafka/network/SocketServerTest.scala   |   5 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala |   3 +-
 .../unit/kafka/server/KafkaConfigTest.scala     |   1 +
 38 files changed, 1446 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/checkstyle/import-control.xml
----------------------------------------------------------------------
diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml
index 1579a1c..4bd907b 100644
--- a/checkstyle/import-control.xml
+++ b/checkstyle/import-control.xml
@@ -43,6 +43,7 @@
   <allow pkg="org.apache.kafka.common.security" />
   <allow pkg="org.apache.kafka.common.utils" />
   <allow pkg="org.apache.kafka.common.errors" exact-match="true" />
+  <allow pkg="org.apache.kafka.common.memory" />
 
   <subpackage name="common">
     <disallow pkg="org.apache.kafka.clients" />
@@ -67,6 +68,10 @@
       <allow pkg="org.apache.kafka.common.metrics" />
     </subpackage>
 
+    <subpackage name="memory">
+      <allow pkg="org.apache.kafka.common.metrics" />
+    </subpackage>
+
     <subpackage name="network">
       <allow pkg="org.apache.kafka.common.security.auth" />
       <allow pkg="org.apache.kafka.common.protocol" />

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
new file mode 100644
index 0000000..041d1c2
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPool.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+
+import java.lang.ref.ReferenceQueue;
+import java.lang.ref.WeakReference;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+
+/**
+ * An extension of SimpleMemoryPool that tracks allocated buffers and logs an error when they "leak"
+ * (when they are garbage-collected without having been release()ed).
+ * THIS IMPLEMENTATION IS A DEVELOPMENT/DEBUGGING AID AND IS NOT MEANT PRO PRODUCTION USE.
+ */
+public class GarbageCollectedMemoryPool extends SimpleMemoryPool implements AutoCloseable {
+
+    private final ReferenceQueue<ByteBuffer> garbageCollectedBuffers = new ReferenceQueue<>();
+    //serves 2 purposes - 1st it maintains the ref objects reachable (which is a requirement for them
+    //to ever be enqueued), 2nd keeps some (small) metadata for every buffer allocated
+    private final Map<BufferReference, BufferMetadata> buffersInFlight = new ConcurrentHashMap<>();
+    private final GarbageCollectionListener gcListener = new GarbageCollectionListener();
+    private final Thread gcListenerThread;
+    private volatile boolean alive = true;
+
+    public GarbageCollectedMemoryPool(long sizeBytes, int maxSingleAllocationSize, boolean strict, Sensor oomPeriodSensor) {
+        super(sizeBytes, maxSingleAllocationSize, strict, oomPeriodSensor);
+        this.alive = true;
+        this.gcListenerThread = new Thread(gcListener, "memory pool GC listener");
+        this.gcListenerThread.setDaemon(true); //so we dont need to worry about shutdown
+        this.gcListenerThread.start();
+    }
+
+    @Override
+    protected void bufferToBeReturned(ByteBuffer justAllocated) {
+        BufferReference ref = new BufferReference(justAllocated, garbageCollectedBuffers);
+        BufferMetadata metadata = new BufferMetadata(justAllocated.capacity());
+        if (buffersInFlight.put(ref, metadata) != null)
+            //this is a bug. it means either 2 different co-existing buffers got
+            //the same identity or we failed to register a released/GC'ed buffer
+            throw new IllegalStateException("allocated buffer identity " + ref.hashCode + " already registered as in use?!");
+
+        log.trace("allocated buffer of size {} and identity {}", sizeBytes, ref.hashCode);
+    }
+
+    @Override
+    protected void bufferToBeReleased(ByteBuffer justReleased) {
+        BufferReference ref = new BufferReference(justReleased); //used ro lookup only
+        BufferMetadata metadata = buffersInFlight.remove(ref);
+        if (metadata == null)
+            //its impossible for the buffer to have already been GC'ed (because we have a hard ref to it
+            //in the function arg) so this means either a double free or not our buffer.
+            throw new IllegalArgumentException("returned buffer " + ref.hashCode + " was never allocated by this pool");
+        if (metadata.sizeBytes != justReleased.capacity()) {
+            //this is a bug
+            throw new IllegalStateException("buffer " + ref.hashCode + " has capacity " + justReleased.capacity() + " but recorded as " + metadata.sizeBytes);
+        }
+        log.trace("released buffer of size {} and identity {}", metadata.sizeBytes, ref.hashCode);
+    }
+
+    @Override
+    public void close() throws Exception {
+        alive = false;
+        gcListenerThread.interrupt();
+    }
+
+    private class GarbageCollectionListener implements Runnable {
+        @Override
+        public void run() {
+            while (alive) {
+                try {
+                    BufferReference ref = (BufferReference) garbageCollectedBuffers.remove(); //blocks
+                    ref.clear();
+                    //this cannot race with a release() call because an object is either reachable or not,
+                    //release() can only happen before its GC'ed, and enqueue can only happen after.
+                    //if the ref was enqueued it must then not have been released
+                    BufferMetadata metadata = buffersInFlight.remove(ref);
+
+                    if (metadata == null) {
+                        //it can happen rarely that the buffer was release()ed properly (so no metadata) and yet
+                        //the reference object to it remains reachable for a short period of time after release()
+                        //and hence gets enqueued. this is because we keep refs in a ConcurrentHashMap which cleans
+                        //up keys lazily.
+                        continue;
+                    }
+
+                    availableMemory.addAndGet(metadata.sizeBytes);
+                    log.error("Reclaimed buffer of size {} and identity {} that was not properly release()ed. This is a bug.", metadata.sizeBytes, ref.hashCode);
+                } catch (InterruptedException e) {
+                    log.debug("interrupted", e);
+                    //ignore, we're a daemon thread
+                }
+            }
+            log.info("GC listener shutting down");
+        }
+    }
+
+    private static final class BufferMetadata {
+        private final int sizeBytes;
+
+        private BufferMetadata(int sizeBytes) {
+            this.sizeBytes = sizeBytes;
+        }
+    }
+
+    private static final class BufferReference extends WeakReference<ByteBuffer> {
+        private final int hashCode;
+
+        private BufferReference(ByteBuffer referent) { //used for lookup purposes only - no queue required.
+            this(referent, null);
+        }
+
+        private BufferReference(ByteBuffer referent, ReferenceQueue<? super ByteBuffer> q) {
+            super(referent, q);
+            hashCode = System.identityHashCode(referent);
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) { //this is important to find leaked buffers (by ref identity)
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            BufferReference that = (BufferReference) o;
+            if (hashCode != that.hashCode) {
+                return false;
+            }
+            ByteBuffer thisBuf = get();
+            if (thisBuf == null) {
+                //our buffer has already been GC'ed, yet "that" is not us. so not same buffer
+                return false;
+            }
+            ByteBuffer thatBuf = that.get();
+            return thisBuf == thatBuf;
+        }
+
+        @Override
+        public int hashCode() {
+            return hashCode;
+        }
+    }
+
+    @Override
+    public String toString() {
+        long allocated = sizeBytes - availableMemory.get();
+        return "GarbageCollectedMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used in " + buffersInFlight.size() + " buffers}";
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
new file mode 100644
index 0000000..5887816
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/MemoryPool.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+
+
+/**
+ * A common memory pool interface for non-blocking pools.
+ * Every buffer returned from {@link #tryAllocate(int)} must always be {@link #release(ByteBuffer) released}.
+ */
+public interface MemoryPool {
+    MemoryPool NONE = new MemoryPool() {
+        @Override
+        public ByteBuffer tryAllocate(int sizeBytes) {
+            return ByteBuffer.allocate(sizeBytes);
+        }
+
+        @Override
+        public void release(ByteBuffer previouslyAllocated) {
+            //nop
+        }
+
+        @Override
+        public long size() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public long availableMemory() {
+            return Long.MAX_VALUE;
+        }
+
+        @Override
+        public boolean isOutOfMemory() {
+            return false;
+        }
+
+        @Override
+        public String toString() {
+            return "NONE";
+        }
+    };
+
+    /**
+     * Tries to acquire a ByteBuffer of the specified size
+     * @param sizeBytes size required
+     * @return a ByteBuffer (which later needs to be release()ed), or null if no memory available.
+     *         the buffer will be of the exact size requested, even if backed by a larger chunk of memory
+     */
+    ByteBuffer tryAllocate(int sizeBytes);
+
+    /**
+     * Returns a previously allocated buffer to the pool.
+     * @param previouslyAllocated a buffer previously returned from tryAllocate()
+     */
+    void release(ByteBuffer previouslyAllocated);
+
+    /**
+     * Returns the total size of this pool
+     * @return total size, in bytes
+     */
+    long size();
+
+    /**
+     * Returns the amount of memory available for allocation by this pool.
+     * NOTE: result may be negative (pools may over allocate to avoid starvation issues)
+     * @return bytes available
+     */
+    long availableMemory();
+
+    /**
+     * Returns true if the pool cannot currently allocate any more buffers
+     * - meaning total outstanding buffers meets or exceeds pool size and
+     * some would need to be released before further allocations are possible.
+     *
+     * This is equivalent to availableMemory() <= 0
+     * @return true if out of memory
+     */
+    boolean isOutOfMemory();
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
new file mode 100644
index 0000000..f1ab8f7
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/memory/SimpleMemoryPool.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * a simple pool implementation. this implementation just provides a limit on the total outstanding memory.
+ * any buffer allocated must be release()ed always otherwise memory is not marked as reclaimed (and "leak"s)
+ */
+public class SimpleMemoryPool implements MemoryPool {
+    protected final Logger log = LoggerFactory.getLogger(getClass()); //subclass-friendly
+
+    protected final long sizeBytes;
+    protected final boolean strict;
+    protected final AtomicLong availableMemory;
+    protected final int maxSingleAllocationSize;
+    protected final AtomicLong startOfNoMemPeriod = new AtomicLong(); //nanoseconds
+    protected volatile Sensor oomTimeSensor;
+
+    public SimpleMemoryPool(long sizeInBytes, int maxSingleAllocationBytes, boolean strict, Sensor oomPeriodSensor) {
+        if (sizeInBytes <= 0 || maxSingleAllocationBytes <= 0 || maxSingleAllocationBytes > sizeInBytes)
+            throw new IllegalArgumentException("must provide a positive size and max single allocation size smaller than size."
+                + "provided " + sizeInBytes + " and " + maxSingleAllocationBytes + " respectively");
+        this.sizeBytes = sizeInBytes;
+        this.strict = strict;
+        this.availableMemory = new AtomicLong(sizeInBytes);
+        this.maxSingleAllocationSize = maxSingleAllocationBytes;
+        this.oomTimeSensor = oomPeriodSensor;
+    }
+
+    @Override
+    public ByteBuffer tryAllocate(int sizeBytes) {
+        if (sizeBytes < 1)
+            throw new IllegalArgumentException("requested size " + sizeBytes + "<=0");
+        if (sizeBytes > maxSingleAllocationSize)
+            throw new IllegalArgumentException("requested size " + sizeBytes + " is larger than maxSingleAllocationSize " + maxSingleAllocationSize);
+
+        long available;
+        boolean success = false;
+        //in strict mode we will only allocate memory if we have at least the size required.
+        //in non-strict mode we will allocate memory if we have _any_ memory available (so available memory
+        //can dip into the negative and max allocated memory would be sizeBytes + maxSingleAllocationSize)
+        long threshold = strict ? sizeBytes : 1;
+        while ((available = availableMemory.get()) >= threshold) {
+            success = availableMemory.compareAndSet(available, available - sizeBytes);
+            if (success)
+                break;
+        }
+
+        if (success) {
+            maybeRecordEndOfDrySpell();
+        } else {
+            if (oomTimeSensor != null) {
+                startOfNoMemPeriod.compareAndSet(0, System.nanoTime());
+            }
+            log.trace("refused to allocate buffer of size {}", sizeBytes);
+            return null;
+        }
+
+        ByteBuffer allocated = ByteBuffer.allocate(sizeBytes);
+        bufferToBeReturned(allocated);
+        return allocated;
+    }
+
+    @Override
+    public void release(ByteBuffer previouslyAllocated) {
+        if (previouslyAllocated == null)
+            throw new IllegalArgumentException("provided null buffer");
+
+        bufferToBeReleased(previouslyAllocated);
+        availableMemory.addAndGet(previouslyAllocated.capacity());
+        maybeRecordEndOfDrySpell();
+    }
+
+    @Override
+    public long size() {
+        return sizeBytes;
+    }
+
+    @Override
+    public long availableMemory() {
+        return availableMemory.get();
+    }
+
+    @Override
+    public boolean isOutOfMemory() {
+        return availableMemory.get() <= 0;
+    }
+
+    //allows subclasses to do their own bookkeeping (and validation) _before_ memory is returned to client code.
+    protected void bufferToBeReturned(ByteBuffer justAllocated) {
+        log.trace("allocated buffer of size {} ", justAllocated.capacity());
+    }
+
+    //allows subclasses to do their own bookkeeping (and validation) _before_ memory is marked as reclaimed.
+    protected void bufferToBeReleased(ByteBuffer justReleased) {
+        log.trace("released buffer of size {}", justReleased.capacity());
+    }
+
+    @Override
+    public String toString() {
+        long allocated = sizeBytes - availableMemory.get();
+        return "SimpleMemoryPool{" + Utils.formatBytes(allocated) + "/" + Utils.formatBytes(sizeBytes) + " used}";
+    }
+
+    protected void maybeRecordEndOfDrySpell() {
+        if (oomTimeSensor != null) {
+            long startOfDrySpell = startOfNoMemPeriod.getAndSet(0);
+            if (startOfDrySpell != 0) {
+                //how long were we refusing allocation requests for
+                oomTimeSensor.record((System.nanoTime() - startOfDrySpell) / 1000000.0); //fractional (double) millis
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
index 0e5ca78..6c8890a 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/ChannelBuilder.java
@@ -20,6 +20,8 @@ import java.util.Map;
 import java.nio.channels.SelectionKey;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
+
 
 /**
  * A ChannelBuilder interface to build Channel based on configs
@@ -36,10 +38,11 @@ public interface ChannelBuilder extends AutoCloseable {
      * returns a Channel with TransportLayer and Authenticator configured.
      * @param  id  channel id
      * @param  key SelectionKey
-     * @param  maxReceiveSize
+     * @param  maxReceiveSize max size of a single receive buffer to allocate
+     * @param  memoryPool memory pool from which to allocate buffers, or null for none
      * @return KafkaChannel
      */
-    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException;
+    KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException;
 
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
index b563c4a..1e76c43 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/KafkaChannel.java
@@ -25,6 +25,8 @@ import java.nio.channels.SelectionKey;
 
 import java.security.Principal;
 
+import java.util.Objects;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.utils.Utils;
 
 public class KafkaChannel {
@@ -35,6 +37,7 @@ public class KafkaChannel {
     // The values are read and reset after each response is sent.
     private long networkThreadTimeNanos;
     private final int maxReceiveSize;
+    private final MemoryPool memoryPool;
     private NetworkReceive receive;
     private Send send;
     // Track connection and mute state of channels to enable outstanding requests on channels to be
@@ -43,12 +46,13 @@ public class KafkaChannel {
     private boolean muted;
     private ChannelState state;
 
-    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize) throws IOException {
+    public KafkaChannel(String id, TransportLayer transportLayer, Authenticator authenticator, int maxReceiveSize, MemoryPool memoryPool) throws IOException {
         this.id = id;
         this.transportLayer = transportLayer;
         this.authenticator = authenticator;
         this.networkThreadTimeNanos = 0L;
         this.maxReceiveSize = maxReceiveSize;
+        this.memoryPool = memoryPool;
         this.disconnected = false;
         this.muted = false;
         this.state = ChannelState.NOT_CONNECTED;
@@ -56,7 +60,7 @@ public class KafkaChannel {
 
     public void close() throws IOException {
         this.disconnected = true;
-        Utils.closeAll(transportLayer, authenticator);
+        Utils.closeAll(transportLayer, authenticator, receive);
     }
 
     /**
@@ -106,13 +110,16 @@ public class KafkaChannel {
         return id;
     }
 
-    public void mute() {
+    /**
+     * externally muting a channel should be done via selector to ensure proper state handling
+     */
+    void mute() {
         if (!disconnected)
             transportLayer.removeInterestOps(SelectionKey.OP_READ);
         muted = true;
     }
 
-    public void unmute() {
+    void unmute() {
         if (!disconnected)
             transportLayer.addInterestOps(SelectionKey.OP_READ);
         muted = false;
@@ -125,6 +132,17 @@ public class KafkaChannel {
         return muted;
     }
 
+    public boolean isInMutableState() {
+        //some requests do not require memory, so if we do not know what the current (or future) request is
+        //(receive == null) we dont mute. we also dont mute if whatever memory required has already been
+        //successfully allocated (if none is required for the currently-being-read request
+        //receive.memoryAllocated() is expected to return true)
+        if (receive == null || receive.memoryAllocated())
+            return false;
+        //also cannot mute if underlying transport is not in the ready state
+        return transportLayer.ready();
+    }
+
     public boolean ready() {
         return transportLayer.ready() && authenticator.complete();
     }
@@ -161,7 +179,7 @@ public class KafkaChannel {
         NetworkReceive result = null;
 
         if (receive == null) {
-            receive = new NetworkReceive(maxReceiveSize, id);
+            receive = new NetworkReceive(maxReceiveSize, id, memoryPool);
         }
 
         receive(receive);
@@ -169,6 +187,9 @@ public class KafkaChannel {
             receive.payload().rewind();
             result = receive;
             receive = null;
+        } else if (receive.requiredMemoryAmountKnown() && !receive.memoryAllocated() && isInMutableState()) {
+            //pool must be out of memory, mute ourselves.
+            mute();
         }
         return result;
     }
@@ -210,4 +231,28 @@ public class KafkaChannel {
 
         return send.completed();
     }
+
+    /**
+     * @return true if underlying transport has bytes remaining to be read from any underlying intermediate buffers.
+     */
+    public boolean hasBytesBuffered() {
+        return transportLayer.hasBytesBuffered();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaChannel that = (KafkaChannel) o;
+        return Objects.equals(id, that.id);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(id);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
index 582f064..564fbcd 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/NetworkReceive.java
@@ -21,6 +21,9 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.ReadableByteChannel;
 import java.nio.channels.ScatteringByteChannel;
+import org.apache.kafka.common.memory.MemoryPool;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A size delimited Receive that consists of a 4 byte network-ordered size N followed by N bytes of content
@@ -29,10 +32,14 @@ public class NetworkReceive implements Receive {
 
     public final static String UNKNOWN_SOURCE = "";
     public final static int UNLIMITED = -1;
+    private static final Logger log = LoggerFactory.getLogger(NetworkReceive.class);
+    private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0);
 
     private final String source;
     private final ByteBuffer size;
     private final int maxSize;
+    private final MemoryPool memoryPool;
+    private int requestedBufferSize = -1;
     private ByteBuffer buffer;
 
 
@@ -41,6 +48,7 @@ public class NetworkReceive implements Receive {
         this.buffer = buffer;
         this.size = null;
         this.maxSize = UNLIMITED;
+        this.memoryPool = MemoryPool.NONE;
     }
 
     public NetworkReceive(String source) {
@@ -48,6 +56,7 @@ public class NetworkReceive implements Receive {
         this.size = ByteBuffer.allocate(4);
         this.buffer = null;
         this.maxSize = UNLIMITED;
+        this.memoryPool = MemoryPool.NONE;
     }
 
     public NetworkReceive(int maxSize, String source) {
@@ -55,6 +64,15 @@ public class NetworkReceive implements Receive {
         this.size = ByteBuffer.allocate(4);
         this.buffer = null;
         this.maxSize = maxSize;
+        this.memoryPool = MemoryPool.NONE;
+    }
+
+    public NetworkReceive(int maxSize, String source, MemoryPool memoryPool) {
+        this.source = source;
+        this.size = ByteBuffer.allocate(4);
+        this.buffer = null;
+        this.maxSize = maxSize;
+        this.memoryPool = memoryPool;
     }
 
     public NetworkReceive() {
@@ -68,13 +86,32 @@ public class NetworkReceive implements Receive {
 
     @Override
     public boolean complete() {
-        return !size.hasRemaining() && !buffer.hasRemaining();
+        return !size.hasRemaining() && buffer != null && !buffer.hasRemaining();
     }
 
     public long readFrom(ScatteringByteChannel channel) throws IOException {
         return readFromReadableChannel(channel);
     }
 
+    @Override
+    public boolean requiredMemoryAmountKnown() {
+        return requestedBufferSize != -1;
+    }
+
+    @Override
+    public boolean memoryAllocated() {
+        return buffer != null;
+    }
+
+
+    @Override
+    public void close() throws IOException {
+        if (buffer != null && buffer != EMPTY_BUFFER) {
+            memoryPool.release(buffer);
+            buffer = null;
+        }
+    }
+
     // Need a method to read from ReadableByteChannel because BlockingChannel requires read with timeout
     // See: http://stackoverflow.com/questions/2866557/timeout-for-socketchannel-doesnt-work
     // This can go away after we get rid of BlockingChannel
@@ -93,10 +130,17 @@ public class NetworkReceive implements Receive {
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + ")");
                 if (maxSize != UNLIMITED && receiveSize > maxSize)
                     throw new InvalidReceiveException("Invalid receive (size = " + receiveSize + " larger than " + maxSize + ")");
-
-                this.buffer = ByteBuffer.allocate(receiveSize);
+                requestedBufferSize = receiveSize; //may be 0 for some payloads (SASL)
+                if (receiveSize == 0) {
+                    buffer = EMPTY_BUFFER;
+                }
             }
         }
+        if (buffer == null && requestedBufferSize != -1) { //we know the size we want but havent been able to allocate it yet
+            buffer = memoryPool.tryAllocate(requestedBufferSize);
+            if (buffer == null)
+                log.trace("Broker low on memory - could not allocate buffer of size {} for source {}", requestedBufferSize, source);
+        }
         if (buffer != null) {
             int bytesRead = channel.read(buffer);
             if (bytesRead < 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
index fb4f6ba..ad63671 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextChannelBuilder.java
@@ -19,6 +19,7 @@ package org.apache.kafka.common.network;
 import java.nio.channels.SelectionKey;
 import java.util.Map;
 
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.KafkaException;
 
@@ -39,12 +40,13 @@ public class PlaintextChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             PlaintextTransportLayer transportLayer = new PlaintextTransportLayer(key);
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.warn("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
index 871b7ac..11c9565 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/PlaintextTransportLayer.java
@@ -214,6 +214,11 @@ public class PlaintextTransportLayer implements TransportLayer {
     }
 
     @Override
+    public boolean hasBytesBuffered() {
+        return false;
+    }
+
+    @Override
     public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
         return fileChannel.transferTo(position, count, socketChannel);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Receive.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Receive.java b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
index b7bbdb4..3bc2761 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Receive.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Receive.java
@@ -16,13 +16,14 @@
  */
 package org.apache.kafka.common.network;
 
+import java.io.Closeable;
 import java.io.IOException;
 import java.nio.channels.ScatteringByteChannel;
 
 /**
  * This interface models the in-progress reading of data from a channel to a source identified by an integer id
  */
-public interface Receive {
+public interface Receive extends Closeable {
 
     /**
      * The numeric id of the source from which we are receiving data.
@@ -42,4 +43,13 @@ public interface Receive {
      */
     long readFrom(ScatteringByteChannel channel) throws IOException;
 
+    /**
+     * Do we know yet how much memory we require to fully read this
+     */
+    boolean requiredMemoryAmountKnown();
+
+    /**
+     * Has the underlying memory required to complete reading been allocated yet?
+     */
+    boolean memoryAllocated();
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
index 33a9f4d..342592b 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SaslChannelBuilder.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.kafka.common.config.SaslConfigs;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.JaasContext;
 import org.apache.kafka.common.security.kerberos.KerberosShortNamer;
 import org.apache.kafka.common.security.authenticator.CredentialCache;
@@ -100,7 +101,8 @@ public class SaslChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SocketChannel socketChannel = (SocketChannel) key.channel();
             TransportLayer transportLayer = buildTransportLayer(id, key, socketChannel);
@@ -114,7 +116,7 @@ public class SaslChannelBuilder implements ChannelBuilder {
                         socketChannel.socket().getInetAddress().getHostName(), clientSaslMechanism, handshakeRequestEnable);
             // Both authenticators don't use `PrincipalBuilder`, so we pass `null` for now. Reconsider if this changes.
             authenticator.configure(transportLayer, null, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/Selector.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/Selector.java b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
index da3de80..38226f9 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/Selector.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/Selector.java
@@ -26,6 +26,7 @@ import java.nio.channels.SocketChannel;
 import java.nio.channels.UnresolvedAddressException;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Deque;
 import java.util.HashMap;
@@ -38,6 +39,7 @@ import java.util.Set;
 import java.util.concurrent.TimeUnit;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.metrics.Measurable;
 import org.apache.kafka.common.metrics.MetricConfig;
 import org.apache.kafka.common.MetricName;
@@ -87,11 +89,14 @@ public class Selector implements Selectable, AutoCloseable {
 
     private final java.nio.channels.Selector nioSelector;
     private final Map<String, KafkaChannel> channels;
+    private final Set<KafkaChannel> explicitlyMutedChannels;
+    private boolean outOfMemory;
     private final List<Send> completedSends;
     private final List<NetworkReceive> completedReceives;
     private final Map<KafkaChannel, Deque<NetworkReceive>> stagedReceives;
     private final Set<SelectionKey> immediatelyConnectedKeys;
     private final Map<String, KafkaChannel> closingChannels;
+    private Set<SelectionKey> keysWithBufferedRead;
     private final Map<String, ChannelState> disconnected;
     private final List<String> connected;
     private final List<String> failedSends;
@@ -101,6 +106,11 @@ public class Selector implements Selectable, AutoCloseable {
     private final int maxReceiveSize;
     private final boolean recordTimePerConnection;
     private final IdleExpiryManager idleExpiryManager;
+    private final MemoryPool memoryPool;
+    private final long lowMemThreshold;
+    //indicates if the previous call to poll was able to make progress in reading already-buffered data.
+    //this is used to prevent tight loops when memory is not available to read any more data
+    private boolean madeReadProgressLastPoll = true;
 
     /**
      * Create a new nioSelector
@@ -122,7 +132,8 @@ public class Selector implements Selectable, AutoCloseable {
                     Map<String, String> metricTags,
                     boolean metricsPerConnection,
                     boolean recordTimePerConnection,
-                    ChannelBuilder channelBuilder) {
+                    ChannelBuilder channelBuilder,
+                    MemoryPool memoryPool) {
         try {
             this.nioSelector = java.nio.channels.Selector.open();
         } catch (IOException e) {
@@ -131,11 +142,14 @@ public class Selector implements Selectable, AutoCloseable {
         this.maxReceiveSize = maxReceiveSize;
         this.time = time;
         this.channels = new HashMap<>();
+        this.explicitlyMutedChannels = new HashSet<>();
+        this.outOfMemory = false;
         this.completedSends = new ArrayList<>();
         this.completedReceives = new ArrayList<>();
         this.stagedReceives = new HashMap<>();
         this.immediatelyConnectedKeys = new HashSet<>();
         this.closingChannels = new HashMap<>();
+        this.keysWithBufferedRead = new HashSet<>();
         this.connected = new ArrayList<>();
         this.disconnected = new HashMap<>();
         this.failedSends = new ArrayList<>();
@@ -143,6 +157,8 @@ public class Selector implements Selectable, AutoCloseable {
         this.channelBuilder = channelBuilder;
         this.recordTimePerConnection = recordTimePerConnection;
         this.idleExpiryManager = connectionMaxIdleMs < 0 ? null : new IdleExpiryManager(time, connectionMaxIdleMs);
+        this.memoryPool = memoryPool;
+        this.lowMemThreshold = (long) (0.1 * this.memoryPool.size());
     }
 
     public Selector(int maxReceiveSize,
@@ -153,7 +169,7 @@ public class Selector implements Selectable, AutoCloseable {
             Map<String, String> metricTags,
             boolean metricsPerConnection,
             ChannelBuilder channelBuilder) {
-        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder);
+        this(maxReceiveSize, connectionMaxIdleMs, metrics, time, metricGrpPrefix, metricTags, metricsPerConnection, false, channelBuilder, MemoryPool.NONE);
     }
 
     public Selector(long connectionMaxIdleMS, Metrics metrics, Time time, String metricGrpPrefix, ChannelBuilder channelBuilder) {
@@ -200,7 +216,7 @@ public class Selector implements Selectable, AutoCloseable {
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_CONNECT);
         KafkaChannel channel;
         try {
-            channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+            channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
         } catch (Exception e) {
             try {
                 socketChannel.close();
@@ -227,7 +243,7 @@ public class Selector implements Selectable, AutoCloseable {
      */
     public void register(String id, SocketChannel socketChannel) throws ClosedChannelException {
         SelectionKey key = socketChannel.register(nioSelector, SelectionKey.OP_READ);
-        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize);
+        KafkaChannel channel = channelBuilder.buildChannel(id, key, maxReceiveSize, memoryPool);
         key.attach(channel);
         this.channels.put(id, channel);
     }
@@ -311,20 +327,46 @@ public class Selector implements Selectable, AutoCloseable {
         if (timeout < 0)
             throw new IllegalArgumentException("timeout should be >= 0");
 
+        boolean madeReadProgressLastCall = madeReadProgressLastPoll;
         clear();
 
-        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty())
+        boolean dataInBuffers = !keysWithBufferedRead.isEmpty();
+
+        if (hasStagedReceives() || !immediatelyConnectedKeys.isEmpty() || (madeReadProgressLastCall && dataInBuffers))
             timeout = 0;
 
+        if (!memoryPool.isOutOfMemory() && outOfMemory) {
+            //we have recovered from memory pressure. unmute any channel not explicitly muted for other reasons
+            log.trace("Broker no longer low on memory - unmuting incoming sockets");
+            for (KafkaChannel channel : channels.values()) {
+                if (channel.isInMutableState() && !explicitlyMutedChannels.contains(channel)) {
+                    channel.unmute();
+                }
+            }
+            outOfMemory = false;
+        }
+
         /* check ready keys */
         long startSelect = time.nanoseconds();
-        int readyKeys = select(timeout);
+        int numReadyKeys = select(timeout);
         long endSelect = time.nanoseconds();
         this.sensors.selectTime.record(endSelect - startSelect, time.milliseconds());
 
-        if (readyKeys > 0 || !immediatelyConnectedKeys.isEmpty()) {
-            pollSelectionKeys(this.nioSelector.selectedKeys(), false, endSelect);
+        if (numReadyKeys > 0 || !immediatelyConnectedKeys.isEmpty() || dataInBuffers) {
+            Set<SelectionKey> readyKeys = this.nioSelector.selectedKeys();
+            keysWithBufferedRead.removeAll(readyKeys); //so no channel gets polled twice
+
+            //poll from channels that have buffered data (but nothing more from the underlying socket)
+            if (!keysWithBufferedRead.isEmpty()) {
+                Set<SelectionKey> toPoll = keysWithBufferedRead;
+                keysWithBufferedRead = new HashSet<>(); //poll() calls will repopulate if needed
+                pollSelectionKeys(toPoll, false, endSelect);
+            }
+            //poll from channels where the underlying socket has more data
+            pollSelectionKeys(readyKeys, false, endSelect);
             pollSelectionKeys(immediatelyConnectedKeys, true, endSelect);
+        } else {
+            madeReadProgressLastPoll = true; //no work is also "progress"
         }
 
         long endIo = time.nanoseconds();
@@ -339,10 +381,16 @@ public class Selector implements Selectable, AutoCloseable {
         addToCompletedReceives();
     }
 
-    private void pollSelectionKeys(Iterable<SelectionKey> selectionKeys,
+    /**
+     * handle any ready I/O on a set of selection keys
+     * @param selectionKeys set of keys to handle
+     * @param isImmediatelyConnected true if running over a set of keys for just-connected sockets
+     * @param currentTimeNanos time at which set of keys was determined
+     */
+    private void pollSelectionKeys(Set<SelectionKey> selectionKeys,
                                    boolean isImmediatelyConnected,
                                    long currentTimeNanos) {
-        Iterator<SelectionKey> iterator = selectionKeys.iterator();
+        Iterator<SelectionKey> iterator = determineHandlingOrder(selectionKeys).iterator();
         while (iterator.hasNext()) {
             SelectionKey key = iterator.next();
             iterator.remove();
@@ -372,14 +420,18 @@ public class Selector implements Selectable, AutoCloseable {
                 }
 
                 /* if channel is not ready finish prepare */
-                if (channel.isConnected() && !channel.ready())
+                if (channel.isConnected() && !channel.ready()) {
                     channel.prepare();
+                }
+
+                attemptRead(key, channel);
 
-                /* if channel is ready read from any connections that have readable data */
-                if (channel.ready() && key.isReadable() && !hasStagedReceive(channel)) {
-                    NetworkReceive networkReceive;
-                    while ((networkReceive = channel.read()) != null)
-                        addToStagedReceives(channel, networkReceive);
+                if (channel.hasBytesBuffered()) {
+                    //this channel has bytes enqueued in intermediary buffers that we could not read
+                    //(possibly because no memory). it may be the case that the underlying socket will
+                    //not come up in the next poll() and so we need to remember this channel for the
+                    //next poll call otherwise data may be stuck in said buffers forever.
+                    keysWithBufferedRead.add(key);
                 }
 
                 /* if channel is ready write to any sockets that have space in their buffer and for which we have data */
@@ -408,6 +460,39 @@ public class Selector implements Selectable, AutoCloseable {
         }
     }
 
+    private Collection<SelectionKey> determineHandlingOrder(Set<SelectionKey> selectionKeys) {
+        //it is possible that the iteration order over selectionKeys is the same every invocation.
+        //this may cause starvation of reads when memory is low. to address this we shuffle the keys if memory is low.
+        Collection<SelectionKey> inHandlingOrder;
+
+        if (!outOfMemory && memoryPool.availableMemory() < lowMemThreshold) {
+            List<SelectionKey> temp = new ArrayList<>(selectionKeys);
+            Collections.shuffle(temp);
+            inHandlingOrder = temp;
+        } else {
+            inHandlingOrder = selectionKeys;
+        }
+        return inHandlingOrder;
+    }
+
+    private void attemptRead(SelectionKey key, KafkaChannel channel) throws IOException {
+        //if channel is ready and has bytes to read from socket or buffer, and has no
+        //previous receive(s) already staged or otherwise in progress then read from it
+        if (channel.ready() && (key.isReadable() || channel.hasBytesBuffered()) && !hasStagedReceive(channel)
+            && !explicitlyMutedChannels.contains(channel)) {
+            NetworkReceive networkReceive;
+            while ((networkReceive = channel.read()) != null) {
+                madeReadProgressLastPoll = true;
+                addToStagedReceives(channel, networkReceive);
+            }
+            if (channel.isMute()) {
+                outOfMemory = true; //channel has muted itself due to memory pressure.
+            } else {
+                madeReadProgressLastPoll = true;
+            }
+        }
+    }
+
     // Record time spent in pollSelectionKeys for channel (moved into a method to keep checkstyle happy)
     private void maybeRecordTimePerConnection(KafkaChannel channel, long startTimeNanos) {
         if (recordTimePerConnection)
@@ -442,6 +527,7 @@ public class Selector implements Selectable, AutoCloseable {
 
     private void mute(KafkaChannel channel) {
         channel.mute();
+        explicitlyMutedChannels.add(channel);
     }
 
     @Override
@@ -451,6 +537,7 @@ public class Selector implements Selectable, AutoCloseable {
     }
 
     private void unmute(KafkaChannel channel) {
+        explicitlyMutedChannels.remove(channel);
         channel.unmute();
     }
 
@@ -509,6 +596,7 @@ public class Selector implements Selectable, AutoCloseable {
             this.disconnected.put(channel, ChannelState.FAILED_SEND);
         }
         this.failedSends.clear();
+        this.madeReadProgressLastPoll = false;
     }
 
     /**
@@ -674,7 +762,7 @@ public class Selector implements Selectable, AutoCloseable {
             while (iter.hasNext()) {
                 Map.Entry<KafkaChannel, Deque<NetworkReceive>> entry = iter.next();
                 KafkaChannel channel = entry.getKey();
-                if (!channel.isMute()) {
+                if (!explicitlyMutedChannels.contains(channel)) {
                     Deque<NetworkReceive> deque = entry.getValue();
                     addToCompletedReceives(channel, deque);
                     if (deque.isEmpty())
@@ -900,4 +988,13 @@ public class Selector implements Selectable, AutoCloseable {
         }
     }
 
+    //package-private for testing
+    boolean isOutOfMemory() {
+        return outOfMemory;
+    }
+
+    //package-private for testing
+    boolean isMadeReadProgressLastPoll() {
+        return madeReadProgressLastPoll;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
index bc34b70..01b72fe 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslChannelBuilder.java
@@ -22,6 +22,7 @@ import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 import java.util.Map;
 
+import org.apache.kafka.common.memory.MemoryPool;
 import org.apache.kafka.common.security.auth.PrincipalBuilder;
 import org.apache.kafka.common.security.ssl.SslFactory;
 import org.apache.kafka.common.KafkaException;
@@ -50,12 +51,13 @@ public class SslChannelBuilder implements ChannelBuilder {
         }
     }
 
-    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize) throws KafkaException {
+    @Override
+    public KafkaChannel buildChannel(String id, SelectionKey key, int maxReceiveSize, MemoryPool memoryPool) throws KafkaException {
         try {
             SslTransportLayer transportLayer = buildTransportLayer(sslFactory, id, key, peerHost(key));
             Authenticator authenticator = new DefaultAuthenticator();
             authenticator.configure(transportLayer, this.principalBuilder, this.configs);
-            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize);
+            return new KafkaChannel(id, transportLayer, authenticator, maxReceiveSize, memoryPool != null ? memoryPool : MemoryPool.NONE);
         } catch (Exception e) {
             log.info("Failed to create channel due to ", e);
             throw new KafkaException(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index eb423e3..3cd0114 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -739,6 +739,11 @@ public class SslTransportLayer implements TransportLayer {
     }
 
     @Override
+    public boolean hasBytesBuffered() {
+        return netReadBuffer.position() != 0 || appReadBuffer.position() != 0;
+    }
+
+    @Override
     public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
         return fileChannel.transferTo(position, count, this);
     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
index fad0cea..be56ad5 100644
--- a/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
+++ b/clients/src/main/java/org/apache/kafka/common/network/TransportLayer.java
@@ -85,6 +85,11 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
     boolean isMute();
 
     /**
+     * @return true if channel has bytes to be read in any intermediate buffers
+     */
+    boolean hasBytesBuffered();
+
+    /**
      * Transfers bytes from `fileChannel` to this `TransportLayer`.
      *
      * This method will delegate to {@link FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)},
@@ -99,5 +104,4 @@ public interface TransportLayer extends ScatteringByteChannel, GatheringByteChan
      * @see FileChannel#transferTo(long, long, java.nio.channels.WritableByteChannel)
      */
     long transferFrom(FileChannel fileChannel, long position, long count) throws IOException;
-
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
new file mode 100644
index 0000000..5d39dff
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/ProtoUtils.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Field;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public class ProtoUtils {
+    public static void walk(Schema schema, SchemaVisitor visitor) {
+        if (schema == null || visitor == null) {
+            throw new IllegalArgumentException("Both schema and visitor must be provided");
+        }
+        handleNode(schema, visitor);
+    }
+
+    private static void handleNode(Type node, SchemaVisitor visitor) {
+        if (node instanceof Schema) {
+            Schema schema = (Schema) node;
+            visitor.visit(schema);
+            for (Field f : schema.fields()) {
+                handleNode(f.type, visitor);
+            }
+        } else if (node instanceof ArrayOf) {
+            ArrayOf array = (ArrayOf) node;
+            visitor.visit(array);
+            handleNode(array.type(), visitor);
+        } else {
+            visitor.visit(node);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
index 329d99b..ee40133 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/Protocol.java
@@ -21,10 +21,13 @@ import org.apache.kafka.common.protocol.types.Field;
 import org.apache.kafka.common.protocol.types.Schema;
 import org.apache.kafka.common.protocol.types.Type;
 
+import java.util.EnumSet;
+import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Set;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.kafka.common.protocol.types.Type.BOOLEAN;
 import static org.apache.kafka.common.protocol.types.Type.BYTES;
@@ -32,6 +35,7 @@ import static org.apache.kafka.common.protocol.types.Type.INT16;
 import static org.apache.kafka.common.protocol.types.Type.INT32;
 import static org.apache.kafka.common.protocol.types.Type.INT64;
 import static org.apache.kafka.common.protocol.types.Type.INT8;
+import static org.apache.kafka.common.protocol.types.Type.NULLABLE_BYTES;
 import static org.apache.kafka.common.protocol.types.Type.RECORDS;
 import static org.apache.kafka.common.protocol.types.Type.STRING;
 import static org.apache.kafka.common.protocol.types.Type.NULLABLE_STRING;
@@ -1825,6 +1829,7 @@ public class Protocol {
     public static final Schema[][] REQUESTS = new Schema[ApiKeys.MAX_API_KEY + 1][];
     public static final Schema[][] RESPONSES = new Schema[ApiKeys.MAX_API_KEY + 1][];
     static final short[] MIN_VERSIONS = new short[ApiKeys.MAX_API_KEY + 1];
+    static final EnumSet<ApiKeys> DELAYED_DEALLOCATION_REQUESTS; //initialized in static block
 
     /* the latest version of each api */
     static final short[] CURR_VERSION = new short[ApiKeys.MAX_API_KEY + 1];
@@ -1924,6 +1929,44 @@ public class Protocol {
                     throw new IllegalStateException("Request and response for version " + i + " of API "
                             + api.id + " are defined inconsistently. One is null while the other is not null.");
         }
+
+        /* go over all request schemata and find those that retain links to the underlying ByteBuffer from
+         * which they were read out. knowing which requests do (and do not) retain a reference to the buffer
+         * is needed to enable buffers to be released as soon as possible for requests that no longer need them */
+        Set<ApiKeys> requestsWithBufferRefs = new HashSet<>();
+        for (int reqId = 0; reqId < REQUESTS.length; reqId++) {
+            ApiKeys requestType = ApiKeys.forId(reqId);
+            Schema[] schemata = REQUESTS[reqId];
+            if (schemata == null) {
+                continue;
+            }
+            for (Schema requestVersionSchema : schemata) {
+                if (retainsBufferReference(requestVersionSchema)) {
+                    requestsWithBufferRefs.add(requestType);
+                    break; //kafka is loose with versions, so if _ANY_ version retains buffers we must assume all do.
+                }
+            }
+        }
+
+        DELAYED_DEALLOCATION_REQUESTS = EnumSet.copyOf(requestsWithBufferRefs);
+    }
+
+    private static boolean retainsBufferReference(Schema schema) {
+        if (schema == null) {
+            return false;
+        }
+        final AtomicReference<Boolean> foundBufferReference = new AtomicReference<>(Boolean.FALSE);
+        SchemaVisitor detector = new SchemaVisitorAdapter() {
+            @Override
+            public void visit(Type field) {
+                if (field == BYTES || field == NULLABLE_BYTES || field == RECORDS) {
+                    foundBufferReference.set(Boolean.TRUE);
+                }
+            }
+        };
+        foundBufferReference.set(Boolean.FALSE);
+        ProtoUtils.walk(schema, detector);
+        return foundBufferReference.get();
     }
 
     public static boolean apiVersionSupported(short apiKey, short apiVersion) {
@@ -1936,6 +1979,10 @@ public class Protocol {
                 0);
     }
 
+    public static boolean requiresDelayedDeallocation(int apiKey) {
+        return DELAYED_DEALLOCATION_REQUESTS.contains(ApiKeys.forId(apiKey));
+    }
+    
     private static String indentString(int size) {
         StringBuilder b = new StringBuilder(size);
         for (int i = 0; i < size; i++)

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
new file mode 100644
index 0000000..e61cc77
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitor.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public interface SchemaVisitor {
+    void visit(Schema schema);
+    void visit(ArrayOf array);
+    void visit(Type field);
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
new file mode 100644
index 0000000..62834d0
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/SchemaVisitorAdapter.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.protocol.types.ArrayOf;
+import org.apache.kafka.common.protocol.types.Schema;
+import org.apache.kafka.common.protocol.types.Type;
+
+public abstract class SchemaVisitorAdapter implements SchemaVisitor {
+    @Override
+    public void visit(Schema schema) {
+        //nop
+    }
+
+    @Override
+    public void visit(ArrayOf array) {
+        //nop
+    }
+
+    @Override
+    public void visit(Type field) {
+        //nop
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
index 605174d..29a89d4 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Field.java
@@ -70,4 +70,9 @@ public class Field {
         return schema;
     }
 
+
+    @Override
+    public String toString() {
+        return name + ":" + type;
+    }
 }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
index fbb520c..a9c08aa 100644
--- a/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
+++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/Schema.java
@@ -139,9 +139,7 @@ public class Schema extends Type {
         StringBuilder b = new StringBuilder();
         b.append('{');
         for (int i = 0; i < this.fields.length; i++) {
-            b.append(this.fields[i].name);
-            b.append(':');
-            b.append(this.fields[i].type());
+            b.append(this.fields[i].toString());
             if (i < this.fields.length - 1)
                 b.append(',');
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index e997fef..75f8cf7 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -16,6 +16,7 @@
  */
 package org.apache.kafka.common.utils;
 
+import java.text.DecimalFormat;
 import org.apache.kafka.common.KafkaException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -62,6 +63,11 @@ public class Utils {
     // IPv6 is supported with [ip] pattern
     private static final Pattern HOST_PORT_PATTERN = Pattern.compile(".*?\\[?([0-9a-zA-Z\\-%._:]*)\\]?:([0-9]+)");
 
+    // Prints up to 2 decimal digits. Used for human readable printing
+    private static final DecimalFormat TWO_DIGIT_FORMAT = new DecimalFormat("0.##");
+
+    private static final String[] BYTE_SCALE_SUFFIXES = new String[] {"B", "KB", "MB", "GB", "TB", "PB", "EB", "ZB", "YB"};
+
     public static final String NL = System.getProperty("line.separator");
 
     private static final Logger log = LoggerFactory.getLogger(Utils.class);
@@ -379,6 +385,28 @@ public class Utils {
     }
 
     /**
+     * Formats a byte number as a human readable String ("3.2 MB")
+     * @param bytes some size in bytes
+     * @return
+     */
+    public static String formatBytes(long bytes) {
+        if (bytes < 0) {
+            return "" + bytes;
+        }
+        double asDouble = (double) bytes;
+        int ordinal = (int) Math.floor(Math.log(asDouble) / Math.log(1024.0));
+        double scale = Math.pow(1024.0, ordinal);
+        double scaled = asDouble / scale;
+        String formatted = TWO_DIGIT_FORMAT.format(scaled);
+        try {
+            return formatted + " " + BYTE_SCALE_SUFFIXES[ordinal];
+        } catch (IndexOutOfBoundsException e) {
+            //huge number?
+            return "" + asDouble;
+        }
+    }
+
+    /**
      * Create a string representation of an array joined by the given separator
      * @param strs The array of items
      * @param separator The separator
@@ -613,7 +641,7 @@ public class Utils {
         } catch (IOException outer) {
             try {
                 Files.move(source, target, StandardCopyOption.REPLACE_EXISTING);
-                log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target, 
+                log.debug("Non-atomic move of {} to {} succeeded after atomic move failed due to {}", source, target,
                         outer.getMessage());
             } catch (IOException inner) {
                 inner.addSuppressed(outer);
@@ -632,7 +660,8 @@ public class Utils {
         IOException exception = null;
         for (Closeable closeable : closeables) {
             try {
-                closeable.close();
+                if (closeable != null)
+                    closeable.close();
             } catch (IOException e) {
                 if (exception != null)
                     exception.addSuppressed(e);

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
new file mode 100644
index 0000000..788d447
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/memory/GarbageCollectedMemoryPoolTest.java
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.memory;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class GarbageCollectedMemoryPoolTest {
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testZeroSize() throws Exception {
+        new GarbageCollectedMemoryPool(0, 7, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeSize() throws Exception {
+        new GarbageCollectedMemoryPool(-1, 7, false, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testZeroMaxAllocation() throws Exception {
+        new GarbageCollectedMemoryPool(100, 0, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testNegativeMaxAllocation() throws Exception {
+        new GarbageCollectedMemoryPool(100, -1, false, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testMaxAllocationLargerThanSize() throws Exception {
+        new GarbageCollectedMemoryPool(100, 101, true, null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationOverMaxAllocation() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        pool.tryAllocate(11);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationZero() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        pool.tryAllocate(0);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testAllocationNegative() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        pool.tryAllocate(-1);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReleaseNull() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        pool.release(null);
+    }
+
+    @Test(expected = IllegalArgumentException.class)
+    public void testReleaseForeignBuffer() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, true, null);
+        ByteBuffer fellOffATruck = ByteBuffer.allocate(1);
+        pool.release(fellOffATruck);
+    }
+
+    @Test
+    public void testDoubleFree() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(1000, 10, false, null);
+        ByteBuffer buffer = pool.tryAllocate(5);
+        Assert.assertNotNull(buffer);
+        pool.release(buffer); //so far so good
+        try {
+            pool.release(buffer);
+            Assert.fail("2nd release() should have failed");
+        } catch (IllegalArgumentException e) {
+            //as expected
+        } catch (Throwable t) {
+            Assert.fail("expected an IllegalArgumentException. instead got " + t);
+        }
+    }
+
+    @Test
+    public void testAllocationBound() throws Exception {
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(21, 10, false, null);
+        ByteBuffer buf1 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf1);
+        Assert.assertEquals(10, buf1.capacity());
+        ByteBuffer buf2 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf2);
+        Assert.assertEquals(10, buf2.capacity());
+        ByteBuffer buf3 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf3);
+        Assert.assertEquals(10, buf3.capacity());
+        //no more allocations
+        Assert.assertNull(pool.tryAllocate(1));
+        //release a buffer
+        pool.release(buf3);
+        //now we can have more
+        ByteBuffer buf4 = pool.tryAllocate(10);
+        Assert.assertNotNull(buf4);
+        Assert.assertEquals(10, buf4.capacity());
+        //no more allocations
+        Assert.assertNull(pool.tryAllocate(1));
+    }
+
+    @Test
+    public void testBuffersGarbageCollected() throws Exception {
+        Runtime runtime = Runtime.getRuntime();
+        long maxHeap = runtime.maxMemory(); //in bytes
+        long maxPool = maxHeap / 2;
+        long maxSingleAllocation = maxPool / 10;
+        Assert.assertTrue(maxSingleAllocation < Integer.MAX_VALUE / 2); //test JVM running with too much memory for this test logic (?)
+        GarbageCollectedMemoryPool pool = new GarbageCollectedMemoryPool(maxPool, (int) maxSingleAllocation, false, null);
+
+        //we will allocate 30 buffers from this pool, which is sized such that at-most
+        //11 should coexist and 30 do not fit in the JVM memory, proving that:
+        // 1. buffers were reclaimed and
+        // 2. the pool registered the reclamation.
+
+        int timeoutSeconds = 30;
+        long giveUp = System.currentTimeMillis() + TimeUnit.SECONDS.toMillis(timeoutSeconds);
+        boolean success = false;
+
+        int buffersAllocated = 0;
+        while (System.currentTimeMillis() < giveUp) {
+            ByteBuffer buffer = pool.tryAllocate((int) maxSingleAllocation);
+            if (buffer == null) {
+                System.gc();
+                Thread.sleep(10);
+                continue;
+            }
+            buffersAllocated++;
+            if (buffersAllocated >= 30) {
+                success = true;
+                break;
+            }
+        }
+
+        Assert.assertTrue("failed to allocate 30 buffers in " + timeoutSeconds + " seconds."
+                + " buffers allocated: " + buffersAllocated + " heap " + Utils.formatBytes(maxHeap)
+                + " pool " + Utils.formatBytes(maxPool) + " single allocation "
+                + Utils.formatBytes(maxSingleAllocation), success);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
index 85c5002..fd4ef69 100644
--- a/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
+++ b/clients/src/test/java/org/apache/kafka/common/network/NioEchoServer.java
@@ -94,14 +94,15 @@ public class NioEchoServer extends Thread {
                 List<NetworkReceive> completedReceives = selector.completedReceives();
                 for (NetworkReceive rcv : completedReceives) {
                     KafkaChannel channel = channel(rcv.source());
-                    channel.mute();
+                    String channelId = channel.id();
+                    selector.mute(channelId);
                     NetworkSend send = new NetworkSend(rcv.source(), rcv.payload());
                     if (outputChannel == null)
                         selector.send(send);
                     else {
                         for (ByteBuffer buffer : send.buffers)
                             outputChannel.write(buffer);
-                        channel.unmute();
+                        selector.unmute(channelId);
                     }
                 }
                 for (Send send : selector.completedSends())

http://git-wip-us.apache.org/repos/asf/kafka/blob/47ee8e95/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
new file mode 100644
index 0000000..3338d03
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/network/PlaintextSender.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.network;
+
+import java.io.OutputStream;
+import java.net.InetSocketAddress;
+import java.net.Socket;
+
+/**
+ * test helper class that will connect to a given server address, write out the given payload and disconnect
+ */
+public class PlaintextSender extends Thread {
+
+    public PlaintextSender(final InetSocketAddress serverAddress, final byte[] payload) {
+        super(new Runnable() {
+            @Override
+            public void run() {
+                try (Socket connection = new Socket(serverAddress.getAddress(), serverAddress.getPort());
+                     OutputStream os = connection.getOutputStream()) {
+                    os.write(payload);
+                    os.flush();
+                } catch (Exception e) {
+                    e.printStackTrace(System.err);
+                }
+            }
+        });
+        setDaemon(true);
+        setName("PlaintextSender - " + payload.length + " bytes @ " + serverAddress);
+    }
+}


Mime
View raw message