Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BE2B8200BC1 for ; Wed, 16 Nov 2016 18:51:32 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BC9DB160B08; Wed, 16 Nov 2016 17:51:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 92492160B02 for ; Wed, 16 Nov 2016 18:51:31 +0100 (CET) Received: (qmail 57398 invoked by uid 500); 16 Nov 2016 17:51:30 -0000 Mailing-List: contact commits-help@kafka.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@kafka.apache.org Delivered-To: mailing list commits@kafka.apache.org Received: (qmail 57389 invoked by uid 99); 16 Nov 2016 17:51:30 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 16 Nov 2016 17:51:30 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A3EE1E0209; Wed, 16 Nov 2016 17:51:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: jgus@apache.org To: commits@kafka.apache.org Message-Id: <618ad68c88ba42069d9f9a699fdd87bf@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: kafka git commit: MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter Date: Wed, 16 Nov 2016 17:51:30 +0000 (UTC) archived-at: Wed, 16 Nov 2016 17:51:32 -0000 Repository: kafka Updated Branches: refs/heads/trunk 31203efcb -> b902ef985 MINOR: Remove unused `ByteBoundedBlockingQueue` class and `zkSessionTimeout` parameter Author: Ismael Juma Reviewers: Jason Gustafson Closes #2136 from ijuma/remove-unused-code Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b902ef98 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b902ef98 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b902ef98 Branch: refs/heads/trunk Commit: b902ef985a8d4ce304950fcb2c02499054fe6d24 Parents: 31203ef Author: Ismael Juma Authored: Wed Nov 16 09:38:42 2016 -0800 Committer: Jason Gustafson Committed: Wed Nov 16 09:38:42 2016 -0800 ---------------------------------------------------------------------- .../kafka/controller/KafkaController.scala | 5 +- .../kafka/utils/ByteBoundedBlockingQueue.scala | 230 ------------------- .../ControlledShutdownLeaderSelectorTest.scala | 2 +- .../unit/kafka/server/LeaderElectionTest.scala | 2 +- .../utils/ByteBoundedBlockingQueueTest.scala | 98 -------- 5 files changed, 4 insertions(+), 333 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/controller/KafkaController.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/controller/KafkaController.scala b/core/src/main/scala/kafka/controller/KafkaController.scala index 730f07c..2a6f61c 100755 --- a/core/src/main/scala/kafka/controller/KafkaController.scala +++ b/core/src/main/scala/kafka/controller/KafkaController.scala @@ -46,8 +46,7 @@ import java.util.concurrent.locks.ReentrantLock import kafka.server._ import kafka.common.TopicAndPartition -class ControllerContext(val zkUtils: ZkUtils, - val zkSessionTimeout: Int) { +class ControllerContext(val zkUtils: ZkUtils) { var controllerChannelManager: ControllerChannelManager = null val controllerLock: ReentrantLock = new ReentrantLock() var shuttingDownBrokerIds: mutable.Set[Int] = mutable.Set.empty @@ -157,7 +156,7 @@ class KafkaController(val config : KafkaConfig, zkUtils: ZkUtils, val brokerStat this.logIdent = "[Controller " + config.brokerId + "]: " private var isRunning = true private val stateChangeLogger = KafkaController.stateChangeLogger - val controllerContext = new ControllerContext(zkUtils, config.zkSessionTimeoutMs) + val controllerContext = new ControllerContext(zkUtils) val partitionStateMachine = new PartitionStateMachine(this) val replicaStateMachine = new ReplicaStateMachine(this) private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover, http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala b/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala deleted file mode 100644 index 26149af..0000000 --- a/core/src/main/scala/kafka/utils/ByteBoundedBlockingQueue.scala +++ /dev/null @@ -1,230 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.util.concurrent.atomic.AtomicInteger -import java.util.concurrent.{TimeUnit, LinkedBlockingQueue} - -/** - * A blocking queue that have size limits on both number of elements and number of bytes. - */ -class ByteBoundedBlockingQueue[E] (val queueNumMessageCapacity: Int, val queueByteCapacity: Int, sizeFunction: Option[(E) => Int]) - extends Iterable[E] { - private val queue = new LinkedBlockingQueue[E] (queueNumMessageCapacity) - private var currentByteSize = new AtomicInteger() - private val putLock = new Object - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#offer]] - * An element can be enqueued provided the current size (in number of elements) is within the configured - * capacity and the current size in bytes of the queue is within the configured byte capacity. i.e., the - * element may be enqueued even if adding it causes the queue's size in bytes to exceed the byte capacity. - * @param e the element to put into the queue - * @param timeout the amount of time to wait before the expire the operation - * @param unit the time unit of timeout parameter, default to millisecond - * @return true if the element is put into queue, false if it is not - * @throws NullPointerException if element is null - * @throws InterruptedException if interrupted during waiting - */ - def offer(e: E, timeout: Long, unit: TimeUnit = TimeUnit.MICROSECONDS): Boolean = { - if (e == null) throw new NullPointerException("Putting null element into queue.") - val startTime = SystemTime.nanoseconds - val expireTime = startTime + unit.toNanos(timeout) - putLock synchronized { - var timeoutNanos = expireTime - SystemTime.nanoseconds - while (currentByteSize.get() >= queueByteCapacity && timeoutNanos > 0) { - // ensure that timeoutNanos > 0, otherwise (per javadoc) we have to wait until the next notify - putLock.wait(timeoutNanos / 1000000, (timeoutNanos % 1000000).toInt) - timeoutNanos = expireTime - SystemTime.nanoseconds - } - // only proceed if queue has capacity and not timeout - timeoutNanos = expireTime - SystemTime.nanoseconds - if (currentByteSize.get() < queueByteCapacity && timeoutNanos > 0) { - val success = queue.offer(e, timeoutNanos, TimeUnit.NANOSECONDS) - // only increase queue byte size if put succeeds - if (success) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - success - } else { - false - } - } - } - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#offer]]. - * Put an element to the tail of the queue, return false immediately if queue is full - * @param e The element to put into queue - * @return true on succeed, false on failure - * @throws NullPointerException if element is null - * @throws InterruptedException if interrupted during waiting - */ - def offer(e: E): Boolean = { - if (e == null) throw new NullPointerException("Putting null element into queue.") - putLock synchronized { - if (currentByteSize.get() >= queueByteCapacity) { - false - } else { - val success = queue.offer(e) - if (success) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - success - } - } - } - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#put]]. - * Put an element to the tail of the queue, block if queue is full - * @param e The element to put into queue - * @return true on succeed, false on failure - * @throws NullPointerException if element is null - * @throws InterruptedException if interrupted during waiting - */ - def put(e: E): Boolean = { - if (e == null) throw new NullPointerException("Putting null element into queue.") - putLock synchronized { - if (currentByteSize.get() >= queueByteCapacity) - putLock.wait() - val success = queue.offer(e) - if (success) - currentByteSize.addAndGet(sizeFunction.get(e)) - // wake up another thread in case multiple threads are waiting - if (currentByteSize.get() < queueByteCapacity) - putLock.notify() - success - } - } - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#poll]] - * Get an element from the head of queue. Wait for some time if the queue is empty. - * @param timeout the amount of time to wait if the queue is empty - * @param unit the unit type - * @return the first element in the queue, null if queue is empty - */ - def poll(timeout: Long, unit: TimeUnit): E = { - val e = queue.poll(timeout, unit) - // only wake up waiting threads if the queue size drop under queueByteCapacity - if (e != null && - currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && - currentByteSize.get() < queueByteCapacity) - putLock.synchronized(putLock.notify()) - e - } - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#poll]] - * Get an element from the head of queue. - * @return the first element in the queue, null if queue is empty - */ - def poll(): E = { - val e = queue.poll() - // only wake up waiting threads if the queue size drop under queueByteCapacity - if (e != null && - currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && - currentByteSize.get() < queueByteCapacity) - putLock.synchronized(putLock.notify()) - e - } - - /** - * Please refer to [[java.util.concurrent.BlockingQueue#take]] - * Get an element from the head of the queue, block if the queue is empty - * @return the first element in the queue, null if queue is empty - */ - def take(): E = { - val e = queue.take() - // only wake up waiting threads if the queue size drop under queueByteCapacity - if (currentByteSize.getAndAdd(-sizeFunction.get(e)) > queueByteCapacity && - currentByteSize.get() < queueByteCapacity) - putLock.synchronized(putLock.notify()) - e - } - - /** - * Iterator for the queue - * @return Iterator for the queue - */ - override def iterator = new Iterator[E] () { - private val iter = queue.iterator() - private var curr: E = null.asInstanceOf[E] - - def hasNext: Boolean = iter.hasNext - - def next(): E = { - curr = iter.next() - curr - } - - def remove() { - if (curr == null) - throw new IllegalStateException("Iterator does not have a current element.") - iter.remove() - if (currentByteSize.addAndGet(-sizeFunction.get(curr)) < queueByteCapacity) - putLock.synchronized(putLock.notify()) - } - } - - /** - * get the number of elements in the queue - * @return number of elements in the queue - */ - override def size() = queue.size() - - /** - * get the current byte size in the queue - * @return current queue size in bytes - */ - def byteSize() = { - val currSize = currentByteSize.get() - // There is a potential race where after an element is put into the queue and before the size is added to - // currentByteSize, it was taken out of the queue and the size was deducted from the currentByteSize, - // in that case, currentByteSize would become negative, in that case, just put the queue size to be 0. - if (currSize > 0) currSize else 0 - } - - /** - * get the number of unused slots in the queue - * @return the number of unused slots in the queue - */ - def remainingSize = queue.remainingCapacity() - - /** - * get the remaining bytes capacity of the queue - * @return the remaining bytes capacity of the queue - */ - def remainingByteSize = math.max(0, queueByteCapacity - currentByteSize.get()) - - /** - * remove all the items in the queue - */ - def clear() { - putLock synchronized { - queue.clear() - currentByteSize.set(0) - putLock.notify() - } - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala index f032eb6..d3dbfe2 100644 --- a/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala +++ b/core/src/test/scala/unit/kafka/server/ControlledShutdownLeaderSelectorTest.scala @@ -39,7 +39,7 @@ class ControlledShutdownLeaderSelectorTest { val firstLeader = 1 val zkUtils = EasyMock.mock(classOf[ZkUtils]) - val controllerContext = new ControllerContext(zkUtils, zkSessionTimeout = 1000) + val controllerContext = new ControllerContext(zkUtils) controllerContext.liveBrokers = assignment.map(Broker(_, Map.empty, None)).toSet controllerContext.shuttingDownBrokerIds = mutable.Set(2, 3) controllerContext.partitionReplicaAssignment = mutable.Map(topicPartition -> assignment) http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala index 5726152..e3f0ad2 100755 --- a/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala +++ b/core/src/test/scala/unit/kafka/server/LeaderElectionTest.scala @@ -131,7 +131,7 @@ class LeaderElectionTest extends ZooKeeperTestHarness { val brokers = servers.map(s => new Broker(s.config.brokerId, "localhost", s.boundPort())) val nodes = brokers.map(_.getNode(SecurityProtocol.PLAINTEXT)) - val controllerContext = new ControllerContext(zkUtils, 6000) + val controllerContext = new ControllerContext(zkUtils) controllerContext.liveBrokers = brokers.toSet val metrics = new Metrics val controllerChannelManager = new ControllerChannelManager(controllerContext, controllerConfig, new SystemTime, metrics) http://git-wip-us.apache.org/repos/asf/kafka/blob/b902ef98/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala b/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala deleted file mode 100644 index 4a070bd..0000000 --- a/core/src/test/scala/unit/kafka/utils/ByteBoundedBlockingQueueTest.scala +++ /dev/null @@ -1,98 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kafka.utils - -import java.util.concurrent.TimeUnit - -import org.junit.Assert._ -import org.junit.Test - -class ByteBoundedBlockingQueueTest { - val sizeFunction = (a: String) => a.length - val queue = new ByteBoundedBlockingQueue[String](5, 15, Some(sizeFunction)) - - @Test - def testByteBoundedBlockingQueue() { - assertEquals(5, queue.remainingSize) - assertEquals(15, queue.remainingByteSize) - - //offer a message whose size is smaller than remaining capacity - val m0 = new String("0123456789") - assertEquals(true, queue.offer(m0)) - assertEquals(1, queue.size()) - assertEquals(10, queue.byteSize()) - assertEquals(4, queue.remainingSize) - assertEquals(5, queue.remainingByteSize) - - // offer a message where remaining capacity < message size < capacity limit - val m1 = new String("1234567890") - assertEquals(true, queue.offer(m1)) - assertEquals(2, queue.size()) - assertEquals(20, queue.byteSize()) - assertEquals(3, queue.remainingSize) - assertEquals(0, queue.remainingByteSize) - - // offer a message using timeout, should fail because no space is left - val m2 = new String("2345678901") - assertEquals(false, queue.offer(m2, 10, TimeUnit.MILLISECONDS)) - assertEquals(2, queue.size()) - assertEquals(20, queue.byteSize()) - assertEquals(3, queue.remainingSize) - assertEquals(0, queue.remainingByteSize) - - // take an element out of the queue - assertEquals("0123456789", queue.take()) - assertEquals(1, queue.size()) - assertEquals(10, queue.byteSize()) - assertEquals(4, queue.remainingSize) - assertEquals(5, queue.remainingByteSize) - - // add 5 small elements into the queue, first 4 should succeed, the 5th one should fail - // test put() - assertEquals(true, queue.put("a")) - assertEquals(true, queue.offer("b")) - assertEquals(true, queue.offer("c")) - assertEquals(4, queue.size()) - assertEquals(13, queue.byteSize()) - assertEquals(1, queue.remainingSize) - assertEquals(2, queue.remainingByteSize) - - assertEquals(true, queue.offer("d")) - assertEquals(5, queue.size()) - assertEquals(14, queue.byteSize()) - assertEquals(0, queue.remainingSize) - assertEquals(1, queue.remainingByteSize) - - assertEquals(false, queue.offer("e")) - assertEquals(5, queue.size()) - assertEquals(14, queue.byteSize()) - assertEquals(0, queue.remainingSize) - assertEquals(1, queue.remainingByteSize) - - // try take 6 elements out of the queue, the last poll() should fail as there is no element anymore - // test take() - assertEquals("1234567890", queue.poll(10, TimeUnit.MILLISECONDS)) - // test poll - assertEquals("a", queue.poll()) - assertEquals("b", queue.poll()) - assertEquals("c", queue.poll()) - assertEquals("d", queue.poll()) - assertEquals(null, queue.poll(10, TimeUnit.MILLISECONDS)) - } - -}