kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From prashanthme...@apache.org
Subject svn commit: r1382828 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/log/Log.scala main/scala/kafka/server/KafkaApis.scala test/scala/unit/kafka/server/SimpleFetchTest.scala
Date Mon, 10 Sep 2012 13:28:44 GMT
Author: prashanthmenon
Date: Mon Sep 10 13:28:44 2012
New Revision: 1382828

URL: http://svn.apache.org/viewvc?rev=1382828&view=rev
Log:
Expose different data to fetch requests from the follower replicas and consumer clients; patched
by Prashanth Menon; reviewed by Jun Rao; KAFKA-376

Added:
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1382828&r1=1382827&r2=1382828&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Mon Sep 10 13:28:44
2012
@@ -280,7 +280,11 @@ private[kafka] class Log( val dir: File,
     trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(length, offset,
name, size))
     val view = segments.view
     Log.findRange(view, offset, view.length) match {
-      case Some(segment) => segment.messageSet.read((offset - segment.start), length)
+      case Some(segment) =>
+        if(length <= 0)
+          MessageSet.Empty
+        else
+          segment.messageSet.read((offset - segment.start), length)
       case _ => MessageSet.Empty
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1382828&r1=1382827&r2=1382828&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Mon Sep
10 13:28:44 2012
@@ -238,8 +238,7 @@ class KafkaApis(val requestChannel: Requ
           satisfiedProduceRequests ++= producerRequestPurgatory.update(key, key)
         })
       })
-      debug("Replica %d fetch unblocked %d producer requests."
-        .format(fetchRequest.replicaId, satisfiedProduceRequests.size))
+      debug("Replica %d fetch unblocked %d producer requests.".format(fetchRequest.replicaId,
satisfiedProduceRequests.size))
       satisfiedProduceRequests.foreach(_.respond())
     }
 
@@ -269,17 +268,23 @@ class KafkaApis(val requestChannel: Requ
     var totalBytes = 0L
     for(offsetDetail <- fetchRequest.offsetInfo) {
       for(i <- 0 until offsetDetail.partitions.size) {
+        debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
         try {
-          val localReplica = replicaManager.getReplica(offsetDetail.topic, offsetDetail.partitions(i))
-          val available = localReplica match {
-            case Some(replica) => max(0, replica.log.get.logEndOffset - offsetDetail.offsets(i))
-            case None => 0
+          val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
+          val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) {
+            leader.highWatermark
+          } else {
+            leader.logEndOffset
           }
+          val available = max(0, end - offsetDetail.offsets(i))
           totalBytes += math.min(offsetDetail.fetchSizes(i), available)
         } catch {
           case e: UnknownTopicOrPartitionException =>
             info("Invalid partition %d in fetch request from client %d."
               .format(offsetDetail.partitions(i), fetchRequest.clientId))
+          case e =>
+            error("Error determining available fetch bytes for topic %s partition %s on broker
%s for client %s"
+              .format(offsetDetail.topic, offsetDetail.partitions(i), brokerId, fetchRequest.clientId),
e)
         }
       }
     }
@@ -311,26 +316,24 @@ class KafkaApis(val requestChannel: Requ
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets,
offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_))
) {
-        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize) match {
+        val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId
+        val partitionInfo = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
match {
           case Left(err) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
             BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
-            fetchRequest.replicaId match {
-              case -1 => new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
-              case _ =>
-                new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
-            }
-          case Right(messages) =>
+            new PartitionData(partition, err, offset, -1L, MessageSet.Empty)
+          case Right((messages, highWatermark)) =>
             BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
             BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
-            val leaderReplica = replicaManager.getReplica(topic, partition).get
-            if (fetchRequest.replicaId != FetchRequest.NonFollowerId) {
-              debug("Leader for topic [%s] partition [%d] received fetch request from follower
[%d]"
-                .format(topic, partition, fetchRequest.replicaId))
-              debug("Leader returning %d messages for topic %s partition %d to follower %d"
-                .format(messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+            if(!isFetchFromFollower) {
+              new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
+            } else {
+              debug("Leader %d for topic %s partition %d received fetch request from follower
%d"
+                .format(brokerId, topic, partition, fetchRequest.replicaId))
+              debug("Leader %d returning %d messages for topic %s partition %d to follower
%d"
+                .format(brokerId, messages.sizeInBytes, topic, partition, fetchRequest.replicaId))
+              new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
             }
-            new PartitionData(partition, ErrorMapping.NoError, offset, leaderReplica.highWatermark,
messages)
         }
         info.append(partitionInfo)
       }
@@ -340,16 +343,28 @@ class KafkaApis(val requestChannel: Requ
   }
 
   /**
-   * Read from a single topic/partition at the given offset
+   * Read from a single topic/partition at the given offset upto maxSize bytes
    */
-  private def readMessageSet(topic: String, partition: Int, offset: Long, maxSize: Int):
Either[Short, MessageSet] = {
-    var response: Either[Short, MessageSet] = null
+  private def readMessageSet(topic: String, partition: Int, offset: Long,
+                             maxSize: Int, fromFollower: Boolean): Either[Short, (MessageSet,
Long)] = {
+    var response: Either[Short, (MessageSet, Long)] = null
     try {
       // check if the current broker is the leader for the partitions
-      val localReplica = replicaManager.getLeaderReplicaIfLocal(topic, partition)
+      val leader = replicaManager.getLeaderReplicaIfLocal(topic, partition)
       trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition,
offset, maxSize))
-      val log = localReplica.log.get
-      response = Right(log.read(offset, maxSize))
+      val actualSize = if (!fromFollower) {
+        min(leader.highWatermark - offset, maxSize).toInt
+      } else {
+        maxSize
+      }
+      val messages = leader.log match {
+        case Some(log) =>
+          log.read(offset, actualSize)
+        case None =>
+          error("Leader for topic %s partition %d on broker %d does not have a local log".format(topic,
partition, brokerId))
+          MessageSet.Empty
+      }
+      response = Right(messages, leader.highWatermark)
     } catch {
       case e =>
         error("error when processing request " + (topic, partition, offset, maxSize), e)
@@ -373,13 +388,14 @@ class KafkaApis(val requestChannel: Requ
       replicaManager.getLeaderReplicaIfLocal(offsetRequest.topic, offsetRequest.partition)
       val offsets = replicaManager.logManager.getOffsets(offsetRequest)
       response = new OffsetResponse(offsetRequest.versionId, offsets)
-    }catch {
+    } catch {
       case ioe: IOException =>
         fatal("Halting due to unrecoverable I/O error while handling producer request: "
+ ioe.getMessage, ioe)
         System.exit(1)
       case e =>
         warn("Error while responding to offset request", e)
-        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
+        response = new OffsetResponse(offsetRequest.versionId, Array.empty[Long],
+          ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]).toShort)
     }
     requestChannel.sendResponse(new RequestChannel.Response(request, new BoundedByteBufferSend(response)))
   }
@@ -420,7 +436,7 @@ class KafkaApis(val requestChannel: Requ
                             ErrorMapping.exceptionFor(topicAndMetadata._2.errorCode).getCause)
           }
         })
