Return-Path: X-Original-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Delivered-To: apmail-incubator-kafka-dev-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D9CAD992F for ; Tue, 6 Mar 2012 17:33:42 +0000 (UTC) Received: (qmail 22908 invoked by uid 500); 6 Mar 2012 17:33:42 -0000 Delivered-To: apmail-incubator-kafka-dev-archive@incubator.apache.org Received: (qmail 22880 invoked by uid 500); 6 Mar 2012 17:33:42 -0000 Mailing-List: contact kafka-dev-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: kafka-dev@incubator.apache.org Delivered-To: mailing list kafka-dev@incubator.apache.org Received: (qmail 22867 invoked by uid 99); 6 Mar 2012 17:33:42 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2012 17:33:42 +0000 X-ASF-Spam-Status: No, hits=-0.7 required=5.0 tests=RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of neha.narkhede@gmail.com designates 209.85.220.175 as permitted sender) Received: from [209.85.220.175] (HELO mail-vx0-f175.google.com) (209.85.220.175) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Mar 2012 17:33:36 +0000 Received: by vcbfl13 with SMTP id fl13so4879683vcb.6 for ; Tue, 06 Mar 2012 09:33:15 -0800 (PST) Received-SPF: pass (google.com: domain of neha.narkhede@gmail.com designates 10.52.178.35 as permitted sender) client-ip=10.52.178.35; Authentication-Results: mr.google.com; spf=pass (google.com: domain of neha.narkhede@gmail.com designates 10.52.178.35 as permitted sender) smtp.mail=neha.narkhede@gmail.com; dkim=pass header.i=neha.narkhede@gmail.com Received: from mr.google.com ([10.52.178.35]) by 10.52.178.35 with SMTP id cv3mr43236559vdc.44.1331055194390 (num_hops = 1); Tue, 06 Mar 2012 09:33:14 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:date:message-id:subject:from:to :content-type:content-transfer-encoding; bh=LPn4v8qj9UBHJdvARSc7NHFHxKheZnuzWJFwjws24Pg=; b=njgDY7y0b85RcrTSkVgLMmKknORoUQYoDyTQUok89fM5OFWfz/OMkTQH9nizp7zCGY jOlHFfeMIhLcz5Gs/wy9sjsmaneVrEhoyyzevLwSryGVInAfUcBwQeYGEFQg0B3OM55d 5pnjC6LYxuFfN0tcWTvNOveNBumpWsPE8epLfQMgmlTZJo2hXWFqwi5Jca91oiSQDdYi 1HoLBLWzViyUk4Hn5/vswNJ2FWpNLJcSBfKujjEpa1Sm38xSldbcU+QgaygHcMDEZgI0 lj2FDPJpmA6jkz8vvGu/3KceHJqhUJTnqtwqhqBCyBAgJ8iaXzNo6MFxEdOwc1MXGKnB 1lcA== MIME-Version: 1.0 Received: by 10.52.178.35 with SMTP id cv3mr37011334vdc.44.1331055194302; Tue, 06 Mar 2012 09:33:14 -0800 (PST) Received: by 10.220.97.84 with HTTP; Tue, 6 Mar 2012 09:33:14 -0800 (PST) In-Reply-To: References: Date: Tue, 6 Mar 2012 09:33:14 -0800 Message-ID: Subject: Re: svn commit: r1296577 - in /incubator/kafka/branches/0.8/core/src: main/scala/kafka/api/ main/scala/kafka/javaapi/ main/scala/kafka/javaapi/message/ main/scala/kafka/javaapi/producer/ main/scala/kafka/message/ main/scala/kafka/producer/ main/scala From: Neha Narkhede To: kafka-dev@incubator.apache.org Content-Type: text/plain; charset=ISO-8859-1 Content-Transfer-Encoding: quoted-printable X-Virus-Checked: Checked by ClamAV on apache.org Also, I would recommend rebasing the 0.8 branch to the latest trunk now. It has been quite a while since the last time I did a rebase. Joe, would you mind helping with that ? Thanks, Neha On Tue, Mar 6, 2012 at 9:15 AM, Jun Rao wrote: > Joe, > > 0.8 now has compilation error after this check in. It seems that you forg= ot > to check in a new file. See my comment in kafka-240. Could you fix this? > > Thanks, > > Jun > > On Fri, Mar 2, 2012 at 9:46 PM, wrote: > >> Author: joestein >> Date: Sat Mar =A03 05:46:43 2012 >> New Revision: 1296577 >> >> URL: http://svn.apache.org/viewvc?rev=3D1296577&view=3Drev >> Log: >> KAFKA-240 ProducerRequest wire format protocol update and related change= s >> >> Removed: >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/MultiProdu= cerRequest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/MultiMe= ssageSetSend.scala >> Modified: >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRespo= nse.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRe= quest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchR= esponse.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/Produc= erRequest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/messag= e/ByteBufferMessageSet.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/produc= er/SyncProducer.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBu= fferMessageSet.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMe= ssageSet.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/Messag= eSet.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncP= roducer.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncP= roducerConfig.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async= /DefaultEventHandler.scala >> >> =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaAp= is.scala >> =A0 =A0incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Util= s.scala >> >> =A0incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/B= yteBufferMessageSetTest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/= AsyncProducerTest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/= SyncProducerTest.scala >> >> =A0incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/Tes= tUtils.scala >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse= .scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/api/FetchResponse.scala?rev=3D1296577&r1=3D1296576&r2=3D1296577= &view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse= .scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchResponse= .scala >> Sat Mar =A03 05:46:43 2012 >> @@ -39,6 +39,15 @@ object PartitionData { >> >> =A0case class PartitionData(partition: Int, error: Int =3D >> ErrorMapping.NoError, initialOffset:Long =3D 0L, messages: MessageSet) { >> =A0 val sizeInBytes =3D 4 + 4 + 8 + 4 + messages.sizeInBytes.intValue() >> + >> + =A0def this(partition: Int, messages: MessageSet) =3D this(partition, >> ErrorMapping.NoError, 0L, messages) >> + >> + =A0def getTranslatedPartition(topic: String, randomSelector: String = =3D> >> Int): Int =3D { >> + =A0 =A0if (partition =3D=3D ProducerRequest.RandomPartition) >> + =A0 =A0 =A0return randomSelector(topic) >> + =A0 =A0else >> + =A0 =A0 =A0return partition >> + =A0} >> =A0} >> >> =A0object TopicData { >> @@ -73,6 +82,15 @@ object TopicData { >> >> =A0case class TopicData(topic: String, partitionData: Array[PartitionDat= a]) { >> =A0 val sizeInBytes =3D 2 + topic.length + partitionData.foldLeft(4)(_ + >> _.sizeInBytes) >> + >> + =A0override def equals(other: Any): Boolean =3D { >> + =A0 =A0other match { >> + =A0 =A0 =A0case that: TopicData =3D> >> + =A0 =A0 =A0 =A0( topic =3D=3D that.topic && >> + =A0 =A0 =A0 =A0 =A0partitionData.toSeq =3D=3D that.partitionData.toSeq= ) >> + =A0 =A0 =A0case _ =3D> false >> + =A0 =A0} >> + =A0} >> =A0} >> >> =A0object FetchResponse { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerReque= st.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/api/ProducerRequest.scala?rev=3D1296577&r1=3D1296576&r2=3D12965= 77&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerReque= st.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerReque= st.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -24,60 +24,108 @@ import kafka.utils._ >> >> =A0object ProducerRequest { >> =A0 val RandomPartition =3D -1 >> - >> + =A0val versionId: Short =3D 0 >> + >> =A0 def readFrom(buffer: ByteBuffer): ProducerRequest =3D { >> - =A0 =A0val topic =3D Utils.readShortString(buffer, "UTF-8") >> - =A0 =A0val partition =3D buffer.getInt >> - =A0 =A0val messageSetSize =3D buffer.getInt >> - =A0 =A0val messageSetBuffer =3D buffer.slice() >> - =A0 =A0messageSetBuffer.limit(messageSetSize) >> - =A0 =A0buffer.position(buffer.position + messageSetSize) >> - =A0 =A0new ProducerRequest(topic, partition, new >> ByteBufferMessageSet(messageSetBuffer)) >> + =A0 =A0val versionId: Short =3D buffer.getShort >> + =A0 =A0val correlationId: Int =3D buffer.getInt >> + =A0 =A0val clientId: String =3D Utils.readShortString(buffer, "UTF-8") >> + =A0 =A0val requiredAcks: Short =3D buffer.getShort >> + =A0 =A0val ackTimeout: Int =3D buffer.getInt >> + =A0 =A0//build the topic structure >> + =A0 =A0val topicCount =3D buffer.getInt >> + =A0 =A0val data =3D new Array[TopicData](topicCount) >> + =A0 =A0for(i <- 0 until topicCount) { >> + =A0 =A0 =A0val topic =3D Utils.readShortString(buffer, "UTF-8") >> + >> + =A0 =A0 =A0val partitionCount =3D buffer.getInt >> + =A0 =A0 =A0//build the partition structure within this topic >> + =A0 =A0 =A0val partitionData =3D new Array[PartitionData](partitionCou= nt) >> + =A0 =A0 =A0for (j <- 0 until partitionCount) { >> + =A0 =A0 =A0 =A0val partition =3D buffer.getInt >> + =A0 =A0 =A0 =A0val messageSetSize =3D buffer.getInt >> + =A0 =A0 =A0 =A0val messageSetBuffer =3D new Array[Byte](messageSetSize= ) >> + =A0 =A0 =A0 =A0buffer.get(messageSetBuffer,0,messageSetSize) >> + =A0 =A0 =A0 =A0partitionData(j) =3D new PartitionData(partition,new >> ByteBufferMessageSet(ByteBuffer.wrap(messageSetBuffer))) >> + =A0 =A0 =A0} >> + =A0 =A0 =A0data(i) =3D new TopicData(topic,partitionData) >> + =A0 =A0} >> + =A0 =A0new ProducerRequest(versionId, correlationId, clientId, require= dAcks, >> ackTimeout, data) >> =A0 } >> =A0} >> >> -class ProducerRequest(val topic: String, >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val partition: Int, >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val messages: ByteBufferMes= sageSet) extends >> Request(RequestKeys.Produce) { >> +case class ProducerRequest(val versionId: Short, val correlationId: Int= , >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val clientId: String, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val requiredAcks: Short, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val ackTimeout: Int, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val data: Array[TopicData])= extends >> Request(RequestKeys.Produce) { >> + >> + =A0def this(correlationId: Int, clientId: String, requiredAcks: Short, >> ackTimeout: Int, data: Array[TopicData]) =3D this(ProducerRequest.versio= nId, >> correlationId, clientId, requiredAcks, ackTimeout, data) >> >> =A0 def writeTo(buffer: ByteBuffer) { >> - =A0 =A0Utils.writeShortString(buffer, topic) >> - =A0 =A0buffer.putInt(partition) >> - =A0 =A0buffer.putInt(messages.serialized.limit) >> - =A0 =A0buffer.put(messages.serialized) >> - =A0 =A0messages.serialized.rewind >> + =A0 =A0buffer.putShort(versionId) >> + =A0 =A0buffer.putInt(correlationId) >> + =A0 =A0Utils.writeShortString(buffer, clientId, "UTF-8") >> + =A0 =A0buffer.putShort(requiredAcks) >> + =A0 =A0buffer.putInt(ackTimeout) >> + =A0 =A0//save the topic structure >> + =A0 =A0buffer.putInt(data.size) //the number of topics >> + =A0 =A0data.foreach(d =3D>{ >> + =A0 =A0 =A0Utils.writeShortString(buffer, d.topic, "UTF-8") //write th= e topic >> + =A0 =A0 =A0buffer.putInt(d.partitionData.size) //the number of partiti= ons >> + =A0 =A0 =A0d.partitionData.foreach(p =3D> { >> + =A0 =A0 =A0 =A0buffer.putInt(p.partition) >> + =A0 =A0 =A0 =A0buffer.putInt(p.messages.getSerialized().limit) >> + =A0 =A0 =A0 =A0buffer.put(p.messages.getSerialized()) >> + =A0 =A0 =A0 =A0p.messages.getSerialized().rewind >> + =A0 =A0 =A0}) >> + =A0 =A0}) >> =A0 } >> - >> - =A0def sizeInBytes(): Int =3D 2 + topic.length + 4 + 4 + >> messages.sizeInBytes.asInstanceOf[Int] >> >> - =A0def getTranslatedPartition(randomSelector: String =3D> Int): Int = =3D { >> - =A0 =A0if (partition =3D=3D ProducerRequest.RandomPartition) >> - =A0 =A0 =A0return randomSelector(topic) >> - =A0 =A0else >> - =A0 =A0 =A0return partition >> + =A0def sizeInBytes(): Int =3D { >> + =A0 =A0var size =3D 0 >> + =A0 =A0//size, request_type_id, version_id, correlation_id, client_id, >> required_acks, ack_timeout, data.size >> + =A0 =A0size =3D 2 + 4 + 2 + clientId.length + 2 + 4 + 4; >> + =A0 =A0data.foreach(d =3D>{ >> + =A0 =A0 =A0 =A0 size +=3D 2 + d.topic.length + 4 >> + =A0 =A0 =A0 =A0 d.partitionData.foreach(p =3D> { >> + =A0 =A0 =A0 =A0 =A0 size +=3D 4 + 4 + p.messages.sizeInBytes.asInstanc= eOf[Int] >> + =A0 =A0 =A0 =A0 }) >> + =A0 =A0}) >> + =A0 =A0size >> =A0 } >> >> =A0 override def toString: String =3D { >> =A0 =A0 val builder =3D new StringBuilder() >> =A0 =A0 builder.append("ProducerRequest(") >> - =A0 =A0builder.append(topic + ",") >> - =A0 =A0builder.append(partition + ",") >> - =A0 =A0builder.append(messages.sizeInBytes) >> + =A0 =A0builder.append(versionId + ",") >> + =A0 =A0builder.append(correlationId + ",") >> + =A0 =A0builder.append(clientId + ",") >> + =A0 =A0builder.append(requiredAcks + ",") >> + =A0 =A0builder.append(ackTimeout) >> + =A0 =A0 =A0 data.foreach(d =3D>{ >> + =A0 =A0 =A0builder.append(":[" + d.topic) >> + =A0 =A0 =A0d.partitionData.foreach(p =3D> { >> + =A0 =A0 =A0 =A0builder.append(":[") >> + =A0 =A0 =A0 =A0builder.append(p.partition + ",") >> + =A0 =A0 =A0 =A0builder.append(p.messages.sizeInBytes) >> + =A0 =A0 =A0 =A0builder.append("]") >> + =A0 =A0 =A0}) >> + =A0 =A0 =A0builder.append("]") >> + =A0 =A0}) >> =A0 =A0 builder.append(")") >> =A0 =A0 builder.toString >> =A0 } >> >> =A0 override def equals(other: Any): Boolean =3D { >> - =A0 =A0other match { >> + =A0 other match { >> =A0 =A0 =A0 case that: ProducerRequest =3D> >> - =A0 =A0 =A0 =A0(that canEqual this) && topic =3D=3D that.topic && part= ition =3D=3D >> that.partition && >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0messages.equals(that.messages) >> + =A0 =A0 =A0 =A0( correlationId =3D=3D that.correlationId && >> + =A0 =A0 =A0 =A0 =A0clientId =3D=3D that.clientId && >> + =A0 =A0 =A0 =A0 =A0requiredAcks =3D=3D that.requiredAcks && >> + =A0 =A0 =A0 =A0 =A0ackTimeout =3D=3D that.ackTimeout && >> + =A0 =A0 =A0 =A0 =A0data.toSeq =3D=3D that.data.toSeq) >> =A0 =A0 =A0 case _ =3D> false >> =A0 =A0 } >> =A0 } >> - >> - =A0def canEqual(other: Any): Boolean =3D other.isInstanceOf[ProducerRe= quest] >> - >> - =A0override def hashCode: Int =3D 31 + (17 * partition) + topic.hashCo= de + >> messages.hashCode >> - >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResp= onse.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/javaapi/FetchResponse.scala?rev=3D1296577&r1=3D1296576&r2=3D129= 6577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResp= onse.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/FetchResp= onse.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -22,7 +22,7 @@ import kafka.api.TopicData >> >> =A0class FetchResponse( val versionId: Short, >> =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val correlationId: Int, >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 val data: Array[TopicData] ) { >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 private val data: Array[TopicD= ata] ) { >> >> =A0 private val underlying =3D new kafka.api.FetchResponse(versionId, >> correlationId, data) >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerR= equest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/javaapi/ProducerRequest.scala?rev=3D1296577&r1=3D1296576&r2=3D1= 296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerR= equest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerR= equest.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -17,36 +17,29 @@ >> =A0package kafka.javaapi >> >> =A0import kafka.network.Request >> -import kafka.api.RequestKeys >> +import kafka.api.{RequestKeys, TopicData} >> =A0import java.nio.ByteBuffer >> >> -class ProducerRequest(val topic: String, >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val partition: Int, >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val messages: >> kafka.javaapi.message.ByteBufferMessageSet) extends >> Request(RequestKeys.Produce) { >> +class ProducerRequest(val correlationId: Int, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val clientId: String, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val requiredAcks: Short, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val ackTimeout: Int, >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0val data: Array[TopicData])= extends >> Request(RequestKeys.Produce) { >> + >> =A0 import Implicits._ >> - =A0private val underlying =3D new kafka.api.ProducerRequest(topic, >> partition, messages) >> + =A0val underlying =3D new kafka.api.ProducerRequest(correlationId, cli= entId, >> requiredAcks, ackTimeout, data) >> >> =A0 def writeTo(buffer: ByteBuffer) { underlying.writeTo(buffer) } >> >> =A0 def sizeInBytes(): Int =3D underlying.sizeInBytes >> >> - =A0def getTranslatedPartition(randomSelector: String =3D> Int): Int = =3D >> - =A0 =A0underlying.getTranslatedPartition(randomSelector) >> - >> =A0 override def toString: String =3D >> =A0 =A0 underlying.toString >> >> - =A0override def equals(other: Any): Boolean =3D { >> - =A0 =A0other match { >> - =A0 =A0 =A0case that: ProducerRequest =3D> >> - =A0 =A0 =A0 =A0(that canEqual this) && topic =3D=3D that.topic && part= ition =3D=3D >> that.partition && >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0messages.equals(that.messages) >> - =A0 =A0 =A0case _ =3D> false >> - =A0 =A0} >> - =A0} >> + =A0override def equals(other: Any): Boolean =3D underlying.equals(othe= r) >> >> =A0 def canEqual(other: Any): Boolean =3D other.isInstanceOf[ProducerReq= uest] >> >> - =A0override def hashCode: Int =3D 31 + (17 * partition) + topic.hashCo= de + >> messages.hashCode >> + =A0override def hashCode: Int =3D underlying.hashCode >> >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/B= yteBufferMessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/javaapi/message/ByteBufferMessageSet.scala?rev=3D1296577&r1=3D1= 296576&r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/B= yteBufferMessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/message/B= yteBufferMessageSet.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -39,7 +39,7 @@ class ByteBufferMessageSet(private val b >> >> =A0 def validBytes: Long =3D underlying.validBytes >> >> - =A0def serialized():ByteBuffer =3D underlying.serialized >> + =A0def serialized():ByteBuffer =3D underlying.getSerialized() >> >> =A0 def getInitialOffset =3D initialOffset >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/= SyncProducer.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/javaapi/producer/SyncProducer.scala?rev=3D1296577&r1=3D1296576&= r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/= SyncProducer.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/producer/= SyncProducer.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -18,6 +18,8 @@ package kafka.javaapi.producer >> >> =A0import kafka.producer.SyncProducerConfig >> =A0import kafka.javaapi.message.ByteBufferMessageSet >> +import kafka.javaapi.ProducerRequest >> +import kafka.api.{PartitionData, TopicData} >> >> =A0class SyncProducer(syncProducer: kafka.producer.SyncProducer) { >> >> @@ -25,21 +27,17 @@ class SyncProducer(syncProducer: kafka.p >> >> =A0 val underlying =3D syncProducer >> >> - =A0def send(topic: String, partition: Int, messages: ByteBufferMessage= Set) >> { >> - =A0 =A0import kafka.javaapi.Implicits._ >> - =A0 =A0underlying.send(topic, partition, messages) >> + =A0def send(producerRequest: kafka.javaapi.ProducerRequest) { >> + =A0 =A0underlying.send(producerRequest.underlying) >> =A0 } >> >> - =A0def send(topic: String, messages: ByteBufferMessageSet): Unit =3D >> send(topic, >> - >> kafka.api.ProducerRequest.RandomPartition, >> - >> messages) >> - >> - =A0def multiSend(produces: Array[kafka.javaapi.ProducerRequest]) { >> - =A0 =A0import kafka.javaapi.Implicits._ >> - =A0 =A0val produceRequests =3D new >> Array[kafka.api.ProducerRequest](produces.length) >> - =A0 =A0for(i <- 0 until produces.length) >> - =A0 =A0 =A0produceRequests(i) =3D new >> kafka.api.ProducerRequest(produces(i).topic, produces(i).partition, >> produces(i).messages) >> - =A0 =A0underlying.multiSend(produceRequests) >> + =A0def send(topic: String, messages: ByteBufferMessageSet): Unit =3D { >> + =A0 =A0var data =3D new Array[TopicData](1) >> + =A0 =A0var partition_data =3D new Array[PartitionData](1) >> + =A0 =A0partition_data(0) =3D new PartitionData(-1,messages.underlying) >> + =A0 =A0data(0) =3D new TopicData(topic,partition_data) >> + =A0 =A0val producerRequest =3D new kafka.api.ProducerRequest(-1, "", 0= , 0, >> data) >> + =A0 =A0underlying.send(producerRequest) >> =A0 } >> >> =A0 def close() { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBuffe= rMessageSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/message/ByteBufferMessageSet.scala?rev=3D1296577&r1=3D1296576&r= 2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBuffe= rMessageSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/ByteBuffe= rMessageSet.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -53,7 +53,7 @@ class ByteBufferMessageSet(private val b >> >> =A0 def getErrorCode =3D errorCode >> >> - =A0def serialized(): ByteBuffer =3D buffer >> + =A0def getSerialized(): ByteBuffer =3D buffer >> >> =A0 def validBytes: Long =3D shallowValidBytes >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessa= geSet.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/message/FileMessageSet.scala?rev=3D1296577&r1=3D1296576&r2=3D12= 96577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessa= geSet.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessa= geSet.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -40,6 +40,8 @@ class FileMessageSet private[kafka](priv >> =A0 private val setSize =3D new AtomicLong() >> =A0 private val setHighWaterMark =3D new AtomicLong() >> >> + =A0def getSerialized(): ByteBuffer =3D throw new >> java.lang.UnsupportedOperationException() >> + >> =A0 if(mutable) { >> =A0 =A0 if(limit < Long.MaxValue || offset > 0) >> =A0 =A0 =A0 throw new IllegalArgumentException("Attempt to open a mutabl= e >> message set with a view or offset, which is not allowed.") >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSe= t.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/message/MessageSet.scala?rev=3D1296577&r1=3D1296576&r2=3D129657= 7&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSe= t.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/MessageSe= t.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -111,4 +111,9 @@ abstract class MessageSet extends Iterab >> =A0 =A0 =A0 =A0 throw new InvalidMessageException >> =A0 } >> >> + =A0/** >> + =A0 * Used to allow children to have serialization on implementation >> + =A0 */ >> + =A0def getSerialized(): ByteBuffer >> + >> =A0} >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucer.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/producer/SyncProducer.scala?rev=3D1296577&r1=3D1296576&r2=3D129= 6577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucer.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucer.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -51,29 +51,10 @@ class SyncProducer(val config: SyncProdu >> =A0 =A0 if (logger.isTraceEnabled) { >> =A0 =A0 =A0 trace("verifying sendbuffer of size " + buffer.limit) >> =A0 =A0 =A0 val requestTypeId =3D buffer.getShort() >> - =A0 =A0 =A0if (requestTypeId =3D=3D RequestKeys.MultiProduce) { >> - =A0 =A0 =A0 =A0try { >> - =A0 =A0 =A0 =A0 =A0val request =3D MultiProducerRequest.readFrom(buffe= r) >> - =A0 =A0 =A0 =A0 =A0for (produce <- request.produces) { >> - =A0 =A0 =A0 =A0 =A0 =A0try { >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0for (messageAndOffset <- produce.messages) >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0if (!messageAndOffset.message.isValid) >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0trace("topic " + produce.topic + " = is invalid") >> - =A0 =A0 =A0 =A0 =A0 =A0} >> - =A0 =A0 =A0 =A0 =A0 =A0catch { >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0case e: Throwable =3D> >> - =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0trace("error iterating messages ", e) >> - =A0 =A0 =A0 =A0 =A0 =A0} >> - =A0 =A0 =A0 =A0 =A0} >> - =A0 =A0 =A0 =A0} >> - =A0 =A0 =A0 =A0catch { >> - =A0 =A0 =A0 =A0 =A0case e: Throwable =3D> >> - =A0 =A0 =A0 =A0 =A0 =A0trace("error verifying sendbuffer ", e) >> - =A0 =A0 =A0 =A0} >> - =A0 =A0 =A0} >> + =A0 =A0 =A0val request =3D ProducerRequest.readFrom(buffer) >> + =A0 =A0 =A0trace(request.toString) >> =A0 =A0 } >> =A0 } >> - >> =A0 /** >> =A0 =A0* Common functionality for the public send methods >> =A0 =A0*/ >> @@ -108,21 +89,15 @@ class SyncProducer(val config: SyncProdu >> =A0 /** >> =A0 =A0* Send a message >> =A0 =A0*/ >> - =A0def send(topic: String, partition: Int, messages: ByteBufferMessage= Set) >> { >> - =A0 =A0verifyMessageSize(messages) >> - =A0 =A0val setSize =3D messages.sizeInBytes.asInstanceOf[Int] >> - =A0 =A0trace("Got message set with " + setSize + " bytes to send") >> - =A0 =A0send(new BoundedByteBufferSend(new ProducerRequest(topic, parti= tion, >> messages))) >> - =A0} >> - >> - =A0def send(topic: String, messages: ByteBufferMessageSet): Unit =3D >> send(topic, ProducerRequest.RandomPartition, messages) >> - >> - =A0def multiSend(produces: Array[ProducerRequest]) { >> - =A0 =A0for (request <- produces) >> - =A0 =A0 =A0verifyMessageSize(request.messages) >> - =A0 =A0val setSize =3D produces.foldLeft(0L)(_ + _.messages.sizeInByte= s) >> - =A0 =A0trace("Got multi message sets with " + setSize + " bytes to sen= d") >> - =A0 =A0send(new BoundedByteBufferSend(new MultiProducerRequest(produce= s))) >> + =A0def send(producerRequest: ProducerRequest) { >> + =A0 =A0producerRequest.data.foreach(d =3D> { >> + =A0 =A0 =A0d.partitionData.foreach(p =3D> { >> + =A0 =A0 =A0 =A0 =A0 verifyMessageSize(new >> ByteBufferMessageSet(p.messages.getSerialized())) >> + =A0 =A0 =A0 =A0val setSize =3D p.messages.sizeInBytes.asInstanceOf[Int= ] >> + =A0 =A0 =A0 =A0trace("Got message set with " + setSize + " bytes to se= nd") >> + =A0 =A0 =A0}) >> + =A0 =A0}) >> + =A0 =A0send(new BoundedByteBufferSend(producerRequest)) >> =A0 } >> >> =A0 def send(request: TopicMetadataRequest): Seq[TopicMetadata] =3D { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucerConfig.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/producer/SyncProducerConfig.scala?rev=3D1296577&r1=3D1296576&r2= =3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucerConfig.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProd= ucerConfig.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -41,4 +41,23 @@ trait SyncProducerConfigShared { >> =A0 val reconnectInterval =3D Utils.getInt(props, "reconnect.interval", = 30000) >> >> =A0 val maxMessageSize =3D Utils.getInt(props, "max.message.size", 10000= 00) >> + >> + =A0/* the client application sending the producer requests */ >> + =A0val correlationId =3D >> Utils.getInt(props,"producer.request.correlation_id",-1) >> + >> + =A0/* the client application sending the producer requests */ >> + =A0val clientId =3D Utils.getString(props,"producer.request.client_id"= ,"") >> + >> + =A0/* the required_acks of the producer requests */ >> + =A0val requiredAcks =3D >> Utils.getShort(props,"producer.request.required_acks",0) >> + >> + =A0/* the ack_timeout of the producer requests */ >> + =A0val ackTimeout =3D Utils.getInt(props,"producer.request.ack_timeout= ",1) >> =A0} >> + >> +object SyncProducerConfig { >> + =A0val DefaultCorrelationId =3D -1 >> + =A0val DefaultClientId =3D "" >> + =A0val DefaultRequiredAcks : Short =3D 0 >> + =A0val DefaultAckTimeoutMs =3D 1 >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/De= faultEventHandler.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/producer/async/DefaultEventHandler.scala?rev=3D1296577&r1=3D129= 6576&r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/De= faultEventHandler.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/De= faultEventHandler.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -17,7 +17,7 @@ >> >> =A0package kafka.producer.async >> >> -import kafka.api.ProducerRequest >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} >> =A0import kafka.serializer.Encoder >> =A0import kafka.producer._ >> =A0import kafka.cluster.{Partition, Broker} >> @@ -147,9 +147,22 @@ class DefaultEventHandler[K,V](config: P >> >> =A0 private def send(brokerId: Int, messagesPerTopic: Map[(String, Int), >> ByteBufferMessageSet]) { >> =A0 =A0 if(messagesPerTopic.size > 0) { >> - =A0 =A0 =A0val requests =3D messagesPerTopic.map(f =3D> new >> ProducerRequest(f._1._1, f._1._2, f._2)).toArray >> + =A0 =A0 =A0val topics =3D new HashMap[String, ListBuffer[PartitionData= ]]() >> + =A0 =A0 =A0val requests =3D messagesPerTopic.map(f =3D> { >> + =A0 =A0 =A0 =A0val topicName =3D f._1._1 >> + =A0 =A0 =A0 =A0val partitionId =3D f._1._2 >> + =A0 =A0 =A0 =A0val messagesSet=3D f._2 >> + =A0 =A0 =A0 =A0val topic =3D topics.get(topicName) // checking to see = if this >> topics exists >> + =A0 =A0 =A0 =A0topic match { >> + =A0 =A0 =A0 =A0 =A0case None =3D> topics +=3D topicName -> new >> ListBuffer[PartitionData]() //create a new listbuffer for this topic >> + =A0 =A0 =A0 =A0 =A0case Some(x) =3D> trace("found " + topicName) >> + =A0 =A0 =A0 =A0} >> + =A0 =A0 =A0 =A0 =A0 topics(topicName).append(new PartitionData(partiti= onId, >> messagesSet)) >> + =A0 =A0 =A0}) >> + =A0 =A0 =A0val topicData =3D topics.map(kv =3D> new TopicData(kv._1,kv= ._2.toArray)) >> + =A0 =A0 =A0val producerRequest =3D new ProducerRequest(config.correlat= ionId, >> config.clientId, config.requiredAcks, config.ackTimeout, topicData.toArr= ay) >> //new kafka.javaapi.ProducerRequest(correlation_id, client_id, >> required_acks, ack_timeout, topic_data.toArray) >> =A0 =A0 =A0 val syncProducer =3D producerPool.getProducer(brokerId) >> - =A0 =A0 =A0syncProducer.multiSend(requests) >> + =A0 =A0 =A0syncProducer.send(producerRequest) >> =A0 =A0 =A0 trace("kafka producer sent messages for topics %s to broker = %s:%d" >> =A0 =A0 =A0 =A0 .format(messagesPerTopic, syncProducer.config.host, >> syncProducer.config.port)) >> =A0 =A0 } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.= scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/server/KafkaApis.scala?rev=3D1296577&r1=3D1296576&r2=3D1296577&= view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.= scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.= scala >> Sat Mar =A03 05:46:43 2012 >> @@ -41,7 +41,6 @@ class KafkaApis(val logManager: LogManag >> =A0 =A0 apiId match { >> =A0 =A0 =A0 case RequestKeys.Produce =3D> handleProducerRequest(receive) >> =A0 =A0 =A0 case RequestKeys.Fetch =3D> handleFetchRequest(receive) >> - =A0 =A0 =A0case RequestKeys.MultiProduce =3D> handleMultiProducerReque= st(receive) >> =A0 =A0 =A0 case RequestKeys.Offsets =3D> handleOffsetRequest(receive) >> =A0 =A0 =A0 case RequestKeys.TopicMetadata =3D> handleTopicMetadataReque= st(receive) >> =A0 =A0 =A0 case _ =3D> throw new IllegalStateException("No mapping foun= d for >> handler id " + apiId) >> @@ -59,31 +58,38 @@ class KafkaApis(val logManager: LogManag >> =A0 =A0 None >> =A0 } >> >> - =A0def handleMultiProducerRequest(receive: Receive): Option[Send] =3D = { >> - =A0 =A0val request =3D MultiProducerRequest.readFrom(receive.buffer) >> - =A0 =A0if(requestLogger.isTraceEnabled) >> - =A0 =A0 =A0requestLogger.trace("Multiproducer request " + request.toSt= ring) >> - =A0 =A0request.produces.map(handleProducerRequest(_, "MultiProducerReq= uest")) >> - =A0 =A0None >> - =A0} >> - >> - =A0private def handleProducerRequest(request: ProducerRequest, >> requestHandlerName: String) =3D { >> - =A0 =A0val partition =3D >> request.getTranslatedPartition(logManager.chooseRandomPartition) >> - =A0 =A0try { >> - =A0 =A0 =A0logManager.getOrCreateLog(request.topic, >> partition).append(request.messages) >> - =A0 =A0 =A0trace(request.messages.sizeInBytes + " bytes written to log= s.") >> - =A0 =A0} catch { >> - =A0 =A0 =A0case e =3D> >> - =A0 =A0 =A0 =A0error("Error processing " + requestHandlerName + " on "= + >> request.topic + ":" + partition, e) >> - =A0 =A0 =A0 =A0e match { >> - =A0 =A0 =A0 =A0 =A0case _: IOException =3D> >> - =A0 =A0 =A0 =A0 =A0 =A0fatal("Halting due to unrecoverable I/O error w= hile handling >> producer request: " + e.getMessage, e) >> - =A0 =A0 =A0 =A0 =A0 =A0System.exit(1) >> - =A0 =A0 =A0 =A0 =A0case _ =3D> >> + =A0private def handleProducerRequest(request: ProducerRequest, >> requestHandlerName: String): Option[ProducerResponse] =3D { >> + =A0 =A0 =A0 val requestSize =3D request.data.size >> + =A0 =A0 =A0 val errors =3D new Array[Int](requestSize) >> + =A0 =A0 =A0 val offsets =3D new Array[Long](requestSize) >> + >> + =A0 =A0request.data.foreach(d =3D> { >> + =A0 =A0 =A0 =A0 d.partitionData.foreach(p =3D> { >> + =A0 =A0 =A0 =A0val partition =3D p.getTranslatedPartition(d.topic, >> logManager.chooseRandomPartition) >> + =A0 =A0 =A0 =A0try { >> + =A0 =A0 =A0 =A0 =A0logManager.getOrCreateLog(d.topic, partition).appen= d(p.messages) >> + =A0 =A0 =A0 =A0 =A0trace(p.messages.sizeInBytes + " bytes written to l= ogs.") >> + =A0 =A0 =A0 =A0 =A0p.messages.foreach(m =3D> trace("wrote message %s t= o >> disk".format(m.message.checksum))) >> =A0 =A0 =A0 =A0 } >> - =A0 =A0 =A0 =A0throw e >> - =A0 =A0} >> - =A0 =A0None >> + =A0 =A0 =A0 =A0catch { >> + =A0 =A0 =A0 =A0 =A0case e =3D> >> + =A0 =A0 =A0 =A0 =A0 =A0//TODO: handle response in ProducerResponse >> + =A0 =A0 =A0 =A0 =A0 =A0error("Error processing " + requestHandlerName = + " on " + >> d.topic + ":" + partition, e) >> + =A0 =A0 =A0 =A0 =A0 =A0e match { >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0case _: IOException =3D> >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0fatal("Halting due to unrecoverable I/O= error while >> handling producer request: " + e.getMessage, e) >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0 =A0Runtime.getRuntime.halt(1) >> + =A0 =A0 =A0 =A0 =A0 =A0 =A0case _ =3D> >> + =A0 =A0 =A0 =A0 =A0 =A0} >> + =A0 =A0 =A0 =A0 =A0//throw e >> + =A0 =A0 =A0 =A0} >> + =A0 =A0 =A0}) >> + =A0 =A0//None >> + =A0 =A0}) >> + =A0 =A0if (request.requiredAcks =3D=3D 0) >> + =A0 =A0 =A0None >> + =A0 =A0else >> + =A0 =A0 =A0None //TODO: send when KAFKA-49 can receive this Some(new >> ProducerResponse(request.versionId, request.correlationId, errors, offse= ts)) >> =A0 } >> >> =A0 def handleFetchRequest(request: Receive): Option[Send] =3D { >> >> Modified: >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/= scala/kafka/utils/Utils.scala?rev=3D1296577&r1=3D1296576&r2=3D1296577&view= =3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -195,6 +195,9 @@ object Utils extends Logging { >> =A0 def getInt(props: Properties, name: String, default: Int): Int =3D >> =A0 =A0 getIntInRange(props, name, default, (Int.MinValue, Int.MaxValue)= ) >> >> + =A0def getShort(props: Properties, name: String, default: Short): Shor= t =3D >> + =A0 =A0getShortInRange(props, name, default, (Short.MinValue, >> Short.MaxValue)) >> + >> =A0 /** >> =A0 =A0* Read an integer from the properties instance. Throw an exceptio= n >> =A0 =A0* if the value is not in the given range (inclusive) >> @@ -217,6 +220,18 @@ object Utils extends Logging { >> =A0 =A0 =A0 v >> =A0 } >> >> + def getShortInRange(props: Properties, name: String, default: Short, >> range: (Short, Short)): Short =3D { >> + =A0 =A0val v =3D >> + =A0 =A0 =A0if(props.containsKey(name)) >> + =A0 =A0 =A0 =A0props.getProperty(name).toShort >> + =A0 =A0 =A0else >> + =A0 =A0 =A0 =A0default >> + =A0 =A0if(v < range._1 || v > range._2) >> + =A0 =A0 =A0throw new IllegalArgumentException(name + " has value " + v= + " >> which is not in the range " + range + ".") >> + =A0 =A0else >> + =A0 =A0 =A0v >> + =A0} >> + >> =A0 def getIntInRange(buffer: ByteBuffer, name: String, range: (Int, Int= )): >> Int =3D { >> =A0 =A0 val value =3D buffer.getInt >> =A0 =A0 if(value < range._1 || value > range._2) >> @@ -777,4 +792,4 @@ class SnapshotStats(private val monitorD >> >> =A0 =A0 def durationMs: Double =3D (end.get - start) / (1000.0 * 1000.0) >> =A0 } >> -} >> +} >> \ No newline at end of file >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/Byte= BufferMessageSetTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/= scala/unit/kafka/message/ByteBufferMessageSetTest.scala?rev=3D1296577&r1=3D= 1296576&r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/Byte= BufferMessageSetTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/message/Byte= BufferMessageSetTest.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -33,7 +33,7 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 // create a ByteBufferMessageSet that doesn't contain a full mes= sage >> =A0 =A0 // iterating it should get an InvalidMessageSizeException >> =A0 =A0 val messages =3D new ByteBufferMessageSet(NoCompressionCodec, ne= w >> Message("01234567890123456789".getBytes())) >> - =A0 =A0val buffer =3D messages.serialized.slice >> + =A0 =A0val buffer =3D messages.getSerialized().slice >> =A0 =A0 buffer.limit(10) >> =A0 =A0 val messageSetWithNoFullMessage =3D new ByteBufferMessageSet(buf= fer =3D >> buffer, initialOffset =3D 1000) >> =A0 =A0 try { >> @@ -51,7 +51,7 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 { >> =A0 =A0 =A0 val messages =3D new ByteBufferMessageSet(NoCompressionCodec= , new >> Message("hello".getBytes()), new Message("there".getBytes())) >> =A0 =A0 =A0 val buffer =3D ByteBuffer.allocate(messages.sizeInBytes.toIn= t + 2) >> - =A0 =A0 =A0buffer.put(messages.serialized) >> + =A0 =A0 =A0buffer.put(messages.getSerialized()) >> =A0 =A0 =A0 buffer.putShort(4) >> =A0 =A0 =A0 val messagesPlus =3D new ByteBufferMessageSet(buffer) >> =A0 =A0 =A0 assertEquals("Adding invalid bytes shouldn't change byte cou= nt", >> messages.validBytes, messagesPlus.validBytes) >> @@ -93,7 +93,7 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 =A0 //make sure ByteBufferMessageSet is re-iterable. >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(messageSet.iterator)) >> =A0 =A0 =A0 //make sure the last offset after iteration is correct >> - =A0 =A0 =A0assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.serialized.limit) >> + =A0 =A0 =A0assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.getSerialized().limit) >> =A0 =A0 } >> >> =A0 =A0 // test for compressed regular messages >> @@ -103,7 +103,7 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 =A0 //make sure ByteBufferMessageSet is re-iterable. >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(messageSet.iterator)) >> =A0 =A0 =A0 //make sure the last offset after iteration is correct >> - =A0 =A0 =A0assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.serialized.limit) >> + =A0 =A0 =A0assertEquals("offset of last message not expected", >> messageSet.last.offset, messageSet.getSerialized().limit) >> =A0 =A0 } >> >> =A0 =A0 // test for mixed empty and non-empty messagesets uncompressed >> @@ -111,16 +111,16 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 =A0 val emptyMessageList : List[Message] =3D Nil >> =A0 =A0 =A0 val emptyMessageSet =3D new ByteBufferMessageSet(NoCompressi= onCodec, >> emptyMessageList: _*) >> =A0 =A0 =A0 val regularMessgeSet =3D new ByteBufferMessageSet(NoCompress= ionCodec, >> messageList: _*) >> - =A0 =A0 =A0val buffer =3D ByteBuffer.allocate(emptyMessageSet.serializ= ed.limit + >> regularMessgeSet.serialized.limit) >> - =A0 =A0 =A0buffer.put(emptyMessageSet.serialized) >> - =A0 =A0 =A0buffer.put(regularMessgeSet.serialized) >> + =A0 =A0 =A0val buffer =3D >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + >> regularMessgeSet.getSerialized().limit) >> + =A0 =A0 =A0buffer.put(emptyMessageSet.getSerialized()) >> + =A0 =A0 =A0buffer.put(regularMessgeSet.getSerialized()) >> =A0 =A0 =A0 buffer.rewind >> =A0 =A0 =A0 val mixedMessageSet =3D new ByteBufferMessageSet(buffer, 0, = 0) >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> =A0 =A0 =A0 //make sure ByteBufferMessageSet is re-iterable. >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> =A0 =A0 =A0 //make sure the last offset after iteration is correct >> - =A0 =A0 =A0assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) >> + =A0 =A0 =A0assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) >> =A0 =A0 } >> >> =A0 =A0 // test for mixed empty and non-empty messagesets compressed >> @@ -128,16 +128,16 @@ class ByteBufferMessageSetTest extends B >> =A0 =A0 =A0 val emptyMessageList : List[Message] =3D Nil >> =A0 =A0 =A0 val emptyMessageSet =3D new >> ByteBufferMessageSet(DefaultCompressionCodec, emptyMessageList: _*) >> =A0 =A0 =A0 val regularMessgeSet =3D new >> ByteBufferMessageSet(DefaultCompressionCodec, messageList: _*) >> - =A0 =A0 =A0val buffer =3D ByteBuffer.allocate(emptyMessageSet.serializ= ed.limit + >> regularMessgeSet.serialized.limit) >> - =A0 =A0 =A0buffer.put(emptyMessageSet.serialized) >> - =A0 =A0 =A0buffer.put(regularMessgeSet.serialized) >> + =A0 =A0 =A0val buffer =3D >> ByteBuffer.allocate(emptyMessageSet.getSerialized().limit + >> regularMessgeSet.getSerialized().limit) >> + =A0 =A0 =A0buffer.put(emptyMessageSet.getSerialized()) >> + =A0 =A0 =A0buffer.put(regularMessgeSet.getSerialized()) >> =A0 =A0 =A0 buffer.rewind >> =A0 =A0 =A0 val mixedMessageSet =3D new ByteBufferMessageSet(buffer, 0, = 0) >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> =A0 =A0 =A0 //make sure ByteBufferMessageSet is re-iterable. >> =A0 =A0 =A0 TestUtils.checkEquals[Message](messageList.iterator, >> TestUtils.getMessageIterator(mixedMessageSet.iterator)) >> =A0 =A0 =A0 //make sure the last offset after iteration is correct >> - =A0 =A0 =A0assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.serialized.limit) >> + =A0 =A0 =A0assertEquals("offset of last message not expected", >> mixedMessageSet.last.offset, mixedMessageSet.getSerialized().limit) >> =A0 =A0 } >> =A0 } >> >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Asy= ncProducerTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/= scala/unit/kafka/producer/AsyncProducerTest.scala?rev=3D1296577&r1=3D129657= 6&r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Asy= ncProducerTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Asy= ncProducerTest.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -381,11 +381,12 @@ class AsyncProducerTest extends JUnit3Su >> =A0 =A0 val mockSyncProducer =3D EasyMock.createMock(classOf[SyncProduce= r]) >> =A0 =A0 mockSyncProducer.send(new TopicMetadataRequest(List(topic))) >> =A0 =A0 EasyMock.expectLastCall().andReturn(List(topic1Metadata)) >> - =A0 =A0mockSyncProducer.multiSend(EasyMock.aryEq(Array(new >> ProducerRequest(topic, 0, messagesToSet(msgs.take(5)))))) >> + =A0 =A0mockSyncProducer.send(TestUtils.produceRequest(topic, 0, >> + =A0 =A0messagesToSet(msgs.take(5)))) >> =A0 =A0 EasyMock.expectLastCall >> - =A0 =A0mockSyncProducer.multiSend(EasyMock.aryEq(Array(new >> ProducerRequest(topic, 0, messagesToSet(msgs.takeRight(5)))))) >> - =A0 =A0EasyMock.expectLastCall >> - =A0 =A0EasyMock.replay(mockSyncProducer) >> + =A0 =A0mockSyncProducer.send(TestUtils.produceRequest(topic, 0, >> + =A0 =A0messagesToSet(msgs.takeRight(5)))) >> + =A0 =A0 =A0 EasyMock.replay(mockSyncProducer) >> >> =A0 =A0 val producerPool =3D EasyMock.createMock(classOf[ProducerPool]) >> =A0 =A0 producerPool.getZkClient >> @@ -495,10 +496,7 @@ class AsyncProducerTest extends JUnit3Su >> =A0 } >> >> =A0 class MockProducer(override val config: SyncProducerConfig) extends >> SyncProducer(config) { >> - =A0 =A0override def send(topic: String, messages: ByteBufferMessageSet= ): >> Unit =3D { >> - =A0 =A0 =A0Thread.sleep(1000) >> - =A0 =A0} >> - =A0 =A0override def multiSend(produces: Array[ProducerRequest]) { >> + =A0 =A0override def send(produceRequest: ProducerRequest): Unit =3D { >> =A0 =A0 =A0 Thread.sleep(1000) >> =A0 =A0 } >> =A0 } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Syn= cProducerTest.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/= scala/unit/kafka/producer/SyncProducerTest.scala?rev=3D1296577&r1=3D1296576= &r2=3D1296577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Syn= cProducerTest.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/producer/Syn= cProducerTest.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -44,7 +44,7 @@ class SyncProducerTest extends JUnit3Sui >> =A0 =A0 var failed =3D false >> =A0 =A0 val firstStart =3D SystemTime.milliseconds >> =A0 =A0 try { >> - =A0 =A0 =A0producer.send("test", 0, new ByteBufferMessageSet(compressi= onCodec >> =3D NoCompressionCodec, messages =3D new Message(messageBytes))) >> + =A0 =A0 =A0producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec =3D NoCompressionCodec, messages = =3D new >> Message(messageBytes)))) >> =A0 =A0 }catch { >> =A0 =A0 =A0 case e: Exception =3D> failed=3Dtrue >> =A0 =A0 } >> @@ -54,7 +54,7 @@ class SyncProducerTest extends JUnit3Sui >> =A0 =A0 Assert.assertTrue((firstEnd-firstStart) < 500) >> =A0 =A0 val secondStart =3D SystemTime.milliseconds >> =A0 =A0 try { >> - =A0 =A0 =A0producer.send("test", 0, new ByteBufferMessageSet(compressi= onCodec >> =3D NoCompressionCodec, messages =3D new Message(messageBytes))) >> + =A0 =A0 =A0producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec =3D NoCompressionCodec, messages = =3D new >> Message(messageBytes)))) >> =A0 =A0 }catch { >> =A0 =A0 =A0 case e: Exception =3D> failed =3D true >> =A0 =A0 } >> @@ -63,7 +63,7 @@ class SyncProducerTest extends JUnit3Sui >> =A0 =A0 Assert.assertTrue((secondEnd-secondStart) < 500) >> >> =A0 =A0 try { >> - =A0 =A0 =A0producer.multiSend(Array(new ProducerRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec =3D NoCompressionCodec, messages = =3D new >> Message(messageBytes))))) >> + =A0 =A0 =A0producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec =3D NoCompressionCodec, messages = =3D new >> Message(messageBytes)))) >> =A0 =A0 }catch { >> =A0 =A0 =A0 case e: Exception =3D> failed=3Dtrue >> =A0 =A0 } >> @@ -83,7 +83,7 @@ class SyncProducerTest extends JUnit3Sui >> =A0 =A0 val bytes =3D new Array[Byte](101) >> =A0 =A0 var failed =3D false >> =A0 =A0 try { >> - =A0 =A0 =A0producer.send("test", 0, new ByteBufferMessageSet(compressi= onCodec >> =3D NoCompressionCodec, messages =3D new Message(bytes))) >> + =A0 =A0 =A0producer.send(TestUtils.produceRequest("test", 0, new >> ByteBufferMessageSet(compressionCodec =3D NoCompressionCodec, messages = =3D new >> Message(bytes)))) >> =A0 =A0 }catch { >> =A0 =A0 =A0 case e: MessageSizeTooLargeException =3D> failed =3D true >> =A0 =A0 } >> >> Modified: >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUt= ils.scala >> URL: >> http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/= scala/unit/kafka/utils/TestUtils.scala?rev=3D1296577&r1=3D1296576&r2=3D1296= 577&view=3Ddiff >> >> =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D=3D= =3D=3D=3D=3D >> --- >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUt= ils.scala >> (original) >> +++ >> incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUt= ils.scala >> Sat Mar =A03 05:46:43 2012 >> @@ -33,6 +33,7 @@ import collection.mutable.ListBuffer >> =A0import kafka.consumer.{KafkaMessageStream, ConsumerConfig} >> =A0import scala.collection.Map >> =A0import kafka.serializer.Encoder >> +import kafka.api.{ProducerRequest, TopicData, PartitionData} >> >> =A0/** >> =A0* Utility functions to help with testing >> @@ -336,7 +337,47 @@ object TestUtils { >> =A0 =A0 =A0 buffer +=3D ("msg" + i) >> =A0 =A0 buffer >> =A0 } >> + =A0/** >> + =A0 * Create a wired format request based on simple basic information >> + =A0 */ >> + =A0def produceRequest(topic: String, message: ByteBufferMessageSet): >> kafka.api.ProducerRequest =3D { >> + >> =A0produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,Producer= Request.RandomPartition,message) >> + =A0} >> + =A0def produceRequest(topic: String, partition: Int, message: >> ByteBufferMessageSet): kafka.api.ProducerRequest =3D { >> + >> =A0produceRequest(SyncProducerConfig.DefaultCorrelationId,topic,partitio= n,message) >> + =A0} >> + >> + =A0def produceRequest(correlationId: Int, topic: String, partition: In= t, >> message: ByteBufferMessageSet): kafka.api.ProducerRequest =3D { >> + =A0 =A0val clientId =3D SyncProducerConfig.DefaultClientId >> + =A0 =A0val requiredAcks: Short =3D SyncProducerConfig.DefaultRequiredA= cks >> + =A0 =A0val ackTimeout =3D SyncProducerConfig.DefaultAckTimeoutMs >> + =A0 =A0var data =3D new Array[TopicData](1) >> + =A0 =A0var partitionData =3D new Array[PartitionData](1) >> + =A0 =A0partitionData(0) =3D new PartitionData(partition,message) >> + =A0 =A0data(0) =3D new TopicData(topic,partitionData) >> + =A0 =A0val pr =3D new kafka.api.ProducerRequest(correlationId, clientI= d, >> requiredAcks, ackTimeout, data) >> + =A0 =A0pr >> + =A0} >> >> + =A0def produceJavaRequest(topic: String, message: >> kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerReque= st >> =3D { >> + =A0 =A0produceJavaRequest(-1,topic,-1,message) >> + =A0} >> + >> + =A0def produceJavaRequest(topic: String, partition: Int, message: >> kafka.javaapi.message.ByteBufferMessageSet): kafka.javaapi.ProducerReque= st >> =3D { >> + =A0 =A0produceJavaRequest(-1,topic,partition,message) >> + =A0} >> + >> + =A0def produceJavaRequest(correlationId: Int, topic: String, partition= : >> Int, message: kafka.javaapi.message.ByteBufferMessageSet): >> kafka.javaapi.ProducerRequest =3D { >> + =A0 =A0val clientId =3D "test" >> + =A0 =A0val requiredAcks: Short =3D 0 >> + =A0 =A0val ackTimeout =3D 0 >> + =A0 =A0var data =3D new Array[TopicData](1) >> + =A0 =A0var partitionData =3D new Array[PartitionData](1) >> + =A0 =A0partitionData(0) =3D new PartitionData(partition,message.underl= ying) >> + =A0 =A0data(0) =3D new TopicData(topic,partitionData) >> + =A0 =A0val pr =3D new kafka.javaapi.ProducerRequest(correlationId, cli= entId, >> requiredAcks, ackTimeout, data) >> + =A0 =A0pr >> + =A0} >> =A0} >> >> =A0object TestZKUtils { >> >> >>