kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] 01/02: MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
Date Fri, 30 Nov 2018 15:27:14 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git

commit 347ecd896fb33d708ad31cd446dd4f9ee5672220
Author: Ismael Juma <ismael@juma.me.uk>
AuthorDate: Sat Nov 10 13:58:18 2018 -0800

    MINOR: Update zstd, easymock, powermock, zkclient and build plugins (#5846)
    
    EasyMock 4.0.x includes a change that relies on the caller for inferring
    the return type of mock creator methods. Updated a number of Scala
    tests for compilation and execution to succeed.
    
    The versions of EasyMock and PowerMock in this PR include full support
    for Java 11.
    
    Reviewers: Manikumar Reddy <manikumar.reddy@gmail.com>
---
 build.gradle                                       |  13 +-
 .../kafka/server/DelayedFetchTest.scala            |   6 +-
 .../kafka/common/InterBrokerSendThreadTest.scala   |   2 +-
 .../test/scala/unit/kafka/admin/AdminTest.scala    |   2 +-
 .../scala/unit/kafka/admin/ConfigCommandTest.scala |   6 +-
 .../admin/ReassignPartitionsCommandTest.scala      |  12 +-
 .../scala/unit/kafka/cluster/PartitionTest.scala   |  10 +-
 .../kafka/controller/ControllerTestUtils.scala     |   2 +-
 .../controller/PartitionStateMachineTest.scala     |   4 +-
 .../coordinator/group/GroupCoordinatorTest.scala   |   6 +-
 .../group/GroupMetadataManagerTest.scala           |  12 +-
 .../transaction/ProducerIdManagerTest.scala        |   2 +-
 .../TransactionCoordinatorConcurrencyTest.scala    |   8 +-
 .../TransactionMarkerChannelManagerTest.scala      |   6 +-
 ...sactionMarkerRequestCompletionHandlerTest.scala |   5 +-
 .../transaction/TransactionStateManagerTest.scala  |   4 +-
 core/src/test/scala/unit/kafka/log/LogTest.scala   |  10 +-
 .../unit/kafka/log/ProducerStateManagerTest.scala  |   2 +-
 .../kafka/server/AbstractFetcherManagerTest.scala  |   2 +-
 .../unit/kafka/server/ClientQuotaManagerTest.scala |   2 +-
 .../kafka/server/DynamicBrokerConfigTest.scala     |   7 +-
 .../kafka/server/DynamicConfigChangeTest.scala     |   2 +-
 .../server/HighwatermarkPersistenceTest.scala      |   2 +-
 .../unit/kafka/server/ISRExpirationTest.scala      |   8 +-
 .../scala/unit/kafka/server/KafkaApisTest.scala    |  24 ++--
 .../scala/unit/kafka/server/LogOffsetTest.scala    |   8 +-
 .../server/ReplicaAlterLogDirsThreadTest.scala     | 112 ++++++++--------
 .../kafka/server/ReplicaFetcherThreadTest.scala    | 146 ++++++++++-----------
 .../kafka/server/ReplicaManagerQuotasTest.scala    |  16 +--
 .../unit/kafka/server/ReplicaManagerTest.scala     |  10 +-
 .../unit/kafka/server/ServerStartupTest.scala      |   2 +-
 .../scala/unit/kafka/server/SimpleFetchTest.scala  |  10 +-
 .../server/ThrottledChannelExpirationTest.scala    |   4 +-
 .../server/epoch/OffsetsForLeaderEpochTest.scala   |  11 +-
 .../unit/kafka/utils/ReplicationUtilsTest.scala    |   9 +-
 .../scala/unit/kafka/zk/AdminZkClientTest.scala    |   2 +-
 gradle/dependencies.gradle                         |  11 +-
 37 files changed, 251 insertions(+), 249 deletions(-)

diff --git a/build.gradle b/build.gradle
index e78d2ce..bba3d83 100644
--- a/build.gradle
+++ b/build.gradle
@@ -29,11 +29,11 @@ buildscript {
     // For Apache Rat plugin to ignore non-Git files
     classpath "org.ajoberstar:grgit:1.9.3"
     classpath 'com.github.ben-manes:gradle-versions-plugin:0.20.0'
-    classpath 'org.scoverage:gradle-scoverage:2.4.0'
-    classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.0'
-    classpath 'org.owasp:dependency-check-gradle:3.3.2'
-    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.15.0"
-    classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.4"
+    classpath 'org.scoverage:gradle-scoverage:2.5.0'
+    classpath 'com.github.jengelman.gradle.plugins:shadow:4.0.2'
+    classpath 'org.owasp:dependency-check-gradle:3.3.4'
+    classpath "com.diffplug.spotless:spotless-plugin-gradle:3.16.0"
+    classpath "gradle.plugin.com.github.spotbugs:spotbugs-gradle-plugin:1.6.5"
   }
 }
 