-    }catch {
+    } catch {
       case e => error("Error while retrieving topic metadata", e)
       // convert exception type to error code
       errorCode = ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]])

Added: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1382828&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
(added)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
Mon Sep 10 13:28:44 2012
@@ -0,0 +1,210 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.server
+
+import java.nio.ByteBuffer
+import kafka.api.{FetchRequest, FetchRequestBuilder}
+import kafka.cluster.{Partition, Replica}
+import kafka.log.Log
+import kafka.message.{ByteBufferMessageSet, Message}
+import kafka.network.{BoundedByteBufferReceive, RequestChannel}
+import kafka.utils.{Time, TestUtils, MockTime}
+import org.easymock.EasyMock
+import org.I0Itec.zkclient.ZkClient
+import org.scalatest.junit.JUnit3Suite
+
+class SimpleFetchTest extends JUnit3Suite {
+
+  val configs = TestUtils.createBrokerConfigs(2).map(new KafkaConfig(_) {
+    override val replicaMaxLagTimeMs = 100L
+    override val replicaMaxLagBytes = 10L
+  })
+  val topic = "foo"
+  val partitionId = 0
+
+  /**
+   * The scenario for this test is that there is one topic, "test-topic", on broker "0" that
has
+   * one  partition with one follower replica on broker "1".  The leader replica on "0"
+   * has HW of "5" and LEO of "20".  The follower on broker "1" has a local replica
+   * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
+   * but is still in ISR (hasn't yet expired from ISR).
+   *
+   * When a normal consumer fetches data, it only should only see data upto the HW of the
leader,
+   * in this case up an offset of "5".
+   */
+  def testNonReplicaSeesHwWhenFetching() {
+    /* setup */
+    val time = new MockTime
+    val leo = 20
+    val hw = 5
+    val messages = new Message("test-message".getBytes())
+
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
+    EasyMock.expect(log.read(0, hw)).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.replay(log)
+
+    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    EasyMock.expect(logManager.getLog(topic, partitionId)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
+    EasyMock.replay(logManager)
+
+    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+    EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
+    EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
+    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.replay(replicaManager)
+
+    val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId,
log, hw, replicaManager)
+    partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    // start a request channel with 2 processors and a queue size of 5 (this is more or less
arbitrary)
+    // don't provide replica or leader callbacks since they will not be tested here
+    val requestChannel = new RequestChannel(2, 5)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+
+    // This request (from a follower) wants to read up to 2*HW but should only get back up
to HW bytes into the log
+    val goodFetch = new FetchRequestBuilder()
+      .replicaId(FetchRequest.NonFollowerId)
+      .addFetch(topic, partitionId, 0, hw*2)
+      .build()
+    val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes)
+    goodFetch.writeTo(goodFetchBB)
+    goodFetchBB.rewind()
+
+    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
+    EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB)
+    EasyMock.replay(receivedRequest)
+
+    // send the request
+    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest,
start=1))
+
+    // make sure the log only reads bytes between 0->HW (5)
+    EasyMock.verify(log)
+  }
+
+  /**
+   * The scenario for this test is that there is one topic, "test-topic", on broker "0" that
has
+   * one  partition with one follower replica on broker "1".  The leader replica on "0"
+   * has HW of "5" and LEO of "20".  The follower on broker "1" has a local replica
+   * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
+   * but is still in ISR (hasn't yet expired from ISR).
+   *
+   * When the follower from broker "1" fetches data, it should see data upto the log end
offset ("20")
+   */
+  def testReplicaSeesLeoWhenFetching() {
+    /* setup */
+    val time = new MockTime
+    val leo = 20
+    val hw = 5
+
+    val messages = new Message("test-message".getBytes())
+
+    val followerReplicaId = configs(1).brokerId
+    val followerLEO = 15
+
+    val zkClient = EasyMock.createMock(classOf[ZkClient])
+    EasyMock.replay(zkClient)
+
+    val log = EasyMock.createMock(classOf[kafka.log.Log])
+    EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
+    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE)).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.replay(log)
+
+    val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
+    EasyMock.expect(logManager.getLog(topic, 0)).andReturn(Some(log)).anyTimes()
+    EasyMock.expect(logManager.config).andReturn(configs.head).anyTimes()
+    EasyMock.replay(logManager)
+
+    val replicaManager = EasyMock.createMock(classOf[kafka.server.ReplicaManager])
+    EasyMock.expect(replicaManager.config).andReturn(configs.head)
+    EasyMock.expect(replicaManager.logManager).andReturn(logManager)
+    EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
+    EasyMock.expect(replicaManager.highWatermarkCheckpoint).andReturn(EasyMock.createMock(classOf[HighwaterMarkCheckpoint]))
+    EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.replay(replicaManager)
+
+    val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId,
log, hw, replicaManager)
+    partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
+
+    EasyMock.reset(replicaManager)
+    EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId,
followerLEO))
+    EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId
== configs(1).brokerId))
+    EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+    EasyMock.replay(replicaManager)
+
+    val requestChannel = new RequestChannel(2, 5)
+    val apis = new KafkaApis(requestChannel, replicaManager, zkClient, configs.head.brokerId)
+
+    /**
+     * This fetch, coming from a replica, requests all data at offset "15".  Because the
request is coming
+     * from a follower, the leader should oblige and read beyond the HW.
+     */
+    val bigFetch = new FetchRequestBuilder()
+      .replicaId(followerReplicaId)
+      .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE)
+      .build()
+
+    val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes)
+    bigFetch.writeTo(fetchRequest)
+    fetchRequest.rewind()
+
+    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
+    EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest)
+    EasyMock.replay(receivedRequest)
+
+    // send the request
+    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest,
start=1))
+
+    /**
+     * Make sure the log satisfies the fetch from a follower by reading data beyond the HW,
mainly all bytes after
+     * an offset of 15
+     */
+    EasyMock.verify(log)
+  }
+
+  private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time,
leaderId: Int,
+                                               localLog: Log, leaderHW: Long, replicaManager:
ReplicaManager): Partition = {
+    val partition = new Partition(topic, partitionId, time, replicaManager)
+    val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
+
+    val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
+    allReplicas.foreach(partition.addReplicaIfNotExists(_))
+    // set in sync replicas for this partition to all the assigned replicas
+    partition.inSyncReplicas = allReplicas.toSet
+    // set the leader and its hw and the hw update time
+    partition.leaderReplicaIdOpt = Some(leaderId)
+    leaderReplica.highWatermark = leaderHW
+    partition
+  }
+
+  private def getFollowerReplicas(partition: Partition, leaderId: Int, time: Time): Seq[Replica]
= {
+    configs.filter(_.brokerId != leaderId).map { config =>
+      new Replica(config.brokerId, partition, time)
+    }
+  }
+
+}



Mime
View raw message