kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ij...@apache.org
Subject [kafka] branch 2.0 updated: MINOR: Change "no such session ID" log to debug (#5316)
Date Sat, 21 Jul 2018 10:33:52 GMT
This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.0
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.0 by this push:
     new dca4304  MINOR: Change "no such session ID" log to debug (#5316)
dca4304 is described below

commit dca4304156373ee5ea5ec5385149425aa7bbc13c
Author: Colin Patrick McCabe <colin@cmccabe.xyz>
AuthorDate: Sat Jul 21 03:32:56 2018 -0700

    MINOR: Change "no such session ID" log to debug (#5316)
    
    Improve the log messages while at it and fix some code style issues.
    
    Reviewers: Ismael Juma <ismael@juma.me.uk>
---
 .../src/main/scala/kafka/server/FetchSession.scala | 147 ++++++++++-----------
 .../scala/unit/kafka/server/FetchSessionTest.scala |  64 ++++-----
 2 files changed, 105 insertions(+), 106 deletions(-)

diff --git a/core/src/main/scala/kafka/server/FetchSession.scala b/core/src/main/scala/kafka/server/FetchSession.scala
index 68f79ca..64bc773 100644
--- a/core/src/main/scala/kafka/server/FetchSession.scala
+++ b/core/src/main/scala/kafka/server/FetchSession.scala
@@ -92,22 +92,22 @@ class CachedPartition(val topic: String,
     this(topic, partition, -1, -1, -1, -1, -1)
 
   def this(part: TopicPartition) =
-    this(part.topic(), part.partition())
+    this(part.topic, part.partition)
 
   def this(part: TopicPartition, reqData: FetchRequest.PartitionData) =
-    this(part.topic(), part.partition(),
+    this(part.topic, part.partition,
       reqData.maxBytes, reqData.fetchOffset, -1,
       reqData.logStartOffset, -1)
 
   def this(part: TopicPartition, reqData: FetchRequest.PartitionData,
            respData: FetchResponse.PartitionData[Records]) =
-    this(part.topic(), part.partition(),
+    this(part.topic, part.partition,
       reqData.maxBytes, reqData.fetchOffset, respData.highWatermark,
       reqData.logStartOffset, respData.logStartOffset)
 
-  def topicPartition() = new TopicPartition(topic, partition)
+  def topicPartition = new TopicPartition(topic, partition)
 
-  def reqData() = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes)
+  def reqData = new FetchRequest.PartitionData(fetchOffset, fetcherLogStartOffset, maxBytes)
 
   def updateRequestParams(reqData: FetchRequest.PartitionData): Unit = {
     // Update our cached request parameters.
@@ -129,7 +129,7 @@ class CachedPartition(val topic: String,
   def maybeUpdateResponseData(respData: FetchResponse.PartitionData[Records], updateResponseData:
Boolean): Boolean = {
     // Check the response data.
     var mustRespond = false
-    if ((respData.records != null) && (respData.records.sizeInBytes() > 0)) {
+    if ((respData.records != null) && (respData.records.sizeInBytes > 0)) {
       // Partitions with new data are always included in the response.
       mustRespond = true
     }
@@ -143,7 +143,7 @@ class CachedPartition(val topic: String,
       if (updateResponseData)
         localLogStartOffset = respData.logStartOffset
     }
-    if (respData.error.code() != 0) {
+    if (respData.error.code != 0) {
       // Partitions with errors are always included in the response.
       // We also set the cached highWatermark to an invalid offset, -1.
       // This ensures that when the error goes away, we re-send the partition.
@@ -154,7 +154,7 @@ class CachedPartition(val topic: String,
     mustRespond
   }
 
-  override def hashCode() = (31 * partition) + topic.hashCode
+  override def hashCode = (31 * partition) + topic.hashCode
 
   def canEqual(that: Any) = that.isInstanceOf[CachedPartition]
 
@@ -166,7 +166,7 @@ class CachedPartition(val topic: String,
       case _ => false
     }
 
-  override def toString() = synchronized {
+  override def toString = synchronized {
     "CachedPartition(topic=" + topic +
       ", partition=" + partition +
       ", maxBytes=" + maxBytes +
@@ -203,23 +203,23 @@ case class FetchSession(val id: Int,
   // If this is -1, the Session is not in the cache.
   var cachedSize = -1
 
-  def size(): Int = synchronized {
-    partitionMap.size()
+  def size: Int = synchronized {
+    partitionMap.size
   }
 
-  def isEmpty(): Boolean = synchronized {
+  def isEmpty: Boolean = synchronized {
     partitionMap.isEmpty
   }
 
-  def lastUsedKey(): LastUsedKey = synchronized {
+  def lastUsedKey: LastUsedKey = synchronized {
     LastUsedKey(lastUsedMs, id)
   }
 
-  def evictableKey(): EvictableKey = synchronized {
+  def evictableKey: EvictableKey = synchronized {
     EvictableKey(privileged, cachedSize, id)
   }
 
-  def metadata(): JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
+  def metadata: JFetchMetadata = synchronized { new JFetchMetadata(id, epoch) }
 
   def getFetchOffset(topicPartition: TopicPartition): Option[Long] = synchronized {
     Option(partitionMap.find(new CachedPartition(topicPartition))).map(_.fetchOffset)
@@ -234,7 +234,7 @@ case class FetchSession(val id: Int,
     val added = new TL
     val updated = new TL
     val removed = new TL
-    fetchData.entrySet().iterator().asScala.foreach(entry => {
+    fetchData.entrySet.iterator.asScala.foreach(entry => {
       val topicPart = entry.getKey
       val reqData = entry.getValue
       val newCachedPart = new CachedPartition(topicPart, reqData)
@@ -247,18 +247,18 @@ case class FetchSession(val id: Int,
         updated.add(topicPart)
       }
     })
-    toForget.iterator().asScala.foreach(p => {
-      if (partitionMap.remove(new CachedPartition(p.topic(), p.partition()))) {
+    toForget.iterator.asScala.foreach(p => {
+      if (partitionMap.remove(new CachedPartition(p.topic, p.partition))) {
         removed.add(p)
       }
     })
     (added, updated, removed)
   }
 
-  override def toString(): String = synchronized {
+  override def toString: String = synchronized {
     "FetchSession(id=" + id +
       ", privileged=" + privileged +
-      ", partitionMap.size=" + partitionMap.size() +
+      ", partitionMap.size=" + partitionMap.size +
       ", creationMs=" + creationMs +
       ", creationMs=" + lastUsedMs +
       ", epoch=" + epoch + ")"
@@ -308,7 +308,7 @@ class SessionErrorContext(val error: Errors,
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit):
Unit = {}
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
-    FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet().iterator())
+    FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator)
   }
 
   // Because of the fetch session error, we don't know what partitions were supposed to be
in this request.
@@ -328,15 +328,15 @@ class SessionlessFetchContext(val fetchData: util.Map[TopicPartition,
FetchReque
     Option(fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit):
Unit = {
-    fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey, entry.getValue))
+    fetchData.entrySet.asScala.foreach(entry => fun(entry.getKey, entry.getValue))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records]
= {
-    debug(s"Sessionless fetch context returning ${partitionsToLogString(updates.keySet())}")
+    debug(s"Sessionless fetch context returning ${partitionsToLogString(updates.keySet)}")
     new FetchResponse(Errors.NONE, updates, 0, INVALID_SESSION_ID)
   }
 }
@@ -359,17 +359,17 @@ class FullFetchContext(private val time: Time,
     Option(fetchData.get(part)).map(_.fetchOffset)
 
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit):
Unit = {
-    fetchData.entrySet().asScala.foreach(entry => fun(entry.getKey, entry.getValue))
+    fetchData.entrySet.asScala.foreach(entry => fun(entry.getKey, entry.getValue))
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
-    FetchResponse.sizeOf(versionId, updates.entrySet().iterator())
+    FetchResponse.sizeOf(versionId, updates.entrySet.iterator)
   }
 
   override def updateAndGenerateResponseData(updates: FetchSession.RESP_MAP): FetchResponse[Records]
= {
-    def createNewSession(): FetchSession.CACHE_MAP = {
-      val cachedPartitions = new FetchSession.CACHE_MAP(updates.size())
-      updates.entrySet().asScala.foreach(entry => {
+    def createNewSession: FetchSession.CACHE_MAP = {
+      val cachedPartitions = new FetchSession.CACHE_MAP(updates.size)
+      updates.entrySet.asScala.foreach(entry => {
         val part = entry.getKey
         val respData = entry.getValue
         val reqData = fetchData.get(part)
@@ -378,9 +378,9 @@ class FullFetchContext(private val time: Time,
       cachedPartitions
     }
     val responseSessionId = cache.maybeCreateSession(time.milliseconds(), isFromFollower,
-        updates.size(), createNewSession)
+        updates.size, () => createNewSession)
     debug(s"Full fetch context with session id $responseSessionId returning " +
-      s"${partitionsToLogString(updates.keySet())}")
+      s"${partitionsToLogString(updates.keySet)}")
     new FetchResponse(Errors.NONE, updates, 0, responseSessionId)
   }
 }
@@ -401,8 +401,8 @@ class IncrementalFetchContext(private val time: Time,
   override def foreachPartition(fun: (TopicPartition, FetchRequest.PartitionData) => Unit):
Unit = {
     // Take the session lock and iterate over all the cached partitions.
     session.synchronized {
-      session.partitionMap.iterator().asScala.foreach(part => {
-        fun(new TopicPartition(part.topic, part.partition), part.reqData())
+      session.partitionMap.iterator.asScala.foreach(part => {
+        fun(new TopicPartition(part.topic, part.partition), part.reqData)
       })
     }
   }
@@ -416,7 +416,7 @@ class IncrementalFetchContext(private val time: Time,
     var nextElement: util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]
= null
 
     override def hasNext: Boolean = {
-      while ((nextElement == null) && iter.hasNext()) {
+      while ((nextElement == null) && iter.hasNext) {
         val element = iter.next()
         val topicPart = element.getKey
         val respData = element.getValue
@@ -438,23 +438,23 @@ class IncrementalFetchContext(private val time: Time,
     }
 
     override def next(): util.Map.Entry[TopicPartition, FetchResponse.PartitionData[Records]]
= {
-      if (!hasNext()) throw new NoSuchElementException()
+      if (!hasNext) throw new NoSuchElementException
       val element = nextElement
       nextElement = null
       element
     }
 
-    override def remove() = throw new UnsupportedOperationException()
+    override def remove() = throw new UnsupportedOperationException
   }
 
   override def getResponseSize(updates: FetchSession.RESP_MAP, versionId: Short): Int = {
     session.synchronized {
-      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
       if (session.epoch != expectedEpoch) {
-        FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet().iterator())
+        FetchResponse.sizeOf(versionId, (new FetchSession.RESP_MAP).entrySet.iterator)
       } else {
         // Pass the partition iterator which updates neither the fetch context nor the partition
map.
-        FetchResponse.sizeOf(versionId, new PartitionIterator(updates.entrySet().iterator(),
false))
+        FetchResponse.sizeOf(versionId, new PartitionIterator(updates.entrySet.iterator,
false))
       }
     }
   }
@@ -463,19 +463,19 @@ class IncrementalFetchContext(private val time: Time,
     session.synchronized {
       // Check to make sure that the session epoch didn't change in between
       // creating this fetch context and generating this response.
-      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but
" +
           s"got ${session.epoch}.  Possible duplicate request.")
         new FetchResponse(Errors.INVALID_FETCH_SESSION_EPOCH, new FetchSession.RESP_MAP,
0, session.id)
       } else {
         // Iterate over the update list using PartitionIterator. This will prune updates
which don't need to be sent
-        val partitionIter = new PartitionIterator(updates.entrySet().iterator(), true)
-        while (partitionIter.hasNext()) {
+        val partitionIter = new PartitionIterator(updates.entrySet.iterator, true)
+        while (partitionIter.hasNext) {
           partitionIter.next()
         }
         debug(s"Incremental fetch context with session id ${session.id} returning " +
-          s"${partitionsToLogString(updates.keySet())}")
+          s"${partitionsToLogString(updates.keySet)}")
         new FetchResponse(Errors.NONE, updates, 0, session.id)
       }
     }
@@ -485,7 +485,7 @@ class IncrementalFetchContext(private val time: Time,
     session.synchronized {
       // Check to make sure that the session epoch didn't change in between
       // creating this fetch context and generating this response.
-      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch())
+      val expectedEpoch = JFetchMetadata.nextEpoch(reqMetadata.epoch)
       if (session.epoch != expectedEpoch) {
         info(s"Incremental fetch session ${session.id} expected epoch $expectedEpoch, but
" +
           s"got ${session.epoch}.  Possible duplicate request.")
@@ -570,14 +570,14 @@ class FetchSessionCache(private val maxEntries: Int,
   /**
     * Get the number of entries currently in the fetch session cache.
     */
-  def size(): Int = synchronized {
+  def size: Int = synchronized {
     sessions.size
   }
 
   /**
     * Get the total number of cached partitions.
     */
-  def totalPartitions(): Long = synchronized {
+  def totalPartitions: Long = synchronized {
     numPartitions
   }
 
@@ -614,7 +614,7 @@ class FetchSessionCache(private val maxEntries: Int,
       val partitionMap = createPartitions()
       val session = new FetchSession(newSessionId(), privileged, partitionMap,
           now, now, JFetchMetadata.nextEpoch(INITIAL_EPOCH))
-      debug(s"Created fetch session ${session.toString()}")
+      debug(s"Created fetch session ${session.toString}")
       sessions.put(session.id, session)
       touch(session, now)
       session.id
@@ -639,12 +639,12 @@ class FetchSessionCache(private val maxEntries: Int,
     */
   def tryEvict(privileged: Boolean, key: EvictableKey, now: Long): Boolean = synchronized
{
     // Try to evict an entry which is stale.
-    val lastUsedEntry = lastUsed.firstEntry()
+    val lastUsedEntry = lastUsed.firstEntry
     if (lastUsedEntry == null) {
       trace("There are no cache entries to evict.")
       false
-    } else if (now - lastUsedEntry.getKey().lastUsedMs > evictionMs) {
-      val session = lastUsedEntry.getValue()
+    } else if (now - lastUsedEntry.getKey.lastUsedMs > evictionMs) {
+      val session = lastUsedEntry.getValue
       trace(s"Evicting stale FetchSession ${session.id}.")
       remove(session)
       evictionsMeter.mark()
@@ -653,16 +653,16 @@ class FetchSessionCache(private val maxEntries: Int,
       // If there are no stale entries, check the first evictable entry.
       // If it is less valuable than our proposed entry, evict it.
       val map = if (privileged) evictableByPrivileged else evictableByAll
-      val evictableEntry = map.firstEntry()
+      val evictableEntry = map.firstEntry
       if (evictableEntry == null) {
         trace("No evictable entries found.")
         false
-      } else if (key.compareTo(evictableEntry.getKey()) < 0) {
-        trace(s"Can't evict ${evictableEntry.getKey()} with ${key.toString}")
+      } else if (key.compareTo(evictableEntry.getKey) < 0) {
+        trace(s"Can't evict ${evictableEntry.getKey} with ${key.toString}")
         false
       } else {
-        trace(s"Evicting ${evictableEntry.getKey()} with ${key.toString}.")
-        remove(evictableEntry.getValue())
+        trace(s"Evicting ${evictableEntry.getKey} with ${key.toString}.")
+        remove(evictableEntry.getValue)
         evictionsMeter.mark()
         true
       }
@@ -685,8 +685,8 @@ class FetchSessionCache(private val maxEntries: Int,
     */
   def remove(session: FetchSession): Option[FetchSession] = synchronized {
     val evictableKey = session.synchronized {
-      lastUsed.remove(session.lastUsedKey())
-      session.evictableKey()
+      lastUsed.remove(session.lastUsedKey)
+      session.evictableKey
     }
     evictableByAll.remove(evictableKey)
     evictableByPrivileged.remove(evictableKey)
@@ -706,19 +706,19 @@ class FetchSessionCache(private val maxEntries: Int,
   def touch(session: FetchSession, now: Long): Unit = synchronized {
     session.synchronized {
       // Update the lastUsed map.
-      lastUsed.remove(session.lastUsedKey())
+      lastUsed.remove(session.lastUsedKey)
       session.lastUsedMs = now
-      lastUsed.put(session.lastUsedKey(), session)
+      lastUsed.put(session.lastUsedKey, session)
 
       val oldSize = session.cachedSize
       if (oldSize != -1) {
-        val oldEvictableKey = session.evictableKey()
+        val oldEvictableKey = session.evictableKey
         evictableByPrivileged.remove(oldEvictableKey)
         evictableByAll.remove(oldEvictableKey)
         numPartitions = numPartitions - oldSize
       }
-      session.cachedSize = session.size()
-      val newEvictableKey = session.evictableKey()
+      session.cachedSize = session.size
+      val newEvictableKey = session.evictableKey
       if ((!session.privileged) || (now - session.creationMs > evictionMs)) {
         evictableByPrivileged.put(newEvictableKey, session)
       }
@@ -738,35 +738,34 @@ class FetchManager(private val time: Time,
                  isFollower: Boolean): FetchContext = {
     val context = if (reqMetadata.isFull) {
       var removedFetchSessionStr = ""
-      if (reqMetadata.sessionId() != INVALID_SESSION_ID) {
+      if (reqMetadata.sessionId != INVALID_SESSION_ID) {
         // Any session specified in a FULL fetch request will be closed.
-        if (cache.remove(reqMetadata.sessionId()).isDefined) {
-          removedFetchSessionStr = s" Removed fetch session ${reqMetadata.sessionId()}."
+        if (cache.remove(reqMetadata.sessionId).isDefined) {
+          removedFetchSessionStr = s" Removed fetch session ${reqMetadata.sessionId}."
         }
       }
       var suffix = ""
-      val context = if (reqMetadata.epoch() == FINAL_EPOCH) {
+      val context = if (reqMetadata.epoch == FINAL_EPOCH) {
         // If the epoch is FINAL_EPOCH, don't try to create a new session.
         suffix = " Will not try to create a new session."
         new SessionlessFetchContext(fetchData)
       } else {
         new FullFetchContext(time, cache, reqMetadata, fetchData, isFollower)
       }
-      debug(s"Created a new full FetchContext with ${partitionsToLogString(fetchData.keySet())}."+
+      debug(s"Created a new full FetchContext with ${partitionsToLogString(fetchData.keySet)}."+
         s"${removedFetchSessionStr}${suffix}")
       context
     } else {
       cache.synchronized {
-        cache.get(reqMetadata.sessionId()) match {
+        cache.get(reqMetadata.sessionId) match {
           case None => {
-            info(s"Created a new error FetchContext for session id ${reqMetadata.sessionId()}:
" +
-              "no such session ID found.")
+            debug(s"Session error for ${reqMetadata.sessionId}: no such session ID found.")
             new SessionErrorContext(Errors.FETCH_SESSION_ID_NOT_FOUND, reqMetadata)
           }
           case Some(session) => session.synchronized {
-            if (session.epoch != reqMetadata.epoch()) {
-              debug(s"Created a new error FetchContext for session id ${session.id}: expected
" +
-                s"epoch ${session.epoch}, but got epoch ${reqMetadata.epoch()}.")
+            if (session.epoch != reqMetadata.epoch) {
+              debug(s"Session error for ${reqMetadata.sessionId}: expected epoch " +
+                s"${session.epoch}, but got ${reqMetadata.epoch} instead.");
               new SessionErrorContext(Errors.INVALID_FETCH_SESSION_EPOCH, reqMetadata)
             } else {
               val (added, updated, removed) = session.update(fetchData, toForget, reqMetadata)
@@ -777,7 +776,7 @@ class FetchManager(private val time: Time,
                 cache.remove(session)
                 new SessionlessFetchContext(fetchData)
               } else {
-                if (session.size() != session.cachedSize) {
+                if (session.size != session.cachedSize) {
                   // If the number of partitions in the session changed, update the session's
                   // position in the cache.
                   cache.touch(session, session.lastUsedMs)
diff --git a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
index ae001a3..c4a9625 100755
--- a/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchSessionTest.scala
@@ -49,7 +49,7 @@ class FetchSessionTest {
       assertTrue("Missing session " + i + " out of " + sessionIds.size + "(" + sessionId
+ ")",
         cache.get(sessionId).isDefined)
     }
-    assertEquals(sessionIds.size, cache.size())
+    assertEquals(sessionIds.size, cache.size)
   }
 
   private def dummyCreate(size: Int)() = {
@@ -63,7 +63,7 @@ class FetchSessionTest {
   @Test
   def testSessionCache(): Unit = {
     val cache = new FetchSessionCache(3, 100)
-    assertEquals(0, cache.size())
+    assertEquals(0, cache.size)
     val id1 = cache.maybeCreateSession(0, false, 10, dummyCreate(10))
     val id2 = cache.maybeCreateSession(10, false, 20, dummyCreate(20))
     val id3 = cache.maybeCreateSession(20, false, 30, dummyCreate(30))
@@ -86,44 +86,44 @@ class FetchSessionTest {
   @Test
   def testResizeCachedSessions(): Unit = {
     val cache = new FetchSessionCache(2, 100)
-    assertEquals(0, cache.totalPartitions())
-    assertEquals(0, cache.size())
-    assertEquals(0, cache.evictionsMeter.count())
+    assertEquals(0, cache.totalPartitions)
+    assertEquals(0, cache.size)
+    assertEquals(0, cache.evictionsMeter.count)
     val id1 = cache.maybeCreateSession(0, false, 2, dummyCreate(2))
     assertTrue(id1 > 0)
     assertCacheContains(cache, id1)
     val session1 = cache.get(id1).get
-    assertEquals(2, session1.size())
-    assertEquals(2, cache.totalPartitions())
-    assertEquals(1, cache.size())
-    assertEquals(0, cache.evictionsMeter.count())
+    assertEquals(2, session1.size)
+    assertEquals(2, cache.totalPartitions)
+    assertEquals(1, cache.size)
+    assertEquals(0, cache.evictionsMeter.count)
     val id2 = cache.maybeCreateSession(0, false, 4, dummyCreate(4))
     val session2 = cache.get(id2).get
     assertTrue(id2 > 0)
     assertCacheContains(cache, id1, id2)
-    assertEquals(6, cache.totalPartitions())
-    assertEquals(2, cache.size())
-    assertEquals(0, cache.evictionsMeter.count())
+    assertEquals(6, cache.totalPartitions)
+    assertEquals(2, cache.size)
+    assertEquals(0, cache.evictionsMeter.count)
     cache.touch(session1, 200)
     cache.touch(session2, 200)
     val id3 = cache.maybeCreateSession(200, false, 5, dummyCreate(5))
     assertTrue(id3 > 0)
     assertCacheContains(cache, id2, id3)
-    assertEquals(9, cache.totalPartitions())
-    assertEquals(2, cache.size())
-    assertEquals(1, cache.evictionsMeter.count())
+    assertEquals(9, cache.totalPartitions)
+    assertEquals(2, cache.size)
+    assertEquals(1, cache.evictionsMeter.count)
     cache.remove(id3)
     assertCacheContains(cache, id2)
-    assertEquals(1, cache.size())
-    assertEquals(1, cache.evictionsMeter.count())
-    assertEquals(4, cache.totalPartitions())
-    val iter = session2.partitionMap.iterator()
+    assertEquals(1, cache.size)
+    assertEquals(1, cache.evictionsMeter.count)
+    assertEquals(4, cache.totalPartitions)
+    val iter = session2.partitionMap.iterator
     iter.next()
     iter.remove()
-    assertEquals(3, session2.size())
+    assertEquals(3, session2.size)
     assertEquals(4, session2.cachedSize)
     cache.touch(session2, session2.lastUsedMs)
-    assertEquals(3, cache.totalPartitions())
+    assertEquals(3, cache.totalPartitions)
   }
 
   val EMPTY_PART_LIST = Collections.unmodifiableList(new util.ArrayList[TopicPartition]())
@@ -220,15 +220,15 @@ class FetchSessionTest {
       val context8 = fetchManager.newContext(
         new JFetchMetadata(prevSessionId, FINAL_EPOCH), reqData8, EMPTY_PART_LIST, false)
       assertEquals(classOf[SessionlessFetchContext], context8.getClass)
-      assertEquals(0, cache.size())
+      assertEquals(0, cache.size)
       val respData8 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
       respData8.put(new TopicPartition("bar", 0),
         new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
       respData8.put(new TopicPartition("bar", 1),
         new FetchResponse.PartitionData(Errors.NONE, 100, 100, 100, null, null))
       val resp8 = context8.updateAndGenerateResponseData(respData8)
-      assertEquals(Errors.NONE, resp8.error())
-      nextSessionId = resp8.sessionId()
+      assertEquals(Errors.NONE, resp8.error)
+      nextSessionId = resp8.sessionId
     } while (nextSessionId == prevSessionId)
   }
 
@@ -277,9 +277,9 @@ class FetchSessionTest {
     respData2.put(new TopicPartition("bar", 0), new FetchResponse.PartitionData(
       Errors.NONE, 10, 10, 10, null, null))
     val resp2 = context2.updateAndGenerateResponseData(respData2)
-    assertEquals(Errors.NONE, resp2.error())
-    assertEquals(1, resp2.responseData().size())
-    assertTrue(resp2.sessionId() > 0)
+    assertEquals(Errors.NONE, resp2.error)
+    assertEquals(1, resp2.responseData.size)
+    assertTrue(resp2.sessionId > 0)
   }
 
   @Test
@@ -300,9 +300,9 @@ class FetchSessionTest {
     respData1.put(new TopicPartition("foo", 1), new FetchResponse.PartitionData(
       Errors.NONE, 10, 10, 10, null, null))
     val resp1 = context1.updateAndGenerateResponseData(respData1)
-    assertEquals(Errors.NONE, resp1.error())
+    assertEquals(Errors.NONE, resp1.error)
     assertTrue(resp1.sessionId() != INVALID_SESSION_ID)
-    assertEquals(2, resp1.responseData().size())
+    assertEquals(2, resp1.responseData.size)
 
     // Create an incremental fetch request that removes foo-0 and foo-1
     // Verify that the previous fetch session was closed.
@@ -311,12 +311,12 @@ class FetchSessionTest {
     removed2.add(new TopicPartition("foo", 0))
     removed2.add(new TopicPartition("foo", 1))
     val context2 = fetchManager.newContext(
-      new JFetchMetadata(resp1.sessionId(), 1), reqData2, removed2, false)
+      new JFetchMetadata(resp1.sessionId, 1), reqData2, removed2, false)
     assertEquals(classOf[SessionlessFetchContext], context2.getClass)
     val respData2 = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData[Records]]
     val resp2 = context2.updateAndGenerateResponseData(respData2)
-    assertEquals(INVALID_SESSION_ID, resp2.sessionId())
+    assertEquals(INVALID_SESSION_ID, resp2.sessionId)
     assertTrue(resp2.responseData().isEmpty)
-    assertEquals(0, cache.size())
+    assertEquals(0, cache.size)
   }
 }


Mime
View raw message