Return-Path: X-Original-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 490049B1B for ; Tue, 6 Mar 2012 17:38:00 +0000 (UTC) Received: (qmail 37467 invoked by uid 500); 6 Mar 2012 17:38:00 -0000 Delivered-To: apmail-incubator-kafka-dev-archive@incubator.apache.org Received: (qmail 37442 invoked by uid 500); 6 Mar 2012 17:38:00 -0000 Mailing-List: contact kafka-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: kafka-dev@incubator.apache.org Delivered-To: mailing list kafka-dev@incubator.apache.org Received: (qmail 37433 invoked by uid 99); 6 Mar 2012 17:38:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2012 17:38:00 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of cryptcom@gmail.com designates 209.85.210.175 as permitted sender) Received: from [209.85.210.175] (HELO mail-iy0-f175.google.com) (209.85.210.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2012 17:37:53 +0000 Received: by iaag37 with SMTP id g37so7082102iaa.6 for ; Tue, 06 Mar 2012 09:37:33 -0800 (PST) Received-SPF: pass (google.com: domain of cryptcom@gmail.com designates 10.42.72.130 as permitted sender) client-ip=10.42.72.130; Authentication-Results: mr.google.com; spf=pass (google.com: domain of cryptcom@gmail.com designates 10.42.72.130 as permitted sender) smtp.mail=cryptcom@gmail.com; dkim=pass header.i=cryptcom@gmail.com Received: from mr.google.com ([10.42.72.130]) by 10.42.72.130 with SMTP id o2mr20162782icj.8.1331055453459 (num_hops = 1); Tue, 06 Mar 2012 09:37:33 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type; bh=P9YQWQdmPLtlHFmWM/bHgPsRmgbUMEn27pmQYHmFZLs=; b=J2algwNC9g/tV4FXDpirHU8Z0dzIIQWUtEeiclyqwNmdIWEnJmLldFZqc8JkIB9yuD aNNvDlZFU7TcYDNl94b1XtfrlsPCqpD8pFf9WhD//r4dp7B+SMn1W+X8choNX4zHmgIh F3IZYUPSdJP4Y0LLRLly4HRdspDC9VbwcrCZrK8XAzkAnK+3jwl36oZwrjfxfYCYvSwD ZNmjMnzTxSVzHHV820d672xEAXdqkyj/rKN7mvtltzMX3WRBj07KsyB8U6jg8Ufiu7Mm nqpfmH9iJc03iAmUB6VzYcF1nPXC5ndhsLS1BOhAySzhnjRgzAPBPB95J7A/BuUlKE3D WpsA== MIME-Version: 1.0 Received: by 10.42.72.130 with SMTP id o2mr16711835icj.8.1331055453141; Tue, 06 Mar 2012 09:37:33 -0800 (PST) Received: by 10.50.203.69 with HTTP; Tue, 6 Mar 2012 09:37:33 -0800 (PST) In-Reply-To: References: Date: Tue, 6 Mar 2012 12:37:33 -0500 Message-ID: Subject: Re: svn commit: r1296577 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/javaapi/producer/ main/scala/kafka/message/ main/scala/kafka/producer/ main/scala From: Joe Stein To: kafka-dev@incubator.apache.org Content-Type: multipart/alternative; boundary=90e6ba6e84f675faba04ba967f5c X-Virus-Checked: Checked by ClamAV on apache.org --90e6ba6e84f675faba04ba967f5c Content-Type: text/plain; charset=ISO-8859-1 let me take care of the missing file right now, sorry about that if now is a good time to rebase the 0.8 branch to the latest trunk then yes I can help with that probably won't have time until Thursday at the soonest I think On Tue, Mar 6, 2012 at 12:33 PM, Neha Narkhede wrote: > Also, I would recommend rebasing the 0.8 branch to the latest trunk > now. It has been quite a while since the last time I did a rebase. > > Joe, would you mind helping with that ? > > Thanks, > Neha > > On Tue, Mar 6, 2012 at 9:15 AM, Jun Rao wrote: > > Joe, > > > > 0.8 now has compilation error after this check in. It seems that you > forgot > > to check in a new file. See my comment in kafka-240. Could you fix this? > > > > Thanks, > > > > Jun > > > > On Fri, Mar 2, 2012 at 9:46 PM, wrote: > > > >> Author: joestein > >> Date: Sat Mar 3 05:46:43 2012 > >> New Revision: 1296577 > >> > >> URL: http://svn.apache.org/viewvc?rev=1296577&view=rev > >> Log: > >> KAFKA-240 ProducerRequest wire format protocol update and related > changes > >> > >> Removed: > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProducerRequest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMessageSetSend.scala > >> Modified: > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > >> > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > >> > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > >> > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -39,6 +39,15 @@ object PartitionData { > >> > >> case class PartitionData(partition: Int, error: Int = > >> ErrorMapping.NoError, initialOffset:Long = 0L, messages: MessageSet) { > >> val sizeInBytes = 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() > >> + > >> + def this(partition: Int, messages: MessageSet) = this(partition, > >> ErrorMapping.NoError, 0L, messages) > >> + > >> + def getTranslatedPartition(topic: String, randomSelector: String => > >> Int): Int = { > >> + if (partition == ProducerRequest.RandomPartition) > >> + return randomSelector(topic) > >> + else > >> + return partition > >> + } > >> } > >> > >> object TopicData { > >> @@ -73,6 +82,15 @@ object TopicData { > >> > >> case class TopicData(topic: String, partitionData: > Array[PartitionData]) { > >> val sizeInBytes = 2 + topic.length + partitionData.foldLeft(4)(_ + > >> _.sizeInBytes) > >> + > >> + override def equals(other: Any): Boolean = { > >> + other match { > >> + case that: TopicData => > >> + ( topic == that.topic && > >> + partitionData.toSeq == that.partitionData.toSeq ) > >> + case _ => false > >> + } > >> + } > >> } > >> > >> object FetchResponse { > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -24,60 +24,108 @@ import kafka.utils._ > >> > >> object ProducerRequest { > >> val RandomPartition = -1 > >> - > >> + val versionId: Short = 0 > >> + > >> def readFrom(buffer: ByteBuffer): ProducerRequest = { > >> - val topic = Utils.readShortString(buffer, "UTF-8") > >> - val partition = buffer.getInt > >> - val messageSetSize = buffer.getInt > >> - val messageSetBuffer = buffer.slice() > >> - messageSetBuffer.limit(messageSetSize) > >> - buffer.position(buffer.position + messageSetSize) > >> - new ProducerRequest(topic, partition, new > >> ByteBufferMessageSet(messageSetBuffer)) > >> + val versionId: Short = buffer.getShort > >> + val correlationId: Int = buffer.getInt > >> + val clientId: String = Utils.readShortString(buffer, "UTF-8") > >> + val requiredAcks: Short = buffer.getShort > >> + val ackTimeout: Int = buffer.getInt > >> + //build the topic structure > >> + val topicCount = buffer.getInt > >> + val data = new Array[TopicData](topicCount) > >> + for(i <- 0 until topicCount) { > >> + val topic = Utils.readShortString(buffer, "UTF-8") > >> + > >> + val partitionCount = buffer.getInt > >> + //build the partition structure within this topic > >> + val partitionData = new Array[PartitionData](partitionCount) > >> + for (j <- 0 until partitionCount) { > >> + val partition = buffer.getInt > >> + val messageSetSize = buffer.getInt > >> + val messageSetBuffer = new Array[Byte](messageSetSize) > >> + buffer.get(messageSetBuffer,0,messageSetSize) > >> + partitionData(j) = new PartitionData(partition,new > >> ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) > >> + } > >> + data(i) = new TopicData(topic,partitionData) > >> + } > >> + new ProducerRequest(versionId, correlationId, clientId, > requiredAcks, > >> ackTimeout, data) > >> } > >> } > >> > >> -class ProducerRequest(val topic: String, > >> - val partition: Int, > >> - val messages: ByteBufferMessageSet) extends > >> Request(RequestKeys.Produce) { > >> +case class ProducerRequest(val versionId: Short, val correlationId: > Int, > >> + val clientId: String, > >> + val requiredAcks: Short, > >> + val ackTimeout: Int, > >> + val data: Array[TopicData]) extends > >> Request(RequestKeys.Produce) { > >> + > >> + def this(correlationId: Int, clientId: String, requiredAcks: Short, > >> ackTimeout: Int, data: Array[TopicData]) = > this(ProducerRequest.versionId, > >> correlationId, clientId, requiredAcks, ackTimeout, data) > >> > >> def writeTo(buffer: ByteBuffer) { > >> - Utils.writeShortString(buffer, topic) > >> - buffer.putInt(partition) > >> - buffer.putInt(messages.serialized.limit) > >> - buffer.put(messages.serialized) > >> - messages.serialized.rewind > >> + buffer.putShort(versionId) > >> + buffer.putInt(correlationId) > >> + Utils.writeShortString(buffer, clientId, "UTF-8") > >> + buffer.putShort(requiredAcks) > >> + buffer.putInt(ackTimeout) > >> + //save the topic structure > >> + buffer.putInt(data.size) //the number of topics > >> + data.foreach(d =>{ > >> + Utils.writeShortString(buffer, d.topic, "UTF-8") //write the > topic > >> + buffer.putInt(d.partitionData.size) //the number of partitions > >> + d.partitionData.foreach(p => { > >> + buffer.putInt(p.partition) > >> + buffer.putInt(p.messages.getSerialized().limit) > >> + buffer.put(p.messages.getSerialized()) > >> + p.messages.getSerialized().rewind > >> + }) > >> + }) > >> } > >> - > >> - def sizeInBytes(): Int = 2 + topic.length + 4 + 4 + > >> messages.sizeInBytes.asInstanceOf[Int] > >> > >> - def getTranslatedPartition(randomSelector: String => Int): Int = { > >> - if (partition == ProducerRequest.RandomPartition) > >> - return randomSelector(topic) > >> - else > >> - return partition > >> + def sizeInBytes(): Int = { > >> + var size = 0 > >> + //size, request_type_id, version_id, correlation_id, client_id, > >> required_acks, ack_timeout, data.size > >> + size = 2 + 4 + 2 + clientId.length + 2 + 4 + 4; > >> + data.foreach(d =>{ > >> + size += 2 + d.topic.length + 4 > >> + d.partitionData.foreach(p => { > >> + size += 4 + 4 + p.messages.sizeInBytes.asInstanceOf[Int] > >> + }) > >> + }) > >> + size > >> } > >> > >> override def toString: String = { > >> val builder = new StringBuilder() > >> builder.append("ProducerRequest(") > >> - builder.append(topic + ",") > >> - builder.append(partition + ",") > >> - builder.append(messages.sizeInBytes) > >> + builder.append(versionId + ",") > >> + builder.append(correlationId + ",") > >> + builder.append(clientId + ",") > >> + builder.append(requiredAcks + ",") > >> + builder.append(ackTimeout) > >> + data.foreach(d =>{ > >> + builder.append(":[" + d.topic) > >> + d.partitionData.foreach(p => { > >> + builder.append(":[") > >> + builder.append(p.partition + ",") > >> + builder.append(p.messages.sizeInBytes) > >> + builder.append("]") > >> + }) > >> + builder.append("]") > >> + }) > >> builder.append(")") > >> builder.toString > >> } > >> > >> override def equals(other: Any): Boolean = { > >> - other match { > >> + other match { > >> case that: ProducerRequest => > >> - (that canEqual this) && topic == that.topic && partition == > >> that.partition && > >> - messages.equals(that.messages) > >> + ( correlationId == that.correlationId && > >> + clientId == that.clientId && > >> + requiredAcks == that.requiredAcks && > >> + ackTimeout == that.ackTimeout && > >> + data.toSeq == that.data.toSeq) > >> case _ => false > >> } > >> } > >> - > >> - def canEqual(other: Any): Boolean = > other.isInstanceOf[ProducerRequest] > >> - > >> - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + > >> messages.hashCode > >> - > >> -} > >> +} > >> \ No newline at end of file > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResponse.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -22,7 +22,7 @@ import kafka.api.TopicData > >> > >> class FetchResponse( val versionId: Short, > >> val correlationId: Int, > >> - val data: Array[TopicData] ) { > >> + private val data: Array[TopicData] ) { > >> > >> private val underlying = new kafka.api.FetchResponse(versionId, > >> correlationId, data) > >> > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -17,36 +17,29 @@ > >> package kafka.javaapi > >> > >> import kafka.network.Request > >> -import kafka.api.RequestKeys > >> +import kafka.api.{RequestKeys, TopicData} > >> import java.nio.ByteBuffer > >> > >> -class ProducerRequest(val topic: String, > >> - val partition: Int, > >> - val messages: > >> kafka.javaapi.message.ByteBufferMessageSet) extends > >> Request(RequestKeys.Produce) { > >> +class ProducerRequest(val correlationId: Int, > >> + val clientId: String, > >> + val requiredAcks: Short, > >> + val ackTimeout: Int, > >> + val data: Array[TopicData]) extends > >> Request(RequestKeys.Produce) { > >> + > >> import Implicits._ > >> - private val underlying = new kafka.api.ProducerRequest(topic, > >> partition, messages) > >> + val underlying = new kafka.api.ProducerRequest(correlationId, > clientId, > >> requiredAcks, ackTimeout, data) > >> > >> def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } > >> > >> def sizeInBytes(): Int = underlying.sizeInBytes > >> > >> - def getTranslatedPartition(randomSelector: String => Int): Int = > >> - underlying.getTranslatedPartition(randomSelector) > >> - > >> override def toString: String = > >> underlying.toString > >> > >> - override def equals(other: Any): Boolean = { > >> - other match { > >> - case that: ProducerRequest => > >> - (that canEqual this) && topic == that.topic && partition == > >> that.partition && > >> - messages.equals(that.messages) > >> - case _ => false > >> - } > >> - } > >> + override def equals(other: Any): Boolean = underlying.equals(other) > >> > >> def canEqual(other: Any): Boolean = > other.isInstanceOf[ProducerRequest] > >> > >> - override def hashCode: Int = 31 + (17 * partition) + topic.hashCode + > >> messages.hashCode > >> + override def hashCode: Int = underlying.hashCode > >> > >> -} > >> +} > >> \ No newline at end of file > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/ByteBufferMessageSet.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b > >> > >> def validBytes: Long = underlying.validBytes > >> > >> - def serialized():ByteBuffer = underlying.serialized > >> + def serialized():ByteBuffer = underlying.getSerialized() > >> > >> def getInitialOffset = initialOffset > >> > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/SyncProducer.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -18,6 +18,8 @@ package kafka.javaapi.producer > >> > >> import kafka.producer.SyncProducerConfig > >> import kafka.javaapi.message.ByteBufferMessageSet > >> +import kafka.javaapi.ProducerRequest > >> +import kafka.api.{PartitionData, TopicData} > >> > >> class SyncProducer(syncProducer: kafka.producer.SyncProducer) { > >> > >> @@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p > >> > >> val underlying = syncProducer > >> > >> - def send(topic: String, partition: Int, messages: > ByteBufferMessageSet) > >> { > >> - import kafka.javaapi.Implicits._ > >> - underlying.send(topic, partition, messages) > >> + def send(producerRequest: kafka.javaapi.ProducerRequest) { > >> + underlying.send(producerRequest.underlying) > >> } > >> > >> - def send(topic: String, messages: ByteBufferMessageSet): Unit = > >> send(topic, > >> - > >> kafka.api.ProducerRequest.RandomPartition, > >> - > >> messages) > >> - > >> - def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { > >> - import kafka.javaapi.Implicits._ > >> - val produceRequests = new > >> Array[kafka.api.ProducerRequest](produces.length) > >> - for(i <- 0 until produces.length) > >> - produceRequests(i) = new > >> kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, > >> produces(i).messages) > >> - underlying.multiSend(produceRequests) > >> + def send(topic: String, messages: ByteBufferMessageSet): Unit = { > >> + var data = new Array[TopicData](1) > >> + var partition_data = new Array[PartitionData](1) > >> + partition_data(0) = new PartitionData(-1,messages.underlying) > >> + data(0) = new TopicData(topic,partition_data) > >> + val producerRequest = new kafka.api.ProducerRequest(-1, "", 0, 0, > >> data) > >> + underlying.send(producerRequest) > >> } > >> > >> def close() { > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b > >> > >> def getErrorCode = errorCode > >> > >> - def serialized(): ByteBuffer = buffer > >> + def getSerialized(): ByteBuffer = buffer > >> > >> def validBytes: Long = shallowValidBytes > >> > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv > >> private val setSize = new AtomicLong() > >> private val setHighWaterMark = new AtomicLong() > >> > >> + def getSerialized(): ByteBuffer = throw new > >> java.lang.UnsupportedOperationException() > >> + > >> if(mutable) { > >> if(limit < Long.MaxValue || offset > 0) > >> throw new IllegalArgumentException("Attempt to open a mutable > >> message set with a view or offset, which is not allowed.") > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSet.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab > >> throw new InvalidMessageException > >> } > >> > >> + /** > >> + * Used to allow children to have serialization on implementation > >> + */ > >> + def getSerialized(): ByteBuffer > >> + > >> } > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu > >> if (logger.isTraceEnabled) { > >> trace("verifying sendbuffer of size " + buffer.limit) > >> val requestTypeId = buffer.getShort() > >> - if (requestTypeId == RequestKeys.MultiProduce) { > >> - try { > >> - val request = MultiProducerRequest.readFrom(buffer) > >> - for (produce <- request.produces) { > >> - try { > >> - for (messageAndOffset <- produce.messages) > >> - if (!messageAndOffset.message.isValid) > >> - trace("topic " + produce.topic + " is invalid") > >> - } > >> - catch { > >> - case e: Throwable => > >> - trace("error iterating messages ", e) > >> - } > >> - } > >> - } > >> - catch { > >> - case e: Throwable => > >> - trace("error verifying sendbuffer ", e) > >> - } > >> - } > >> + val request = ProducerRequest.readFrom(buffer) > >> + trace(request.toString) > >> } > >> } > >> - > >> /** > >> * Common functionality for the public send methods > >> */ > >> @@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu > >> /** > >> * Send a message > >> */ > >> - def send(topic: String, partition: Int, messages: > ByteBufferMessageSet) > >> { > >> - verifyMessageSize(messages) > >> - val setSize = messages.sizeInBytes.asInstanceOf[Int] > >> - trace("Got message set with " + setSize + " bytes to send") > >> - send(new BoundedByteBufferSend(new ProducerRequest(topic, > partition, > >> messages))) > >> - } > >> - > >> - def send(topic: String, messages: ByteBufferMessageSet): Unit = > >> send(topic, ProducerRequest.RandomPartition, messages) > >> - > >> - def multiSend(produces: Array[ProducerRequest]) { > >> - for (request <- produces) > >> - verifyMessageSize(request.messages) > >> - val setSize = produces.foldLeft(0L)(_ + _.messages.sizeInBytes) > >> - trace("Got multi message sets with " + setSize + " bytes to send") > >> - send(new BoundedByteBufferSend(new MultiProducerRequest(produces))) > >> + def send(producerRequest: ProducerRequest) { > >> + producerRequest.data.foreach(d => { > >> + d.partitionData.foreach(p => { > >> + verifyMessageSize(new > >> ByteBufferMessageSet(p.messages.getSerialized())) > >> + val setSize = p.messages.sizeInBytes.asInstanceOf[Int] > >> + trace("Got message set with " + setSize + " bytes to send") > >> + }) > >> + }) > >> + send(new BoundedByteBufferSend(producerRequest)) > >> } > >> > >> def send(request: TopicMetadataRequest): Seq[TopicMetadata] = { > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducerConfig.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -41,4 +41,23 @@ trait SyncProducerConfigShared { > >> val reconnectInterval = Utils.getInt(props, "reconnect.interval", > 30000) > >> > >> val maxMessageSize = Utils.getInt(props, "max.message.size", 1000000) > >> + > >> + /* the client application sending the producer requests */ > >> + val correlationId = > >> Utils.getInt(props,"producer.request.correlation_id",-1) > >> + > >> + /* the client application sending the producer requests */ > >> + val clientId = Utils.getString(props,"producer.request.client_id","") > >> + > >> + /* the required_acks of the producer requests */ > >> + val requiredAcks = > >> Utils.getShort(props,"producer.request.required_acks",0) > >> + > >> + /* the ack_timeout of the producer requests */ > >> + val ackTimeout = Utils.getInt(props,"producer.request.ack_timeout",1) > >> } > >> + > >> +object SyncProducerConfig { > >> + val DefaultCorrelationId = -1 > >> + val DefaultClientId = "" > >> + val DefaultRequiredAcks : Short = 0 > >> + val DefaultAckTimeoutMs = 1 > >> +} > >> \ No newline at end of file > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -17,7 +17,7 @@ > >> > >> package kafka.producer.async > >> > >> -import kafka.api.ProducerRequest > >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} > >> import kafka.serializer.Encoder > >> import kafka.producer._ > >> import kafka.cluster.{Partition, Broker} > >> @@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P > >> > >> private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), > >> ByteBufferMessageSet]) { > >> if(messagesPerTopic.size > 0) { > >> - val requests = messagesPerTopic.map(f => new > >> ProducerRequest(f._1._1, f._1._2, f._2)).toArray > >> + val topics = new HashMap[String, ListBuffer[PartitionData]]() > >> + val requests = messagesPerTopic.map(f => { > >> + val topicName = f._1._1 > >> + val partitionId = f._1._2 > >> + val messagesSet= f._2 > >> + val topic = topics.get(topicName) // checking to see if this > >> topics exists > >> + topic match { > >> + case None => topics += topicName -> new > >> ListBuffer[PartitionData]() //create a new listbuffer for this topic > >> + case Some(x) => trace("found " + topicName) > >> + } > >> + topics(topicName).append(new PartitionData(partitionId, > >> messagesSet)) > >> + }) > >> + val topicData = topics.map(kv => new > TopicData(kv._1,kv._2.toArray)) > >> + val producerRequest = new ProducerRequest(config.correlationId, > >> config.clientId, config.requiredAcks, config.ackTimeout, > topicData.toArray) > >> //new kafka.javaapi.ProducerRequest(correlation_id, client_id, > >> required_acks, ack_timeout, topic_data.toArray) > >> val syncProducer = producerPool.getProducer(brokerId) > >> - syncProducer.multiSend(requests) > >> + syncProducer.send(producerRequest) > >> trace("kafka producer sent messages for topics %s to broker %s:%d" > >> .format(messagesPerTopic, syncProducer.config.host, > >> syncProducer.config.port)) > >> } > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag > >> apiId match { > >> case RequestKeys.Produce => handleProducerRequest(receive) > >> case RequestKeys.Fetch => handleFetchRequest(receive) > >> - case RequestKeys.MultiProduce => > handleMultiProducerRequest(receive) > >> case RequestKeys.Offsets => handleOffsetRequest(receive) > >> case RequestKeys.TopicMetadata => > handleTopicMetadataRequest(receive) > >> case _ => throw new IllegalStateException("No mapping found for > >> handler id " + apiId) > >> @@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag > >> None > >> } > >> > >> - def handleMultiProducerRequest(receive: Receive): Option[Send] = { > >> - val request = MultiProducerRequest.readFrom(receive.buffer) > >> - if(requestLogger.isTraceEnabled) > >> - requestLogger.trace("Multiproducer request " + request.toString) > >> - request.produces.map(handleProducerRequest(_, > "MultiProducerRequest")) > >> - None > >> - } > >> - > >> - private def handleProducerRequest(request: ProducerRequest, > >> requestHandlerName: String) = { > >> - val partition = > >> request.getTranslatedPartition(logManager.chooseRandomPartition) > >> - try { > >> - logManager.getOrCreateLog(request.topic, > >> partition).append(request.messages) > >> - trace(request.messages.sizeInBytes + " bytes written to logs.") > >> - } catch { > >> - case e => > >> - error("Error processing " + requestHandlerName + " on " + > >> request.topic + ":" + partition, e) > >> - e match { > >> - case _: IOException => > >> - fatal("Halting due to unrecoverable I/O error while > handling > >> producer request: " + e.getMessage, e) > >> - System.exit(1) > >> - case _ => > >> + private def handleProducerRequest(request: ProducerRequest, > >> requestHandlerName: String): Option[ProducerResponse] = { > >> + val requestSize = request.data.size > >> + val errors = new Array[Int](requestSize) > >> + val offsets = new Array[Long](requestSize) > >> + > >> + request.data.foreach(d => { > >> + d.partitionData.foreach(p => { > >> + val partition = p.getTranslatedPartition(d.topic, > >> logManager.chooseRandomPartition) > >> + try { > >> + logManager.getOrCreateLog(d.topic, > partition).append(p.messages) > >> + trace(p.messages.sizeInBytes + " bytes written to logs.") > >> + p.messages.foreach(m => trace("wrote message %s to > >> disk".format(m.message.checksum))) > >> } > >> - throw e > >> - } > >> - None > >> + catch { > >> + case e => > >> + //TODO: handle response in ProducerResponse > >> + error("Error processing " + requestHandlerName + " on " + > >> d.topic + ":" + partition, e) > >> + e match { > >> + case _: IOException => > >> + fatal("Halting due to unrecoverable I/O error while > >> handling producer request: " + e.getMessage, e) > >> + Runtime.getRuntime.halt(1) > >> + case _ => > >> + } > >> + //throw e > >> + } > >> + }) > >> + //None > >> + }) > >> + if (request.requiredAcks == 0) > >> + None > >> + else > >> + None //TODO: send when KAFKA-49 can receive this Some(new > >> ProducerResponse(request.versionId, request.correlationId, errors, > offsets)) > >> } > >> > >> def handleFetchRequest(request: Receive): Option[Send] = { > >> > >> Modified: > >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > >> (original) > >> +++ > >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -195,6 +195,9 @@ object Utils extends Logging { > >> def getInt(props: Properties, name: String, default: Int): Int = > >> getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)) > >> > >> + def getShort(props: Properties, name: String, default: Short): Short > = > >> + getShortInRange(props, name, default, (Short.MinValue, > >> Short.MaxValue)) > >> + > >> /** > >> * Read an integer from the properties instance. Throw an exception > >> * if the value is not in the given range (inclusive) > >> @@ -217,6 +220,18 @@ object Utils extends Logging { > >> v > >> } > >> > >> + def getShortInRange(props: Properties, name: String, default: Short, > >> range: (Short, Short)): Short = { > >> + val v = > >> + if(props.containsKey(name)) > >> + props.getProperty(name).toShort > >> + else > >> + default > >> + if(v < range._1 || v > range._2) > >> + throw new IllegalArgumentException(name + " has value " + v + " > >> which is not in the range " + range + ".") > >> + else > >> + v > >> + } > >> + > >> def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, > Int)): > >> Int = { > >> val value = buffer.getInt > >> if(value < range._1 || value > range._2) > >> @@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD > >> > >> def durationMs: Double = (end.get - start) / (1000.0 * 1000.0) > >> } > >> -} > >> +} > >> \ No newline at end of file > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/ByteBufferMessageSetTest.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B > >> // create a ByteBufferMessageSet that doesn't contain a full message > >> // iterating it should get an InvalidMessageSizeException > >> val messages = new ByteBufferMessageSet(NoCompressionCodec, new > >> Message("01234567890123456789".getBytes())) > >> - val buffer = messages.serialized.slice > >> + val buffer = messages.getSerialized().slice > >> buffer.limit(10) > >> val messageSetWithNoFullMessage = new ByteBufferMessageSet(buffer = > >> buffer, initialOffset = 1000) > >> try { > >> @@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B > >> { > >> val messages = new ByteBufferMessageSet(NoCompressionCodec, new > >> Message("hello".getBytes()), new Message("there".getBytes())) > >> val buffer = ByteBuffer.allocate(messages.sizeInBytes.toInt + 2) > >> - buffer.put(messages.serialized) > >> + buffer.put(messages.getSerialized()) > >> buffer.putShort(4) > >> val messagesPlus = new ByteBufferMessageSet(buffer) > >> assertEquals("Adding invalid bytes shouldn't change byte count", > >> messages.validBytes, messagesPlus.validBytes) > >> @@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B > >> //make sure ByteBufferMessageSet is re-iterable. > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(messageSet.iterator)) > >> //make sure the last offset after iteration is correct > >> - assertEquals("offset of last message not expected", > >> messageSet.last.offset, messageSet.serialized.limit) > >> + assertEquals("offset of last message not expected", > >> messageSet.last.offset, messageSet.getSerialized().limit) > >> } > >> > >> // test for compressed regular messages > >> @@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B > >> //make sure ByteBufferMessageSet is re-iterable. > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(messageSet.iterator)) > >> //make sure the last offset after iteration is correct > >> - assertEquals("offset of last message not expected", > >> messageSet.last.offset, messageSet.serialized.limit) > >> + assertEquals("offset of last message not expected", > >> messageSet.last.offset, messageSet.getSerialized().limit) > >> } > >> > >> // test for mixed empty and non-empty messagesets uncompressed > >> @@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B > >> val emptyMessageList : List[Message] = Nil > >> val emptyMessageSet = new ByteBufferMessageSet(NoCompressionCodec, > >> emptyMessageList: _*) > >> val regularMessgeSet = new > ByteBufferMessageSet(NoCompressionCodec, > >> messageList: _*) > >> - val buffer = > ByteBuffer.allocate(emptyMessageSet.serialized.limit + > >> regularMessgeSet.serialized.limit) > >> - buffer.put(emptyMessageSet.serialized) > >> - buffer.put(regularMessgeSet.serialized) > >> + val buffer = > >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + > >> regularMessgeSet.getSerialized().limit) > >> + buffer.put(emptyMessageSet.getSerialized()) > >> + buffer.put(regularMessgeSet.getSerialized()) > >> buffer.rewind > >> val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) > >> //make sure ByteBufferMessageSet is re-iterable. > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) > >> //make sure the last offset after iteration is correct > >> - assertEquals("offset of last message not expected", > >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) > >> + assertEquals("offset of last message not expected", > >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) > >> } > >> > >> // test for mixed empty and non-empty messagesets compressed > >> @@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B > >> val emptyMessageList : List[Message] = Nil > >> val emptyMessageSet = new > >> ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) > >> val regularMessgeSet = new > >> ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) > >> - val buffer = > ByteBuffer.allocate(emptyMessageSet.serialized.limit + > >> regularMessgeSet.serialized.limit) > >> - buffer.put(emptyMessageSet.serialized) > >> - buffer.put(regularMessgeSet.serialized) > >> + val buffer = > >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + > >> regularMessgeSet.getSerialized().limit) > >> + buffer.put(emptyMessageSet.getSerialized()) > >> + buffer.put(regularMessgeSet.getSerialized()) > >> buffer.rewind > >> val mixedMessageSet = new ByteBufferMessageSet(buffer, 0, 0) > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) > >> //make sure ByteBufferMessageSet is re-iterable. > >> TestUtils.checkEquals[Message](messageList.iterator, > >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) > >> //make sure the last offset after iteration is correct > >> - assertEquals("offset of last message not expected", > >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) > >> + assertEquals("offset of last message not expected", > >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) > >> } > >> } > >> > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/AsyncProducerTest.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su > >> val mockSyncProducer = EasyMock.createMock(classOf[SyncProducer]) > >> mockSyncProducer.send(new TopicMetadataRequest(List(topic))) > >> EasyMock.expectLastCall().andReturn(List(topic1Metadata)) > >> - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new > >> ProducerRequest(topic, 0, messagesToSet(msgs.take(5)))))) > >> + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, > >> + messagesToSet(msgs.take(5)))) > >> EasyMock.expectLastCall > >> - mockSyncProducer.multiSend(EasyMock.aryEq(Array(new > >> ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5)))))) > >> - EasyMock.expectLastCall > >> - EasyMock.replay(mockSyncProducer) > >> + mockSyncProducer.send(TestUtils.produceRequest(topic, 0, > >> + messagesToSet(msgs.takeRight(5)))) > >> + EasyMock.replay(mockSyncProducer) > >> > >> val producerPool = EasyMock.createMock(classOf[ProducerPool]) > >> producerPool.getZkClient > >> @@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su > >> } > >> > >> class MockProducer(override val config: SyncProducerConfig) extends > >> SyncProducer(config) { > >> - override def send(topic: String, messages: ByteBufferMessageSet): > >> Unit = { > >> - Thread.sleep(1000) > >> - } > >> - override def multiSend(produces: Array[ProducerRequest]) { > >> + override def send(produceRequest: ProducerRequest): Unit = { > >> Thread.sleep(1000) > >> } > >> } > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/SyncProducerTest.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui > >> var failed = false > >> val firstStart = SystemTime.milliseconds > >> try { > >> - producer.send("test", 0, new > ByteBufferMessageSet(compressionCodec > >> = NoCompressionCodec, messages = new Message(messageBytes))) > >> + producer.send(TestUtils.produceRequest("test", 0, new > >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = > new > >> Message(messageBytes)))) > >> }catch { > >> case e: Exception => failed=true > >> } > >> @@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui > >> Assert.assertTrue((firstEnd-firstStart) < 500) > >> val secondStart = SystemTime.milliseconds > >> try { > >> - producer.send("test", 0, new > ByteBufferMessageSet(compressionCodec > >> = NoCompressionCodec, messages = new Message(messageBytes))) > >> + producer.send(TestUtils.produceRequest("test", 0, new > >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = > new > >> Message(messageBytes)))) > >> }catch { > >> case e: Exception => failed = true > >> } > >> @@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui > >> Assert.assertTrue((secondEnd-secondStart) < 500) > >> > >> try { > >> - producer.multiSend(Array(new ProducerRequest("test", 0, new > >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = > new > >> Message(messageBytes))))) > >> + producer.send(TestUtils.produceRequest("test", 0, new > >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = > new > >> Message(messageBytes)))) > >> }catch { > >> case e: Exception => failed=true > >> } > >> @@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui > >> val bytes = new Array[Byte](101) > >> var failed = false > >> try { > >> - producer.send("test", 0, new > ByteBufferMessageSet(compressionCodec > >> = NoCompressionCodec, messages = new Message(bytes))) > >> + producer.send(TestUtils.produceRequest("test", 0, new > >> ByteBufferMessageSet(compressionCodec = NoCompressionCodec, messages = > new > >> Message(bytes)))) > >> }catch { > >> case e: MessageSizeTooLargeException => failed = true > >> } > >> > >> Modified: > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > >> URL: > >> > http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1296577&r1=1296576&r2=1296577&view=diff > >> > >> > ============================================================================== > >> --- > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > >> (original) > >> +++ > >> > incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala > >> Sat Mar 3 05:46:43 2012 > >> @@ -33,6 +33,7 @@ import collection.mutable.ListBuffer > >> import kafka.consumer.{KafkaMessageStream, ConsumerConfig} > >> import scala.collection.Map > >> import kafka.serializer.Encoder > >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} > >> > >> /** > >> * Utility functions to help with testing > >> @@ -336,7 +337,47 @@ object TestUtils { > >> buffer += ("msg" + i) > >> buffer > >> } > >> + /** > >> + * Create a wired format request based on simple basic information > >> + */ > >> + def produceRequest(topic: String, message: ByteBufferMessageSet): > >> kafka.api.ProducerRequest = { > >> + > >> > produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,ProducerRequest.RandomPartition,message) > >> + } > >> + def produceRequest(topic: String, partition: Int, message: > >> ByteBufferMessageSet): kafka.api.ProducerRequest = { > >> + > >> > produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partition,message) > >> + } > >> + > >> + def produceRequest(correlationId: Int, topic: String, partition: Int, > >> message: ByteBufferMessageSet): kafka.api.ProducerRequest = { > >> + val clientId = SyncProducerConfig.DefaultClientId > >> + val requiredAcks: Short = SyncProducerConfig.DefaultRequiredAcks > >> + val ackTimeout = SyncProducerConfig.DefaultAckTimeoutMs > >> + var data = new Array[TopicData](1) > >> + var partitionData = new Array[PartitionData](1) > >> + partitionData(0) = new PartitionData(partition,message) > >> + data(0) = new TopicData(topic,partitionData) > >> + val pr = new kafka.api.ProducerRequest(correlationId, clientId, > >> requiredAcks, ackTimeout, data) > >> + pr > >> + } > >> > >> + def produceJavaRequest(topic: String, message: > >> kafka.javaapi.message.ByteBufferMessageSet): > kafka.javaapi.ProducerRequest > >> = { > >> + produceJavaRequest(-1,topic,-1,message) > >> + } > >> + > >> + def produceJavaRequest(topic: String, partition: Int, message: > >> kafka.javaapi.message.ByteBufferMessageSet): > kafka.javaapi.ProducerRequest > >> = { > >> + produceJavaRequest(-1,topic,partition,message) > >> + } > >> + > >> + def produceJavaRequest(correlationId: Int, topic: String, partition: > >> Int, message: kafka.javaapi.message.ByteBufferMessageSet): > >> kafka.javaapi.ProducerRequest = { > >> + val clientId = "test" > >> + val requiredAcks: Short = 0 > >> + val ackTimeout = 0 > >> + var data = new Array[TopicData](1) > >> + var partitionData = new Array[PartitionData](1) > >> + partitionData(0) = new PartitionData(partition,message.underlying) > >> + data(0) = new TopicData(topic,partitionData) > >> + val pr = new kafka.javaapi.ProducerRequest(correlationId, clientId, > >> requiredAcks, ackTimeout, data) > >> + pr > >> + } > >> } > >> > >> object TestZKUtils { > >> > >> > >> > -- /* Joe Stein http://www.linkedin.com/in/charmalloc Twitter: @allthingshadoop */ --90e6ba6e84f675faba04ba967f5c--