kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1395729 [1/4] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/ core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/common/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/...
Date Mon, 08 Oct 2012 19:13:27 GMT
Author: jkreps
Date: Mon Oct  8 19:13:24 2012
New Revision: 1395729

URL: http://svn.apache.org/viewvc?rev=1395729&view=rev
Log:
KAFKA-506 Move to logical offsets. Reviewed by Jun and Neha.


Added:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidTopicException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetIndex.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/OffsetPosition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionFactory.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/api/RequestResponseSerializationTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/FileMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/OffsetIndexTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/storage/
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/CompressionUtils.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LogCorruptionTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/CompressionUtilsTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/FileMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/RpcDataSerializationTest.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/SegmentList.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Message.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageAndOffset.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/ConsoleProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaConfig.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/DumpLogSegments.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/tools/SimpleConsumerShell.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/ZkUtils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/other/kafka/TestLogPerformance.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/consumer/ZookeeperConsumerConnectorTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/FetcherTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/LazyInitProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/javaapi/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogOffsetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/log/LogTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/BaseMessageSetTestCases.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/MessageTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/metrics/KafkaTimerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/ProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/LogRecoveryTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ReplicaFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/perf/src/main/scala/kafka/perf/ConsumerPerformance.scala

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/Kafka.scala Mon Oct  8 19:13:24 2012
@@ -24,7 +24,6 @@ import utils.{Utils, Logging}
 object Kafka extends Logging {
 
   def main(args: Array[String]): Unit = {
-
     if (args.length != 1) {
       println("USAGE: java [options] %s server.properties".format(classOf[KafkaServer].getSimpleName()))
       System.exit(1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala Mon Oct  8 19:13:24 2012
@@ -34,7 +34,7 @@ object FetchResponsePartitionData {
     val messageSetBuffer = buffer.slice()
     messageSetBuffer.limit(messageSetSize)
     buffer.position(buffer.position + messageSetSize)
-    new FetchResponsePartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer, initialOffset))
+    new FetchResponsePartitionData(partition, error, initialOffset, hw, new ByteBufferMessageSet(messageSetBuffer))
   }
 
   val headerSize =
@@ -50,6 +50,7 @@ case class FetchResponsePartitionData(pa
   val sizeInBytes = FetchResponsePartitionData.headerSize + messages.sizeInBytes.intValue()
 
   def this(partition: Int, messages: MessageSet) = this(partition, ErrorMapping.NoError, 0L, -1L, messages)
+  
 }
 
 // SENDS

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Mon Oct  8 19:13:24 2012
@@ -72,12 +72,12 @@ object ProducerRequest {
   }
 }
 
-case class ProducerRequest( versionId: Short = ProducerRequest.CurrentVersion,
-                            correlationId: Int,
-                            clientId: String,
-                            requiredAcks: Short,
-                            ackTimeoutMs: Int,
-                            data: Map[TopicAndPartition, ProducerRequestPartitionData])
+case class ProducerRequest(versionId: Short = ProducerRequest.CurrentVersion,
+                           correlationId: Int,
+                           clientId: String,
+                           requiredAcks: Short,
+                           ackTimeoutMs: Int,
+                           data: Map[TopicAndPartition, ProducerRequestPartitionData])
     extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   /**
@@ -107,10 +107,11 @@ case class ProducerRequest( versionId: S
         buffer.putInt(topicAndPartitionData.size) //the number of partitions
         topicAndPartitionData.foreach(partitionAndData => {
           val partitionData = partitionAndData._2
+          val bytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer
           buffer.putInt(partitionData.partition)
-          buffer.putInt(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.limit)
-          buffer.put(partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer)
-          partitionData.messages.asInstanceOf[ByteBufferMessageSet].buffer.rewind
+          buffer.putInt(bytes.limit)
+          buffer.put(bytes)
+          bytes.rewind
         })
     }
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerResponse.scala Mon Oct  8 19:13:24 2012
@@ -43,7 +43,7 @@ object ProducerResponse {
   }
 }
 
-case class ProducerResponseStatus(error: Short, nextOffset: Long)
+case class ProducerResponseStatus(error: Short, offset: Long)
 
 
 case class ProducerResponse(versionId: Short,

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Mon Oct  8 19:13:24 2012
@@ -43,7 +43,7 @@ class Partition(val topic: String,
   private val leaderISRUpdateLock = new Object
   private var zkVersion: Int = LeaderAndIsr.initialZKVersion
   private var leaderEpoch: Int = LeaderAndIsr.initialLeaderEpoch - 1
-  this.logIdent = "Partition [%s, %d] on broker %d, ".format(topic, partitionId, localBrokerId)
+  this.logIdent = "Partition [%s, %d] on broker %d: ".format(topic, partitionId, localBrokerId)
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
@@ -116,7 +116,7 @@ class Partition(val topic: String,
   def makeLeaderOrFollower(topic: String, partitionId: Int, leaderAndISR: LeaderAndIsr, isMakingLeader: Boolean): Boolean = {
     leaderISRUpdateLock synchronized {
       if (leaderEpoch >= leaderAndISR.leaderEpoch){
-        info("Current leaderEpoch [%d] is larger or equal to the requested leaderEpoch [%d], discard the become %s request"
+        info("Current leader epoch [%d] is larger or equal to the requested leader epoch [%d], discard the become %s request"
           .format(leaderEpoch, leaderAndISR.leaderEpoch, if(isMakingLeader) "leader" else "follower"))
         return false
       }
@@ -183,7 +183,7 @@ class Partition(val topic: String,
 
   def updateLeaderHWAndMaybeExpandISR(replicaId: Int, offset: Long) {
     leaderISRUpdateLock synchronized {
-      debug("Recording follower %d position %d for topic %s partition %d".format(replicaId, offset, topic, partitionId))
+      debug("Recording follower %d position %d for topic %s partition %d.".format(replicaId, offset, topic, partitionId))
       val replica = getOrCreateReplica(replicaId)
       replica.logEndOffset = offset
 
@@ -195,7 +195,7 @@ class Partition(val topic: String,
           if (!inSyncReplicas.contains(replica) && replica.logEndOffset >= leaderHW) {
             // expand ISR
             val newInSyncReplicas = inSyncReplicas + replica
-            info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
+            info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(", ")))
             // update ISR in ZK and cache
             updateISR(newInSyncReplicas)
             replicaManager.isrExpandRate.mark()
@@ -289,7 +289,7 @@ class Partition(val topic: String,
   }
 
   private def updateISR(newISR: Set[Replica]) {
-    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(",")))
+    info("Updated ISR for topic %s partition %d to %s".format(topic, partitionId, newISR.mkString(", ")))
     val newLeaderAndISR = new LeaderAndIsr(localBrokerId, leaderEpoch, newISR.map(r => r.brokerId).toList, zkVersion)
     val (updateSucceeded, newVersion) = ZkUtils.conditionalUpdatePersistentPath(zkClient,
       ZkUtils.getTopicPartitionLeaderAndIsrPath(topic, partitionId), newLeaderAndISR.toString, zkVersion)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/ErrorMapping.scala Mon Oct  8 19:13:24 2012
@@ -67,11 +67,3 @@ object ErrorMapping {
 
   def exceptionFor(code: Short) : Throwable = codeToException(code).newInstance()
 }
-
-class InvalidTopicException(message: String) extends RuntimeException(message) {
-  def this() = this(null)  
-}
-
-class MessageSizeTooLargeException(message: String) extends RuntimeException(message) {
-  def this() = this(null)
-}

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidTopicException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidTopicException.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidTopicException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/InvalidTopicException.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class InvalidTopicException(message: String) extends RuntimeException(message) {
+  def this() = this(null) 
+}
\ No newline at end of file

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/common/MessageSizeTooLargeException.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.common
+
+class MessageSizeTooLargeException(message: String) extends RuntimeException(message) {
+  def this() = this(null)
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerFetcherThread.scala Mon Oct  8 19:13:24 2012
@@ -28,16 +28,20 @@ class ConsumerFetcherThread(name: String
                             val config: ConsumerConfig,
                             sourceBroker: Broker,
                             val consumerFetcherManager: ConsumerFetcherManager)
-        extends AbstractFetcherThread(name = name, sourceBroker = sourceBroker, socketTimeout = config.socketTimeoutMs,
-          socketBufferSize = config.socketBufferSize, fetchSize = config.fetchSize,
-          fetcherBrokerId = Request.NonFollowerId, maxWait = config.maxFetchWaitMs,
-          minBytes = config.minFetchBytes) {
+        extends AbstractFetcherThread(name = name, 
+                                      sourceBroker = sourceBroker, 
+                                      socketTimeout = config.socketTimeoutMs,
+                                      socketBufferSize = config.socketBufferSize, 
+                                      fetchSize = config.fetchSize,
+                                      fetcherBrokerId = Request.NonFollowerId, 
+                                      maxWait = config.maxFetchWaitMs,
+                                      minBytes = config.minFetchBytes) {
 
   // process fetched data
   def processPartitionData(topic: String, fetchOffset: Long, partitionData: FetchResponsePartitionData) {
     val pti = consumerFetcherManager.getPartitionTopicInfo((topic, partitionData.partition))
     if (pti.getFetchOffset != fetchOffset)
-      throw new RuntimeException("offset doesn't match for topic %s partition: %d pti offset: %d fetch ofset: %d"
+      throw new RuntimeException("Offset doesn't match for topic %s partition: %d pti offset: %d fetch offset: %d"
                                 .format(topic, partitionData.partition, pti.getFetchOffset, fetchOffset))
     pti.enqueue(partitionData.messages.asInstanceOf[ByteBufferMessageSet])
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Mon Oct  8 19:13:24 2012
@@ -22,7 +22,7 @@ import java.util.concurrent.{TimeUnit, B
 import kafka.serializer.Decoder
 import java.util.concurrent.atomic.AtomicReference
 import kafka.message.{MessageAndOffset, MessageAndMetadata}
-import kafka.common.KafkaException
+import kafka.common.{KafkaException, MessageSizeTooLargeException}
 
 
 /**
@@ -82,9 +82,16 @@ class ConsumerIterator[T](private val ch
                        else currentDataChunk.messages.iterator
         current.set(localCurrent)
       }
+      // if we just updated the current chunk and it is empty that means the fetch size is too small!
+      if(currentDataChunk.messages.validBytes == 0)
+        throw new MessageSizeTooLargeException("Found a message larger than the maximum fetch size of this consumer on topic " +
+                                               "%s partition %d at fetch offset %d. Increase the fetch size, or decrease the maximum message size the broker will allow."
+                                               .format(currentDataChunk.topicInfo.topic, currentDataChunk.topicInfo.partitionId, currentDataChunk.fetchOffset))
     }
     val item = localCurrent.next()
-    consumedOffset = item.offset
+    consumedOffset = item.nextOffset
+    
+    item.message.ensureValid() // validate checksum of message to ensure it is valid
 
     new MessageAndMetadata(decoder.toEvent(item.message), currentTopicInfo.topic)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/FetchedDataChunk.scala Mon Oct  8 19:13:24 2012
@@ -19,6 +19,6 @@ package kafka.consumer
 
 import kafka.message.ByteBufferMessageSet
 
-private[consumer] class FetchedDataChunk(val messages: ByteBufferMessageSet,
-                                         val topicInfo: PartitionTopicInfo,
-                                         val fetchOffset: Long)
+case class FetchedDataChunk(messages: ByteBufferMessageSet,
+                            topicInfo: PartitionTopicInfo,
+                            fetchOffset: Long)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Mon Oct  8 19:13:24 2012
@@ -22,13 +22,13 @@ import java.util.concurrent.atomic._
 import kafka.message._
 import kafka.utils.Logging
 
-private[consumer] class PartitionTopicInfo(val topic: String,
-                                           val brokerId: Int,
-                                           val partitionId: Int,
-                                           private val chunkQueue: BlockingQueue[FetchedDataChunk],
-                                           private val consumedOffset: AtomicLong,
-                                           private val fetchedOffset: AtomicLong,
-                                           private val fetchSize: AtomicInteger) extends Logging {
+class PartitionTopicInfo(val topic: String,
+                         val brokerId: Int,
+                         val partitionId: Int,
+                         private val chunkQueue: BlockingQueue[FetchedDataChunk],
+                         private val consumedOffset: AtomicLong,
+                         private val fetchedOffset: AtomicLong,
+                         private val fetchSize: AtomicInteger) extends Logging {
 
   debug("initial consumer offset of " + this + " is " + consumedOffset.get)
   debug("initial fetch offset of " + this + " is " + fetchedOffset.get)
@@ -51,17 +51,28 @@ private[consumer] class PartitionTopicIn
    * Enqueue a message set for processing
    */
   def enqueue(messages: ByteBufferMessageSet) {
-    val size = messages.validBytes
+    val size = messages.sizeInBytes
     if(size > 0) {
-      // update fetched offset to the compressed data chunk size, not the decompressed message set size
-      trace("Updating fetch offset = " + fetchedOffset.get + " with size = " + size)
+      val next = nextOffset(messages)
+      trace("Updating fetch offset = " + fetchedOffset.get + " to " + next)
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
-      val newOffset = fetchedOffset.addAndGet(size)
-      debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
+      fetchedOffset.set(next)
+      debug("updated fetch offset of (%s) to %d".format(this, next))
       ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
       ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
     }
   }
+  
+  /**
+   * Get the next fetch offset after this message set
+   */
+  private def nextOffset(messages: ByteBufferMessageSet): Long = {
+    var nextOffset = -1L
+    val iter = messages.shallowIterator
+    while(iter.hasNext)
+      nextOffset = iter.next.nextOffset
+    nextOffset
+  }
 
   override def toString(): String = topic + ":" + partitionId.toString + ": fetched offset = " + fetchedOffset.get +
     ": consumed offset = " + consumedOffset.get

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Mon Oct  8 19:13:24 2012
@@ -28,10 +28,10 @@ import kafka.metrics.{KafkaTimer, KafkaM
  * A consumer of kafka messages
  */
 @threadsafe
-class SimpleConsumer( val host: String,
-                      val port: Int,
-                      val soTimeout: Int,
-                      val bufferSize: Int ) extends Logging {
+class SimpleConsumer(val host: String,
+                     val port: Int,
+                     val soTimeout: Int,
+                     val bufferSize: Int) extends Logging {
 
   private val lock = new Object()
   private val blockingChannel = new BlockingChannel(host, port, bufferSize, BlockingChannel.UseDefaultBufferSize, soTimeout)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Implicits.scala Mon Oct  8 19:13:24 2012
@@ -21,8 +21,9 @@ import kafka.utils.Logging
 private[javaapi] object Implicits extends Logging {
 
   implicit def scalaMessageSetToJavaMessageSet(messageSet: kafka.message.ByteBufferMessageSet):
-      kafka.javaapi.message.ByteBufferMessageSet = 
-    new kafka.javaapi.message.ByteBufferMessageSet(messageSet)
+     kafka.javaapi.message.ByteBufferMessageSet = {
+    new kafka.javaapi.message.ByteBufferMessageSet(messageSet.buffer)
+  }
 
   implicit def toJavaFetchResponse(response: kafka.api.FetchResponse): kafka.javaapi.FetchResponse =
     new kafka.javaapi.FetchResponse(response)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadata.scala Mon Oct  8 19:13:24 2012
@@ -22,11 +22,11 @@ import scala.collection.JavaConversions.
 private[javaapi] object MetadataListImplicits {
   implicit def toJavaTopicMetadataList(topicMetadataSeq: Seq[kafka.api.TopicMetadata]):
   java.util.List[kafka.javaapi.TopicMetadata] =
-    topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_))
+    asList(topicMetadataSeq.map(new kafka.javaapi.TopicMetadata(_)))
 
   implicit def toPartitionMetadataList(partitionMetadataSeq: Seq[kafka.api.PartitionMetadata]):
   java.util.List[kafka.javaapi.PartitionMetadata] =
-    partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_))
+    asList(partitionMetadataSeq.map(new kafka.javaapi.PartitionMetadata(_)))
 }
 
 class TopicMetadata(private val underlying: kafka.api.TopicMetadata) {
@@ -51,9 +51,9 @@ class PartitionMetadata(private val unde
     underlying.leader
   }
 
-  def replicas: java.util.List[Broker] = underlying.replicas
+  def replicas: java.util.List[Broker] = asList(underlying.replicas)
 
-  def isr: java.util.List[Broker] = underlying.isr
+  def isr: java.util.List[Broker] = asList(underlying.isr)
 
   def errorCode: Short = underlying.errorCode
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/TopicMetadataRequest.scala Mon Oct  8 19:13:24 2012
@@ -18,13 +18,13 @@ package kafka.javaapi
 
 import kafka.api._
 import java.nio.ByteBuffer
-import scala.collection.JavaConversions.asBuffer
+import scala.collection.JavaConversions
 
 class TopicMetadataRequest(val versionId: Short,
                            val clientId: String,
                            val topics: java.util.List[String]) extends RequestOrResponse(Some(kafka.api.RequestKeys.MetadataKey)) {
   val underlying: kafka.api.TopicMetadataRequest =
-    new kafka.api.TopicMetadataRequest(versionId, clientId, topics)
+    new kafka.api.TopicMetadataRequest(versionId, clientId, JavaConversions.asBuffer(topics))
 
   def this(topics: java.util.List[String]) =
     this(kafka.api.TopicMetadataRequest.CurrentVersion, kafka.api.TopicMetadataRequest.DefaultClientId, topics)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala Mon Oct  8 19:13:24 2012
@@ -16,9 +16,21 @@
 */
 package kafka.javaapi.message
 
+import java.util.concurrent.atomic.AtomicLong
+import scala.reflect.BeanProperty
+import java.nio.ByteBuffer
 import kafka.message._
 
-class ByteBufferMessageSet(private val underlying: kafka.message.ByteBufferMessageSet) extends MessageSet {
+class ByteBufferMessageSet(@BeanProperty val buffer: ByteBuffer) extends MessageSet {
+  private val underlying: kafka.message.ByteBufferMessageSet = new kafka.message.ByteBufferMessageSet(buffer)
+  
+  def this(compressionCodec: CompressionCodec, messages: java.util.List[Message]) {
+    this(new kafka.message.ByteBufferMessageSet(compressionCodec, new AtomicLong(0), scala.collection.JavaConversions.asBuffer(messages): _*).buffer)
+  }
+
+  def this(messages: java.util.List[Message]) {
+    this(NoCompressionCodec, messages)
+  }
 
   def validBytes: Long = underlying.validBytes
 
@@ -41,12 +53,11 @@ class ByteBufferMessageSet(private val u
 
   override def equals(other: Any): Boolean = {
     other match {
-      case that: ByteBufferMessageSet =>
-        underlying.equals(that.underlying)
+      case that: ByteBufferMessageSet => buffer.equals(that.buffer)
       case _ => false
     }
   }
 
-  override def hashCode: Int = underlying.hashCode
 
+  override def hashCode: Int = buffer.hashCode
 }

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/FileMessageSet.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * 
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.log
+
+import java.io._
+import java.nio._
+import java.nio.channels._
+import java.util.concurrent.atomic._
+
+import kafka.utils._
+import kafka.message._
+import kafka.common.KafkaException
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
+
+/**
+ * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
+ * will fail on an immutable message set. An optional limit and start position can be applied to the message set
+ * which will control the position in the file at which the set begins
+ */
+@nonthreadsafe
+class FileMessageSet private[kafka](val file: File,
+                                    private[log] val channel: FileChannel,
+                                    private[log] val start: Long, // the starting position in the file
+                                    private[log] val limit: Long, // the length (may be less than the file length)
+                                    val mutable: Boolean) extends MessageSet with Logging {
+  
+  private val setSize = new AtomicLong()
+
+  if(mutable) {
+    if(limit < Long.MaxValue || start > 0)
+      throw new KafkaException("Attempt to open a mutable message set with a view or offset, which is not allowed.")
+
+    setSize.set(channel.size())
+    channel.position(channel.size)
+  } else {
+    setSize.set(scala.math.min(channel.size(), limit) - start)
+  }
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(file: File, channel: FileChannel, mutable: Boolean) = 
+    this(file, channel, 0, Long.MaxValue, mutable)
+  
+  /**
+   * Create a file message set with no limit or offset
+   */
+  def this(file: File, mutable: Boolean) = 
+    this(file, Utils.openChannel(file, mutable), mutable)
+  
+  /**
+   * Return a message set which is a view into this set starting from the given position and with the given size limit.
+   */
+  def read(position: Long, size: Long): FileMessageSet = {
+    new FileMessageSet(file, channel, this.start + position, scala.math.min(this.start + position + size, sizeInBytes()),
+      false)
+  }
+  
+  /**
+   * Search forward for the file position of the last offset that is great than or equal to the target offset 
+   * and return its physical position. If no such offsets are found, return null.
+   */
+  private[log] def searchFor(targetOffset: Long, startingPosition: Int): OffsetPosition = {
+    var position = startingPosition
+    val buffer = ByteBuffer.allocate(12)
+    val size = setSize.get()
+    while(position + 12 < size) {
+      buffer.rewind()
+      channel.read(buffer, position)
+      if(buffer.hasRemaining)
+        throw new IllegalStateException("Failed to read complete buffer.")
+      buffer.rewind()
+      val offset = buffer.getLong()
+      if(offset >= targetOffset)
+        return OffsetPosition(offset, position)
+      val messageSize = buffer.getInt()
+      position += MessageSet.LogOverhead + messageSize
+    }
+    null
+  }
+  
+  /**
+   * Write some of this set to the given channel, return the amount written
+   */
+  def writeTo(destChannel: GatheringByteChannel, writePosition: Long, size: Long): Long = 
+    channel.transferTo(start + writePosition, scala.math.min(size, sizeInBytes), destChannel)
+  
+  /**
+   * Get an iterator over the messages in the set
+   */
+  override def iterator: Iterator[MessageAndOffset] = {
+    new IteratorTemplate[MessageAndOffset] {
+      var location = start
+      
+      override def makeNext(): MessageAndOffset = {
+        // read the size of the item
+        val sizeOffsetBuffer = ByteBuffer.allocate(12)
+        channel.read(sizeOffsetBuffer, location)
+        if(sizeOffsetBuffer.hasRemaining)
+          return allDone()
+        
+        sizeOffsetBuffer.rewind()
+        val offset = sizeOffsetBuffer.getLong()
+        val size = sizeOffsetBuffer.getInt()
+        if (size < Message.MinHeaderSize)
+          return allDone()
+        
+        // read the item itself
+        val buffer = ByteBuffer.allocate(size)
+        channel.read(buffer, location + 12)
+        if(buffer.hasRemaining)
+          return allDone()
+        buffer.rewind()
+        
+        // increment the location and return the item
+        location += size + 12
+        new MessageAndOffset(new Message(buffer), offset)
+      }
+    }
+  }
+  
+  /**
+   * The number of bytes taken up by this file set
+   */
+  def sizeInBytes(): Long = setSize.get()
+
+  def checkMutable(): Unit = {
+    if(!mutable)
+      throw new KafkaException("Attempt to invoke mutation on immutable message set.")
+  }
+  
+  /**
+   * Append this message to the message set
+   */
+  def append(messages: MessageSet): Unit = {
+    checkMutable()
+    var written = 0L
+    while(written < messages.sizeInBytes)
+      written += messages.writeTo(channel, 0, messages.sizeInBytes)
+    setSize.getAndAdd(written)
+  }
+ 
+  /**
+   * Commit all written data to the physical disk
+   */
+  def flush() = {
+    checkMutable()
+    LogFlushStats.logFlushTimer.time {
+      channel.force(true)
+    }
+  }
+  
+  /**
+   * Close this message set
+   */
+  def close() {
+    if(mutable)
+      flush()
+    channel.close()
+  }
+  
+  /**
+   * Delete this message set from the filesystem
+   */
+  def delete(): Boolean = {
+    Utils.swallow(channel.close())
+    file.delete()
+  }
+
+  /**
+   * Truncate this file message set to the given size. Note that this API does no checking that the 
+   * given size falls on a valid byte offset.
+   */
+  def truncateTo(targetSize: Long) = {
+    checkMutable()
+    if(targetSize > sizeInBytes())
+      throw new KafkaException("Attempt to truncate log segment to %d bytes failed since the current ".format(targetSize) +
+        " size of this log segment is only %d bytes".format(sizeInBytes()))
+    channel.truncate(targetSize)
+    channel.position(targetSize)
+    setSize.set(targetSize)
+  }
+  
+}
+
+object LogFlushStats extends KafkaMetricsGroup {
+  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Mon Oct  8 19:13:24 2012
@@ -20,50 +20,44 @@ package kafka.log
 import kafka.api.OffsetRequest
 import java.io.{IOException, File}
 import java.util.{Comparator, Collections, ArrayList}
-import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong, AtomicInteger}
+import java.util.concurrent.atomic._
 import kafka.utils._
+import scala.math._
 import java.text.NumberFormat
 import kafka.server.BrokerTopicStat
-import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
+import kafka.message._
+import kafka.common._
 import kafka.metrics.KafkaMetricsGroup
 import com.yammer.metrics.core.Gauge
-import kafka.common.{KafkaStorageException, KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
 
 object Log {
-  val FileSuffix = ".kafka"
+  val LogFileSuffix = ".log"
+  val IndexFileSuffix = ".index"
 
   /**
-   * Find a given range object in a list of ranges by a value in that range. Does a binary search over the ranges
-   * but instead of checking for equality looks within the range. Takes the array size as an option in case
-   * the array grows while searching happens
-   *
-   * TODO: This should move into SegmentList.scala
+   * Search for the greatest range with start <= the target value.
    */
   def findRange[T <: Range](ranges: Array[T], value: Long, arraySize: Int): Option[T] = {
     if(ranges.size < 1)
       return None
 
     // check out of bounds
-    if(value < ranges(0).start || value > ranges(arraySize - 1).start + ranges(arraySize - 1).size)
-      throw new OffsetOutOfRangeException("offset " + value + " is out of range")
-
-    // check at the end
-    if (value == ranges(arraySize - 1).start + ranges(arraySize - 1).size)
+    if(value < ranges(0).start)
       return None
 
     var low = 0
     var high = arraySize - 1
-    while(low <= high) {
-      val mid = (high + low) / 2
+    while(low < high) {
+      val mid = ceil((high + low) / 2.0).toInt
       val found = ranges(mid)
-      if(found.contains(value))
+      if(found.start == value)
         return Some(found)
       else if (value < found.start)
         high = mid - 1
       else
-        low = mid + 1
+        low = mid
     }
-    None
+    Some(ranges(low))
   }
 
   def findRange[T <: Range](ranges: Array[T], value: Long): Option[T] =
@@ -73,13 +67,19 @@ object Log {
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically
    */
-  def nameFromOffset(offset: Long): String = {
+  def filenamePrefixFromOffset(offset: Long): String = {
     val nf = NumberFormat.getInstance()
     nf.setMinimumIntegerDigits(20)
     nf.setMaximumFractionDigits(0)
     nf.setGroupingUsed(false)
-    nf.format(offset) + FileSuffix
+    nf.format(offset)
   }
+  
+  def logFilename(dir: File, offset: Long) = 
+    new File(dir, filenamePrefixFromOffset(offset) + LogFileSuffix)
+  
+  def indexFilename(dir: File, offset: Long) = 
+    new File(dir, filenamePrefixFromOffset(offset) + IndexFileSuffix)
 
   def getEmptyOffsets(timestamp: Long): Seq[Long] =
     if (timestamp == OffsetRequest.LatestTime || timestamp == OffsetRequest.EarliestTime)
@@ -89,52 +89,29 @@ object Log {
 
 
 /**
- * A segment file in the log directory. Each log segment consists of an open message set, a start offset and a size
- */
-class LogSegment(val file: File, val messageSet: FileMessageSet, val start: Long, time: Time) extends Range {
-  var firstAppendTime: Option[Long] = None
-  @volatile var deleted = false
-  /* Return the size in bytes of this log segment */
-  def size: Long = messageSet.sizeInBytes()
-  /* Return the absolute end offset of this log segment */
-  def absoluteEndOffset: Long = start + messageSet.sizeInBytes()
-
-  def updateFirstAppendTime() {
-    if (firstAppendTime.isEmpty)
-      firstAppendTime = Some(time.milliseconds)
-  }
-
-  def append(messages: ByteBufferMessageSet) {
-    if (messages.sizeInBytes > 0) {
-      messageSet.append(messages)
-      updateFirstAppendTime()
-    }
-  }
-
-  override def toString() = "(file=" + file + ", start=" + start + ", size=" + size + ")"
-
-  /**
-   * Truncate this log segment upto absolute offset value. Since the offset specified is absolute, to compute the amount
-   * of data to be deleted, we have to compute the offset relative to start of the log segment
-   * @param offset Absolute offset for this partition
-   */
-  def truncateTo(offset: Long) = {
-    messageSet.truncateTo(offset - start)
-  }
-}
-
-
-/**
- * An append-only log for storing messages. 
+ * An append-only log for storing messages.
+ * 
+ * The log is a sequence of LogSegments, each with a base offset denoting the first message in the segment.
+ * 
+ * New log segments are created according to a configurable policy that controls the size in bytes or time interval
+ * for a given segment.
+ * 
  */
 @threadsafe
-private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
-                          val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
-                          brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
+private[kafka] class Log(val dir: File, 
+                         val maxLogFileSize: Long, 
+                         val maxMessageSize: Int, 
+                         val flushInterval: Int,
+                         val rollIntervalMs: Long, 
+                         val needsRecovery: Boolean, 
+                         val maxIndexSize: Int = (10*1024*1024),
+                         val indexIntervalBytes: Int = 4096,
+                         time: Time = SystemTime,
+                         brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
-
+  
   /* A lock that guards all modifications to the log */
   private val lock = new Object
 
@@ -144,22 +121,17 @@ private[kafka] class Log( val dir: File,
   /* last time it was flushed */
   private val lastflushedTime = new AtomicLong(System.currentTimeMillis)
 
-  /* The actual segments of the log */
+  /* the actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
+    
+  /* Calculate the offset of the next message */
+  private var nextOffset: AtomicLong = new AtomicLong(segments.view.last.nextOffset())
 
-  newGauge(
-    name + "-" + "NumLogSegments",
-    new Gauge[Int] {
-      def value() = numberOfSegments
-    }
-  )
-
-  newGauge(
-    name + "-" + "LogEndOffset",
-    new Gauge[Long] {
-      def value() = logEndOffset
-    }
-  )
+  newGauge(name + "-" + "NumLogSegments",
+           new Gauge[Int] { def value() = numberOfSegments })
+
+  newGauge(name + "-" + "LogEndOffset",
+           new Gauge[Long] { def value() = logEndOffset })
 
   /* The name of this log */
   def name  = dir.getName()
@@ -170,21 +142,29 @@ private[kafka] class Log( val dir: File,
     val logSegments = new ArrayList[LogSegment]
     val ls = dir.listFiles()
     if(ls != null) {
-      for(file <- ls if file.isFile && file.toString.endsWith(FileSuffix)) {
+      for(file <- ls if file.isFile && file.toString.endsWith(LogFileSuffix)) {
         if(!file.canRead)
           throw new IOException("Could not read file " + file)
         val filename = file.getName()
-        val start = filename.substring(0, filename.length - FileSuffix.length).toLong
-        val messageSet = new FileMessageSet(file, false)
-        logSegments.add(new LogSegment(file, messageSet, start, time))
+        val start = filename.substring(0, filename.length - LogFileSuffix.length).toLong
+        // TODO: we should ideally rebuild any missing index files, instead of erroring out
+        if(!Log.indexFilename(dir, start).exists)
+          throw new IllegalStateException("Found log file with no corresponding index file.")
+        logSegments.add(new LogSegment(dir = dir, 
+                                       startOffset = start, 
+                                       mutable = false, 
+                                       indexIntervalBytes = indexIntervalBytes, 
+                                       maxIndexSize = maxIndexSize))
       }
     }
 
     if(logSegments.size == 0) {
       // no existing segments, create a new mutable segment
-      val newFile = new File(dir, nameFromOffset(0))
-      val set = new FileMessageSet(newFile, true)
-      logSegments.add(new LogSegment(newFile, set, 0, time))
+      logSegments.add(new LogSegment(dir = dir, 
+                                     startOffset = 0, 
+                                     mutable = true, 
+                                     indexIntervalBytes = indexIntervalBytes, 
+                                     maxIndexSize = maxIndexSize))
     } else {
       // there is at least one existing segment, validate and recover them/it
       // sort segments into ascending order for fast searching
@@ -195,30 +175,48 @@ private[kafka] class Log( val dir: File,
           else 1
         }
       })
-      validateSegments(logSegments)
 
       //make the final section mutable and run recovery on it if necessary
       val last = logSegments.remove(logSegments.size - 1)
-      last.messageSet.close()
-      info("Loading the last segment " + last.file.getAbsolutePath() + " in mutable mode, recovery " + needRecovery)
-      val mutable = new LogSegment(last.file, new FileMessageSet(last.file, true, new AtomicBoolean(needRecovery)), last.start, time)
-      logSegments.add(mutable)
+      last.close()
+      val mutableSegment = new LogSegment(dir = dir, 
+                                          startOffset = last.start, 
+                                          mutable = true, 
+                                          indexIntervalBytes = indexIntervalBytes, 
+                                          maxIndexSize = maxIndexSize)
+      if(needsRecovery)
+        recoverSegment(mutableSegment)
+      logSegments.add(mutableSegment)
     }
     new SegmentList(logSegments.toArray(new Array[LogSegment](logSegments.size)))
   }
-
+  
   /**
-   * Check that the ranges and sizes add up, otherwise we have lost some data somewhere
+   * Run recovery on the given segment. This will rebuild the index from the log file and lop off any invalid bytes from the end of the log.
    */
-  private def validateSegments(segments: ArrayList[LogSegment]) {
-    lock synchronized {
-      for(i <- 0 until segments.size - 1) {
-        val curr = segments.get(i)
-        val next = segments.get(i+1)
-        if(curr.start + curr.size != next.start)
-          throw new KafkaException("The following segments don't validate: " + curr.file.getAbsolutePath() + ", " + next.file.getAbsolutePath())
+  private def recoverSegment(segment: LogSegment) {
+    segment.index.truncate()
+    var validBytes = 0
+    var lastIndexEntry = 0
+    val iter = segment.messageSet.iterator
+    try {
+      while(iter.hasNext) {
+        val entry = iter.next
+        entry.message.ensureValid()
+        if(validBytes - lastIndexEntry > indexIntervalBytes) {
+          segment.index.append(entry.offset, validBytes)
+          lastIndexEntry = validBytes
+        }
+        validBytes += MessageSet.entrySize(entry.message)
       }
-    }
+    } catch {
+      case e: InvalidMessageException => 
+        logger.warn("Found invalid messages in log " + name)
+    }
+    val truncated = segment.messageSet.sizeInBytes - validBytes
+    if(truncated > 0)
+      warn("Truncated " + truncated + " invalid bytes from the log " + name + ".")
+    segment.messageSet.truncateTo(validBytes)
   }
 
   /**
@@ -232,69 +230,110 @@ private[kafka] class Log( val dir: File,
   def close() {
     debug("Closing log " + name)
     lock synchronized {
-      for(seg <- segments.view) {
-        info("Closing log segment " + seg.file.getAbsolutePath)
-        seg.messageSet.close()
-      }
+      for(seg <- segments.view)
+        seg.close()
     }
   }
 
   /**
    * Append this message set to the active segment of the log, rolling over to a fresh segment if necessary.
-   * Returns the offset at which the messages are written.
+   * Returns a tuple containing (first_offset, last_offset) for the newly appended of the message set, 
+   * or (-1,-1) if the message set is empty
    */
-  def append(messages: ByteBufferMessageSet): Unit = {
-    // validate the messages
-    messages.verifyMessageSize(maxMessageSize)
-    var numberOfMessages = 0
-    for(messageAndOffset <- messages) {
-      if(!messageAndOffset.message.isValid)
-        throw new InvalidMessageException()
-      numberOfMessages += 1;
-    }
+  def append(messages: ByteBufferMessageSet): (Long, Long) = {
+    // check that all messages are valid and see if we have any compressed messages
+    var messageCount = 0
+    var codec: CompressionCodec = NoCompressionCodec
+    for(messageAndOffset <- messages.shallowIterator) {
+      val m = messageAndOffset.message
+      m.ensureValid()
+      if(MessageSet.entrySize(m) > maxMessageSize)
+        throw new MessageSizeTooLargeException("Message size is %d bytes which exceeds the maximum configured message size of %d.".format(MessageSet.entrySize(m), maxMessageSize))
+      messageCount += 1;
+      val messageCodec = m.compressionCodec
+      if(messageCodec != NoCompressionCodec)
+        codec = messageCodec
+    }
+
+    // if we have any valid messages, append them to the log
+    if(messageCount == 0) {
+      (-1L, -1L)
+    } else {
+      BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(messageCount)
+      BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(messageCount)
 
-    BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(numberOfMessages)
-    BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(numberOfMessages)
+      // trim any invalid bytes or partial messages before appending it to the on-disk log
+      var validMessages = trimInvalidBytes(messages)
 
-    // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
-    val validByteBuffer = messages.buffer.duplicate()
+      // they are valid, insert them in the log
+      lock synchronized {
+        try {
+          val firstOffset = nextOffset.get
+          
+          // maybe roll the log
+          val segment = maybeRoll(segments.view.last)
+          
+          // assign offsets to the messages
+          validMessages = validMessages.assignOffsets(nextOffset, codec)
+          
+          trace("Appending message set to " + this.name + ": " + validMessages)
+            
+          // now append to the log
+          segment.append(firstOffset, validMessages)
+          val lastOffset = nextOffset.get - 1
+          
+          // maybe flush the log and index
+          maybeFlush(messageCount)
+          
+          // return the offset at which the messages were appended
+          (firstOffset, lastOffset)
+        } catch {
+          case e: IOException => throw new KafkaStorageException("I/O exception in append to log '%s'".format(name), e)
+        }
+      }
+    }
+  }
+  
+  /**
+   * Trim any invalid bytes from the end of this message set (if there are any)
+   */
+  def trimInvalidBytes(messages: ByteBufferMessageSet): ByteBufferMessageSet = {
     val messageSetValidBytes = messages.validBytes
     if(messageSetValidBytes > Int.MaxValue || messageSetValidBytes < 0)
       throw new InvalidMessageSizeException("Illegal length of message set " + messageSetValidBytes + " Message set cannot be appended to log. Possible causes are corrupted produce requests")
-
-    validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
-    val validMessages = new ByteBufferMessageSet(validByteBuffer)
-
-    // they are valid, insert them in the log
-    lock synchronized {
-      try {
-        var segment = segments.view.last
-        maybeRoll(segment)
-        segment = segments.view.last
-        segment.append(validMessages)
-        maybeFlush(numberOfMessages)
-      }
-      catch {
-        case e: IOException =>
-          throw new KafkaStorageException("IO exception in log append", e)
-        case e2 => throw e2
-      }
+    if(messageSetValidBytes == messages.sizeInBytes) {
+      messages
+    } else {
+      // trim invalid bytes
+      val validByteBuffer = messages.buffer.duplicate()
+      validByteBuffer.limit(messageSetValidBytes.asInstanceOf[Int])
+      new ByteBufferMessageSet(validByteBuffer)
     }
   }
 
   /**
-   * Read from the log file at the given offset
+   * Read a message set from the log. 
+   * startOffset - The logical offset to begin reading at
+   * maxLength - The maximum number of bytes to read
+   * maxOffset - The maximum logical offset to include in the resulting message set
    */
-  def read(offset: Long, length: Int): MessageSet = {
-    trace("Reading %d bytes from offset %d in log %s of length %s bytes".format(length, offset, name, size))
+  def read(startOffset: Long, maxLength: Int, maxOffset: Option[Long] = None): MessageSet = {
+    trace("Reading %d bytes from offset %d in log %s of length %d bytes".format(maxLength, startOffset, name, size))
     val view = segments.view
-    Log.findRange(view, offset, view.length) match {
-      case Some(segment) =>
-        if(length <= 0)
-          MessageSet.Empty
-        else
-          segment.messageSet.read((offset - segment.start), length)
-      case _ => MessageSet.Empty
+        
+    // check if the offset is valid and in range
+    val first = view.head.start
+    val next = nextOffset.get
+    if(startOffset == next)
+      return MessageSet.Empty
+    else if(startOffset > next || startOffset < first)
+      throw new OffsetOutOfRangeException("Request for offset %d but we only have log segments in the range %d to %d.".format(startOffset, first, next))
+    
+    // Do the read on the segment with a base offset less than the target offset
+    // TODO: to handle sparse offsets, we need to skip to the next segment if this read doesn't find anything
+    Log.findRange(view, startOffset, view.length) match {
+      case None => throw new OffsetOutOfRangeException("Offset is earlier than the earliest offset")
+      case Some(segment) => segment.read(startOffset, maxLength, maxOffset)
     }
   }
 
@@ -315,7 +354,7 @@ private[kafka] class Log( val dir: File,
         else {
           // If the last segment to be deleted is empty and we roll the log, the new segment will have the same
           // file name. So simply reuse the last segment and reset the modified time.
-          view(numToDelete - 1).file.setLastModified(time.milliseconds)
+          view(numToDelete - 1).messageSet.file.setLastModified(time.milliseconds)
           numToDelete -=1
         }
       }
@@ -329,58 +368,75 @@ private[kafka] class Log( val dir: File,
   def size: Long = segments.view.foldLeft(0L)(_ + _.size)
 
   /**
-   *  Get the absolute offset of the last message in the log
+   *  Get the offset of the next message that will be appended
    */
-  def logEndOffset: Long = segments.view.last.start + segments.view.last.size
+  def logEndOffset: Long = nextOffset.get
 
   /**
    * Roll the log over if necessary
    */
-  private def maybeRoll(segment: LogSegment) {
+  private def maybeRoll(segment: LogSegment): LogSegment = {
     if ((segment.messageSet.sizeInBytes > maxLogFileSize) ||
-       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)))
+       ((segment.firstAppendTime.isDefined) && (time.milliseconds - segment.firstAppendTime.get > rollIntervalMs)) ||
+       segment.index.isFull)
       roll()
-  }
-
-  private def rollSegment(newOffset: Long) {
-    val newFile = new File(dir, nameFromOffset(newOffset))
-    if (newFile.exists) {
-      warn("newly rolled logsegment " + newFile.getName + " already exists; deleting it first")
-      newFile.delete()
-    }
-    debug("Rolling log '" + name + "' to " + newFile.getName())
-    segments.append(new LogSegment(newFile, new FileMessageSet(newFile, true), newOffset, time))
+    else
+      segment
   }
 
   /**
-   * Create a new segment and make it active
+   * Create a new segment and make it active, and return it
    */
-  def roll() {
+  def roll(): LogSegment = {
     lock synchronized {
-      flush
-      rollSegment(logEndOffset)
+      flush()
+      rollToOffset(logEndOffset)
     }
   }
+  
+  /**
+   * Roll the log over to the given new offset value
+   */
+  private def rollToOffset(newOffset: Long): LogSegment = {
+    val logFile = logFilename(dir, newOffset)
+    val indexFile = indexFilename(dir, newOffset)
+    for(file <- List(logFile, indexFile); if file.exists) {
+      warn("Newly rolled segment file " + file.getName + " already exists; deleting it first")
+      file.delete()
+    }
+    debug("Rolling log '" + name + "' to " + logFile.getName + " and " + indexFile.getName)
+    segments.view.lastOption match {
+      case Some(segment) => segment.index.makeReadOnly()
+      case None => 
+    }
+    val segment = new LogSegment(dir, 
+                                 startOffset = newOffset,
+                                 mutable = true, 
+                                 indexIntervalBytes = indexIntervalBytes, 
+                                 maxIndexSize = maxIndexSize)
+    segments.append(segment)
+    segment
+  }
 
   /**
    * Flush the log if necessary
    */
   private def maybeFlush(numberOfMessages : Int) {
-    if(unflushed.addAndGet(numberOfMessages) >= flushInterval) {
+    if(unflushed.addAndGet(numberOfMessages) >= flushInterval)
       flush()
-    }
   }
 
   /**
    * Flush this log file to the physical disk
    */
   def flush() : Unit = {
-    if (unflushed.get == 0) return
+    if (unflushed.get == 0)
+      return
 
     lock synchronized {
       debug("Flushing log '" + name + "' last flushed: " + getLastFlushedTime + " current time: " +
           time.milliseconds)
-      segments.view.last.messageSet.flush()
+      segments.view.last.flush()
       unflushed.set(0)
       lastflushedTime.set(time.milliseconds)
      }
@@ -389,15 +445,15 @@ private[kafka] class Log( val dir: File,
   def getOffsetsBefore(timestamp: Long, maxNumOffsets: Int): Seq[Long] = {
     val segsArray = segments.view
     var offsetTimeArray: Array[(Long, Long)] = null
-    if (segsArray.last.size > 0)
+    if(segsArray.last.size > 0)
       offsetTimeArray = new Array[(Long, Long)](segsArray.length + 1)
     else
       offsetTimeArray = new Array[(Long, Long)](segsArray.length)
 
-    for (i <- 0 until segsArray.length)
-      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).file.lastModified)
-    if (segsArray.last.size > 0)
-      offsetTimeArray(segsArray.length) = (segsArray.last.start + segsArray.last.messageSet.sizeInBytes(), time.milliseconds)
+    for(i <- 0 until segsArray.length)
+      offsetTimeArray(i) = (segsArray(i).start, segsArray(i).messageSet.file.lastModified)
+    if(segsArray.last.size > 0)
+      offsetTimeArray(segsArray.length) = (logEndOffset, time.milliseconds)
 
     var startIndex = -1
     timestamp match {
@@ -419,7 +475,7 @@ private[kafka] class Log( val dir: File,
 
     val retSize = maxNumOffsets.min(startIndex + 1)
     val ret = new Array[Long](retSize)
-    for (j <- 0 until retSize) {
+    for(j <- 0 until retSize) {
       ret(j) = offsetTimeArray(startIndex)._1
       startIndex -= 1
     }
@@ -427,63 +483,77 @@ private[kafka] class Log( val dir: File,
     ret.toSeq.sortBy(- _)
   }
 
-  /**
-   *  Truncate all segments in the log and start a new segment on a new offset
-   */
-  def truncateAndStartWithNewOffset(newOffset: Long) {
-    lock synchronized {
-      val deletedSegments = segments.trunc(segments.view.size)
-      rollSegment(newOffset)
-      deleteSegments(deletedSegments)
-    }
+  def delete(): Unit = {
+    deleteSegments(segments.contents.get())
+    Utils.rm(dir)
   }
 
+
   /* Attempts to delete all provided segments from a log and returns how many it was able to */
   def deleteSegments(segments: Seq[LogSegment]): Int = {
     var total = 0
     for(segment <- segments) {
-      info("Deleting log segment " + segment.file.getName() + " from " + name)
-      swallow(segment.messageSet.close())
-      if(!segment.file.delete()) {
-        warn("Delete failed.")
+      info("Deleting log segment " + segment.start + " from " + name)
+      if(!segment.messageSet.delete() || !segment.index.delete()) {
+        warn("Delete of log segment " + segment.start + " failed.")
       } else {
         total += 1
       }
     }
     total
   }
-
+  
   def truncateTo(targetOffset: Long) {
+    if(targetOffset < 0)
+      throw new IllegalArgumentException("Cannot truncate to a negative offset (%d).".format(targetOffset))
     lock synchronized {
       val segmentsToBeDeleted = segments.view.filter(segment => segment.start > targetOffset)
       val viewSize = segments.view.size
       val numSegmentsDeleted = deleteSegments(segmentsToBeDeleted)
-
       /* We should not hit this error because segments.view is locked in markedDeletedWhile() */
       if(numSegmentsDeleted != segmentsToBeDeleted.size)
-        error("Failed to delete some segments during log recovery during truncateTo(" + targetOffset +")")
-
+        error("Failed to delete some segments when attempting to truncate to offset " + targetOffset +")")
       if (numSegmentsDeleted == viewSize) {
         segments.trunc(segments.view.size)
-        rollSegment(targetOffset)
+        rollToOffset(targetOffset)
+        this.nextOffset.set(targetOffset)
       } else {
-        // find the log segment that has this hw
-        val segmentToBeTruncated =
-          segments.view.find(segment => targetOffset >= segment.start && targetOffset < segment.absoluteEndOffset)
-        segmentToBeTruncated match {
-          case Some(segment) =>
-            val truncatedSegmentIndex = segments.view.indexOf(segment)
-            segments.truncLast(truncatedSegmentIndex)
-            segment.truncateTo(targetOffset)
-            info("Truncated log segment %s to target offset %d".format(segment.file.getAbsolutePath, targetOffset))
-          case None =>
-            if(targetOffset > segments.view.last.absoluteEndOffset)
-              error("Target offset %d cannot be greater than the last message offset %d in the log %s".
-                format(targetOffset, segments.view.last.absoluteEndOffset, segments.view.last.file.getAbsolutePath))
+        if(targetOffset > logEndOffset) {
+          error("Target offset %d cannot be greater than the last message offset %d in the log %s".
+                format(targetOffset, logEndOffset, segments.view.last.messageSet.file.getAbsolutePath))
+        } else {
+          // find the log segment that has this hw
+          val segmentToBeTruncated = findRange(segments.view, targetOffset)
+          segmentToBeTruncated match {
+            case Some(segment) =>
+              val truncatedSegmentIndex = segments.view.indexOf(segment)
+              segments.truncLast(truncatedSegmentIndex)
+              segment.truncateTo(targetOffset)
+              this.nextOffset.set(targetOffset)
+              info("Truncated log segment %s to target offset %d".format(segments.view.last.messageSet.file.getAbsolutePath, targetOffset))
+            case None => // nothing to do
+          }
         }
       }
     }
   }
+    
+    /**
+   *  Truncate all segments in the log and start a new segment on a new offset
+   */
+  def truncateAndStartWithNewOffset(newOffset: Long) {
+    lock synchronized {
+      val deletedSegments = segments.trunc(segments.view.size)
+      debug("Truncate and start log '" + name + "' to " + newOffset)
+      segments.append(new LogSegment(dir, 
+                                     newOffset, 
+                                     mutable = true, 
+                                     indexIntervalBytes = indexIntervalBytes, 
+                                     maxIndexSize = maxIndexSize))
+      deleteSegments(deletedSegments)
+      this.nextOffset.set(newOffset)
+    }
+  }
 
   def topicName():String = {
     name.substring(0, name.lastIndexOf("-"))

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala?rev=1395729&r1=1395728&r2=1395729&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogManager.scala Mon Oct  8 19:13:24 2012
@@ -46,7 +46,7 @@ private[kafka] class LogManager(val conf
   private val logRetentionSizeMap = config.logRetentionSizeMap
   private val logRetentionMsMap = config.logRetentionHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L)) // convert hours to ms
   private val logRollMsMap = config.logRollHoursMap.map(e => (e._1, e._2 * 60 * 60 * 1000L))
-  this.logIdent = "[Log Manager on Broker " + config.brokerId + "], "
+  this.logIdent = "[Log Manager on Broker " + config.brokerId + "] "
 
   /* Initialize a log for each subdirectory of the main log directory */
   private val logs = new Pool[String, Pool[Int, Log]]()
@@ -69,7 +69,16 @@ private[kafka] class LogManager(val conf
         val topic = Utils.getTopicPartition(dir.getName)._1
         val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
         val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-        val log = new Log(dir, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needRecovery, time, config.brokerId)
+        val log = new Log(dir, 
+                          maxLogFileSize, 
+                          config.maxMessageSize, 
+                          flushInterval, 
+                          rollIntervalMs, 
+                          needRecovery, 
+                          config.logIndexMaxSizeBytes,
+                          config.logIndexIntervalBytes,
+                          time, 
+                          config.brokerId)
         val topicPartition = Utils.getTopicPartition(dir.getName)
         logs.putIfNotExists(topicPartition._1, new Pool[Int, Log]())
         val parts = logs.get(topicPartition._1)
@@ -88,7 +97,7 @@ private[kafka] class LogManager(val conf
       scheduler.scheduleWithRate(cleanupLogs, "kafka-logcleaner-", 60 * 1000, logCleanupIntervalMs, false)
       info("Starting log flusher every " + config.flushSchedulerThreadRate +
                    " ms with the following overrides " + logFlushIntervals)
-      scheduler.scheduleWithRate(flushAllLogs, "kafka-logflusher-",
+      scheduler.scheduleWithRate(flushDirtyLogs, "kafka-logflusher-",
                                  config.flushSchedulerThreadRate, config.flushSchedulerThreadRate, false)
     }
   }
@@ -103,7 +112,7 @@ private[kafka] class LogManager(val conf
       d.mkdirs()
       val rollIntervalMs = logRollMsMap.get(topic).getOrElse(this.logRollDefaultIntervalMs)
       val maxLogFileSize = logFileSizeMap.get(topic).getOrElse(config.logFileSize)
-      new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, false, time, config.brokerId)
+      new Log(d, maxLogFileSize, config.maxMessageSize, flushInterval, rollIntervalMs, needsRecovery = false, config.logIndexMaxSizeBytes, config.logIndexIntervalBytes, time, config.brokerId)
     }
   }
 
@@ -162,7 +171,7 @@ private[kafka] class LogManager(val conf
     val startMs = time.milliseconds
     val topic = Utils.getTopicPartition(log.name)._1
     val logCleanupThresholdMs = logRetentionMsMap.get(topic).getOrElse(this.logCleanupDefaultAgeMs)
-    val toBeDeleted = log.markDeletedWhile(startMs - _.file.lastModified > logCleanupThresholdMs)
+    val toBeDeleted = log.markDeletedWhile(startMs - _.messageSet.file.lastModified > logCleanupThresholdMs)
     val total = log.deleteSegments(toBeDeleted)
     total
   }
@@ -208,9 +217,9 @@ private[kafka] class LogManager(val conf
    * Close all the logs
    */
   def shutdown() {
-    info("shut down")
+    debug("Shutting down.")
     allLogs.foreach(_.close())
-    info("shutted down completedly")
+    debug("Shutdown complete.")
   }
 
   /**
@@ -218,21 +227,22 @@ private[kafka] class LogManager(val conf
    */
   def allLogs() = logs.values.flatMap(_.values)
 
-  private def flushAllLogs() = {
-    debug("Flushing the high watermark of all logs")
-    for (log <- allLogs)
-    {
-      try{
+  /**
+   * Flush any log which has exceeded its flush interval and has unwritten messages.
+   */
+  private def flushDirtyLogs() = {
+    debug("Checking for dirty logs to flush...")
+    for (log <- allLogs) {
+      try {
         val timeSinceLastFlush = System.currentTimeMillis - log.getLastFlushedTime
         var logFlushInterval = config.defaultFlushIntervalMs
         if(logFlushIntervals.contains(log.topicName))
           logFlushInterval = logFlushIntervals(log.topicName)
         debug(log.topicName + " flush interval  " + logFlushInterval +
-                      " last flushed " + log.getLastFlushedTime + " timesincelastFlush: " + timeSinceLastFlush)
+                      " last flushed " + log.getLastFlushedTime + " time since last flush: " + timeSinceLastFlush)
         if(timeSinceLastFlush >= logFlushInterval)
           log.flush
-      }
-      catch {
+      } catch {
         case e =>
           error("Error flushing topic " + log.topicName, e)
           e match {

Added: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala?rev=1395729&view=auto
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala (added)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogSegment.scala Mon Oct  8 19:13:24 2012
@@ -0,0 +1,151 @@
+package kafka.log
+
+import scala.math._
+import java.io.File
+import kafka.common._
+import kafka.message._
+import kafka.utils.{Utils, Range, Time, SystemTime, nonthreadsafe}
+
+/**
+ * A segment of the log. Each segment has two components: a log and an index. The log is a FileMessageSet containing
+ * the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each 
+ * segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in
+ * any previous segment.
+ * 
+ * A segment with a base offset of [base_offset] would be stored in two files, a [base_offset].index and a [base_offset].log file. 
+ */
+@nonthreadsafe
+class LogSegment(val messageSet: FileMessageSet, 
+                 val index: OffsetIndex, 
+                 val start: Long, 
+                 val indexIntervalBytes: Int,
+                 time: Time) extends Range {
+  
+  var firstAppendTime: Option[Long] = None
+  
+  /* the number of bytes since we last added an entry in the offset index */
+  var bytesSinceLastIndexEntry = 0
+  
+  @volatile var deleted = false
+  
+  def this(dir: File, startOffset: Long, mutable: Boolean, indexIntervalBytes: Int, maxIndexSize: Int) = 
+    this(new FileMessageSet(file = Log.logFilename(dir, startOffset), mutable = mutable), 
+         new OffsetIndex(file = Log.indexFilename(dir, startOffset), baseOffset = startOffset, mutable = mutable, maxIndexSize = maxIndexSize),
+         startOffset,
+         indexIntervalBytes,
+         SystemTime)
+    
+  /* Return the size in bytes of this log segment */
+  def size: Long = messageSet.sizeInBytes()
+
+  def updateFirstAppendTime() {
+    if (firstAppendTime.isEmpty)
+      firstAppendTime = Some(time.milliseconds)
+  }
+
+  /**
+   * Append the given messages starting with the given offset. Add
+   * an entry to the index if needed.
+   * 
+   * It is assumed this method is being called from within a lock
+   */
+  def append(offset: Long, messages: ByteBufferMessageSet) {
+    if (messages.sizeInBytes > 0) {
+      // append an entry to the index (if needed)
+      if(bytesSinceLastIndexEntry > indexIntervalBytes) {
+        index.append(offset, messageSet.sizeInBytes().toInt)
+        this.bytesSinceLastIndexEntry = 0
+      }
+      // append the messages
+      messageSet.append(messages)
+      updateFirstAppendTime()
+      this.bytesSinceLastIndexEntry += messages.sizeInBytes.toInt
+    }
+  }
+  
+  /**
+   * Find the physical file position for the least offset >= the given offset. If no offset is found
+   * that meets this criteria before the end of the log, return null.
+   */
+  def translateOffset(offset: Long): OffsetPosition = {
+    val mapping = index.lookup(offset)
+    messageSet.searchFor(offset, mapping.position)
+  }
+  
+  /**
+   * Read a message set from this segment beginning with the first offset
+   * greater than or equal to the startOffset. The message set will include
+   * no more than maxSize bytes and will end before maxOffset if a maxOffset is specified.
+   */
+  def read(startOffset: Long, maxSize: Int, maxOffset: Option[Long]): MessageSet = {
+    if(maxSize <= 0)
+      return MessageSet.Empty
+      
+    val startPosition = translateOffset(startOffset)
+    
+    // if the start position is already off the end of the log, return MessageSet.Empty
+    if(startPosition == null)
+      return MessageSet.Empty
+    
+    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
+    val length = 
+      maxOffset match {
+        case None =>
+          // no max offset, just use the max size they gave unmolested
+          maxSize
+        case Some(offset) => {
+          // there is a max offset, translate it to a file position and use that to calculate the max read size
+          val mapping = translateOffset(offset)
+          val endPosition = 
+            if(mapping == null)
+              messageSet.sizeInBytes().toInt // the max offset is off the end of the log, use the end of the file
+            else
+              mapping.position
+          min(endPosition - startPosition.position, maxSize) 
+        }
+      }
+    messageSet.read(startPosition.position, length)
+  }
+
+  override def toString() = "LogSegment(start=" + start + ", size=" + size + ")"
+
+  /**
+   * Truncate off all index and log entries with offsets greater than or equal to the current offset. 
+   */
+  def truncateTo(offset: Long) {
+    val mapping = translateOffset(offset)
+    if(mapping == null)
+      return
+    index.truncateTo(offset)  
+    messageSet.truncateTo(mapping.position)
+  }
+  
+  /**
+   * Calculate the offset that would be used for the next message to be append to this segment.
+   * Not that this is expensive.
+   */
+  def nextOffset(): Long = {
+    val ms = read(index.lastOffset, messageSet.sizeInBytes.toInt, None)
+    ms.lastOption match {
+      case None => start
+      case Some(last) => last.nextOffset
+    }
+  }
+  
+  /**
+   * Flush this log segment to disk
+   */
+  def flush() {
+    messageSet.flush()
+    index.flush()
+  }
+  
+  /**
+   * Close this log segment
+   */
+  def close() {
+    Utils.swallow(index.close)
+    Utils.swallow(messageSet.close)
+  }
+  
+}
\ No newline at end of file



Mime
View raw message