@@ -369,8 +369,7 @@ subprojects {
 
   if (!JavaVersion.current().isJava11Compatible()) {
     spotbugs {
-      // 3.1.6 has a regression that breaks our build, seems to be https://github.com/spotbugs/spotbugs/pull/688
-      toolVersion = '3.1.5'
+      toolVersion = '3.1.8'
       excludeFilter = file("$rootDir/gradle/spotbugs-exclude.xml")
       ignoreFailures = false
     }
diff --git a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
index 890ea3b..f7c51f7 100644
--- a/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
+++ b/core/src/test/scala/integration/kafka/server/DelayedFetchTest.scala
@@ -30,8 +30,8 @@ import org.junit.Assert._
 
 class DelayedFetchTest extends EasyMockSupport {
   private val maxBytes = 1024
-  private val replicaManager = mock(classOf[ReplicaManager])
-  private val replicaQuota = mock(classOf[ReplicaQuota])
+  private val replicaManager: ReplicaManager = mock(classOf[ReplicaManager])
+  private val replicaQuota: ReplicaQuota = mock(classOf[ReplicaQuota])
 
   @Test
   def testFetchWithFencedEpoch(): Unit = {
@@ -58,7 +58,7 @@ class DelayedFetchTest extends EasyMockSupport {
       quota = replicaQuota,
       responseCallback = callback)
 
-    val partition = mock(classOf[Partition])
+    val partition: Partition = mock(classOf[Partition])
 
     EasyMock.expect(replicaManager.getPartitionOrException(topicPartition, expectLeader = true))
         .andReturn(partition)
diff --git a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
index 6838653..5c0ea2d 100644
--- a/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
+++ b/core/src/test/scala/kafka/common/InterBrokerSendThreadTest.scala
@@ -30,7 +30,7 @@ import scala.collection.mutable
 
 class InterBrokerSendThreadTest {
   private val time = new MockTime()
-  private val networkClient = EasyMock.createMock(classOf[NetworkClient])
+  private val networkClient: NetworkClient = EasyMock.createMock(classOf[NetworkClient])
   private val completionHandler = new StubCompletionHandler
   private val requestTimeoutMs = 1000
 
diff --git a/core/src/test/scala/unit/kafka/admin/AdminTest.scala b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
index a1c317e..a82657e 100755
--- a/core/src/test/scala/unit/kafka/admin/AdminTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/AdminTest.scala
@@ -142,7 +142,7 @@ class AdminTest extends ZooKeeperTestHarness with Logging with RackAwareTest {
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock = EasyMock.createNiceMock(classOf[ZkUtils])
+    val zkMock: ZkUtils = EasyMock.createNiceMock(classOf[ZkUtils])
     EasyMock.expect(zkMock.pathExists(s"/brokers/topics/$topic")).andReturn(false)
     EasyMock.expect(zkMock.getAllTopics).andReturn(Seq("some.topic", topic, "some.other.topic"))
     EasyMock.replay(zkMock)
diff --git a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
index cb261f6..ee4a6ef 100644
--- a/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConfigCommandTest.scala
@@ -245,12 +245,12 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
     val configEntries = util.Collections.singletonList(new ConfigEntry("num.io.threads", "5"))
     val future = new KafkaFutureImpl[util.Map[ConfigResource, Config]]
     future.complete(util.Collections.singletonMap(resource, new Config(configEntries)))
-    val describeResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
+    val describeResult: DescribeConfigsResult = EasyMock.createNiceMock(classOf[DescribeConfigsResult])
     EasyMock.expect(describeResult.all()).andReturn(future).once()
 
     val alterFuture = new KafkaFutureImpl[Void]
     alterFuture.complete(null)
-    val alterResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
+    val alterResult: AlterConfigsResult = EasyMock.createNiceMock(classOf[AlterConfigsResult])
     EasyMock.expect(alterResult.all()).andReturn(alterFuture)
 
     val mockAdminClient = new MockAdminClient(util.Collections.singletonList(node), node) {
@@ -622,7 +622,7 @@ class ConfigCommandTest extends ZooKeeperTestHarness with Logging {
 
   @Test
   def testQuotaDescribeEntities() {
-    val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
     def checkEntities(opts: Array[String], expectedFetches: Map[String, Seq[String]], expectedEntityNames: Seq[String]) {
       val entity = ConfigCommand.parseEntity(new ConfigCommandOptions(opts :+ "--describe"))
diff --git a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
index 213c23a..0d89430 100644
--- a/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ReassignPartitionsCommandTest.scala
@@ -268,7 +268,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.fetchEntityConfig(anyString(), anyString())).andStubReturn(new Properties)
@@ -294,7 +294,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -328,7 +328,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(existing)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     val assigner = new ReassignPartitionsCommand(zk, None, proposed, Map.empty, admin)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[List[Int]], capture(propsCapture))).anyTimes()
@@ -364,7 +364,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(brokers = brokers)
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     expect(admin.fetchEntityConfig(is(ConfigType.Topic), anyString())).andStubReturn(new Properties)
     expect(admin.changeBrokerConfig(anyObject().asInstanceOf[Seq[Int]], capture(propsCapture))).anyTimes()
@@ -399,7 +399,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
     //Setup
     val zk = stubZKClient(brokers = Seq(100, 101))
-    val admin = createMock(classOf[AdminZkClient])
+    val admin: AdminZkClient = createMock(classOf[AdminZkClient])
     val propsCapture: Capture[Properties] = newCapture(CaptureType.ALL)
     expect(admin.fetchEntityConfig(is(ConfigType.Broker), anyString())).andStubReturn(new Properties)
     expect(admin.fetchEntityConfig(is(ConfigType.Topic), is("topic1"))).andStubReturn(copyOf(existingConfigs))
@@ -567,7 +567,7 @@ class ReassignPartitionsCommandTest extends ZooKeeperTestHarness with Logging {
 
   def stubZKClient(existingAssignment: Map[TopicPartition, Seq[Int]] = Map[TopicPartition, Seq[Int]](),
                    brokers: Seq[Int] = Seq[Int]()): KafkaZkClient = {
-    val zkClient = createMock(classOf[KafkaZkClient])
+    val zkClient: KafkaZkClient = createMock(classOf[KafkaZkClient])
     expect(zkClient.getReplicaAssignmentForTopics(anyObject().asInstanceOf[Set[String]])).andStubReturn(existingAssignment)
     expect(zkClient.getAllBrokersInCluster).andStubReturn(brokers.map(TestUtils.createBroker(_, "", 1)))
     replay(zkClient)
diff --git a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
index b5b271e..e05f148 100644
--- a/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
+++ b/core/src/test/scala/unit/kafka/cluster/PartitionTest.scala
@@ -75,7 +75,7 @@ class PartitionTest {
     val brokerProps = TestUtils.createBrokerConfig(brokerId, TestUtils.MockZkConnect)
     brokerProps.put(KafkaConfig.LogDirsProp, Seq(logDir1, logDir2).map(_.getAbsolutePath).mkString(","))
     val brokerConfig = KafkaConfig.fromProps(brokerProps)
-    val kafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+    val kafkaZkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     replicaManager = new ReplicaManager(
       config = brokerConfig, metrics, time, zkClient = kafkaZkClient, new MockScheduler(time),
       logManager, new AtomicBoolean(false), QuotaFactory.instantiate(brokerConfig, metrics, time, ""),
@@ -370,8 +370,8 @@ class PartitionTest {
                                       isLeader: Boolean,
                                       log: Log = logManager.getOrCreateLog(topicPartition, logConfig)): Partition = {
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
 
     val partition = new Partition(topicPartition,
       isOffline = false,
@@ -465,8 +465,8 @@ class PartitionTest {
   def testListOffsetIsolationLevels(): Unit = {
     val log = logManager.getOrCreateLog(topicPartition, logConfig)
     val replica = new Replica(brokerId, topicPartition, time, log = Some(log))
-    val replicaManager = EasyMock.mock(classOf[ReplicaManager])
-    val zkClient = EasyMock.mock(classOf[KafkaZkClient])
+    val replicaManager: ReplicaManager = EasyMock.mock(classOf[ReplicaManager])
+    val zkClient: KafkaZkClient = EasyMock.mock(classOf[KafkaZkClient])
 
     val partition = new Partition(topicPartition,
       isOffline = false,
diff --git a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
index b0413a7..84b956d 100644
--- a/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
+++ b/core/src/test/scala/unit/kafka/controller/ControllerTestUtils.scala
@@ -22,7 +22,7 @@ object ControllerTestUtils {
 
   /** Since ControllerEvent is sealed, return a subclass of ControllerEvent created with EasyMock */
   def createMockControllerEvent(controllerState: ControllerState, process: () => Unit): ControllerEvent = {
-    val mockEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
+    val mockEvent: ControllerEvent = EasyMock.createNiceMock(classOf[ControllerEvent])
     EasyMock.expect(mockEvent.state).andReturn(controllerState)
     EasyMock.expect(mockEvent.process()).andAnswer(new IAnswer[Unit]() {
       def answer(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
index 3370b54..0e8f98e 100644
--- a/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
+++ b/core/src/test/scala/unit/kafka/controller/PartitionStateMachineTest.scala
@@ -433,7 +433,7 @@ class PartitionStateMachineTest extends JUnitSuite {
       mockZkClient, partitionState, mockControllerBrokerRequestBatch)
 
     def createMockController() = {
-      val mockController = EasyMock.createMock(classOf[KafkaController])
+      val mockController: KafkaController = EasyMock.createMock(classOf[KafkaController])
       EasyMock.expect(mockController.controllerContext).andReturn(controllerContext).anyTimes()
       EasyMock.expect(mockController.config).andReturn(customConfig).anyTimes()
       EasyMock.expect(mockController.partitionStateMachine).andReturn(partitionStateMachine).anyTimes()
@@ -444,7 +444,7 @@ class PartitionStateMachineTest extends JUnitSuite {
     }
 
     val mockController = createMockController()
-    val mockEventManager = EasyMock.createMock(classOf[ControllerEventManager])
+    val mockEventManager: ControllerEventManager = EasyMock.createMock(classOf[ControllerEventManager])
     EasyMock.replay(mockController, replicaStateMachine, mockEventManager)
 
     val topicDeletionManager = new TopicDeletionManager(mockController, mockEventManager, mockZkClient)
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index c2c0841..c162342 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -928,7 +928,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Empty.toString, summary.state)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1425,7 +1425,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, leaveGroupResult)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
@@ -1466,7 +1466,7 @@ class GroupCoordinatorTest extends JUnitSuite {
     assertEquals(Errors.NONE, leaveGroupResult)
 
     val groupTopicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, groupPartitionId)
-    val partition = EasyMock.niceMock(classOf[Partition])
+    val partition: Partition = EasyMock.niceMock(classOf[Partition])
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getMagic(EasyMock.anyObject())).andStubReturn(Some(RecordBatch.CURRENT_MAGIC_VALUE))
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 9ab9705..3ab4a13 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -729,7 +729,7 @@ class GroupMetadataManagerTest {
     val tp2 = new TopicPartition("bar", 0)
     val tp3 = new TopicPartition("xxx", 0)
 
-    val logMock =  EasyMock.mock(classOf[Log])
+    val logMock: Log = EasyMock.mock(classOf[Log])
     EasyMock.expect(replicaManager.getLog(groupTopicPartition)).andStubReturn(Some(logMock))
 
     val segment1MemberId = "a"
@@ -1785,19 +1785,19 @@ class GroupMetadataManagerTest {
       offsetCommitRecords ++ Seq(groupMetadataRecord): _*)
 
     // Prepend empty control batch to valid records
-    val mockBatch = EasyMock.createMock(classOf[MutableRecordBatch])
+    val mockBatch: MutableRecordBatch = EasyMock.createMock(classOf[MutableRecordBatch])
     EasyMock.expect(mockBatch.iterator).andReturn(Collections.emptyIterator[Record])
     EasyMock.expect(mockBatch.isControlBatch).andReturn(true)
     EasyMock.expect(mockBatch.isTransactional).andReturn(true)
     EasyMock.expect(mockBatch.nextOffset).andReturn(16L)
     EasyMock.replay(mockBatch)
-    val mockRecords = EasyMock.createMock(classOf[MemoryRecords])
+    val mockRecords: MemoryRecords = EasyMock.createMock(classOf[MemoryRecords])
     EasyMock.expect(mockRecords.batches).andReturn((Iterable[MutableRecordBatch](mockBatch) ++ records.batches.asScala).asJava).anyTimes()
     EasyMock.expect(mockRecords.records).andReturn(records.records()).anyTimes()
     EasyMock.expect(mockRecords.sizeInBytes()).andReturn(DefaultRecordBatch.RECORD_BATCH_OVERHEAD + records.sizeInBytes()).anyTimes()
     EasyMock.replay(mockRecords)
 
-    val logMock = EasyMock.mock(classOf[Log])
+    val logMock: Log = EasyMock.mock(classOf[Log])
     EasyMock.expect(logMock.logStartOffset).andReturn(startOffset).anyTimes()
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
       maxLength = EasyMock.anyInt(),
@@ -1886,7 +1886,7 @@ class GroupMetadataManagerTest {
   private def expectGroupMetadataLoad(groupMetadataTopicPartition: TopicPartition,
                                       startOffset: Long,
                                       records: MemoryRecords): Unit = {
-    val logMock =  EasyMock.mock(classOf[Log])
+    val logMock: Log =  EasyMock.mock(classOf[Log])
     EasyMock.expect(replicaManager.getLog(groupMetadataTopicPartition)).andStubReturn(Some(logMock))
     val endOffset = expectGroupMetadataLoad(logMock, startOffset, records)
     EasyMock.expect(replicaManager.getLogEndOffset(groupMetadataTopicPartition)).andStubReturn(Some(endOffset))
@@ -1902,7 +1902,7 @@ class GroupMetadataManagerTest {
                                       startOffset: Long,
                                       records: MemoryRecords): Long = {
     val endOffset = startOffset + records.records.asScala.size
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     EasyMock.expect(logMock.logStartOffset).andStubReturn(startOffset)
     EasyMock.expect(logMock.read(EasyMock.eq(startOffset),
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
index 660e623..b2cc4a5 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/ProducerIdManagerTest.scala
@@ -24,7 +24,7 @@ import org.junit.Assert._
 
 class ProducerIdManagerTest {
 
-  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
 
   @After
   def tearDown(): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index b2a6733..3cf9566 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -81,13 +81,13 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
       new MockTimer,
       reaperEnabled = false)
     val brokerNode = new Node(0, "host", 10)
-    val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getPartitionLeaderEndpoint(
       EasyMock.anyString(),
       EasyMock.anyInt(),
       EasyMock.anyObject())
     ).andReturn(Some(brokerNode)).anyTimes()
-    val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+    val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
     txnMarkerChannelManager = new TransactionMarkerChannelManager(
       KafkaConfig.fromProps(serverProps),
       metadataCache,
@@ -246,8 +246,8 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
 
   private def prepareTxnLog(partitionId: Int): Unit = {
 
-    val logMock =  EasyMock.mock(classOf[Log])
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val logMock: Log =  EasyMock.mock(classOf[Log])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     val topicPartition = new TopicPartition(TRANSACTION_STATE_TOPIC_NAME, partitionId)
     val startOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(20L)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
index 454b361..44d5c5f 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerChannelManagerTest.scala
@@ -36,9 +36,9 @@ import scala.collection.JavaConverters._
 import scala.collection.mutable
 
 class TransactionMarkerChannelManagerTest {
-  private val metadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
-  private val networkClient = EasyMock.createNiceMock(classOf[NetworkClient])
-  private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val metadataCache: MetadataCache = EasyMock.createNiceMock(classOf[MetadataCache])
+  private val networkClient: NetworkClient = EasyMock.createNiceMock(classOf[NetworkClient])
+  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
 
   private val partition1 = new TopicPartition("topic1", 0)
   private val partition2 = new TopicPartition("topic1", 1)
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
index 3ca6c1b..85159c3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionMarkerRequestCompletionHandlerTest.scala
@@ -47,9 +47,10 @@ class TransactionMarkerRequestCompletionHandlerTest {
   private val txnMetadata = new TransactionMetadata(transactionalId, producerId, producerEpoch, txnTimeoutMs,
     PrepareCommit, mutable.Set[TopicPartition](topicPartition), 0L, 0L)
 
-  private val markerChannelManager = EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
+  private val markerChannelManager: TransactionMarkerChannelManager =
+    EasyMock.createNiceMock(classOf[TransactionMarkerChannelManager])
 
-  private val txnStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
+  private val txnStateManager: TransactionStateManager = EasyMock.createNiceMock(classOf[TransactionStateManager])
 
   private val handler = new TransactionMarkerRequestCompletionHandler(brokerId, txnStateManager, markerChannelManager, txnIdAndMarkers)
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index d2fe7ea..ba8fe1d 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -574,8 +574,8 @@ class TransactionStateManagerTest {
                             records: MemoryRecords): Unit = {
     EasyMock.reset(replicaManager)
 
-    val logMock =  EasyMock.mock(classOf[Log])
-    val fileRecordsMock = EasyMock.mock(classOf[FileRecords])
+    val logMock: Log = EasyMock.mock(classOf[Log])
+    val fileRecordsMock: FileRecords = EasyMock.mock(classOf[FileRecords])
 
     val endOffset = startOffset + records.records.asScala.size
 
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 7728998..9b39567 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -325,7 +325,7 @@ class LogTest {
   @Test
   def testSizeForLargeLogs(): Unit = {
     val largeSize = Int.MaxValue.toLong * 2
-    val logSegment = EasyMock.createMock(classOf[LogSegment])
+    val logSegment: LogSegment = EasyMock.createMock(classOf[LogSegment])
 
     EasyMock.expect(logSegment.size).andReturn(Int.MaxValue).anyTimes
     EasyMock.replay(logSegment)
@@ -347,7 +347,7 @@ class LogTest {
 
   @Test
   def testSkipLoadingIfEmptyProducerStateBeforeTruncation(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     // Load the log
     EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
@@ -426,7 +426,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndNoCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
@@ -463,7 +463,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfOldMessageFormatAndCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     stateManager.updateMapEndOffset(0L)
     EasyMock.expectLastCall().anyTimes()
@@ -503,7 +503,7 @@ class LogTest {
 
   @Test
   def testSkipTruncateAndReloadIfNewMessageFormatAndCleanShutdown(): Unit = {
-    val stateManager = EasyMock.mock(classOf[ProducerStateManager])
+    val stateManager: ProducerStateManager = EasyMock.mock(classOf[ProducerStateManager])
 
     EasyMock.expect(stateManager.latestSnapshotOffset).andReturn(None)
 
diff --git a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
index 9afb145..b49b5e1 100644
--- a/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/ProducerStateManagerTest.scala
@@ -754,7 +754,7 @@ class ProducerStateManagerTest extends JUnitSuite {
     val producerEpoch = 145.toShort
     val baseOffset = 15
 
-    val batch = EasyMock.createMock(classOf[RecordBatch])
+    val batch: RecordBatch = EasyMock.createMock(classOf[RecordBatch])
     EasyMock.expect(batch.isControlBatch).andReturn(true).once
     EasyMock.expect(batch.iterator).andReturn(Collections.emptyIterator[Record]).once
     EasyMock.replay(batch)
diff --git a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
index cd00ff1..0a4d7c1 100644
--- a/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/AbstractFetcherManagerTest.scala
@@ -26,7 +26,7 @@ class AbstractFetcherManagerTest {
 
   @Test
   def testAddAndRemovePartition(): Unit = {
-    val fetcher = EasyMock.mock(classOf[AbstractFetcherThread])
+    val fetcher: AbstractFetcherThread = EasyMock.mock(classOf[AbstractFetcherThread])
     val fetcherManager = new AbstractFetcherManager[AbstractFetcherThread]("fetcher-manager", "fetcher-manager", 2) {
       override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): AbstractFetcherThread = {
         fetcher
diff --git a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
index 6f75174..e10d4b2 100644
--- a/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ClientQuotaManagerTest.scala
@@ -59,7 +59,7 @@ class ClientQuotaManagerTest {
 
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
-    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+    val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
 
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
diff --git a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
index 41b9055..45ef18f 100755
--- a/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicBrokerConfigTest.scala
@@ -21,6 +21,7 @@ import java.util
 import java.util.Properties
 
 import kafka.utils.TestUtils
+import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.Reconfigurable
 import org.apache.kafka.common.config.types.Password
 import org.apache.kafka.common.config.{ConfigException, SslConfigs}
@@ -303,7 +304,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
   def testDynamicListenerConfig(): Unit = {
     val props = TestUtils.createBrokerConfig(0, TestUtils.MockZkConnect, port = 9092)
     val oldConfig =  KafkaConfig.fromProps(props)
-    val kafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
+    val kafkaServer: KafkaServer = EasyMock.createMock(classOf[kafka.server.KafkaServer])
     EasyMock.expect(kafkaServer.config).andReturn(oldConfig).anyTimes()
     EasyMock.replay(kafkaServer)
 
@@ -328,7 +329,7 @@ class DynamicBrokerConfigTest extends JUnitSuite {
 
   @Test
   def testDynamicConfigInitializationWithoutConfigsInZK(): Unit = {
-    val zkClient = EasyMock.createMock(classOf[kafka.zk.KafkaZkClient])
+    val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
     EasyMock.expect(zkClient.getEntityConfigs(EasyMock.anyString(), EasyMock.anyString())).andReturn(new java.util.Properties()).anyTimes()
     EasyMock.replay(zkClient)
 
@@ -362,4 +363,4 @@ class TestDynamicThreadPool() extends BrokerReconfigurable {
     assertEquals(10, newConfig.numIoThreads)
     assertEquals(100, newConfig.backgroundThreads)
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index cabe0a9..789dbae 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -210,7 +210,7 @@ class DynamicConfigChangeTest extends KafkaServerTestHarness {
     // Create a mock ConfigHandler to record config changes it is asked to process
     val entityArgument = EasyMock.newCapture[String]
     val propertiesArgument = EasyMock.newCapture[Properties]
-    val handler = EasyMock.createNiceMock(classOf[ConfigHandler])
+    val handler: ConfigHandler = EasyMock.createNiceMock(classOf[ConfigHandler])
     handler.processConfigChanges(
       EasyMock.and(EasyMock.capture(entityArgument), EasyMock.isA(classOf[String])),
       EasyMock.and(EasyMock.capture(propertiesArgument), EasyMock.isA(classOf[Properties])))
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index 7451234..61cbd2c 100755
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -35,7 +35,7 @@ class HighwatermarkPersistenceTest {
 
   val configs = TestUtils.createBrokerConfigs(2, TestUtils.MockZkConnect).map(KafkaConfig.fromProps)
   val topic = "foo"
-  val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
+  val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
   val logManagers = configs map { config =>
     TestUtils.createLogManager(
       logDirs = config.logDirs.map(new File(_)),
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 3dff709..d5bdf14 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -21,7 +21,7 @@ import java.util.Properties
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
 import kafka.server.epoch.LeaderEpochFileCache
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
@@ -55,7 +55,7 @@ class IsrExpirationTest {
 
   @Before
   def setUp() {
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     EasyMock.replay(logManager)
 
@@ -252,8 +252,8 @@ class IsrExpirationTest {
   }
 
   private def logMock: Log = {
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
-    val cache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
+    val log: Log = EasyMock.createMock(classOf[Log])
+    val cache: LeaderEpochFileCache = EasyMock.createNiceMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
     EasyMock.expect(log.leaderEpochCache).andReturn(cache).anyTimes()
     EasyMock.expect(log.onHighWatermarkIncremented(0L))
diff --git a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
index b903c4a..be30f8a 100644
--- a/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
+++ b/core/src/test/scala/unit/kafka/server/KafkaApisTest.scala
@@ -55,24 +55,24 @@ import scala.collection.Map
 
 class KafkaApisTest {
 
-  private val requestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
-  private val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
-  private val replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
-  private val groupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
-  private val adminManager = EasyMock.createNiceMock(classOf[AdminManager])
-  private val txnCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
-  private val controller = EasyMock.createNiceMock(classOf[KafkaController])
-  private val zkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+  private val requestChannel: RequestChannel = EasyMock.createNiceMock(classOf[RequestChannel])
+  private val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+  private val replicaManager: ReplicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
+  private val groupCoordinator: GroupCoordinator = EasyMock.createNiceMock(classOf[GroupCoordinator])
+  private val adminManager: AdminManager = EasyMock.createNiceMock(classOf[AdminManager])
+  private val txnCoordinator: TransactionCoordinator = EasyMock.createNiceMock(classOf[TransactionCoordinator])
+  private val controller: KafkaController = EasyMock.createNiceMock(classOf[KafkaController])
+  private val zkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
   private val metrics = new Metrics()
   private val brokerId = 1
   private val metadataCache = new MetadataCache(brokerId)
   private val authorizer: Option[Authorizer] = None
-  private val clientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
-  private val clientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
-  private val replicaQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
+  private val clientQuotaManager: ClientQuotaManager = EasyMock.createNiceMock(classOf[ClientQuotaManager])
+  private val clientRequestQuotaManager: ClientRequestQuotaManager = EasyMock.createNiceMock(classOf[ClientRequestQuotaManager])
+  private val replicaQuotaManager: ReplicationQuotaManager = EasyMock.createNiceMock(classOf[ReplicationQuotaManager])
   private val quotas = QuotaManagers(clientQuotaManager, clientQuotaManager, clientRequestQuotaManager,
     replicaQuotaManager, replicaQuotaManager, replicaQuotaManager, None)
-  private val fetchManager = EasyMock.createNiceMock(classOf[FetchManager])
+  private val fetchManager: FetchManager = EasyMock.createNiceMock(classOf[FetchManager])
   private val brokerTopicStats = new BrokerTopicStats
   private val clusterId = "clusterId"
   private val time = new MockTime
diff --git a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
index 740b28e..50449dc 100755
--- a/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogOffsetTest.scala
@@ -211,8 +211,8 @@ class LogOffsetTest extends BaseRequestTest {
    * a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegmentSize() {
-    val log = EasyMock.niceMock(classOf[Log])
-    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    val log: Log = EasyMock.niceMock(classOf[Log])
+    val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(logSegment.size).andStubAnswer(new IAnswer[Int] {
       private val value = new AtomicInteger(0)
       def answer: Int = value.getAndIncrement()
@@ -228,8 +228,8 @@ class LogOffsetTest extends BaseRequestTest {
    * different (simulating a race condition) */
   @Test
   def testFetchOffsetsBeforeWithChangingSegments() {
-    val log = EasyMock.niceMock(classOf[Log])
-    val logSegment = EasyMock.niceMock(classOf[LogSegment])
+    val log: Log = EasyMock.niceMock(classOf[Log])
+    val logSegment: LogSegment = EasyMock.niceMock(classOf[LogSegment])
     EasyMock.expect(log.logSegments).andStubAnswer {
       new IAnswer[Iterable[LogSegment]] {
         def answer = new Iterable[LogSegment] {
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
index 5d92b61..a4fbaf2 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaAlterLogDirsThreadTest.scala
@@ -50,9 +50,9 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val partitionT1p0 = createMock(classOf[Partition])
-    val partitionT1p1 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val partitionT1p1: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpochT1p0 = 2
     val leaderEpochT1p1 = 5
@@ -101,8 +101,8 @@ class ReplicaAlterLogDirsThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val partitionT1p0 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 2
     val leo = 13
@@ -149,18 +149,18 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochsT1p0 = createMock(classOf[LeaderEpochFileCache])
-    val futureReplicaLeaderEpochsT1p1 = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaT1p0 = createNiceMock(classOf[Replica])
-    val replicaT1p1 = createNiceMock(classOf[Replica])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochsT1p0: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val futureReplicaLeaderEpochsT1p1: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaT1p0: Replica = createNiceMock(classOf[Replica])
+    val replicaT1p1: Replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplicaT1p0 = createNiceMock(classOf[Replica])
-    val futureReplicaT1p1 = createNiceMock(classOf[Replica])
-    val partitionT1p0 = createMock(classOf[Partition])
-    val partitionT1p1 = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val futureReplicaT1p0: Replica = createNiceMock(classOf[Replica])
+    val futureReplicaT1p1: Replica = createNiceMock(classOf[Replica])
+    val partitionT1p0: Partition = createMock(classOf[Partition])
+    val partitionT1p1: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 2
@@ -228,14 +228,14 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
     // one future replica mock because our mocking methods return same values for both future replicas
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 5
@@ -301,13 +301,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val initialFetchOffset = 100
@@ -356,13 +356,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     // Setup all the dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[kafka.log.LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val futureReplicaLeaderEpoch = 1
@@ -439,13 +439,13 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val futureReplicaLeaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val futureReplicaLeaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
     val responseCallback: Capture[Seq[(TopicPartition, FetchPartitionData)] => Unit]  = EasyMock.newCapture()
 
     val leaderEpoch = 5
@@ -494,12 +494,12 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
     expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
@@ -543,12 +543,12 @@ class ReplicaAlterLogDirsThreadTest {
 
     //Setup all dependencies
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
-    val quotaManager = createNiceMock(classOf[ReplicationQuotaManager])
-    val logManager = createMock(classOf[LogManager])
-    val replica = createNiceMock(classOf[Replica])
-    val futureReplica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quotaManager: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val futureReplica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     //Stubs
     expect(futureReplica.logStartOffset).andReturn(123).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
index a370101..c65c254 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaFetcherThreadTest.scala
@@ -79,7 +79,7 @@ class ReplicaFetcherThreadTest {
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "0.10.2")
     props.put(KafkaConfig.LogMessageFormatVersionProp, "0.10.2")
     val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint = createMock(classOf[BlockingSend])
+    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
     expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
       override def answer(): ClientResponse = {
         toFail = true // assert no leader request is sent
@@ -123,7 +123,7 @@ class ReplicaFetcherThreadTest {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     props.put(KafkaConfig.InterBrokerProtocolVersionProp, "1.0.0")
     val config = KafkaConfig.fromProps(props)
-    val leaderEndpoint = createMock(classOf[BlockingSend])
+    val leaderEndpoint: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(leaderEndpoint.sendRequest(anyObject())).andAnswer(new IAnswer[ClientResponse] {
       override def answer(): ClientResponse = {
@@ -162,13 +162,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
@@ -261,7 +261,7 @@ class ReplicaFetcherThreadTest {
   def shouldHandleExceptionFromBlockingSend(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
-    val mockBlockingSend = createMock(classOf[BlockingSend])
+    val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(mockBlockingSend.sendRequest(anyObject())).andThrow(new NullPointerException).once()
     replay(mockBlockingSend)
@@ -295,12 +295,12 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all dependencies
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
 
@@ -355,13 +355,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
     val initialLEO = 200
@@ -404,13 +404,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpochAtFollower = 5
     val leaderEpochAtLeader = 4
@@ -458,13 +458,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     // Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialLEO = 200
 
@@ -529,13 +529,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(props)
 
     // Setup all dependencies
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialLEO = 200
 
@@ -590,13 +590,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val initialFetchOffset = 100
     val initialLeo = 300
@@ -635,13 +635,13 @@ class ReplicaFetcherThreadTest {
 
     // Setup all the dependencies
     val configs = TestUtils.createBrokerConfigs(1, "localhost:1234").map(KafkaConfig.fromProps)
-    val quota = createNiceMock(classOf[kafka.server.ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createMock(classOf[kafka.log.LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createMock(classOf[kafka.server.ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createMock(classOf[ReplicaManager])
 
     val leaderEpoch = 5
     val highWaterMark = 100
@@ -693,13 +693,13 @@ class ReplicaFetcherThreadTest {
     val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, "localhost:1234"))
 
     //Setup all stubs
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createNiceMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
     val leaderEpoch = 4
 
@@ -746,13 +746,13 @@ class ReplicaFetcherThreadTest {
     val initialLEO = 100
 
     //Setup all stubs
-    val quota = createNiceMock(classOf[ReplicationQuotaManager])
-    val leaderEpochs = createNiceMock(classOf[LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[LogManager])
-    val replicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
-    val replica = createNiceMock(classOf[Replica])
-    val partition = createMock(classOf[Partition])
-    val replicaManager = createNiceMock(classOf[ReplicaManager])
+    val quota: ReplicationQuotaManager = createNiceMock(classOf[ReplicationQuotaManager])
+    val leaderEpochs: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
+    val replicaAlterLogDirsManager: ReplicaAlterLogDirsManager = createMock(classOf[ReplicaAlterLogDirsManager])
+    val replica: Replica = createNiceMock(classOf[Replica])
+    val partition: Partition = createMock(classOf[Partition])
+    val replicaManager: ReplicaManager = createNiceMock(classOf[ReplicaManager])
 
     //Stub return values
     expect(partition.truncateTo(capture(truncateToCapture), anyBoolean())).once
@@ -796,7 +796,7 @@ class ReplicaFetcherThreadTest {
   def shouldCatchExceptionFromBlockingSendWhenShuttingDownReplicaFetcherThread(): Unit = {
     val props = TestUtils.createBrokerConfig(1, "localhost:1234")
     val config = KafkaConfig.fromProps(props)
-    val mockBlockingSend = createMock(classOf[BlockingSend])
+    val mockBlockingSend: BlockingSend = createMock(classOf[BlockingSend])
 
     expect(mockBlockingSend.initiateClose()).andThrow(new IllegalArgumentException()).once()
     expect(mockBlockingSend.close()).andThrow(new IllegalStateException()).once()
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
index 9b59e71..50b5fa7 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerQuotasTest.scala
@@ -21,7 +21,7 @@ import java.util.{Optional, Properties}
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.{Partition, Replica}
-import kafka.log.{Log, LogOffsetSnapshot}
+import kafka.log.{Log, LogManager, LogOffsetSnapshot}
 import kafka.utils._
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.TopicPartition
@@ -151,7 +151,7 @@ class ReplicaManagerQuotasTest {
     // Set up DelayedFetch where there is data to return to a follower replica, either in-sync or out of sync
     def setupDelayedFetch(isReplicaInSync: Boolean): DelayedFetch = {
       val endOffsetMetadata = new LogOffsetMetadata(messageOffset = 100L, segmentBaseOffset = 0L, relativePositionInSegment = 500)
-      val partition = EasyMock.createMock(classOf[Partition])
+      val partition: Partition = EasyMock.createMock(classOf[Partition])
 
       val offsetSnapshot = LogOffsetSnapshot(
         logStartOffset = 0L,
@@ -161,7 +161,7 @@ class ReplicaManagerQuotasTest {
       EasyMock.expect(partition.fetchOffsetSnapshot(Optional.empty(), fetchOnlyFromLeader = true))
           .andReturn(offsetSnapshot)
 
-      val replicaManager = EasyMock.createMock(classOf[ReplicaManager])
+      val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
       EasyMock.expect(replicaManager.getPartitionOrException(
         EasyMock.anyObject[TopicPartition], EasyMock.anyBoolean()))
         .andReturn(partition).anyTimes()
@@ -192,11 +192,11 @@ class ReplicaManagerQuotasTest {
   }
 
   def setUpMocks(fetchInfo: Seq[(TopicPartition, PartitionData)], record: SimpleRecord = this.record, bothReplicasInSync: Boolean = false) {
-    val zkClient = EasyMock.createMock(classOf[KafkaZkClient])
-    val scheduler = createNiceMock(classOf[KafkaScheduler])
+    val zkClient: KafkaZkClient = EasyMock.createMock(classOf[KafkaZkClient])
+    val scheduler: KafkaScheduler = createNiceMock(classOf[KafkaScheduler])
 
     //Create log which handles both a regular read and a 0 bytes read
-    val log = createNiceMock(classOf[Log])
+    val log: Log = createNiceMock(classOf[Log])
     expect(log.logStartOffset).andReturn(0L).anyTimes()
     expect(log.logEndOffset).andReturn(20L).anyTimes()
     expect(log.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(20L)).anyTimes()
@@ -225,7 +225,7 @@ class ReplicaManagerQuotasTest {
     replay(log)
 
     //Create log manager
-    val logManager = createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createMock(classOf[LogManager])
 
     //Return the same log for each partition as it doesn't matter
     expect(logManager.getLog(anyObject(), anyBoolean())).andReturn(Some(log)).anyTimes()
@@ -263,7 +263,7 @@ class ReplicaManagerQuotasTest {
   }
 
   def mockQuota(bound: Long): ReplicaQuota = {
-    val quota = createMock(classOf[ReplicaQuota])
+    val quota: ReplicaQuota = createMock(classOf[ReplicaQuota])
     expect(quota.isThrottled(anyObject())).andReturn(true).anyTimes()
     quota
   }
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 4401748..c4ca2cb 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -148,7 +148,7 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = Seq(createBroker(0, "host0", 0), createBroker(1, "host1", 1))
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     EasyMock.replay(metadataCache)
     val rm = new ReplicaManager(config, metrics, time, kafkaZkClient, new MockScheduler(time), mockLogMgr,
@@ -594,7 +594,7 @@ class ReplicaManagerTest {
     val mockScheduler = new MockScheduler(time)
     val mockBrokerTopicStats = new BrokerTopicStats
     val mockLogDirFailureChannel = new LogDirFailureChannel(config.logDirs.size)
-    val mockLeaderEpochCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
+    val mockLeaderEpochCache: LeaderEpochFileCache = EasyMock.createMock(classOf[LeaderEpochFileCache])
     EasyMock.expect(mockLeaderEpochCache.latestEpoch).andReturn(leaderEpochFromLeader)
     EasyMock.expect(mockLeaderEpochCache.endOffsetFor(leaderEpochFromLeader))
       .andReturn((leaderEpochFromLeader, localLogOffset))
@@ -620,7 +620,7 @@ class ReplicaManagerTest {
     }
 
     // Expect to call LogManager.truncateTo exactly once
-    val mockLogMgr = EasyMock.createMock(classOf[LogManager])
+    val mockLogMgr: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(mockLogMgr.liveLogDirs).andReturn(config.logDirs.map(new File(_).getAbsoluteFile)).anyTimes
     EasyMock.expect(mockLogMgr.currentDefaultConfig).andReturn(LogConfig())
     EasyMock.expect(mockLogMgr.getOrCreateLog(new TopicPartition(topic, topicPartition),
@@ -634,7 +634,7 @@ class ReplicaManagerTest {
     val aliveBrokerIds = Seq[Integer](followerBrokerId, leaderBrokerId)
     val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
 
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes
     aliveBrokerIds.foreach { brokerId =>
       EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes
@@ -793,7 +793,7 @@ class ReplicaManagerTest {
     val logProps = new Properties()
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)), LogConfig(logProps))
     val aliveBrokers = aliveBrokerIds.map(brokerId => createBroker(brokerId, s"host$brokerId", brokerId))
-    val metadataCache = EasyMock.createMock(classOf[MetadataCache])
+    val metadataCache: MetadataCache = EasyMock.createMock(classOf[MetadataCache])
     EasyMock.expect(metadataCache.getAliveBrokers).andReturn(aliveBrokers).anyTimes()
     aliveBrokerIds.foreach { brokerId =>
       EasyMock.expect(metadataCache.isBrokerAlive(EasyMock.eq(brokerId))).andReturn(true).anyTimes()
diff --git a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
index 67d083c..1bc0257 100755
--- a/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerStartupTest.scala
@@ -105,7 +105,7 @@ class ServerStartupTest extends ZooKeeperTestHarness {
   @Test
   def testBrokerStateRunningAfterZK(): Unit = {
     val brokerId = 0
-    val mockBrokerState = EasyMock.niceMock(classOf[kafka.server.BrokerState])
+    val mockBrokerState: BrokerState = EasyMock.niceMock(classOf[BrokerState])
 
     class BrokerStateInterceptor() extends BrokerState {
       override def newState(newState: BrokerStates): Unit = {
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ecfbd73..d0ff46b 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -21,7 +21,7 @@ import java.io.File
 import kafka.api._
 import kafka.utils._
 import kafka.cluster.Replica
-import kafka.log.Log
+import kafka.log.{Log, LogManager}
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.zk.KafkaZkClient
 import org.apache.kafka.common.metrics.Metrics
@@ -71,15 +71,15 @@ class SimpleFetchTest {
   @Before
   def setUp() {
     // create nice mock since we don't particularly care about zkclient calls
-    val kafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val kafkaZkClient: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.replay(kafkaZkClient)
 
     // create nice mock since we don't particularly care about scheduler calls
-    val scheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
+    val scheduler: KafkaScheduler = EasyMock.createNiceMock(classOf[KafkaScheduler])
     EasyMock.replay(scheduler)
 
     // create the log which takes read with either HW max offset or none max offset
-    val log = EasyMock.createNiceMock(classOf[Log])
+    val log: Log = EasyMock.createNiceMock(classOf[Log])
     EasyMock.expect(log.logStartOffset).andReturn(0).anyTimes()
     EasyMock.expect(log.logEndOffset).andReturn(leaderLEO).anyTimes()
     EasyMock.expect(log.dir).andReturn(TestUtils.tempDir()).anyTimes()
@@ -107,7 +107,7 @@ class SimpleFetchTest {
     EasyMock.replay(log)
 
     // create the log manager that is aware of this mock log
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.getLog(topicPartition, false)).andReturn(Some(log)).anyTimes()
     EasyMock.expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     EasyMock.replay(logManager)
diff --git a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
index ff781a2..c46404a 100644
--- a/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ThrottledChannelExpirationTest.scala
@@ -50,7 +50,7 @@ class ThrottledChannelExpirationTest {
 
     val request = builder.build()
     val buffer = request.serialize(new RequestHeader(builder.apiKey, request.version, "", 0))
-    val requestChannelMetrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
+    val requestChannelMetrics: RequestChannel.Metrics = EasyMock.createNiceMock(classOf[RequestChannel.Metrics])
 
     // read the header from the buffer first so that the body can be read next from the Request constructor
     val header = RequestHeader.parse(buffer)
@@ -122,4 +122,4 @@ class ThrottledChannelExpirationTest {
       time.sleep(10)
     }
   }
-}
\ No newline at end of file
+}
diff --git a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
index 86a087b..7adc204 100644
--- a/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
+++ b/core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
@@ -21,6 +21,7 @@ import java.util.Optional
 import java.util.concurrent.atomic.AtomicBoolean
 
 import kafka.cluster.Replica
+import kafka.log.{Log, LogManager}
 import kafka.server._
 import kafka.utils.{MockTime, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -46,9 +47,9 @@ class OffsetsForLeaderEpochTest {
     val request = Map(tp -> new OffsetsForLeaderEpochRequest.PartitionData(Optional.empty(), epochRequested))
 
     //Stubs
-    val mockLog = createNiceMock(classOf[kafka.log.Log])
-    val mockCache = createNiceMock(classOf[kafka.server.epoch.LeaderEpochFileCache])
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val mockLog: Log = createNiceMock(classOf[Log])
+    val mockCache: LeaderEpochFileCache = createNiceMock(classOf[LeaderEpochFileCache])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(mockCache.endOffsetFor(epochRequested)).andReturn(epochAndOffset)
     expect(mockLog.leaderEpochCache).andReturn(mockCache).anyTimes()
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
@@ -72,7 +73,7 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnNoLeaderForPartitionIfThrown(): Unit = {
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
@@ -95,7 +96,7 @@ class OffsetsForLeaderEpochTest {
 
   @Test
   def shouldReturnUnknownTopicOrPartitionIfThrown(): Unit = {
-    val logManager = createNiceMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = createNiceMock(classOf[LogManager])
     expect(logManager.liveLogDirs).andReturn(Array.empty[File]).anyTimes()
     replay(logManager)
 
diff --git a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
index 65273eb..4bf7471 100644
--- a/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/ReplicationUtilsTest.scala
@@ -17,9 +17,10 @@
 
 package kafka.utils
 
-import kafka.server.{KafkaConfig, ReplicaFetcherManager}
+import kafka.server.{KafkaConfig, ReplicaFetcherManager, ReplicaManager}
 import kafka.api.LeaderAndIsr
 import kafka.controller.LeaderIsrAndControllerEpoch
+import kafka.log.{Log, LogManager}
 import kafka.zk._
 import org.apache.kafka.common.TopicPartition
 import org.junit.Assert._
@@ -48,16 +49,16 @@ class ReplicationUtilsTest extends ZooKeeperTestHarness {
   @Test
   def testUpdateLeaderAndIsr() {
     val configs = TestUtils.createBrokerConfigs(1, zkConnect).map(KafkaConfig.fromProps)
-    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    val log: Log = EasyMock.createMock(classOf[Log])
     EasyMock.expect(log.logEndOffset).andReturn(20).anyTimes()
     EasyMock.expect(log)
     EasyMock.replay(log)
 
-    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    val logManager: LogManager = EasyMock.createMock(classOf[LogManager])
     EasyMock.expect(logManager.getLog(new TopicPartition(topic, partition), false)).andReturn(Some(log)).anyTimes()
     EasyMock.replay(logManager)
 
-    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+    val replicaManager: ReplicaManager = EasyMock.createMock(classOf[ReplicaManager])
     EasyMock.expect(replicaManager.config).andReturn(configs.head)
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
diff --git a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
index 39745e5..aca538b 100644
--- a/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
+++ b/core/src/test/scala/unit/kafka/zk/AdminZkClientTest.scala
@@ -138,7 +138,7 @@ class AdminZkClientTest extends ZooKeeperTestHarness with Logging with RackAware
     val topic = "test.topic"
 
     // simulate the ZK interactions that can happen when a topic is concurrently created by multiple processes
-    val zkMock = EasyMock.createNiceMock(classOf[KafkaZkClient])
+    val zkMock: KafkaZkClient = EasyMock.createNiceMock(classOf[KafkaZkClient])
     EasyMock.expect(zkMock.topicExists(topic)).andReturn(false)
     EasyMock.expect(zkMock.getAllTopicsInCluster).andReturn(Seq("some.topic", topic, "some.other.topic"))
     EasyMock.replay(zkMock)
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e11ded1..7dd3604 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -52,7 +52,7 @@ versions += [
   apacheds: "2.0.0-M24",
   argparse4j: "0.7.0",
   bcpkix: "1.60",
-  easymock: "3.6",
+  easymock: "4.0.1",
   jackson: "2.9.7",
   jetty: "9.4.12.v20180830",
   jersey: "2.27",
@@ -73,20 +73,19 @@ versions += [
   kafka_11: "1.1.1",
   kafka_20: "2.0.0",
   lz4: "1.5.0",
-  mavenArtifact: "3.5.4",
+  mavenArtifact: "3.6.0",
   metrics: "2.2.0",
   mockito: "2.23.0",
-  // PowerMock 1.x doesn't support Java 9, so use PowerMock 2.0.0 beta
-  powermock: "2.0.0-beta.5",
+  powermock: "2.0.0-RC.3",
   reflections: "0.9.11",
   rocksDB: "5.14.2",
   scalatest: "3.0.5",
   scoverage: "1.3.1",
   slf4j: "1.7.25",
   snappy: "1.1.7.2",
-  zkclient: "0.10",
+  zkclient: "0.11",
   zookeeper: "3.4.13",
-  zstd: "1.3.5-4"
+  zstd: "1.3.7-1"
 ]
 
 libs += [


Mime
View raw message