kafka-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Stein <crypt...@gmail.com>
Subject Re: svn commit: r1296577 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/javaapi/producer/ main/scala/kafka/message/ main/scala/kafka/producer/ main/scala
Date Tue, 06 Mar 2012 17:37:33 GMT
let me take care of the missing file right now, sorry about that

if now is a good time to rebase the 0.8 branch to the latest trunk then yes
I can help with that probably won't have time until Thursday at the soonest
I think

On Tue, Mar 6, 2012 at 12:33 PM, Neha Narkhede <neha.narkhede@gmail.com>wrote:

> Also, I would recommend rebasing the 0.8 branch to the latest trunk
> now. It has been quite a while since the last time I did a rebase.
>
> Joe, would you mind helping with that ?
>
> Thanks,
> Neha
>
> On Tue, Mar 6, 2012 at 9:15 AM, Jun Rao <junrao@gmail.com> wrote:
> > Joe,
> >
> > 0.8 now has compilation error after this check in. It seems that you
> forgot
> > to check in a new file. See my comment in kafka-240. Could you fix this?
> >
> > Thanks,
> >
> > Jun
> >
> > On Fri, Mar 2, 2012 at 9:46 PM, <joestein@apache.org> wrote:
> >
> >> Author: joestein
> >> Date: Sat Mar  3 05:46:43 2012
> >> New Revision: 1296577
> >>
> >> URL: http://svn.apache.org/viewvc?rev=1296577&view=rev
> >> Log:
> >> KAFKA-240 ProducerRequest wire format protocol update and related
> changes
> >>
> >> Removed:
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProducerRequest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala
> >> Modified:
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
> >>
>  incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
> >>
> >>
>  incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -39,6 +39,15 @@ object PartitionData {
> >>
> >>  case class PartitionData(partition: Int, error: Int =
> >> ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) {
> >>   val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue()
> >> +
> >> +  def this(partition: Int, messages: MessageSet) = this(partition,
> >> ErrorMapping.NoError, 0L, messages)
> >> +
> >> +  def getTranslatedPartition(topic: String, randomSelector: String =>
> >> Int): Int = {
> >> +    if (partition == ProducerRequest.RandomPartition)
> >> +      return randomSelector(topic)
> >> +    else
> >> +      return partition
> >> +  }
> >>  }
> >>
> >>  object TopicData {
> >> @@ -73,6 +82,15 @@ object TopicData {
> >>
> >>  case class TopicData(topic: String, partitionData:
> Array[PartitionData]) {
> >>   val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ +
> >> _.sizeInBytes)
> >> +
> >> +  override def equals(other: Any): Boolean = {
> >> +    other match {
> >> +      case that: TopicData =>
> >> +        ( topic == that.topic &&
> >> +          partitionData.toSeq == that.partitionData.toSeq )
> >> +      case _ => false
> >> +    }
> >> +  }
> >>  }
> >>
> >>  object FetchResponse {
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -24,60 +24,108 @@ import kafka.utils._
> >>
> >>  object ProducerRequest {
> >>   val RandomPartition = -1
> >> -
> >> +  val versionId: Short = 0
> >> +
> >>   def readFrom(buffer: ByteBuffer): ProducerRequest = {
> >> -    val topic = Utils.readShortString(buffer, "UTF-8")
> >> -    val partition = buffer.getInt
> >> -    val messageSetSize = buffer.getInt
> >> -    val messageSetBuffer = buffer.slice()
> >> -    messageSetBuffer.limit(messageSetSize)
> >> -    buffer.position(buffer.position + messageSetSize)
> >> -    new ProducerRequest(topic, partition, new
> >> ByteBufferMessageSet(messageSetBuffer))
> >> +    val versionId: Short = buffer.getShort
> >> +    val correlationId: Int = buffer.getInt
> >> +    val clientId: String = Utils.readShortString(buffer, "UTF-8")
> >> +    val requiredAcks: Short = buffer.getShort
> >> +    val ackTimeout: Int = buffer.getInt
> >> +    //build the topic structure
> >> +    val topicCount = buffer.getInt
> >> +    val data = new Array[TopicData](topicCount)
> >> +    for(i <- 0 until topicCount) {
> >> +      val topic = Utils.readShortString(buffer, "UTF-8")
> >> +
> >> +      val partitionCount = buffer.getInt
> >> +      //build the partition structure within this topic
> >> +      val partitionData = new Array[PartitionData](partitionCount)
> >> +      for (j <- 0 until partitionCount) {
> >> +        val partition = buffer.getInt
> >> +        val messageSetSize = buffer.getInt
> >> +        val messageSetBuffer = new Array[Byte](messageSetSize)
> >> +        buffer.get(messageSetBuffer,0,messageSetSize)
> >> +        partitionData(j) = new PartitionData(partition,new
> >> ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer)))
> >> +      }
> >> +      data(i) = new TopicData(topic,partitionData)
> >> +    }
> >> +    new ProducerRequest(versionId, correlationId, clientId,
> requiredAcks,
> >> ackTimeout, data)
> >>   }
> >>  }
> >>
> >> -class ProducerRequest(val topic: String,
> >> -                      val partition: Int,
> >> -                      val messages: ByteBufferMessageSet) extends
> >> Request(RequestKeys.Produce) {
> >> +case class ProducerRequest(val versionId: Short, val correlationId:
> Int,
> >> +                      val clientId: String,
> >> +                      val requiredAcks: Short,
> >> +                      val ackTimeout: Int,
> >> +                      val data: Array[TopicData]) extends
> >> Request(RequestKeys.Produce) {
> >> +
> >> +  def this(correlationId: Int, clientId: String, requiredAcks: Short,
> >> ackTimeout: Int, data: Array[TopicData]) =
> this(ProducerRequest.versionId,
> >> correlationId, clientId, requiredAcks, ackTimeout, data)
> >>
> >>   def writeTo(buffer: ByteBuffer) {
> >> -    Utils.writeShortString(buffer, topic)
> >> -    buffer.putInt(partition)
> >> -    buffer.putInt(messages.serialized.limit)
> >> -    buffer.put(messages.serialized)
> >> -    messages.serialized.rewind
> >> +    buffer.putShort(versionId)
> >> +    buffer.putInt(correlationId)
> >> +    Utils.writeShortString(buffer, clientId, "UTF-8")
> >> +    buffer.putShort(requiredAcks)
> >> +    buffer.putInt(ackTimeout)
> >> +    //save the topic structure
> >> +    buffer.putInt(data.size) //the number of topics
> >> +    data.foreach(d =>{
> >> +      Utils.writeShortString(buffer, d.topic, "UTF-8") //write the
> topic
> >> +      buffer.putInt(d.partitionData.size) //the number of partitions
> >> +      d.partitionData.foreach(p => {
> >> +        buffer.putInt(p.partition)
> >> +        buffer.putInt(p.messages.getSerialized().limit)
> >> +        buffer.put(p.messages.getSerialized())
> >> +        p.messages.getSerialized().rewind
> >> +      })
> >> +    })
> >>   }
> >> -
> >> -  def sizeInBytes(): Int = 2 + topic.length + 4 + 4 +
> >> messages.sizeInBytes.asInstanceOf[Int]
> >>
> >> -  def getTranslatedPartition(randomSelector: String => Int): Int = {
> >> -    if (partition == ProducerRequest.RandomPartition)
> >> -      return randomSelector(topic)
> >> -    else
> >> -      return partition
> >> +  def sizeInBytes(): Int = {
> >> +    var size = 0
> >> +    //size, request_type_id, version_id, correlation_id, client_id,
> >> required_acks, ack_timeout, data.size
> >> +    size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4;
> >> +    data.foreach(d =>{
> >> +         size += 2 + d.topic.length + 4
> >> +         d.partitionData.foreach(p => {
> >> +           size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int]
> >> +         })
> >> +    })
> >> +    size
> >>   }
> >>
> >>   override def toString: String = {
> >>     val builder = new StringBuilder()
> >>     builder.append("ProducerRequest(")
> >> -    builder.append(topic + ",")
> >> -    builder.append(partition + ",")
> >> -    builder.append(messages.sizeInBytes)
> >> +    builder.append(versionId + ",")
> >> +    builder.append(correlationId + ",")
> >> +    builder.append(clientId + ",")
> >> +    builder.append(requiredAcks + ",")
> >> +    builder.append(ackTimeout)
> >> +       data.foreach(d =>{
> >> +      builder.append(":[" + d.topic)
> >> +      d.partitionData.foreach(p => {
> >> +        builder.append(":[")
> >> +        builder.append(p.partition + ",")
> >> +        builder.append(p.messages.sizeInBytes)
> >> +        builder.append("]")
> >> +      })
> >> +      builder.append("]")
> >> +    })
> >>     builder.append(")")
> >>     builder.toString
> >>   }
> >>
> >>   override def equals(other: Any): Boolean = {
> >> -    other match {
> >> +   other match {
> >>       case that: ProducerRequest =>
> >> -        (that canEqual this) && topic == that.topic && partition ==
> >> that.partition &&
> >> -                messages.equals(that.messages)
> >> +        ( correlationId == that.correlationId &&
> >> +          clientId == that.clientId &&
> >> +          requiredAcks == that.requiredAcks &&
> >> +          ackTimeout == that.ackTimeout &&
> >> +          data.toSeq == that.data.toSeq)
> >>       case _ => false
> >>     }
> >>   }
> >> -
> >> -  def canEqual(other: Any): Boolean =
> other.isInstanceOf[ProducerRequest]
> >> -
> >> -  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode +
> >> messages.hashCode
> >> -
> >> -}
> >> +}
> >> \ No newline at end of file
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -22,7 +22,7 @@ import kafka.api.TopicData
> >>
> >>  class FetchResponse( val versionId: Short,
> >>                      val correlationId: Int,
> >> -                     val data: Array[TopicData] ) {
> >> +                     private val data: Array[TopicData] ) {
> >>
> >>   private val underlying = new kafka.api.FetchResponse(versionId,
> >> correlationId, data)
> >>
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -17,36 +17,29 @@
> >>  package kafka.javaapi
> >>
> >>  import kafka.network.Request
> >> -import kafka.api.RequestKeys
> >> +import kafka.api.{RequestKeys, TopicData}
> >>  import java.nio.ByteBuffer
> >>
> >> -class ProducerRequest(val topic: String,
> >> -                      val partition: Int,
> >> -                      val messages:
> >> kafka.javaapi.message.ByteBufferMessageSet) extends
> >> Request(RequestKeys.Produce) {
> >> +class ProducerRequest(val correlationId: Int,
> >> +                      val clientId: String,
> >> +                      val requiredAcks: Short,
> >> +                      val ackTimeout: Int,
> >> +                      val data: Array[TopicData]) extends
> >> Request(RequestKeys.Produce) {
> >> +
> >>   import Implicits._
> >> -  private val underlying = new kafka.api.ProducerRequest(topic,
> >> partition, messages)
> >> +  val underlying = new kafka.api.ProducerRequest(correlationId,
> clientId,
> >> requiredAcks, ackTimeout, data)
> >>
> >>   def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) }
> >>
> >>   def sizeInBytes(): Int = underlying.sizeInBytes
> >>
> >> -  def getTranslatedPartition(randomSelector: String => Int): Int =
> >> -    underlying.getTranslatedPartition(randomSelector)
> >> -
> >>   override def toString: String =
> >>     underlying.toString
> >>
> >> -  override def equals(other: Any): Boolean = {
> >> -    other match {
> >> -      case that: ProducerRequest =>
> >> -        (that canEqual this) && topic == that.topic && partition ==
> >> that.partition &&
> >> -                messages.equals(that.messages)
> >> -      case _ => false
> >> -    }
> >> -  }
> >> +  override def equals(other: Any): Boolean = underlying.equals(other)
> >>
> >>   def canEqual(other: Any): Boolean =
> other.isInstanceOf[ProducerRequest]
> >>
> >> -  override def hashCode: Int = 31 + (17 * partition) + topic.hashCode +
> >> messages.hashCode
> >> +  override def hashCode: Int = underlying.hashCode
> >>
> >> -}
> >> +}
> >> \ No newline at end of file
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b
> >>
> >>   def validBytes: Long = underlying.validBytes
> >>
> >> -  def serialized():ByteBuffer = underlying.serialized
> >> +  def serialized():ByteBuffer = underlying.getSerialized()
> >>
> >>   def getInitialOffset = initialOffset
> >>
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -18,6 +18,8 @@ package kafka.javaapi.producer
> >>
> >>  import kafka.producer.SyncProducerConfig
> >>  import kafka.javaapi.message.ByteBufferMessageSet
> >> +import kafka.javaapi.ProducerRequest
> >> +import kafka.api.{PartitionData, TopicData}
> >>
> >>  class SyncProducer(syncProducer: kafka.producer.SyncProducer) {
> >>
> >> @@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p
> >>
> >>   val underlying = syncProducer
> >>
> >> -  def send(topic: String, partition: Int, messages:
> ByteBufferMessageSet)
> >> {
> >> -    import kafka.javaapi.Implicits._
> >> -    underlying.send(topic, partition, messages)
> >> +  def send(producerRequest: kafka.javaapi.ProducerRequest) {
> >> +    underlying.send(producerRequest.underlying)
> >>   }
> >>
> >> -  def send(topic: String, messages: ByteBufferMessageSet): Unit =
> >> send(topic,
> >> -
> >> kafka.api.ProducerRequest.RandomPartition,
> >> -
> >> messages)
> >> -
> >> -  def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) {
> >> -    import kafka.javaapi.Implicits._
> >> -    val produceRequests = new
> >> Array[kafka.api.ProducerRequest](produces.length)
> >> -    for(i <- 0 until produces.length)
> >> -      produceRequests(i) = new
> >> kafka.api.ProducerRequest(produces(i).topic, produces(i).partition,
> >> produces(i).messages)
> >> -    underlying.multiSend(produceRequests)
> >> +  def send(topic: String, messages: ByteBufferMessageSet): Unit = {
> >> +    var data = new Array[TopicData](1)
> >> +    var partition_data = new Array[PartitionData](1)
> >> +    partition_data(0) = new PartitionData(-1,messages.underlying)
> >> +    data(0) = new TopicData(topic,partition_data)
> >> +    val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0,
> >> data)
> >> +    underlying.send(producerRequest)
> >>   }
> >>
> >>   def close() {
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b
> >>
> >>   def getErrorCode = errorCode
> >>
> >> -  def serialized(): ByteBuffer = buffer
> >> +  def getSerialized(): ByteBuffer = buffer
> >>
> >>   def validBytes: Long = shallowValidBytes
> >>
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv
> >>   private val setSize = new AtomicLong()
> >>   private val setHighWaterMark = new AtomicLong()
> >>
> >> +  def getSerialized(): ByteBuffer = throw new
> >> java.lang.UnsupportedOperationException()
> >> +
> >>   if(mutable) {
> >>     if(limit < Long.MaxValue || offset > 0)
> >>       throw new IllegalArgumentException("Attempt to open a mutable
> >> message set with a view or offset, which is not allowed.")
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab
> >>         throw new InvalidMessageException
> >>   }
> >>
> >> +  /**
> >> +   * Used to allow children to have serialization on implementation
> >> +   */
> >> +  def getSerialized(): ByteBuffer
> >> +
> >>  }
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu
> >>     if (logger.isTraceEnabled) {
> >>       trace("verifying sendbuffer of size " + buffer.limit)
> >>       val requestTypeId = buffer.getShort()
> >> -      if (requestTypeId == RequestKeys.MultiProduce) {
> >> -        try {
> >> -          val request = MultiProducerRequest.readFrom(buffer)
> >> -          for (produce <- request.produces) {
> >> -            try {
> >> -              for (messageAndOffset <- produce.messages)
> >> -                if (!messageAndOffset.message.isValid)
> >> -                  trace("topic " + produce.topic + " is invalid")
> >> -            }
> >> -            catch {
> >> -              case e: Throwable =>
> >> -                trace("error iterating messages ", e)
> >> -            }
> >> -          }
> >> -        }
> >> -        catch {
> >> -          case e: Throwable =>
> >> -            trace("error verifying sendbuffer ", e)
> >> -        }
> >> -      }
> >> +      val request = ProducerRequest.readFrom(buffer)
> >> +      trace(request.toString)
> >>     }
> >>   }
> >> -
> >>   /**
> >>    * Common functionality for the public send methods
> >>    */
> >> @@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu
> >>   /**
> >>    * Send a message
> >>    */
> >> -  def send(topic: String, partition: Int, messages:
> ByteBufferMessageSet)
> >> {
> >> -    verifyMessageSize(messages)
> >> -    val setSize = messages.sizeInBytes.asInstanceOf[Int]
> >> -    trace("Got message set with " + setSize + " bytes to send")
> >> -    send(new BoundedByteBufferSend(new ProducerRequest(topic,
> partition,
> >> messages)))
> >> -  }
> >> -
> >> -  def send(topic: String, messages: ByteBufferMessageSet): Unit =
> >> send(topic, ProducerRequest.RandomPartition, messages)
> >> -
> >> -  def multiSend(produces: Array[ProducerRequest]) {
> >> -    for (request <- produces)
> >> -      verifyMessageSize(request.messages)
> >> -    val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes)
> >> -    trace("Got multi message sets with " + setSize + " bytes to send")
> >> -    send(new BoundedByteBufferSend(new MultiProducerRequest(produces)))
> >> +  def send(producerRequest: ProducerRequest) {
> >> +    producerRequest.data.foreach(d => {
> >> +      d.partitionData.foreach(p => {
> >> +           verifyMessageSize(new
> >> ByteBufferMessageSet(p.messages.getSerialized()))
> >> +        val setSize = p.messages.sizeInBytes.asInstanceOf[Int]
> >> +        trace("Got message set with " + setSize + " bytes to send")
> >> +      })
> >> +    })
> >> +    send(new BoundedByteBufferSend(producerRequest))
> >>   }
> >>
> >>   def send(request: TopicMetadataRequest): Seq[TopicMetadata] = {
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -41,4 +41,23 @@ trait SyncProducerConfigShared {
> >>   val reconnectInterval = Utils.getInt(props, "reconnect.interval",
> 30000)
> >>
> >>   val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000)
> >> +
> >> +  /* the client application sending the producer requests */
> >> +  val correlationId =
> >> Utils.getInt(props,"producer.request.correlation_id",-1)
> >> +
> >> +  /* the client application sending the producer requests */
> >> +  val clientId = Utils.getString(props,"producer.request.client_id","")
> >> +
> >> +  /* the required_acks of the producer requests */
> >> +  val requiredAcks =
> >> Utils.getShort(props,"producer.request.required_acks",0)
> >> +
> >> +  /* the ack_timeout of the producer requests */
> >> +  val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1)
> >>  }
> >> +
> >> +object SyncProducerConfig {
> >> +  val DefaultCorrelationId = -1
> >> +  val DefaultClientId = ""
> >> +  val DefaultRequiredAcks : Short = 0
> >> +  val DefaultAckTimeoutMs = 1
> >> +}
> >> \ No newline at end of file
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -17,7 +17,7 @@
> >>
> >>  package kafka.producer.async
> >>
> >> -import kafka.api.ProducerRequest
> >> +import kafka.api.{ProducerRequest, TopicData, PartitionData}
> >>  import kafka.serializer.Encoder
> >>  import kafka.producer._
> >>  import kafka.cluster.{Partition, Broker}
> >> @@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P
> >>
> >>   private def send(brokerId: Int, messagesPerTopic: Map[(String, Int),
> >> ByteBufferMessageSet]) {
> >>     if(messagesPerTopic.size > 0) {
> >> -      val requests = messagesPerTopic.map(f => new
> >> ProducerRequest(f._1._1, f._1._2, f._2)).toArray
> >> +      val topics = new HashMap[String, ListBuffer[PartitionData]]()
> >> +      val requests = messagesPerTopic.map(f => {
> >> +        val topicName = f._1._1
> >> +        val partitionId = f._1._2
> >> +        val messagesSet= f._2
> >> +        val topic = topics.get(topicName) // checking to see if this
> >> topics exists
> >> +        topic match {
> >> +          case None => topics += topicName -> new
> >> ListBuffer[PartitionData]() //create a new listbuffer for this topic
> >> +          case Some(x) => trace("found " + topicName)
> >> +        }
> >> +           topics(topicName).append(new PartitionData(partitionId,
> >> messagesSet))
> >> +      })
> >> +      val topicData = topics.map(kv => new
> TopicData(kv._1,kv._2.toArray))
> >> +      val producerRequest = new ProducerRequest(config.correlationId,
> >> config.clientId, config.requiredAcks, config.ackTimeout,
> topicData.toArray)
> >> //new kafka.javaapi.ProducerRequest(correlation_id, client_id,
> >> required_acks, ack_timeout, topic_data.toArray)
> >>       val syncProducer = producerPool.getProducer(brokerId)
> >> -      syncProducer.multiSend(requests)
> >> +      syncProducer.send(producerRequest)
> >>       trace("kafka producer sent messages for topics %s to broker %s:%d"
> >>         .format(messagesPerTopic, syncProducer.config.host,
> >> syncProducer.config.port))
> >>     }
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag
> >>     apiId match {
> >>       case RequestKeys.Produce => handleProducerRequest(receive)
> >>       case RequestKeys.Fetch => handleFetchRequest(receive)
> >> -      case RequestKeys.MultiProduce =>
> handleMultiProducerRequest(receive)
> >>       case RequestKeys.Offsets => handleOffsetRequest(receive)
> >>       case RequestKeys.TopicMetadata =>
> handleTopicMetadataRequest(receive)
> >>       case _ => throw new IllegalStateException("No mapping found for
> >> handler id " + apiId)
> >> @@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag
> >>     None
> >>   }
> >>
> >> -  def handleMultiProducerRequest(receive: Receive): Option[Send] = {
> >> -    val request = MultiProducerRequest.readFrom(receive.buffer)
> >> -    if(requestLogger.isTraceEnabled)
> >> -      requestLogger.trace("Multiproducer request " + request.toString)
> >> -    request.produces.map(handleProducerRequest(_,
> "MultiProducerRequest"))
> >> -    None
> >> -  }
> >> -
> >> -  private def handleProducerRequest(request: ProducerRequest,
> >> requestHandlerName: String) = {
> >> -    val partition =
> >> request.getTranslatedPartition(logManager.chooseRandomPartition)
> >> -    try {
> >> -      logManager.getOrCreateLog(request.topic,
> >> partition).append(request.messages)
> >> -      trace(request.messages.sizeInBytes + " bytes written to logs.")
> >> -    } catch {
> >> -      case e =>
> >> -        error("Error processing " + requestHandlerName + " on " +
> >> request.topic + ":" + partition, e)
> >> -        e match {
> >> -          case _: IOException =>
> >> -            fatal("Halting due to unrecoverable I/O error while
> handling
> >> producer request: " + e.getMessage, e)
> >> -            System.exit(1)
> >> -          case _ =>
> >> +  private def handleProducerRequest(request: ProducerRequest,
> >> requestHandlerName: String): Option[ProducerResponse] = {
> >> +       val requestSize = request.data.size
> >> +       val errors = new Array[Int](requestSize)
> >> +       val offsets = new Array[Long](requestSize)
> >> +
> >> +    request.data.foreach(d => {
> >> +         d.partitionData.foreach(p => {
> >> +        val partition = p.getTranslatedPartition(d.topic,
> >> logManager.chooseRandomPartition)
> >> +        try {
> >> +          logManager.getOrCreateLog(d.topic,
> partition).append(p.messages)
> >> +          trace(p.messages.sizeInBytes + " bytes written to logs.")
> >> +          p.messages.foreach(m => trace("wrote message %s to
> >> disk".format(m.message.checksum)))
> >>         }
> >> -        throw e
> >> -    }
> >> -    None
> >> +        catch {
> >> +          case e =>
> >> +            //TODO: handle response in ProducerResponse
> >> +            error("Error processing " + requestHandlerName + " on " +
> >> d.topic + ":" + partition, e)
> >> +            e match {
> >> +              case _: IOException =>
> >> +                fatal("Halting due to unrecoverable I/O error while
> >> handling producer request: " + e.getMessage, e)
> >> +                Runtime.getRuntime.halt(1)
> >> +              case _ =>
> >> +            }
> >> +          //throw e
> >> +        }
> >> +      })
> >> +    //None
> >> +    })
> >> +    if (request.requiredAcks == 0)
> >> +      None
> >> +    else
> >> +      None //TODO: send when KAFKA-49 can receive this Some(new
> >> ProducerResponse(request.versionId, request.correlationId, errors,
> offsets))
> >>   }
> >>
> >>   def handleFetchRequest(request: Receive): Option[Send] = {
> >>
> >> Modified:
> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
> >> (original)
> >> +++
> >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -195,6 +195,9 @@ object Utils extends Logging {
> >>   def getInt(props: Properties, name: String, default: Int): Int =
> >>     getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue))
> >>
> >> +  def getShort(props: Properties, name: String, default: Short): Short
> =
> >> +    getShortInRange(props, name, default, (Short.MinValue,
> >> Short.MaxValue))
> >> +
> >>   /**
> >>    * Read an integer from the properties instance. Throw an exception
> >>    * if the value is not in the given range (inclusive)
> >> @@ -217,6 +220,18 @@ object Utils extends Logging {
> >>       v
> >>   }
> >>
> >> + def getShortInRange(props: Properties, name: String, default: Short,
> >> range: (Short, Short)): Short = {
> >> +    val v =
> >> +      if(props.containsKey(name))
> >> +        props.getProperty(name).toShort
> >> +      else
> >> +        default
> >> +    if(v < range._1 || v > range._2)
> >> +      throw new IllegalArgumentException(name + " has value " + v + "
> >> which is not in the range " + range + ".")
> >> +    else
> >> +      v
> >> +  }
> >> +
> >>   def getIntInRange(buffer: ByteBuffer, name: String, range: (Int,
> Int)):
> >> Int = {
> >>     val value = buffer.getInt
> >>     if(value < range._1 || value > range._2)
> >> @@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD
> >>
> >>     def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
> >>   }
> >> -}
> >> +}
> >> \ No newline at end of file
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B
> >>     // create a ByteBufferMessageSet that doesn't contain a full message
> >>     // iterating it should get an InvalidMessageSizeException
> >>     val messages = new ByteBufferMessageSet(NoCompressionCodec, new
> >> Message("01234567890123456789".getBytes()))
> >> -    val buffer = messages.serialized.slice
> >> +    val buffer = messages.getSerialized().slice
> >>     buffer.limit(10)
> >>     val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer =
> >> buffer, initialOffset = 1000)
> >>     try {
> >> @@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B
> >>     {
> >>       val messages = new ByteBufferMessageSet(NoCompressionCodec, new
> >> Message("hello".getBytes()), new Message("there".getBytes()))
> >>       val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2)
> >> -      buffer.put(messages.serialized)
> >> +      buffer.put(messages.getSerialized())
> >>       buffer.putShort(4)
> >>       val messagesPlus = new ByteBufferMessageSet(buffer)
> >>       assertEquals("Adding invalid bytes shouldn't change byte count",
> >> messages.validBytes, messagesPlus.validBytes)
> >> @@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B
> >>       //make sure ByteBufferMessageSet is re-iterable.
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(messageSet.iterator))
> >>       //make sure the last offset after iteration is correct
> >> -      assertEquals("offset of last message not expected",
> >> messageSet.last.offset, messageSet.serialized.limit)
> >> +      assertEquals("offset of last message not expected",
> >> messageSet.last.offset, messageSet.getSerialized().limit)
> >>     }
> >>
> >>     // test for compressed regular messages
> >> @@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B
> >>       //make sure ByteBufferMessageSet is re-iterable.
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(messageSet.iterator))
> >>       //make sure the last offset after iteration is correct
> >> -      assertEquals("offset of last message not expected",
> >> messageSet.last.offset, messageSet.serialized.limit)
> >> +      assertEquals("offset of last message not expected",
> >> messageSet.last.offset, messageSet.getSerialized().limit)
> >>     }
> >>
> >>     // test for mixed empty and non-empty messagesets uncompressed
> >> @@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B
> >>       val emptyMessageList : List[Message] = Nil
> >>       val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec,
> >> emptyMessageList: _*)
> >>       val regularMessgeSet = new
> ByteBufferMessageSet(NoCompressionCodec,
> >> messageList: _*)
> >> -      val buffer =
> ByteBuffer.allocate(emptyMessageSet.serialized.limit +
> >> regularMessgeSet.serialized.limit)
> >> -      buffer.put(emptyMessageSet.serialized)
> >> -      buffer.put(regularMessgeSet.serialized)
> >> +      val buffer =
> >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit +
> >> regularMessgeSet.getSerialized().limit)
> >> +      buffer.put(emptyMessageSet.getSerialized())
> >> +      buffer.put(regularMessgeSet.getSerialized())
> >>       buffer.rewind
> >>       val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(mixedMessageSet.iterator))
> >>       //make sure ByteBufferMessageSet is re-iterable.
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(mixedMessageSet.iterator))
> >>       //make sure the last offset after iteration is correct
> >> -      assertEquals("offset of last message not expected",
> >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
> >> +      assertEquals("offset of last message not expected",
> >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
> >>     }
> >>
> >>     // test for mixed empty and non-empty messagesets compressed
> >> @@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B
> >>       val emptyMessageList : List[Message] = Nil
> >>       val emptyMessageSet = new
> >> ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*)
> >>       val regularMessgeSet = new
> >> ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*)
> >> -      val buffer =
> ByteBuffer.allocate(emptyMessageSet.serialized.limit +
> >> regularMessgeSet.serialized.limit)
> >> -      buffer.put(emptyMessageSet.serialized)
> >> -      buffer.put(regularMessgeSet.serialized)
> >> +      val buffer =
> >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit +
> >> regularMessgeSet.getSerialized().limit)
> >> +      buffer.put(emptyMessageSet.getSerialized())
> >> +      buffer.put(regularMessgeSet.getSerialized())
> >>       buffer.rewind
> >>       val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0)
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(mixedMessageSet.iterator))
> >>       //make sure ByteBufferMessageSet is re-iterable.
> >>       TestUtils.checkEquals[Message](messageList.iterator,
> >> TestUtils.getMessageIterator(mixedMessageSet.iterator))
> >>       //make sure the last offset after iteration is correct
> >> -      assertEquals("offset of last message not expected",
> >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit)
> >> +      assertEquals("offset of last message not expected",
> >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit)
> >>     }
> >>   }
> >>
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su
> >>     val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer])
> >>     mockSyncProducer.send(new TopicMetadataRequest(List(topic)))
> >>     EasyMock.expectLastCall().andReturn(List(topic1Metadata))
> >> -    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new
> >> ProducerRequest(topic, 0, messagesToSet(msgs.take(5))))))
> >> +    mockSyncProducer.send(TestUtils.produceRequest(topic, 0,
> >> +    messagesToSet(msgs.take(5))))
> >>     EasyMock.expectLastCall
> >> -    mockSyncProducer.multiSend(EasyMock.aryEq(Array(new
> >> ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5))))))
> >> -    EasyMock.expectLastCall
> >> -    EasyMock.replay(mockSyncProducer)
> >> +    mockSyncProducer.send(TestUtils.produceRequest(topic, 0,
> >> +    messagesToSet(msgs.takeRight(5))))
> >> +       EasyMock.replay(mockSyncProducer)
> >>
> >>     val producerPool = EasyMock.createMock(classOf[ProducerPool])
> >>     producerPool.getZkClient
> >> @@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su
> >>   }
> >>
> >>   class MockProducer(override val config: SyncProducerConfig) extends
> >> SyncProducer(config) {
> >> -    override def send(topic: String, messages: ByteBufferMessageSet):
> >> Unit = {
> >> -      Thread.sleep(1000)
> >> -    }
> >> -    override def multiSend(produces: Array[ProducerRequest]) {
> >> +    override def send(produceRequest: ProducerRequest): Unit = {
> >>       Thread.sleep(1000)
> >>     }
> >>   }
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui
> >>     var failed = false
> >>     val firstStart = SystemTime.milliseconds
> >>     try {
> >> -      producer.send("test", 0, new
> ByteBufferMessageSet(compressionCodec
> >> = NoCompressionCodec, messages = new Message(messageBytes)))
> >> +      producer.send(TestUtils.produceRequest("test", 0, new
> >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> >> Message(messageBytes))))
> >>     }catch {
> >>       case e: Exception => failed=true
> >>     }
> >> @@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui
> >>     Assert.assertTrue((firstEnd-firstStart) < 500)
> >>     val secondStart = SystemTime.milliseconds
> >>     try {
> >> -      producer.send("test", 0, new
> ByteBufferMessageSet(compressionCodec
> >> = NoCompressionCodec, messages = new Message(messageBytes)))
> >> +      producer.send(TestUtils.produceRequest("test", 0, new
> >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> >> Message(messageBytes))))
> >>     }catch {
> >>       case e: Exception => failed = true
> >>     }
> >> @@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui
> >>     Assert.assertTrue((secondEnd-secondStart) < 500)
> >>
> >>     try {
> >> -      producer.multiSend(Array(new ProducerRequest("test", 0, new
> >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> >> Message(messageBytes)))))
> >> +      producer.send(TestUtils.produceRequest("test", 0, new
> >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> >> Message(messageBytes))))
> >>     }catch {
> >>       case e: Exception => failed=true
> >>     }
> >> @@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui
> >>     val bytes = new Array[Byte](101)
> >>     var failed = false
> >>     try {
> >> -      producer.send("test", 0, new
> ByteBufferMessageSet(compressionCodec
> >> = NoCompressionCodec, messages = new Message(bytes)))
> >> +      producer.send(TestUtils.produceRequest("test", 0, new
> >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages =
> new
> >> Message(bytes))))
> >>     }catch {
> >>       case e: MessageSizeTooLargeException => failed = true
> >>     }
> >>
> >> Modified:
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
> >> URL:
> >>
> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff
> >>
> >>
> ==============================================================================
> >> ---
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
> >> (original)
> >> +++
> >>
> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
> >> Sat Mar  3 05:46:43 2012
> >> @@ -33,6 +33,7 @@ import collection.mutable.ListBuffer
> >>  import kafka.consumer.{KafkaMessageStream, ConsumerConfig}
> >>  import scala.collection.Map
> >>  import kafka.serializer.Encoder
> >> +import kafka.api.{ProducerRequest, TopicData, PartitionData}
> >>
> >>  /**
> >>  * Utility functions to help with testing
> >> @@ -336,7 +337,47 @@ object TestUtils {
> >>       buffer += ("msg" + i)
> >>     buffer
> >>   }
> >> +  /**
> >> +   * Create a wired format request based on simple basic information
> >> +   */
> >> +  def produceRequest(topic: String, message: ByteBufferMessageSet):
> >> kafka.api.ProducerRequest = {
> >> +
> >>
>  produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message)
> >> +  }
> >> +  def produceRequest(topic: String, partition: Int, message:
> >> ByteBufferMessageSet): kafka.api.ProducerRequest = {
> >> +
> >>
>  produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message)
> >> +  }
> >> +
> >> +  def produceRequest(correlationId: Int, topic: String, partition: Int,
> >> message: ByteBufferMessageSet): kafka.api.ProducerRequest = {
> >> +    val clientId = SyncProducerConfig.DefaultClientId
> >> +    val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks
> >> +    val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs
> >> +    var data = new Array[TopicData](1)
> >> +    var partitionData = new Array[PartitionData](1)
> >> +    partitionData(0) = new PartitionData(partition,message)
> >> +    data(0) = new TopicData(topic,partitionData)
> >> +    val pr = new kafka.api.ProducerRequest(correlationId, clientId,
> >> requiredAcks, ackTimeout, data)
> >> +    pr
> >> +  }
> >>
> >> +  def produceJavaRequest(topic: String, message:
> >> kafka.javaapi.message.ByteBufferMessageSet):
> kafka.javaapi.ProducerRequest
> >> = {
> >> +    produceJavaRequest(-1,topic,-1,message)
> >> +  }
> >> +
> >> +  def produceJavaRequest(topic: String, partition: Int, message:
> >> kafka.javaapi.message.ByteBufferMessageSet):
> kafka.javaapi.ProducerRequest
> >> = {
> >> +    produceJavaRequest(-1,topic,partition,message)
> >> +  }
> >> +
> >> +  def produceJavaRequest(correlationId: Int, topic: String, partition:
> >> Int, message: kafka.javaapi.message.ByteBufferMessageSet):
> >> kafka.javaapi.ProducerRequest = {
> >> +    val clientId = "test"
> >> +    val requiredAcks: Short = 0
> >> +    val ackTimeout = 0
> >> +    var data = new Array[TopicData](1)
> >> +    var partitionData = new Array[PartitionData](1)
> >> +    partitionData(0) = new PartitionData(partition,message.underlying)
> >> +    data(0) = new TopicData(topic,partitionData)
> >> +    val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId,
> >> requiredAcks, ackTimeout, data)
> >> +    pr
> >> +  }
> >>  }
> >>
> >>  object TestZKUtils {
> >>
> >>
> >>
>



-- 

/*
Joe Stein
http://www.linkedin.com/in/charmalloc
Twitter: @allthingshadoop <http://www.twitter.com/allthingshadoop>
*/

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message