kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jun...@apache.org
Subject [1/2] kafka-1430; Purgatory redesign; patched by Guozhang Wang; reviewed by Jun Rao
Date Wed, 06 Aug 2014 04:28:11 GMT
Repository: kafka
Updated Branches:
  refs/heads/trunk 7a67a7226 -> 0dc243b92


http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/LogOffsetMetadata.scala b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
new file mode 100644
index 0000000..a868334
--- /dev/null
+++ b/core/src/main/scala/kafka/server/LogOffsetMetadata.scala
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import org.apache.kafka.common.KafkaException
+
+object LogOffsetMetadata {
+  val UnknownOffsetMetadata = new LogOffsetMetadata(-1, 0, 0)
+  val UnknownSegBaseOffset = -1L
+  val UnknownFilePosition = -1
+
+  class OffsetOrdering extends Ordering[LogOffsetMetadata] {
+    override def compare(x: LogOffsetMetadata , y: LogOffsetMetadata ): Int = {
+      return x.offsetDiff(y).toInt
+    }
+  }
+
+}
+
+/*
+ * A log offset structure, including:
+ *  1. the message offset
+ *  2. the base message offset of the located segment
+ *  3. the physical position on the located segment
+ */
+case class LogOffsetMetadata(messageOffset: Long,
+                             segmentBaseOffset: Long = LogOffsetMetadata.UnknownSegBaseOffset,
+                             relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition) {
+
+  // check if this offset is already on an older segment compared with the given offset
+  def offsetOnOlderSegment(that: LogOffsetMetadata): Boolean = {
+    if (messageOffsetOnly())
+      throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+
+    this.segmentBaseOffset < that.segmentBaseOffset
+  }
+
+  // check if this offset is on the same segment with the given offset
+  def offsetOnSameSegment(that: LogOffsetMetadata): Boolean = {
+    if (messageOffsetOnly())
+      throw new KafkaException("%s cannot compare its segment info with %s since it only has message offset info".format(this, that))
+
+    this.segmentBaseOffset == that.segmentBaseOffset
+  }
+
+  // check if this offset is before the given offset
+  def precedes(that: LogOffsetMetadata): Boolean = this.messageOffset < that.messageOffset
+
+  // compute the number of messages between this offset to the given offset
+  def offsetDiff(that: LogOffsetMetadata): Long = {
+    this.messageOffset - that.messageOffset
+  }
+
+  // compute the number of bytes between this offset to the given offset
+  // if they are on the same segment and this offset precedes the given offset
+  def positionDiff(that: LogOffsetMetadata): Int = {
+    if(!offsetOnSameSegment(that))
+      throw new KafkaException("%s cannot compare its segment position with %s since they are not on the same segment".format(this, that))
+    if(messageOffsetOnly())
+      throw new KafkaException("%s cannot compare its segment position with %s since it only has message offset info".format(this, that))
+
+    this.relativePositionInSegment - that.relativePositionInSegment
+  }
+
+  // decide if the offset metadata only contains message offset info
+  def messageOffsetOnly(): Boolean = {
+    segmentBaseOffset == LogOffsetMetadata.UnknownSegBaseOffset && relativePositionInSegment == LogOffsetMetadata.UnknownFilePosition
+  }
+
+  override def toString = messageOffset.toString + " [" + segmentBaseOffset + " : " + relativePositionInSegment + "]"
+
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/OffsetManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/OffsetManager.scala b/core/src/main/scala/kafka/server/OffsetManager.scala
index 0e22897..43eb2a3 100644
--- a/core/src/main/scala/kafka/server/OffsetManager.scala
+++ b/core/src/main/scala/kafka/server/OffsetManager.scala
@@ -17,26 +17,29 @@
 
 package kafka.server
 
+import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
+import org.apache.kafka.common.protocol.types.Type.STRING
+import org.apache.kafka.common.protocol.types.Type.INT32
+import org.apache.kafka.common.protocol.types.Type.INT64
+
 import kafka.utils._
 import kafka.common._
-import java.nio.ByteBuffer
-import java.util.Properties
 import kafka.log.{FileMessageSet, LogConfig}
-import org.I0Itec.zkclient.ZkClient
-import scala.collection._
 import kafka.message._
-import java.util.concurrent.TimeUnit
 import kafka.metrics.KafkaMetricsGroup
-import com.yammer.metrics.core.Gauge
-import scala.Some
 import kafka.common.TopicAndPartition
 import kafka.tools.MessageFormatter
+
+import scala.Some
+import scala.collection._
 import java.io.PrintStream
-import org.apache.kafka.common.protocol.types.{Struct, Schema, Field}
-import org.apache.kafka.common.protocol.types.Type.STRING
-import org.apache.kafka.common.protocol.types.Type.INT32
-import org.apache.kafka.common.protocol.types.Type.INT64
 import java.util.concurrent.atomic.AtomicBoolean
+import java.nio.ByteBuffer
+import java.util.Properties
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Gauge
+import org.I0Itec.zkclient.ZkClient
 
 
 /**
@@ -271,7 +274,7 @@ class OffsetManager(val config: OffsetManagerConfig,
             // loop breaks if leader changes at any time during the load, since getHighWatermark is -1
             while (currOffset < getHighWatermark(offsetsPartition) && !shuttingDown.get()) {
               buffer.clear()
-              val messages = log.read(currOffset, config.loadBufferSize).asInstanceOf[FileMessageSet]
+              val messages = log.read(currOffset, config.loadBufferSize).messageSet.asInstanceOf[FileMessageSet]
               messages.readInto(buffer, 0)
               val messageSet = new ByteBufferMessageSet(buffer)
               messageSet.foreach { msgAndOffset =>
@@ -312,7 +315,7 @@ class OffsetManager(val config: OffsetManagerConfig,
     val partitionOpt = replicaManager.getPartition(OffsetManager.OffsetsTopicName, partitionId)
 
     val hw = partitionOpt.map { partition =>
-      partition.leaderReplicaIfLocal().map(_.highWatermark).getOrElse(-1L)
+      partition.leaderReplicaIfLocal().map(_.highWatermark.messageOffset).getOrElse(-1L)
     }.getOrElse(-1L)
 
     hw

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
new file mode 100644
index 0000000..d4a7d4a
--- /dev/null
+++ b/core/src/main/scala/kafka/server/ProducerRequestPurgatory.scala
@@ -0,0 +1,69 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
+import kafka.network.{BoundedByteBufferSend, RequestChannel}
+
+import java.util.concurrent.TimeUnit
+
+/**
+ * The purgatory holding delayed producer requests
+ */
+class ProducerRequestPurgatory(replicaManager: ReplicaManager, offsetManager: OffsetManager, requestChannel: RequestChannel)
+  extends RequestPurgatory[DelayedProduce](replicaManager.config.brokerId, replicaManager.config.producerPurgatoryPurgeIntervalRequests) {
+  this.logIdent = "[ProducerRequestPurgatory-%d] ".format(replicaManager.config.brokerId)
+
+  private class DelayedProducerRequestMetrics(keyLabel: String = DelayedRequestKey.globalLabel) extends KafkaMetricsGroup {
+    val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
+  }
+
+  private val producerRequestMetricsForKey = {
+    val valueFactory = (k: DelayedRequestKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
+    new Pool[DelayedRequestKey, DelayedProducerRequestMetrics](Some(valueFactory))
+  }
+
+  private val aggregateProduceRequestMetrics = new DelayedProducerRequestMetrics
+
+  private def recordDelayedProducerKeyExpired(key: DelayedRequestKey) {
+    val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
+    List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
+  }
+
+  /**
+   * Check if a specified delayed fetch request is satisfied
+   */
+  def checkSatisfied(delayedProduce: DelayedProduce) = delayedProduce.isSatisfied(replicaManager)
+
+  /**
+   * When a delayed produce request expires answer it with possible time out error codes
+   */
+  def expire(delayedProduce: DelayedProduce) {
+    debug("Expiring produce request %s.".format(delayedProduce.produce))
+    for ((topicPartition, responseStatus) <- delayedProduce.partitionStatus if responseStatus.acksPending)
+      recordDelayedProducerKeyExpired(new TopicPartitionRequestKey(topicPartition))
+    respond(delayedProduce)
+  }
+
+  // TODO: purgatory should not be responsible for sending back the responses
+  def respond(delayedProduce: DelayedProduce) {
+    val response = delayedProduce.respond(offsetManager)
+    requestChannel.sendResponse(new RequestChannel.Response(delayedProduce.request, new BoundedByteBufferSend(response)))
+  }
+}

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
index 75ae1e1..6879e73 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
@@ -47,16 +47,19 @@ class ReplicaFetcherThread(name:String,
       val replica = replicaMgr.getReplica(topic, partitionId).get
       val messageSet = partitionData.messages.asInstanceOf[ByteBufferMessageSet]
 
-      if (fetchOffset != replica.logEndOffset)
-        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset))
+      if (fetchOffset != replica.logEndOffset.messageOffset)
+        throw new RuntimeException("Offset mismatch: fetched offset = %d, log end offset = %d.".format(fetchOffset, replica.logEndOffset.messageOffset))
       trace("Follower %d has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
-            .format(replica.brokerId, replica.logEndOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
+            .format(replica.brokerId, replica.logEndOffset.messageOffset, topicAndPartition, messageSet.sizeInBytes, partitionData.hw))
       replica.log.get.append(messageSet, assignOffsets = false)
       trace("Follower %d has replica log end offset %d after appending %d bytes of messages for partition %s"
-            .format(replica.brokerId, replica.logEndOffset, messageSet.sizeInBytes, topicAndPartition))
-      val followerHighWatermark = replica.logEndOffset.min(partitionData.hw)
-      replica.highWatermark = followerHighWatermark
-      trace("Follower %d set replica highwatermark for partition [%s,%d] to %d"
+            .format(replica.brokerId, replica.logEndOffset.messageOffset, messageSet.sizeInBytes, topicAndPartition))
+      val followerHighWatermark = replica.logEndOffset.messageOffset.min(partitionData.hw)
+      // for the follower replica, we do not need to keep
+      // its segment base offset the physical position,
+      // these values will be computed upon making the leader
+      replica.highWatermark = new LogOffsetMetadata(followerHighWatermark)
+      trace("Follower %d set replica high watermark for partition [%s,%d] to %s"
             .format(replica.brokerId, topic, partitionId, followerHighWatermark))
     } catch {
       case e: KafkaStorageException =>
@@ -82,7 +85,7 @@ class ReplicaFetcherThread(name:String,
      * There is a potential for a mismatch between the logs of the two replicas here. We don't fix this mismatch as of now.
      */
     val leaderEndOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.LatestTime, brokerConfig.brokerId)
-    if (leaderEndOffset < replica.logEndOffset) {
+    if (leaderEndOffset < replica.logEndOffset.messageOffset) {
       // Prior to truncating the follower's log, ensure that doing so is not disallowed by the configuration for unclean leader election.
       // This situation could only happen if the unclean election configuration for a topic changes while a replica is down. Otherwise,
       // we should never encounter this situation since a non-ISR leader cannot be elected if disallowed by the broker configuration.
@@ -91,13 +94,13 @@ class ReplicaFetcherThread(name:String,
         // Log a fatal error and shutdown the broker to ensure that data loss does not unexpectedly occur.
         fatal("Halting because log truncation is not allowed for topic %s,".format(topicAndPartition.topic) +
           " Current leader %d's latest offset %d is less than replica %d's latest offset %d"
-          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset))
+          .format(sourceBroker.id, leaderEndOffset, brokerConfig.brokerId, replica.logEndOffset.messageOffset))
         Runtime.getRuntime.halt(1)
       }
 
       replicaMgr.logManager.truncateTo(Map(topicAndPartition -> leaderEndOffset))
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's latest offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderEndOffset))
+        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderEndOffset))
       leaderEndOffset
     } else {
       /**
@@ -109,7 +112,7 @@ class ReplicaFetcherThread(name:String,
       val leaderStartOffset = simpleConsumer.earliestOrLatestOffset(topicAndPartition, OffsetRequest.EarliestTime, brokerConfig.brokerId)
       replicaMgr.logManager.truncateFullyAndStartAt(topicAndPartition, leaderStartOffset)
       warn("Replica %d for partition %s reset its fetch offset from %d to current leader %d's start offset %d"
-        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset, sourceBroker.id, leaderStartOffset))
+        .format(brokerConfig.brokerId, topicAndPartition, replica.logEndOffset.messageOffset, sourceBroker.id, leaderStartOffset))
       leaderStartOffset
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 897783c..68758e3 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -16,29 +16,39 @@
  */
 package kafka.server
 
-import collection._
-import mutable.HashMap
-import kafka.cluster.{Broker, Partition, Replica}
+import kafka.api._
+import kafka.common._
 import kafka.utils._
+import kafka.cluster.{Broker, Partition, Replica}
 import kafka.log.LogManager
 import kafka.metrics.KafkaMetricsGroup
-import kafka.common._
-import kafka.api.{UpdateMetadataRequest, StopReplicaRequest, PartitionStateInfo, LeaderAndIsrRequest}
 import kafka.controller.KafkaController
-import org.I0Itec.zkclient.ZkClient
-import com.yammer.metrics.core.Gauge
+import kafka.common.TopicAndPartition
+import kafka.message.MessageSet
+
 import java.util.concurrent.atomic.AtomicBoolean
 import java.io.{IOException, File}
 import java.util.concurrent.TimeUnit
+import scala.Predef._
+import scala.collection._
+import scala.collection.mutable.HashMap
+import scala.collection.Map
+import scala.collection.Set
+import scala.Some
+
+import org.I0Itec.zkclient.ZkClient
+import com.yammer.metrics.core.Gauge
 
 object ReplicaManager {
-  val UnknownLogEndOffset = -1L
   val HighWatermarkFilename = "replication-offset-checkpoint"
 }
 
-class ReplicaManager(val config: KafkaConfig,
-                     time: Time,
-                     val zkClient: ZkClient,
+case class PartitionDataAndOffset(data: FetchResponsePartitionData, offset: LogOffsetMetadata)
+
+
+class ReplicaManager(val config: KafkaConfig, 
+                     time: Time, 
+                     val zkClient: ZkClient, 
                      scheduler: Scheduler,
                      val logManager: LogManager,
                      val isShuttingDown: AtomicBoolean ) extends Logging with KafkaMetricsGroup {
@@ -54,6 +64,9 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
 
+  var producerRequestPurgatory: ProducerRequestPurgatory = null
+  var fetchRequestPurgatory: FetchRequestPurgatory = null
+
   newGauge(
     "LeaderCount",
     new Gauge[Int] {
@@ -87,17 +100,37 @@ class ReplicaManager(val config: KafkaConfig,
   }
 
   /**
-   * This function is only used in two places: in Partition.updateISR() and KafkaApis.handleProducerRequest().
-   * In the former case, the partition should have been created, in the latter case, return -1 will put the request into purgatory
+   * Initialize the replica manager with the request purgatory
+   *
+   * TODO: will be removed in 0.9 where we refactor server structure
    */
-  def getReplicationFactorForPartition(topic: String, partitionId: Int) = {
-    val partitionOpt = getPartition(topic, partitionId)
-    partitionOpt match {
-      case Some(partition) =>
-        partition.replicationFactor
-      case None =>
-        -1
-    }
+
+  def initWithRequestPurgatory(producerRequestPurgatory: ProducerRequestPurgatory, fetchRequestPurgatory: FetchRequestPurgatory) {
+    this.producerRequestPurgatory = producerRequestPurgatory
+    this.fetchRequestPurgatory = fetchRequestPurgatory
+  }
+
+  /**
+   * Unblock some delayed produce requests with the request key
+   */
+  def unblockDelayedProduceRequests(key: DelayedRequestKey) {
+    val satisfied = producerRequestPurgatory.update(key)
+    debug("Request key %s unblocked %d producer requests."
+      .format(key.keyLabel, satisfied.size))
+
+    // send any newly unblocked responses
+    satisfied.foreach(producerRequestPurgatory.respond(_))
+  }
+
+  /**
+   * Unblock some delayed fetch requests with the request key
+   */
+  def unblockDelayedFetchRequests(key: DelayedRequestKey) {
+    val satisfied = fetchRequestPurgatory.update(key)
+    debug("Request key %s unblocked %d fetch requests.".format(key.keyLabel, satisfied.size))
+
+    // send any newly unblocked responses
+    satisfied.foreach(fetchRequestPurgatory.respond(_))
   }
 
   def startup() {
@@ -155,10 +188,10 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def getOrCreatePartition(topic: String, partitionId: Int, replicationFactor: Int): Partition = {
+  def getOrCreatePartition(topic: String, partitionId: Int): Partition = {
     var partition = allPartitions.get((topic, partitionId))
     if (partition == null) {
-      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, replicationFactor, time, this))
+      allPartitions.putIfNotExists((topic, partitionId), new Partition(topic, partitionId, time, this))
       partition = allPartitions.get((topic, partitionId))
     }
     partition
@@ -203,6 +236,77 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
+  /**
+   * Read from all the offset details given and return a map of
+   * (topic, partition) -> PartitionData
+   */
+  def readMessageSets(fetchRequest: FetchRequest) = {
+    val isFetchFromFollower = fetchRequest.isFromFollower
+    fetchRequest.requestInfo.map
+    {
+      case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+        val partitionDataAndOffsetInfo =
+          try {
+            val (fetchInfo, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, fetchRequest.replicaId)
+            BrokerTopicStats.getBrokerTopicStats(topic).bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
+            BrokerTopicStats.getBrokerAllTopicsStats.bytesOutRate.mark(fetchInfo.messageSet.sizeInBytes)
+            if (isFetchFromFollower) {
+              debug("Partition [%s,%d] received fetch request from follower %d"
+                .format(topic, partition, fetchRequest.replicaId))
+            }
+            new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.NoError, highWatermark, fetchInfo.messageSet), fetchInfo.fetchOffset)
+          } catch {
+            // NOTE: Failed fetch requests is not incremented for UnknownTopicOrPartitionException and NotLeaderForPartitionException
+            // since failed fetch requests metric is supposed to indicate failure of a broker in handling a fetch request
+            // for a partition it is the leader for
+            case utpe: UnknownTopicOrPartitionException =>
+              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
+                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, utpe.getMessage))
+              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(utpe.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+            case nle: NotLeaderForPartitionException =>
+              warn("Fetch request with correlation id %d from client %s on partition [%s,%d] failed due to %s".format(
+                fetchRequest.correlationId, fetchRequest.clientId, topic, partition, nle.getMessage))
+              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(nle.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+            case t: Throwable =>
+              BrokerTopicStats.getBrokerTopicStats(topic).failedFetchRequestRate.mark()
+              BrokerTopicStats.getBrokerAllTopicsStats.failedFetchRequestRate.mark()
+              error("Error when processing fetch request for partition [%s,%d] offset %d from %s with correlation id %d. Possible cause: %s"
+                .format(topic, partition, offset, if (isFetchFromFollower) "follower" else "consumer", fetchRequest.correlationId, t.getMessage))
+              new PartitionDataAndOffset(new FetchResponsePartitionData(ErrorMapping.codeFor(t.getClass.asInstanceOf[Class[Throwable]]), -1L, MessageSet.Empty), LogOffsetMetadata.UnknownOffsetMetadata)
+          }
+        (TopicAndPartition(topic, partition), partitionDataAndOffsetInfo)
+    }
+  }
+
+  /**
+   * Read from a single topic/partition at the given offset upto maxSize bytes
+   */
+  private def readMessageSet(topic: String,
+                             partition: Int,
+                             offset: Long,
+                             maxSize: Int,
+                             fromReplicaId: Int): (FetchDataInfo, Long) = {
+    // check if the current broker is the leader for the partitions
+    val localReplica = if(fromReplicaId == Request.DebuggingConsumerId)
+      getReplicaOrException(topic, partition)
+    else
+      getLeaderReplicaIfLocal(topic, partition)
+    trace("Fetching log segment for topic, partition, offset, size = " + (topic, partition, offset, maxSize))
+    val maxOffsetOpt =
+      if (Request.isValidBrokerId(fromReplicaId))
+        None
+      else
+        Some(localReplica.highWatermark.messageOffset)
+    val fetchInfo = localReplica.log match {
+      case Some(log) =>
+        log.read(offset, maxSize, maxOffsetOpt)
+      case None =>
+        error("Leader for partition [%s,%d] does not have a local log".format(topic, partition))
+        FetchDataInfo(LogOffsetMetadata.UnknownOffsetMetadata, MessageSet.Empty)
+    }
+    (fetchInfo, localReplica.highWatermark.messageOffset)
+  }
+
   def maybeUpdateMetadataCache(updateMetadataRequest: UpdateMetadataRequest, metadataCache: MetadataCache) {
     replicaStateChangeLock synchronized {
       if(updateMetadataRequest.controllerEpoch < controllerEpoch) {
@@ -243,7 +347,7 @@ class ReplicaManager(val config: KafkaConfig,
         // First check partition's leader epoch
         val partitionState = new HashMap[Partition, PartitionStateInfo]()
         leaderAndISRRequest.partitionStateInfos.foreach{ case ((topic, partitionId), partitionStateInfo) =>
-          val partition = getOrCreatePartition(topic, partitionId, partitionStateInfo.replicationFactor)
+          val partition = getOrCreatePartition(topic, partitionId)
           val partitionLeaderEpoch = partition.getLeaderEpoch()
           // If the leader epoch is valid record the epoch of the controller that made the leadership decision.
           // This is useful while updating the isr to maintain the decision maker controller's epoch in the zookeeper path
@@ -403,7 +507,7 @@ class ReplicaManager(val config: KafkaConfig,
           .format(localBrokerId, controllerId, epoch, correlationId, TopicAndPartition(partition.topic, partition.partitionId)))
       }
 
-      logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark)).toMap)
+      logManager.truncateTo(partitionsToMakeFollower.map(partition => (new TopicAndPartition(partition), partition.getOrCreateReplica().highWatermark.messageOffset)).toMap)
 
       partitionsToMakeFollower.foreach { partition =>
         stateChangeLogger.trace(("Broker %d truncated logs and checkpointed recovery boundaries for partition [%s,%d] as part of " +
@@ -421,7 +525,9 @@ class ReplicaManager(val config: KafkaConfig,
       else {
         // we do not need to check if the leader exists again since this has been done at the beginning of this process
         val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map(partition =>
-          new TopicAndPartition(partition) -> BrokerAndInitialOffset(leaders.find(_.id == partition.leaderReplicaIdOpt.get).get, partition.getReplica().get.logEndOffset)).toMap
+          new TopicAndPartition(partition) -> BrokerAndInitialOffset(
+            leaders.find(_.id == partition.leaderReplicaIdOpt.get).get,
+            partition.getReplica().get.logEndOffset.messageOffset)).toMap
         replicaFetcherManager.addFetcherForPartitions(partitionsToMakeFollowerWithLeaderAndOffset)
 
         partitionsToMakeFollower.foreach { partition =>
@@ -451,12 +557,23 @@ class ReplicaManager(val config: KafkaConfig,
     allPartitions.values.foreach(partition => partition.maybeShrinkIsr(config.replicaLagTimeMaxMs, config.replicaLagMaxMessages))
   }
 
-  def recordFollowerPosition(topic: String, partitionId: Int, replicaId: Int, offset: Long) = {
-    val partitionOpt = getPartition(topic, partitionId)
-    if(partitionOpt.isDefined) {
-      partitionOpt.get.updateLeaderHWAndMaybeExpandIsr(replicaId, offset)
-    } else {
-      warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
+  def updateReplicaLEOAndPartitionHW(topic: String, partitionId: Int, replicaId: Int, offset: LogOffsetMetadata) = {
+    getPartition(topic, partitionId) match {
+      case Some(partition) =>
+        partition.getReplica(replicaId) match {
+          case Some(replica) =>
+            replica.logEndOffset = offset
+            // check if we need to update HW and expand Isr
+            partition.updateLeaderHWAndMaybeExpandIsr(replicaId)
+            debug("Recorded follower %d position %d for partition [%s,%d].".format(replicaId, offset.messageOffset, topic, partitionId))
+          case None =>
+            throw new NotAssignedReplicaException(("Leader %d failed to record follower %d's position %d since the replica" +
+              " is not recognized to be one of the assigned replicas %s for partition [%s,%d]").format(localBrokerId, replicaId,
+              offset.messageOffset, partition.assignedReplicas().map(_.brokerId).mkString(","), topic, partitionId))
+
+        }
+      case None =>
+        warn("While recording the follower position, the partition [%s,%d] hasn't been created, skip updating leader HW".format(topic, partitionId))
     }
   }
 
@@ -470,7 +587,7 @@ class ReplicaManager(val config: KafkaConfig,
     val replicas = allPartitions.values.map(_.getReplica(config.brokerId)).collect{case Some(replica) => replica}
     val replicasByDir = replicas.filter(_.log.isDefined).groupBy(_.log.get.dir.getParentFile.getAbsolutePath)
     for((dir, reps) <- replicasByDir) {
-      val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark)).toMap
+      val hwms = reps.map(r => (new TopicAndPartition(r) -> r.highWatermark.messageOffset)).toMap
       try {
         highWatermarkCheckpoints(dir).write(hwms)
       } catch {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/server/RequestPurgatory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/RequestPurgatory.scala b/core/src/main/scala/kafka/server/RequestPurgatory.scala
index 3d0ff1e..ce06d2c 100644
--- a/core/src/main/scala/kafka/server/RequestPurgatory.scala
+++ b/core/src/main/scala/kafka/server/RequestPurgatory.scala
@@ -17,13 +17,15 @@
 
 package kafka.server
 
-import scala.collection._
-import java.util.concurrent._
-import java.util.concurrent.atomic._
 import kafka.network._
 import kafka.utils._
 import kafka.metrics.KafkaMetricsGroup
+
 import java.util
+import java.util.concurrent._
+import java.util.concurrent.atomic._
+import scala.collection._
+
 import com.yammer.metrics.core.Gauge
 
 
@@ -45,8 +47,10 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
  *
  * For us the key is generally a (topic, partition) pair.
  * By calling 
- *   watch(delayedRequest) 
- * we will add triggers for each of the given keys. It is up to the user to then call
+ *   val isSatisfiedByMe = checkAndMaybeWatch(delayedRequest)
+ * we will check if a request is satisfied already, and if not add the request for watch on all its keys.
+ *
+ * It is up to the user to then call
  *   val satisfied = update(key, request) 
  * when a request relevant to the given key occurs. This triggers bookeeping logic and returns back any requests satisfied by this
  * new request.
@@ -61,18 +65,23 @@ class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, de
  * this function handles delayed requests that have hit their time limit without being satisfied.
  *
  */
-abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purgeInterval: Int = 1000)
+abstract class RequestPurgatory[T <: DelayedRequest](brokerId: Int = 0, purgeInterval: Int = 1000)
         extends Logging with KafkaMetricsGroup {
 
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
-  private val requestCounter = new AtomicInteger(0)
+  /* the number of requests being watched, duplicates added on different watchers are also counted */
+  private val watched = new AtomicInteger(0)
+
+  /* background thread expiring requests that have been waiting too long */
+  private val expiredRequestReaper = new ExpiredRequestReaper
+  private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
 
   newGauge(
     "PurgatorySize",
     new Gauge[Int] {
-      def value = watchersForKey.values.map(_.numRequests).sum + expiredRequestReaper.numRequests
+      def value = watched.get() + expiredRequestReaper.numRequests
     }
   )
 
@@ -83,41 +92,50 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
     }
   )
 
-  /* background thread expiring requests that have been waiting too long */
-  private val expiredRequestReaper = new ExpiredRequestReaper
-  private val expirationThread = Utils.newThread(name="request-expiration-task", runnable=expiredRequestReaper, daemon=false)
   expirationThread.start()
 
   /**
-   * Add a new delayed request watching the contained keys
+   * Try to add the request for watch on all keys. Return true iff the request is
+   * satisfied and the satisfaction is done by the caller.
+   *
+   * Requests can be watched on only a few of the keys if it is found satisfied when
+   * trying to add it to each one of the keys. In this case the request is still treated as satisfied
+   * and hence no longer watched. Those already added elements will be later purged by the expire reaper.
    */
-  def watch(delayedRequest: T) {
-    requestCounter.getAndIncrement()
-
+  def checkAndMaybeWatch(delayedRequest: T): Boolean = {
     for(key <- delayedRequest.keys) {
-      var lst = watchersFor(key)
-      lst.add(delayedRequest)
+      val lst = watchersFor(key)
+      if(!lst.checkAndMaybeAdd(delayedRequest)) {
+        if(delayedRequest.satisfied.compareAndSet(false, true))
+          return true
+        else
+          return false
+      }
     }
+
+    // if it is indeed watched, add to the expire queue also
     expiredRequestReaper.enqueue(delayedRequest)
+
+    false
   }
 
   /**
    * Update any watchers and return a list of newly satisfied requests.
    */
-  def update(key: Any, request: R): Seq[T] = {
+  def update(key: Any): Seq[T] = {
     val w = watchersForKey.get(key)
     if(w == null)
       Seq.empty
     else
-      w.collectSatisfiedRequests(request)
+      w.collectSatisfiedRequests()
   }
 
   private def watchersFor(key: Any) = watchersForKey.getAndMaybePut(key)
   
   /**
-   * Check if this request satisfied this delayed request
+   * Check if this delayed request is already satisfied
    */
-  protected def checkSatisfied(request: R, delayed: T): Boolean
+  protected def checkSatisfied(request: T): Boolean
 
   /**
    * Handle an expired delayed request
@@ -125,7 +143,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
   protected def expire(delayed: T)
 
   /**
-   * Shutdown the expirey thread
+   * Shutdown the expire reaper thread
    */
   def shutdown() {
     expiredRequestReaper.shutdown()
@@ -136,17 +154,26 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
    * bookkeeping logic.
    */
   private class Watchers {
+    private val requests = new util.ArrayList[T]
 
-    private val requests = new util.LinkedList[T]
-
-    def numRequests = requests.size
-
-    def add(t: T) {
+    // potentially add the element to watch if it is not satisfied yet
+    def checkAndMaybeAdd(t: T): Boolean = {
       synchronized {
+        // if it is already satisfied, do not add to the watch list
+        if (t.satisfied.get)
+          return false
+        // synchronize on the delayed request to avoid any race condition
+        // with expire and update threads on client-side.
+        if(t synchronized checkSatisfied(t)) {
+          return false
+        }
         requests.add(t)
+        watched.getAndIncrement()
+        return true
       }
     }
 
+    // traverse the list and purge satisfied elements
     def purgeSatisfied(): Int = {
       synchronized {
         val iter = requests.iterator()
@@ -155,6 +182,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
           val curr = iter.next
           if(curr.satisfied.get()) {
             iter.remove()
+            watched.getAndDecrement()
             purged += 1
           }
         }
@@ -162,7 +190,8 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
       }
     }
 
-    def collectSatisfiedRequests(request: R): Seq[T] = {
+    // traverse the list and try to satisfy watched elements
+    def collectSatisfiedRequests(): Seq[T] = {
       val response = new mutable.ArrayBuffer[T]
       synchronized {
         val iter = requests.iterator()
@@ -174,9 +203,10 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
           } else {
             // synchronize on curr to avoid any race condition with expire
             // on client-side.
-            val satisfied = curr synchronized checkSatisfied(request, curr)
+            val satisfied = curr synchronized checkSatisfied(curr)
             if(satisfied) {
               iter.remove()
+              watched.getAndDecrement()
               val updated = curr.satisfied.compareAndSet(false, true)
               if(updated == true) {
                 response += curr
@@ -215,13 +245,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
               expire(curr)
             }
           }
-          if (requestCounter.get >= purgeInterval) { // see if we need to force a full purge
+          if (watched.get + numRequests >= purgeInterval) { // see if we need to force a full purge
             debug("Beginning purgatory purge")
-            requestCounter.set(0)
             val purged = purgeSatisfied()
             debug("Purged %d requests from delay queue.".format(purged))
             val numPurgedFromWatchers = watchersForKey.values.map(_.purgeSatisfied()).sum
-            debug("Purged %d (watcher) requests.".format(numPurgedFromWatchers))
+            debug("Purged %d requests from watch lists.".format(numPurgedFromWatchers))
           }
         } catch {
           case e: Exception =>
@@ -266,10 +295,12 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
     }
 
     /**
-     * Delete all expired events from the delay queue
+     * Delete all satisfied events from the delay queue and the watcher lists
      */
     private def purgeSatisfied(): Int = {
       var purged = 0
+
+      // purge the delayed queue
       val iter = delayed.iterator()
       while(iter.hasNext) {
         val curr = iter.next()
@@ -278,6 +309,7 @@ abstract class RequestPurgatory[T <: DelayedRequest, R](brokerId: Int = 0, purge
           purged += 1
         }
       }
+
       purged
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
index 5f8f6bc..67196f3 100644
--- a/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
+++ b/core/src/main/scala/kafka/tools/TestEndToEndLatency.scala
@@ -17,15 +17,17 @@
 
 package kafka.tools
 
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
+
+import kafka.consumer._
+
 import java.util.Properties
 import java.util.Arrays
-import kafka.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord, KafkaProducer}
 
 object TestEndToEndLatency {
   def main(args: Array[String]) {
-    if (args.length != 4) {
-      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages")
+    if (args.length != 6) {
+      System.err.println("USAGE: java " + getClass().getName + " broker_list zookeeper_connect topic num_messages consumer_fetch_max_wait producer_acks")
       System.exit(1)
     }
 
@@ -33,31 +35,38 @@ object TestEndToEndLatency {
     val zkConnect = args(1)
     val topic = args(2)
     val numMessages = args(3).toInt
+    val consumerFetchMaxWait = args(4).toInt
+    val producerAcks = args(5).toInt
 
     val consumerProps = new Properties()
     consumerProps.put("group.id", topic)
     consumerProps.put("auto.commit.enable", "false")
     consumerProps.put("auto.offset.reset", "largest")
     consumerProps.put("zookeeper.connect", zkConnect)
-    consumerProps.put("fetch.wait.max.ms", "1")
+    consumerProps.put("fetch.wait.max.ms", consumerFetchMaxWait.toString)
     consumerProps.put("socket.timeout.ms", 1201000.toString)
 
     val config = new ConsumerConfig(consumerProps)
     val connector = Consumer.create(config)
-    var stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
+    val stream = connector.createMessageStreams(Map(topic -> 1)).get(topic).head.head
     val iter = stream.iterator
 
     val producerProps = new Properties()
     producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList)
     producerProps.put(ProducerConfig.LINGER_MS_CONFIG, "0")
     producerProps.put(ProducerConfig.BLOCK_ON_BUFFER_FULL_CONFIG, "true")
+    producerProps.put(ProducerConfig.ACKS_CONFIG, producerAcks.toString)
     val producer = new KafkaProducer(producerProps)
 
+    // make sure the consumer fetcher has started before sending data since otherwise
+    // the consumption from the tail will skip the first message and hence be blocked
+    Thread.sleep(5000)
+
     val message = "hello there beautiful".getBytes
     var totalTime = 0.0
     val latencies = new Array[Long](numMessages)
     for (i <- 0 until numMessages) {
-      var begin = System.nanoTime
+      val begin = System.nanoTime
       producer.send(new ProducerRecord(topic, message))
       val received = iter.next
       val elapsed = System.nanoTime - begin

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/other/kafka/StressTestLog.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/other/kafka/StressTestLog.scala b/core/src/test/scala/other/kafka/StressTestLog.scala
index 8fcd068..e19b8b2 100644
--- a/core/src/test/scala/other/kafka/StressTestLog.scala
+++ b/core/src/test/scala/other/kafka/StressTestLog.scala
@@ -91,7 +91,7 @@ object StressTestLog {
     @volatile var offset = 0
     override def work() {
       try {
-        log.read(offset, 1024, Some(offset+1)) match {
+        log.read(offset, 1024, Some(offset+1)).messageSet match {
           case read: FileMessageSet if read.sizeInBytes > 0 => {
             val first = read.head
             require(first.offset == offset, "We should either read nothing or the message we asked for.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
index 9f04bd3..a5386a0 100644
--- a/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
+++ b/core/src/test/scala/unit/kafka/integration/PrimitiveApiTest.scala
@@ -74,7 +74,7 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
 
     val replica = servers.head.replicaManager.getReplica(topic, 0).get
     assertTrue("HighWatermark should equal logEndOffset with just 1 replica",
-               replica.logEndOffset > 0 && replica.logEndOffset == replica.highWatermark)
+               replica.logEndOffset.messageOffset > 0 && replica.logEndOffset.equals(replica.highWatermark))
 
     val request = new FetchRequestBuilder()
       .clientId("test-client")
@@ -248,13 +248,13 @@ class PrimitiveApiTest extends JUnit3Suite with ProducerConsumerTestHarness with
                             "Published messages should be in the log")
 
     val replicaId = servers.head.config.brokerId
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test1", 0, replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test2", 0, replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test3", 0, replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
-    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark == 2 },
+    TestUtils.waitUntilTrue(() => { servers.head.replicaManager.getReplica("test4", 0, replicaId).get.highWatermark.messageOffset == 2 },
                             "High watermark should equal to log end offset")
 
     // test if the consumer received the messages in the correct order when producer has enabled request pipelining

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 7d4c70c..59bd8a9 100644
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -94,7 +94,7 @@ class LogManagerTest extends JUnit3Suite {
     assertEquals("Now there should only be only one segment in the index.", 1, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset+1, 1024).messageSet.sizeInBytes)
 
     try {
       log.read(0, 1024)
@@ -137,7 +137,7 @@ class LogManagerTest extends JUnit3Suite {
     assertEquals("Now there should be exactly 6 segments", 6, log.numberOfSegments)
     time.sleep(log.config.fileDeleteDelayMs + 1)
     assertEquals("Files should have been deleted", log.numberOfSegments * 2, log.dir.list.length)
-    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).sizeInBytes)
+    assertEquals("Should get empty fetch off new log.", 0, log.read(offset + 1, 1024).messageSet.sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Should get exception from fetching earlier.")

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 6b76037..7b97e6a 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -78,7 +78,7 @@ class LogSegmentTest extends JUnit3Suite {
     val seg = createSegment(40)
     val ms = messages(50, "hello", "there", "little", "bee")
     seg.append(50, ms)
-    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None)
+    val read = seg.read(startOffset = 41, maxSize = 300, maxOffset = None).messageSet
     assertEquals(ms.toList, read.toList)
   }
   
@@ -94,7 +94,7 @@ class LogSegmentTest extends JUnit3Suite {
     seg.append(baseOffset, ms)
     def validate(offset: Long) = 
       assertEquals(ms.filter(_.offset == offset).toList, 
-                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).toList)
+                   seg.read(startOffset = offset, maxSize = 1024, maxOffset = Some(offset+1)).messageSet.toList)
     validate(50)
     validate(51)
     validate(52)
@@ -109,7 +109,7 @@ class LogSegmentTest extends JUnit3Suite {
     val ms = messages(50, "hello", "there")
     seg.append(50, ms)
     val read = seg.read(startOffset = 52, maxSize = 200, maxOffset = None)
-    assertNull("Read beyond the last offset in the segment should give null", null)
+    assertNull("Read beyond the last offset in the segment should give null", read)
   }
   
   /**
@@ -124,7 +124,7 @@ class LogSegmentTest extends JUnit3Suite {
     val ms2 = messages(60, "alpha", "beta")
     seg.append(60, ms2)
     val read = seg.read(startOffset = 55, maxSize = 200, maxOffset = None)
-    assertEquals(ms2.toList, read.toList)
+    assertEquals(ms2.toList, read.messageSet.toList)
   }
   
   /**
@@ -142,12 +142,12 @@ class LogSegmentTest extends JUnit3Suite {
       seg.append(offset+1, ms2)
       // check that we can read back both messages
       val read = seg.read(offset, None, 10000)
-      assertEquals(List(ms1.head, ms2.head), read.toList)
+      assertEquals(List(ms1.head, ms2.head), read.messageSet.toList)
       // now truncate off the last message
       seg.truncateTo(offset + 1)
       val read2 = seg.read(offset, None, 10000)
-      assertEquals(1, read2.size)
-      assertEquals(ms1.head, read2.head)
+      assertEquals(1, read2.messageSet.size)
+      assertEquals(ms1.head, read2.messageSet.head)
       offset += 1
     }
   }
@@ -204,7 +204,7 @@ class LogSegmentTest extends JUnit3Suite {
     TestUtils.writeNonsenseToFile(indexFile, 5, indexFile.length.toInt)
     seg.recover(64*1024)
     for(i <- 0 until 100)
-      assertEquals(i, seg.read(i, Some(i+1), 1024).head.offset)
+      assertEquals(i, seg.read(i, Some(i+1), 1024).messageSet.head.offset)
   }
   
   /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 1da1393..577d102 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -131,11 +131,11 @@ class LogTest extends JUnitSuite {
     for(i <- 0 until messages.length)
       log.append(new ByteBufferMessageSet(NoCompressionCodec, messages = messages(i)))
     for(i <- 0 until messages.length) {
-      val read = log.read(i, 100, Some(i+1)).head
+      val read = log.read(i, 100, Some(i+1)).messageSet.head
       assertEquals("Offset read should match order appended.", i, read.offset)
       assertEquals("Message should match appended.", messages(i), read.message)
     }
-    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).size)
+    assertEquals("Reading beyond the last message returns nothing.", 0, log.read(messages.length, 100, None).messageSet.size)
   }
   
   /**
@@ -153,7 +153,7 @@ class LogTest extends JUnitSuite {
       log.append(new ByteBufferMessageSet(NoCompressionCodec, new AtomicLong(messageIds(i)), messages = messages(i)), assignOffsets = false)
     for(i <- 50 until messageIds.max) {
       val idx = messageIds.indexWhere(_ >= i)
-      val read = log.read(i, 100, None).head
+      val read = log.read(i, 100, None).messageSet.head
       assertEquals("Offset read should match message id.", messageIds(idx), read.offset)
       assertEquals("Message should match appended.", messages(idx), read.message)
     }
@@ -176,7 +176,7 @@ class LogTest extends JUnitSuite {
     // now manually truncate off all but one message from the first segment to create a gap in the messages
     log.logSegments.head.truncateTo(1)
     
-    assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).head.offset)
+    assertEquals("A read should now return the last message in the log", log.logEndOffset-1, log.read(1, 200, None).messageSet.head.offset)
   }
   
   /**
@@ -188,7 +188,7 @@ class LogTest extends JUnitSuite {
   def testReadOutOfRange() {
     createEmptyLogs(logDir, 1024)
     val log = new Log(logDir, logConfig.copy(segmentSize = 1024), recoveryPoint = 0L, time.scheduler, time = time)
-    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).sizeInBytes)
+    assertEquals("Reading just beyond end of log should produce 0 byte read.", 0, log.read(1024, 1000).messageSet.sizeInBytes)
     try {
       log.read(0, 1024)
       fail("Expected exception on invalid read.")
@@ -219,12 +219,12 @@ class LogTest extends JUnitSuite {
     /* do successive reads to ensure all our messages are there */
     var offset = 0L
     for(i <- 0 until numMessages) {
-      val messages = log.read(offset, 1024*1024)
+      val messages = log.read(offset, 1024*1024).messageSet
       assertEquals("Offsets not equal", offset, messages.head.offset)
       assertEquals("Messages not equal at offset " + offset, messageSets(i).head.message, messages.head.message)
       offset = messages.head.offset + 1
     }
-    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1))
+    val lastRead = log.read(startOffset = numMessages, maxLength = 1024*1024, maxOffset = Some(numMessages + 1)).messageSet
     assertEquals("Should be no more messages", 0, lastRead.size)
     
     // check that rolling the log forced a flushed the log--the flush is asyn so retry in case of failure
@@ -245,7 +245,7 @@ class LogTest extends JUnitSuite {
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("hello".getBytes), new Message("there".getBytes)))
     log.append(new ByteBufferMessageSet(DefaultCompressionCodec, new Message("alpha".getBytes), new Message("beta".getBytes)))
     
-    def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).head.message)
+    def read(offset: Int) = ByteBufferMessageSet.decompress(log.read(offset, 4096).messageSet.head.message)
     
     /* we should always get the first message in the compressed set when reading any offset in the set */
     assertEquals("Read at offset 0 should produce 0", 0, read(0).head.offset)
@@ -363,7 +363,7 @@ class LogTest extends JUnitSuite {
     log = new Log(logDir, config, recoveryPoint = 0L, time.scheduler, time)    
     assertEquals("Should have %d messages when log is reopened".format(numMessages), numMessages, log.logEndOffset)
     for(i <- 0 until numMessages)
-      assertEquals(i, log.read(i, 100, None).head.offset)
+      assertEquals(i, log.read(i, 100, None).messageSet.head.offset)
     log.close()
   }
 
@@ -575,15 +575,15 @@ class LogTest extends JUnitSuite {
   
   @Test
   def testAppendMessageWithNullPayload() {
-    var log = new Log(logDir,
+    val log = new Log(logDir,
                       LogConfig(),
                       recoveryPoint = 0L,
                       time.scheduler,
                       time)
     log.append(new ByteBufferMessageSet(new Message(bytes = null)))
-    val ms = log.read(0, 4096, None)
-    assertEquals(0, ms.head.offset)
-    assertTrue("Message payload should be null.", ms.head.message.isNull)
+    val messageSet = log.read(0, 4096, None).messageSet
+    assertEquals(0, messageSet.head.offset)
+    assertTrue("Message payload should be null.", messageSet.head.message.isNull)
   }
   
   @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
index 6db245c..dd8847f 100644
--- a/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
+++ b/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
@@ -31,7 +31,7 @@ trait BaseMessageSetTestCases extends JUnitSuite {
   def createMessageSet(messages: Seq[Message]): MessageSet
 
   @Test
-  def testWrittenEqualsRead {
+  def testWrittenEqualsRead() {
     val messageSet = createMessageSet(messages)
     checkEquals(messages.iterator, messageSet.map(m => m.message).iterator)
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
index e532c28..03a424d 100644
--- a/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
+++ b/core/src/test/scala/unit/kafka/server/HighwatermarkPersistenceTest.scala
@@ -58,7 +58,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     replicaManager.checkpointHighWatermarks()
     var fooPartition0Hw = hwmFor(replicaManager, topic, 0)
     assertEquals(0L, fooPartition0Hw)
-    val partition0 = replicaManager.getOrCreatePartition(topic, 0, 1)
+    val partition0 = replicaManager.getOrCreatePartition(topic, 0)
     // create leader and follower replicas
     val log0 = logManagers(0).createLog(TopicAndPartition(topic, 0), LogConfig())
     val leaderReplicaPartition0 = new Replica(configs.head.brokerId, partition0, SystemTime, 0, Some(log0))
@@ -67,18 +67,12 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     partition0.addReplicaIfNotExists(followerReplicaPartition0)
     replicaManager.checkpointHighWatermarks()
     fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
-    try {
-      followerReplicaPartition0.highWatermark
-      fail("Should fail with KafkaException")
-    }catch {
-      case e: KafkaException => // this is ok
-    }
-    // set the highwatermark for local replica
-    partition0.getReplica().get.highWatermark = 5L
+    assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
+    // set the high watermark for local replica
+    partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
     replicaManager.checkpointHighWatermarks()
     fooPartition0Hw = hwmFor(replicaManager, topic, 0)
-    assertEquals(leaderReplicaPartition0.highWatermark, fooPartition0Hw)
+    assertEquals(leaderReplicaPartition0.highWatermark.messageOffset, fooPartition0Hw)
     EasyMock.verify(zkClient)
   }
 
@@ -97,7 +91,7 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     replicaManager.checkpointHighWatermarks()
     var topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
     assertEquals(0L, topic1Partition0Hw)
-    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0, 1)
+    val topic1Partition0 = replicaManager.getOrCreatePartition(topic1, 0)
     // create leader log
     val topic1Log0 = logManagers(0).createLog(TopicAndPartition(topic1, 0), LogConfig())
     // create a local replica for topic1
@@ -105,15 +99,15 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     topic1Partition0.addReplicaIfNotExists(leaderReplicaTopic1Partition0)
     replicaManager.checkpointHighWatermarks()
     topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(leaderReplicaTopic1Partition0.highWatermark, topic1Partition0Hw)
-    // set the highwatermark for local replica
-    topic1Partition0.getReplica().get.highWatermark = 5L
+    assertEquals(leaderReplicaTopic1Partition0.highWatermark.messageOffset, topic1Partition0Hw)
+    // set the high watermark for local replica
+    topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(5L)
     replicaManager.checkpointHighWatermarks()
     topic1Partition0Hw = hwmFor(replicaManager, topic1, 0)
-    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark)
+    assertEquals(5L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
     assertEquals(5L, topic1Partition0Hw)
     // add another partition and set highwatermark
-    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0, 1)
+    val topic2Partition0 = replicaManager.getOrCreatePartition(topic2, 0)
     // create leader log
     val topic2Log0 = logManagers(0).createLog(TopicAndPartition(topic2, 0), LogConfig())
     // create a local replica for topic2
@@ -121,13 +115,13 @@ class HighwatermarkPersistenceTest extends JUnit3Suite {
     topic2Partition0.addReplicaIfNotExists(leaderReplicaTopic2Partition0)
     replicaManager.checkpointHighWatermarks()
     var topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)
-    assertEquals(leaderReplicaTopic2Partition0.highWatermark, topic2Partition0Hw)
+    assertEquals(leaderReplicaTopic2Partition0.highWatermark.messageOffset, topic2Partition0Hw)
     // set the highwatermark for local replica
-    topic2Partition0.getReplica().get.highWatermark = 15L
-    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark)
+    topic2Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(15L)
+    assertEquals(15L, leaderReplicaTopic2Partition0.highWatermark.messageOffset)
     // change the highwatermark for topic1
-    topic1Partition0.getReplica().get.highWatermark = 10L
-    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark)
+    topic1Partition0.getReplica().get.highWatermark = new LogOffsetMetadata(10L)
+    assertEquals(10L, leaderReplicaTopic1Partition0.highWatermark.messageOffset)
     replicaManager.checkpointHighWatermarks()
     // verify checkpointed hw for topic 2
     topic2Partition0Hw = hwmFor(replicaManager, topic2, 0)

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
index 2cd3a3f..cd302aa 100644
--- a/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ISRExpirationTest.scala
@@ -46,7 +46,7 @@ class IsrExpirationTest extends JUnit3Suite {
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
 
     // let the follower catch up to 10
-    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 10)
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(10L))
     var partition0OSR = partition0.getOutOfSyncReplicas(leaderReplica, configs.head.replicaLagTimeMaxMs, configs.head.replicaLagMaxMessages)
     assertEquals("No replica should be out of sync", Set.empty[Int], partition0OSR.map(_.brokerId))
 
@@ -69,7 +69,7 @@ class IsrExpirationTest extends JUnit3Suite {
     assertEquals("All replicas should be in ISR", configs.map(_.brokerId).toSet, partition0.inSyncReplicas.map(_.brokerId))
     val leaderReplica = partition0.getReplica(configs.head.brokerId).get
     // set remote replicas leo to something low, like 4
-    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = 4L)
+    (partition0.assignedReplicas() - leaderReplica).foreach(r => r.logEndOffset = new LogOffsetMetadata(4L))
 
     // now follower (broker id 1) has caught up to only 4, while the leader is at 15. Since the gap it larger than
     // replicaMaxLagBytes, the follower is out of sync.
@@ -83,7 +83,7 @@ class IsrExpirationTest extends JUnit3Suite {
                                                localLog: Log): Partition = {
     val leaderId=config.brokerId
     val replicaManager = new ReplicaManager(config, time, null, null, null, new AtomicBoolean(false))
-    val partition = replicaManager.getOrCreatePartition(topic, partitionId, 1)
+    val partition = replicaManager.getOrCreatePartition(topic, partitionId)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
@@ -97,7 +97,7 @@ class IsrExpirationTest extends JUnit3Suite {
 
   private def getLogWithLogEndOffset(logEndOffset: Long, expectedCalls: Int): Log = {
     val log1 = EasyMock.createMock(classOf[kafka.log.Log])
-    EasyMock.expect(log1.logEndOffset).andReturn(logEndOffset).times(expectedCalls)
+    EasyMock.expect(log1.logEndOffsetMetadata).andReturn(new LogOffsetMetadata(logEndOffset)).times(expectedCalls)
     EasyMock.replay(log1)
 
     log1

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
index 0ec120a..d5d351c 100644
--- a/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
@@ -85,7 +85,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // give some time for the follower 1 to record leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == numMessages,
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == numMessages,
       "Failed to update high watermark for follower after timeout")
 
     servers.foreach(server => server.replicaManager.checkpointHighWatermarks())
@@ -134,7 +134,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // give some time for follower 1 to record leader HW of 60
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
@@ -147,7 +147,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
     val hw = 20L
     // give some time for follower 1 to record leader HW of 600
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())
@@ -165,7 +165,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server2.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      server2.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // kill the server hosting the preferred replica
     server1.shutdown()
@@ -191,7 +191,7 @@ class LogRecoveryTest extends JUnit3Suite with ZooKeeperTestHarness {
 
     // allow some time for the follower to get the leader HW
     TestUtils.waitUntilTrue(() =>
-      server1.replicaManager.getReplica(topic, 0).get.highWatermark == hw,
+      server1.replicaManager.getReplica(topic, 0).get.highWatermark.messageOffset == hw,
       "Failed to update high watermark for follower after timeout")
     // shutdown the servers to allow the hw to be checkpointed
     servers.foreach(server => server.shutdown())

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
index 9abf219..a9c4ddc 100644
--- a/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
@@ -39,7 +39,7 @@ class ReplicaManagerTest extends JUnit3Suite {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
-    val partition = rm.getOrCreatePartition(topic, 1, 1)
+    val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
   }
@@ -53,7 +53,7 @@ class ReplicaManagerTest extends JUnit3Suite {
     val mockLogMgr = TestUtils.createLogManager(config.logDirs.map(new File(_)).toArray)
     val time: MockTime = new MockTime()
     val rm = new ReplicaManager(config, time, zkClient, new MockScheduler(time), mockLogMgr, new AtomicBoolean(false))
-    val partition = rm.getOrCreatePartition(topic, 1, 1)
+    val partition = rm.getOrCreatePartition(topic, 1)
     partition.getOrCreateReplica(1)
     rm.checkpointHighWatermarks()
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
index 4f61f84..168712d 100644
--- a/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
+++ b/core/src/test/scala/unit/kafka/server/RequestPurgatoryTest.scala
@@ -46,17 +46,17 @@ class RequestPurgatoryTest extends JUnit3Suite {
   def testRequestSatisfaction() {
     val r1 = new DelayedRequest(Array("test1"), null, 100000L)
     val r2 = new DelayedRequest(Array("test2"), null, 100000L)
-    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1", producerRequest1).size)
-    purgatory.watch(r1)
-    assertEquals("Still nothing satisfied", 0, purgatory.update("test1", producerRequest1).size)
-    purgatory.watch(r2)
-    assertEquals("Still nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+    assertEquals("With no waiting requests, nothing should be satisfied", 0, purgatory.update("test1").size)
+    assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
+    assertEquals("Still nothing satisfied", 0, purgatory.update("test1").size)
+    assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
+    assertEquals("Still nothing satisfied", 0, purgatory.update("test2").size)
     purgatory.satisfied += r1
-    assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1", producerRequest1))
-    assertEquals("Nothing satisfied", 0, purgatory.update("test1", producerRequest2).size)
+    assertEquals("r1 satisfied", mutable.ArrayBuffer(r1), purgatory.update("test1"))
+    assertEquals("Nothing satisfied", 0, purgatory.update("test1").size)
     purgatory.satisfied += r2
-    assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2", producerRequest2))
-    assertEquals("Nothing satisfied", 0, purgatory.update("test2", producerRequest2).size)
+    assertEquals("r2 satisfied", mutable.ArrayBuffer(r2), purgatory.update("test2"))
+    assertEquals("Nothing satisfied", 0, purgatory.update("test2").size)
   }
 
   @Test
@@ -65,8 +65,8 @@ class RequestPurgatoryTest extends JUnit3Suite {
     val r1 = new DelayedRequest(Array("test1"), null, expiration)
     val r2 = new DelayedRequest(Array("test1"), null, 200000L)
     val start = System.currentTimeMillis
-    purgatory.watch(r1)
-    purgatory.watch(r2)
+    assertFalse("r1 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r1))
+    assertFalse("r2 not satisfied and hence watched", purgatory.checkAndMaybeWatch(r2))
     purgatory.awaitExpiration(r1)
     val elapsed = System.currentTimeMillis - start
     assertTrue("r1 expired", purgatory.expired.contains(r1))
@@ -74,7 +74,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
     assertTrue("Time for expiration %d should at least %d".format(elapsed, expiration), elapsed >= expiration)
   }
   
-  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest, ProducerRequest] {
+  class MockRequestPurgatory extends RequestPurgatory[DelayedRequest] {
     val satisfied = mutable.Set[DelayedRequest]()
     val expired = mutable.Set[DelayedRequest]()
     def awaitExpiration(delayed: DelayedRequest) = {
@@ -82,7 +82,7 @@ class RequestPurgatoryTest extends JUnit3Suite {
         delayed.wait()
       }
     }
-    def checkSatisfied(request: ProducerRequest, delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
+    def checkSatisfied(delayed: DelayedRequest): Boolean = satisfied.contains(delayed)
     def expire(delayed: DelayedRequest) {
       expired += delayed
       delayed synchronized {

http://git-wip-us.apache.org/repos/asf/kafka/blob/0dc243b9/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index b1c4ce9..09ed8f5 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -16,17 +16,19 @@
  */
 package kafka.server
 
+import kafka.api._
 import kafka.cluster.{Partition, Replica}
+import kafka.common.{ErrorMapping, TopicAndPartition}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
 import kafka.network.RequestChannel
 import kafka.utils.{ZkUtils, Time, TestUtils, MockTime}
+
+import scala.Some
+
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
-import kafka.api._
-import scala.Some
-import kafka.common.TopicAndPartition
 
 class SimpleFetchTest extends JUnit3Suite {
 
@@ -45,13 +47,13 @@ class SimpleFetchTest extends JUnit3Suite {
    * with a HW matching the leader's ("5") and LEO of "15", meaning it's not in-sync
    * but is still in ISR (hasn't yet expired from ISR).
    *
-   * When a normal consumer fetches data, it only should only see data upto the HW of the leader,
+   * When a normal consumer fetches data, it should only see data up to the HW of the leader,
    * in this case up an offset of "5".
    */
   def testNonReplicaSeesHwWhenFetching() {
     /* setup */
     val time = new MockTime
-    val leo = 20
+    val leo = 20L
     val hw = 5
     val fetchSize = 100
     val messages = new Message("test-message".getBytes())
@@ -64,7 +66,11 @@ class SimpleFetchTest extends JUnit3Suite {
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
     EasyMock.expect(log)
-    EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.expect(log.read(0, fetchSize, Some(hw))).andReturn(
+      new FetchDataInfo(
+        new LogOffsetMetadata(0L, 0L, leo.toInt),
+        new ByteBufferMessageSet(messages)
+      )).anyTimes()
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
@@ -76,14 +82,26 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+      val fetchInfo = log.read(0, fetchSize, Some(hw))
+      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+    }).anyTimes()
     EasyMock.replay(replicaManager)
 
     val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
-    partition.getReplica(configs(1).brokerId).get.logEndOffset = leo - 5L
+    partition.getReplica(configs(1).brokerId).get.logEndOffset = new LogOffsetMetadata(leo - 5L, 0L, leo.toInt - 5)
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+    EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
+    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+      val fetchInfo = log.read(0, fetchSize, Some(hw))
+      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+    }).anyTimes()
+
     EasyMock.replay(replicaManager)
 
     val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
@@ -138,7 +156,11 @@ class SimpleFetchTest extends JUnit3Suite {
 
     val log = EasyMock.createMock(classOf[kafka.log.Log])
     EasyMock.expect(log.logEndOffset).andReturn(leo).anyTimes()
-    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(new ByteBufferMessageSet(messages))
+    EasyMock.expect(log.read(followerLEO, Integer.MAX_VALUE, None)).andReturn(
+      new FetchDataInfo(
+        new LogOffsetMetadata(followerLEO, 0L, followerLEO),
+        new ByteBufferMessageSet(messages)
+      )).anyTimes()
     EasyMock.replay(log)
 
     val logManager = EasyMock.createMock(classOf[kafka.log.LogManager])
@@ -150,16 +172,28 @@ class SimpleFetchTest extends JUnit3Suite {
     EasyMock.expect(replicaManager.logManager).andReturn(logManager)
     EasyMock.expect(replicaManager.replicaFetcherManager).andReturn(EasyMock.createMock(classOf[ReplicaFetcherManager]))
     EasyMock.expect(replicaManager.zkClient).andReturn(zkClient)
+    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+      val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
+      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+    }).anyTimes()
     EasyMock.replay(replicaManager)
 
     val partition = getPartitionWithAllReplicasInISR(topic, partitionId, time, configs.head.brokerId, log, hw, replicaManager)
-    partition.getReplica(followerReplicaId).get.logEndOffset = followerLEO.asInstanceOf[Long]
+    partition.getReplica(followerReplicaId).get.logEndOffset = new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)
 
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.config).andReturn(configs.head).anyTimes()
-    EasyMock.expect(replicaManager.recordFollowerPosition(topic, partitionId, followerReplicaId, followerLEO))
+    EasyMock.expect(replicaManager.updateReplicaLEOAndPartitionHW(topic, partitionId, followerReplicaId, new LogOffsetMetadata(followerLEO.asInstanceOf[Long], 0L, followerLEO)))
     EasyMock.expect(replicaManager.getReplica(topic, partitionId, followerReplicaId)).andReturn(partition.inSyncReplicas.find(_.brokerId == configs(1).brokerId))
     EasyMock.expect(replicaManager.getLeaderReplicaIfLocal(topic, partitionId)).andReturn(partition.leaderReplicaIfLocal().get).anyTimes()
+    EasyMock.expect(replicaManager.initWithRequestPurgatory(EasyMock.anyObject(), EasyMock.anyObject()))
+    EasyMock.expect(replicaManager.readMessageSets(EasyMock.anyObject())).andReturn({
+      val fetchInfo = log.read(followerLEO, Integer.MAX_VALUE, None)
+      val partitionData = new FetchResponsePartitionData(ErrorMapping.NoError, hw.toLong, fetchInfo.messageSet)
+      Map(TopicAndPartition(topic, partitionId) -> new PartitionDataAndOffset(partitionData, fetchInfo.fetchOffset))
+    }).anyTimes()
+    EasyMock.expect(replicaManager.unblockDelayedProduceRequests(EasyMock.anyObject())).anyTimes()
     EasyMock.replay(replicaManager)
 
     val offsetManager = EasyMock.createMock(classOf[kafka.server.OffsetManager])
@@ -195,7 +229,7 @@ class SimpleFetchTest extends JUnit3Suite {
 
   private def getPartitionWithAllReplicasInISR(topic: String, partitionId: Int, time: Time, leaderId: Int,
                                                localLog: Log, leaderHW: Long, replicaManager: ReplicaManager): Partition = {
-    val partition = new Partition(topic, partitionId, 2, time, replicaManager)
+    val partition = new Partition(topic, partitionId, time, replicaManager)
     val leaderReplica = new Replica(leaderId, partition, time, 0, Some(localLog))
 
     val allReplicas = getFollowerReplicas(partition, leaderId, time) :+ leaderReplica
@@ -204,7 +238,7 @@ class SimpleFetchTest extends JUnit3Suite {
     partition.inSyncReplicas = allReplicas.toSet
     // set the leader and its hw and the hw update time
     partition.leaderReplicaIdOpt = Some(leaderId)
-    leaderReplica.highWatermark = leaderHW
+    leaderReplica.highWatermark = new LogOffsetMetadata(leaderHW)
     partition
   }
 


Mime
View raw message