Return-Path: X-Original-To: apmail-incubator-kafka-commits-archive@minotaur.apache.org Delivered-To: apmail-incubator-kafka-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 1E60972DC for ; Mon, 1 Aug 2011 23:43:30 +0000 (UTC) Received: (qmail 81216 invoked by uid 500); 1 Aug 2011 23:43:30 -0000 Delivered-To: apmail-incubator-kafka-commits-archive@incubator.apache.org Received: (qmail 81198 invoked by uid 500); 1 Aug 2011 23:43:30 -0000 Mailing-List: contact kafka-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: kafka-dev@incubator.apache.org Delivered-To: mailing list kafka-commits@incubator.apache.org Received: (qmail 81191 invoked by uid 99); 1 Aug 2011 23:43:30 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 23:43:30 +0000 X-ASF-Spam-Status: No, hits=-1998.9 required=5.0 tests=ALL_TRUSTED,FRT_OFFER2 X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2011 23:43:14 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6E75F2388AF0; Mon, 1 Aug 2011 23:42:27 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit Subject: svn commit: r1152970 [11/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr... Date: Mon, 01 Aug 2011 23:42:17 -0000 To: kafka-commits@incubator.apache.org From: jkreps@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20110801234227.6E75F2388AF0@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/OffsetRequest.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,90 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Kafka.Client.Util; + +namespace Kafka.Client.Request +{ + /// + /// Constructs a request to send to Kafka. + /// + public class OffsetRequest : AbstractRequest + { + /// + /// The latest time constant. + /// + public static readonly long LatestTime = -1L; + + /// + /// The earliest time constant. + /// + public static readonly long EarliestTime = -2L; + + /// + /// Initializes a new instance of the OffsetRequest class. + /// + public OffsetRequest() + { + } + + /// + /// Initializes a new instance of the OffsetRequest class. + /// + /// The topic to publish to. + /// The partition to publish to. + /// The time from which to request offsets. + /// The maximum amount of offsets to return. + public OffsetRequest(string topic, int partition, long time, int maxOffsets) + { + Topic = topic; + Partition = partition; + Time = time; + MaxOffsets = maxOffsets; + } + + /// + /// Gets the time. + /// + public long Time { get; private set; } + + /// + /// Gets the maximum number of offsets to return. + /// + public int MaxOffsets { get; private set; } + + /// + /// Determines if the request has valid settings. + /// + /// True if valid and false otherwise. + public override bool IsValid() + { + return !string.IsNullOrWhiteSpace(Topic); + } + + /// + /// Converts the request to an array of bytes that is expected by Kafka. + /// + /// An array of bytes that represents the request. + public override byte[] GetBytes() + { + byte[] requestBytes = BitWorks.GetBytesReversed(Convert.ToInt16((int)RequestType.Offsets)); + byte[] topicLengthBytes = BitWorks.GetBytesReversed(Convert.ToInt16(Topic.Length)); + byte[] topicBytes = Encoding.UTF8.GetBytes(Topic); + byte[] partitionBytes = BitWorks.GetBytesReversed(Partition); + byte[] timeBytes = BitWorks.GetBytesReversed(Time); + byte[] maxOffsetsBytes = BitWorks.GetBytesReversed(MaxOffsets); + + List encodedMessageSet = new List(); + encodedMessageSet.AddRange(requestBytes); + encodedMessageSet.AddRange(topicLengthBytes); + encodedMessageSet.AddRange(topicBytes); + encodedMessageSet.AddRange(partitionBytes); + encodedMessageSet.AddRange(timeBytes); + encodedMessageSet.AddRange(maxOffsetsBytes); + encodedMessageSet.InsertRange(0, BitWorks.GetBytesReversed(encodedMessageSet.Count)); + + return encodedMessageSet.ToArray(); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/ProducerRequest.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,98 @@ +using System; +using System.Collections.Generic; +using System.Text; +using Kafka.Client.Util; + +namespace Kafka.Client.Request +{ + /// + /// Constructs a request to send to Kafka. + /// + public class ProducerRequest : AbstractRequest + { + /// + /// Initializes a new instance of the ProducerRequest class. + /// + public ProducerRequest() + { + } + + /// + /// Initializes a new instance of the ProducerRequest class. + /// + /// The topic to publish to. + /// The partition to publish to. + /// The list of messages to send. + public ProducerRequest(string topic, int partition, IList messages) + { + Topic = topic; + Partition = partition; + Messages = messages; + } + + /// + /// Gets or sets the messages to publish. + /// + public IList Messages { get; set; } + + /// + /// Determines if the request has valid settings. + /// + /// True if valid and false otherwise. + public override bool IsValid() + { + return !string.IsNullOrWhiteSpace(Topic) && Messages != null && Messages.Count > 0; + } + + /// + /// Gets the bytes matching the expected Kafka structure. + /// + /// The byte array of the request. + public override byte[] GetBytes() + { + List encodedMessageSet = new List(); + encodedMessageSet.AddRange(GetInternalBytes()); + + byte[] requestBytes = BitWorks.GetBytesReversed(Convert.ToInt16((int)RequestType.Produce)); + encodedMessageSet.InsertRange(0, requestBytes); + encodedMessageSet.InsertRange(0, BitWorks.GetBytesReversed(encodedMessageSet.Count)); + + return encodedMessageSet.ToArray(); + } + + /// + /// Gets the bytes representing the request which is used when generating a multi-request. + /// + /// + /// The method is used for sending a single . + /// It prefixes this byte array with the request type and the number of messages. This method + /// is used to supply the with the contents for its message. + /// + /// The bytes that represent this . + internal byte[] GetInternalBytes() + { + List messagePack = new List(); + foreach (Message message in Messages) + { + byte[] messageBytes = message.GetBytes(); + messagePack.AddRange(BitWorks.GetBytesReversed(messageBytes.Length)); + messagePack.AddRange(messageBytes); + } + + byte[] topicLengthBytes = BitWorks.GetBytesReversed(Convert.ToInt16(Topic.Length)); + byte[] topicBytes = Encoding.UTF8.GetBytes(Topic); + byte[] partitionBytes = BitWorks.GetBytesReversed(Partition); + byte[] messagePackLengthBytes = BitWorks.GetBytesReversed(messagePack.Count); + byte[] messagePackBytes = messagePack.ToArray(); + + List encodedMessageSet = new List(); + encodedMessageSet.AddRange(topicLengthBytes); + encodedMessageSet.AddRange(topicBytes); + encodedMessageSet.AddRange(partitionBytes); + encodedMessageSet.AddRange(messagePackLengthBytes); + encodedMessageSet.AddRange(messagePackBytes); + + return encodedMessageSet.ToArray(); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestContext.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,36 @@ +using System.Net.Sockets; + +namespace Kafka.Client +{ + /// + /// The context of a request made to Kafka. + /// + /// + /// Must be of type and represents the type of request + /// sent to Kafka. + /// + public class RequestContext where T : AbstractRequest + { + /// + /// Initializes a new instance of the RequestContext class. + /// + /// The network stream that sent the message. + /// The request sent over the stream. + public RequestContext(NetworkStream networkStream, T request) + { + NetworkStream = networkStream; + Request = request; + } + + /// + /// Gets the instance of the request. + /// + public NetworkStream NetworkStream { get; private set; } + + /// + /// Gets the or object + /// associated with the . + /// + public T Request { get; private set; } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestType.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestType.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestType.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/RequestType.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,36 @@ +namespace Kafka.Client +{ + /// + /// Requests types for Kafka + /// + /// + /// Many of these are not in play yet. + /// + public enum RequestType + { + /// + /// Produce a message. + /// + Produce = 0, + + /// + /// Fetch a message. + /// + Fetch = 1, + + /// + /// Multi-fetch messages. + /// + MultiFetch = 2, + + /// + /// Multi-produce messages. + /// + MultiProduce = 3, + + /// + /// Gets offsets. + /// + Offsets = 4 + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/BitWorks.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,69 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; + +namespace Kafka.Client.Util +{ + /// + /// Utilty class for managing bits and bytes. + /// + public class BitWorks + { + /// + /// Converts the value to bytes and reverses them. + /// + /// The value to convert to bytes. + /// Bytes representing the value. + public static byte[] GetBytesReversed(short value) + { + return ReverseBytes(BitConverter.GetBytes(value)); + } + + /// + /// Converts the value to bytes and reverses them. + /// + /// The value to convert to bytes. + /// Bytes representing the value. + public static byte[] GetBytesReversed(int value) + { + return ReverseBytes(BitConverter.GetBytes(value)); + } + + /// + /// Converts the value to bytes and reverses them. + /// + /// The value to convert to bytes. + /// Bytes representing the value. + public static byte[] GetBytesReversed(long value) + { + return ReverseBytes(BitConverter.GetBytes(value)); + } + + /// + /// Reverse the position of an array of bytes. + /// + /// + /// The array to reverse. If null or zero-length then the returned array will be null. + /// + /// The reversed array. + public static byte[] ReverseBytes(byte[] inArray) + { + if (inArray != null && inArray.Length > 0) + { + int highCtr = inArray.Length - 1; + byte temp; + + for (int ctr = 0; ctr < inArray.Length / 2; ctr++) + { + temp = inArray[ctr]; + inArray[ctr] = inArray[highCtr]; + inArray[highCtr] = temp; + highCtr -= 1; + } + } + + return inArray; + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Util/Crc32.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,115 @@ +// +using System; +using System.Security.Cryptography; + +namespace Kafka.Client.Util +{ + /// + /// From http://damieng.com/blog/2006/08/08/calculating_crc32_in_c_and_net + /// + public class Crc32 : HashAlgorithm + { + public const UInt32 DefaultPolynomial = 0xedb88320; + public const UInt32 DefaultSeed = 0xffffffff; + + private UInt32 hash; + private UInt32 seed; + private UInt32[] table; + private static UInt32[] defaultTable; + + public Crc32() + { + table = InitializeTable(DefaultPolynomial); + seed = DefaultSeed; + Initialize(); + } + + public Crc32(UInt32 polynomial, UInt32 seed) + { + table = InitializeTable(polynomial); + this.seed = seed; + Initialize(); + } + + public override void Initialize() + { + hash = seed; + } + + protected override void HashCore(byte[] buffer, int start, int length) + { + hash = CalculateHash(table, hash, buffer, start, length); + } + + protected override byte[] HashFinal() + { + byte[] hashBuffer = UInt32ToBigEndianBytes(~hash); + this.HashValue = hashBuffer; + return hashBuffer; + } + + public override int HashSize + { + get { return 32; } + } + + public static UInt32 Compute(byte[] buffer) + { + return ~CalculateHash(InitializeTable(DefaultPolynomial), DefaultSeed, buffer, 0, buffer.Length); + } + + public static UInt32 Compute(UInt32 seed, byte[] buffer) + { + return ~CalculateHash(InitializeTable(DefaultPolynomial), seed, buffer, 0, buffer.Length); + } + + public static UInt32 Compute(UInt32 polynomial, UInt32 seed, byte[] buffer) + { + return ~CalculateHash(InitializeTable(polynomial), seed, buffer, 0, buffer.Length); + } + + private static UInt32[] InitializeTable(UInt32 polynomial) + { + if (polynomial == DefaultPolynomial && defaultTable != null) + return defaultTable; + + UInt32[] createTable = new UInt32[256]; + for (int i = 0; i < 256; i++) + { + UInt32 entry = (UInt32)i; + for (int j = 0; j < 8; j++) + if ((entry & 1) == 1) + entry = (entry >> 1) ^ polynomial; + else + entry = entry >> 1; + createTable[i] = entry; + } + + if (polynomial == DefaultPolynomial) + defaultTable = createTable; + + return createTable; + } + + private static UInt32 CalculateHash(UInt32[] table, UInt32 seed, byte[] buffer, int start, int size) + { + UInt32 crc = seed; + for (int i = start; i < size; i++) + unchecked + { + crc = (crc >> 8) ^ table[buffer[i] ^ crc & 0xff]; + } + return crc; + } + + private byte[] UInt32ToBigEndianBytes(UInt32 x) + { + return new byte[] { + (byte)((x >> 24) & 0xff), + (byte)((x >> 16) & 0xff), + (byte)((x >> 8) & 0xff), + (byte)(x & 0xff) + }; + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.sln Mon Aug 1 23:41:24 2011 @@ -0,0 +1,38 @@ + +Microsoft Visual Studio Solution File, Format Version 11.00 +# Visual Studio 2010 +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client", "Kafka.Client\Kafka.Client.csproj", "{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.Tests", "Tests\Kafka.Client.Tests\Kafka.Client.Tests.csproj", "{9BA1A0BF-B207-4A11-8883-5F64B113C07D}" +EndProject +Project("{2150E333-8FDC-42A3-9474-1A3956D46DE8}") = "Tests", "Tests", "{06FD20F1-CE06-430E-AF6E-2EBECE6E47B3}" +EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "Kafka.Client.IntegrationTests", "Tests\Kafka.Client.IntegrationTests\Kafka.Client.IntegrationTests.csproj", "{AF29C330-49BD-4648-B692-882E922C435B}" +EndProject +Global + GlobalSection(SolutionConfigurationPlatforms) = preSolution + Debug|Any CPU = Debug|Any CPU + Release|Any CPU = Release|Any CPU + EndGlobalSection + GlobalSection(ProjectConfigurationPlatforms) = postSolution + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Debug|Any CPU.Build.0 = Debug|Any CPU + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.ActiveCfg = Release|Any CPU + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}.Release|Any CPU.Build.0 = Release|Any CPU + {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Debug|Any CPU.Build.0 = Debug|Any CPU + {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.ActiveCfg = Release|Any CPU + {9BA1A0BF-B207-4A11-8883-5F64B113C07D}.Release|Any CPU.Build.0 = Release|Any CPU + {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {AF29C330-49BD-4648-B692-882E922C435B}.Debug|Any CPU.Build.0 = Debug|Any CPU + {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.ActiveCfg = Release|Any CPU + {AF29C330-49BD-4648-B692-882E922C435B}.Release|Any CPU.Build.0 = Release|Any CPU + EndGlobalSection + GlobalSection(SolutionProperties) = preSolution + HideSolutionNode = FALSE + EndGlobalSection + GlobalSection(NestedProjects) = preSolution + {9BA1A0BF-B207-4A11-8883-5F64B113C07D} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3} + {AF29C330-49BD-4648-B692-882E922C435B} = {06FD20F1-CE06-430E-AF6E-2EBECE6E47B3} + EndGlobalSection +EndGlobal Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Kafka.Client.IntegrationTests.csproj Mon Aug 1 23:41:24 2011 @@ -0,0 +1,64 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {AF29C330-49BD-4648-B692-882E922C435B} + Library + Properties + Kafka.Client.IntegrationTests + Kafka.Client.IntegrationTests + v4.0 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + False + ..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll + + + + + + + + + + + + + + + + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2} + Kafka.Client + + + + + \ No newline at end of file Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/KafkaIntegrationTest.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,181 @@ +using System; +using System.Collections.Generic; +using System.Text; +using System.Threading; +using Kafka.Client.Request; +using NUnit.Framework; + +namespace Kafka.Client.Tests +{ + /// + /// Contains tests that go all the way to Kafka and back. + /// + [TestFixture] + [Ignore("Requires a Kafka server running to execute")] + public class KafkaIntegrationTest + { + /// + /// Kafka server to test against. + /// + private static readonly string KafkaServer = "192.168.50.203"; + + /// + /// Port of the Kafka server to test against. + /// + private static readonly int KafkaPort = 9092; + + /// + /// Sends a pair of message to Kafka. + /// + [Test] + public void ProducerSendsMessage() + { + string payload1 = "kafka 1."; + byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1); + Message msg1 = new Message(payloadData1); + + string payload2 = "kafka 2."; + byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2); + Message msg2 = new Message(payloadData2); + + Producer producer = new Producer(KafkaServer, KafkaPort); + producer.Send("test", 0, new List { msg1, msg2 }); + } + + /// + /// Asynchronously sends a pair of message to Kafka. + /// + [Test] + public void ProducerSendsMessageAsynchronously() + { + bool waiting = true; + + List messages = GenerateRandomMessages(50); + + Producer producer = new Producer(KafkaServer, KafkaPort); + producer.SendAsync( + "test", + 0, + messages, + (requestContext) => { waiting = false; }); + + while (waiting) + { + Console.WriteLine("Keep going..."); + Thread.Sleep(10); + } + } + + /// + /// Send a multi-produce request to Kafka. + /// + [Test] + public void ProducerSendMultiRequest() + { + List requests = new List + { + new ProducerRequest("test", 0, new List { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }), + new ProducerRequest("test", 0, new List { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }), + new ProducerRequest("testa", 0, new List { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }), + new ProducerRequest("testa", 0, new List { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) }) + }; + + MultiProducerRequest request = new MultiProducerRequest(requests); + Producer producer = new Producer(KafkaServer, KafkaPort); + producer.Send(request); + } + + /// + /// Generates messages for Kafka then gets them back. + /// + [Test] + public void ConsumerFetchMessage() + { + ProducerSendsMessage(); + + Consumer consumer = new Consumer(KafkaServer, KafkaPort); + List messages = consumer.Consume("test", 0, 0); + + foreach (Message msg in messages) + { + Console.WriteLine(msg); + } + } + + /// + /// Generates multiple messages for Kafka then gets them back. + /// + [Test] + public void ConsumerMultiFetchGetsMessage() + { + ProducerSendMultiRequest(); + + Consumer consumer = new Consumer(KafkaServer, KafkaPort); + MultiFetchRequest request = new MultiFetchRequest(new List + { + new FetchRequest("test", 0, 0), + new FetchRequest("test", 0, 0), + new FetchRequest("testa", 0, 0) + }); + + List> messages = consumer.Consume(request); + + for (int ix = 0; ix < messages.Count; ix++) + { + List messageSet = messages[ix]; + Console.WriteLine(string.Format("Request #{0}-->", ix)); + foreach (Message msg in messageSet) + { + Console.WriteLine(msg); + } + } + } + + /// + /// Gets offsets from Kafka. + /// + [Test] + public void ConsumerGetsOffsets() + { + OffsetRequest request = new OffsetRequest("test", 0, DateTime.Now.AddHours(-24).Ticks, 10); + + Consumer consumer = new Consumer(KafkaServer, KafkaPort); + IList list = consumer.GetOffsetsBefore(request); + + foreach (long l in list) + { + Console.Out.WriteLine(l); + } + } + + /// + /// Gererates a randome list of messages. + /// + /// The number of messages to generate. + /// A list of random messages. + private static List GenerateRandomMessages(int numberOfMessages) + { + List messages = new List(); + for (int ix = 0; ix < numberOfMessages; ix++) + { + messages.Add(new Message(GenerateRandomBytes(10000))); + } + + return messages; + } + + /// + /// Generate a random set of bytes. + /// + /// Length of the byte array. + /// Random byte array. + private static byte[] GenerateRandomBytes(int length) + { + byte[] randBytes = new byte[length]; + Random randNum = new Random(); + randNum.NextBytes(randBytes); + + return randBytes; + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.IntegrationTests/Properties/AssemblyInfo.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Kafka.Client.IntegrationTests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Microsoft")] +[assembly: AssemblyProduct("Kafka.Client.IntegrationTests")] +[assembly: AssemblyCopyright("Copyright © Microsoft 2011")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("7b2387b7-6a58-4e8b-ae06-8aadf1a64949")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Kafka.Client.Tests.csproj Mon Aug 1 23:41:24 2011 @@ -0,0 +1,70 @@ + + + + Debug + AnyCPU + 8.0.30703 + 2.0 + {9BA1A0BF-B207-4A11-8883-5F64B113C07D} + Library + Properties + Kafka.Client.Tests + Kafka.Client.Tests + v4.0 + 512 + + + true + full + false + bin\Debug\ + DEBUG;TRACE + prompt + 4 + + + pdbonly + true + bin\Release\ + TRACE + prompt + 4 + + + + False + ..\..\..\..\lib\nunit\2.5.9\nunit.framework.dll + + + + + + + + + + + + + + + + + + + + + + {A92DD03B-EE4F-4A78-9FB2-279B6348C7D2} + Kafka.Client + + + + + \ No newline at end of file Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/MessageTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,68 @@ +using System; +using System.Linq; +using System.Text; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Tests +{ + /// + /// Tests for the class. + /// + [TestFixture] + public class MessageTests + { + /// + /// Demonstrates a properly parsed message. + /// + [Test] + public void ParseFromValid() + { + Crc32 crc32 = new Crc32(); + + string payload = "kafka"; + byte magic = 0; + byte[] payloadData = Encoding.UTF8.GetBytes(payload); + byte[] payloadSize = BitConverter.GetBytes(payloadData.Length); + byte[] checksum = crc32.ComputeHash(payloadData); + byte[] messageData = new byte[payloadData.Length + 1 + payloadSize.Length + checksum.Length]; + + Buffer.BlockCopy(payloadSize, 0, messageData, 0, payloadSize.Length); + messageData[4] = magic; + Buffer.BlockCopy(checksum, 0, messageData, payloadSize.Length + 1, checksum.Length); + Buffer.BlockCopy(payloadData, 0, messageData, payloadSize.Length + 1 + checksum.Length, payloadData.Length); + + Message message = Message.ParseFrom(messageData); + + Assert.IsNotNull(message); + Assert.AreEqual(magic, message.Magic); + Assert.IsTrue(payloadData.SequenceEqual(message.Payload)); + Assert.IsTrue(checksum.SequenceEqual(message.Checksum)); + } + + /// + /// Ensure that the bytes returned from the message are in valid kafka sequence. + /// + [Test] + public void GetBytesValidSequence() + { + Message message = new Message(new byte[10], (byte)245); + + byte[] bytes = message.GetBytes(); + + Assert.IsNotNull(bytes); + + // len(payload) + 1 + 4 + Assert.AreEqual(15, bytes.Length); + + // first 4 bytes = the magic number + Assert.AreEqual((byte)245, bytes[0]); + + // next 4 bytes = the checksum + Assert.IsTrue(message.Checksum.SequenceEqual(bytes.Skip(1).Take(4).ToArray())); + + // remaining bytes = the payload + Assert.AreEqual(10, bytes.Skip(5).ToArray().Length); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Properties/AssemblyInfo.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,36 @@ +using System.Reflection; +using System.Runtime.CompilerServices; +using System.Runtime.InteropServices; + +// General Information about an assembly is controlled through the following +// set of attributes. Change these attribute values to modify the information +// associated with an assembly. +[assembly: AssemblyTitle("Kafka.Client.Tests")] +[assembly: AssemblyDescription("")] +[assembly: AssemblyConfiguration("")] +[assembly: AssemblyCompany("Microsoft")] +[assembly: AssemblyProduct("Kafka.Client.Tests")] +[assembly: AssemblyCopyright("Copyright © Microsoft 2011")] +[assembly: AssemblyTrademark("")] +[assembly: AssemblyCulture("")] + +// Setting ComVisible to false makes the types in this assembly not visible +// to COM components. If you need to access a type in this assembly from +// COM, set the ComVisible attribute to true on that type. +[assembly: ComVisible(false)] + +// The following GUID is for the ID of the typelib if this project is exposed to COM +[assembly: Guid("bf361ee0-5cbb-4fd6-bded-67bedcb603b8")] + +// Version information for an assembly consists of the following four values: +// +// Major Version +// Minor Version +// Build Number +// Revision +// +// You can specify all the values or you can default the Build and Revision Numbers +// by using the '*' as shown below: +// [assembly: AssemblyVersion("1.0.*")] +[assembly: AssemblyVersion("1.0.0.0")] +[assembly: AssemblyFileVersion("1.0.0.0")] Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/FetchRequestTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,86 @@ +using System; +using System.Linq; +using System.Text; +using Kafka.Client.Request; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Request.Tests +{ + /// + /// Tests for the class. + /// + [TestFixture] + public class FetchRequestTests + { + /// + /// Tests a valid request. + /// + [Test] + public void IsValidTrue() + { + FetchRequest request = new FetchRequest("topic", 1, 10L, 100); + Assert.IsTrue(request.IsValid()); + } + + /// + /// Tests a invalid request with no topic. + /// + [Test] + public void IsValidNoTopic() + { + FetchRequest request = new FetchRequest(" ", 1, 10L, 100); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Tests a invalid request with no topic. + /// + [Test] + public void IsValidNulltopic() + { + FetchRequest request = new FetchRequest(null, 1, 10L, 100); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Tests to ensure that the request follows the expected structure. + /// + [Test] + public void GetBytesValidStructure() + { + string topicName = "topic"; + FetchRequest request = new FetchRequest(topicName, 1, 10L, 100); + + // REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE + int requestSize = 2 + 2 + topicName.Length + 4 + 8 + 4; + + byte[] bytes = request.GetBytes(); + Assert.IsNotNull(bytes); + + // add 4 bytes for the length of the message at the beginning + Assert.AreEqual(requestSize + 4, bytes.Length); + + // first 4 bytes = the message length + Assert.AreEqual(25, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0)); + + // next 2 bytes = the request type + Assert.AreEqual((short)RequestType.Fetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0)); + + // next 2 bytes = the topic length + Assert.AreEqual((short)topicName.Length, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0)); + + // next few bytes = the topic + Assert.AreEqual(topicName, Encoding.ASCII.GetString(bytes.Skip(8).Take(topicName.Length).ToArray())); + + // next 4 bytes = the partition + Assert.AreEqual(1, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(8 + topicName.Length).Take(4).ToArray()), 0)); + + // next 8 bytes = the offset + Assert.AreEqual(10, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(12 + topicName.Length).Take(8).ToArray()), 0)); + + // last 4 bytes = the max size + Assert.AreEqual(100, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(20 + +topicName.Length).Take(4).ToArray()), 0)); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiFetchRequestTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Kafka.Client.Request; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Request.Tests +{ + /// + /// Tests for the class. + /// + [TestFixture] + public class MultiFetchRequestTests + { + /// + /// Tests a valid multi-consumer request. + /// + [Test] + public void IsValidTrue() + { + List requests = new List + { + new FetchRequest("topic a", 0, 0), + new FetchRequest("topic a", 0, 0), + new FetchRequest("topic b", 0, 0), + new FetchRequest("topic c", 0, 0) + }; + + MultiFetchRequest multiRequest = new MultiFetchRequest(requests); + Assert.IsTrue(multiRequest.IsValid()); + } + + /// + /// Tests for an invalid multi-request with no requests provided. + /// + [Test] + public void IsValidNoRequests() + { + MultiFetchRequest multiRequest = new MultiFetchRequest(new List()); + Assert.IsFalse(multiRequest.IsValid()); + } + + /// + /// Tests for an invalid multi-request with no requests provided. + /// + [Test] + public void IsValidNullRequests() + { + MultiFetchRequest multiRequest = new MultiFetchRequest(null); + Assert.IsFalse(multiRequest.IsValid()); + } + + /// + /// Test to ensure a valid format in the returned byte array as expected by Kafka. + /// + [Test] + public void GetBytesValidFormat() + { + List requests = new List + { + new FetchRequest("topic a", 0, 0), + new FetchRequest("topic a", 0, 0), + new FetchRequest("topic b", 0, 0), + new FetchRequest("topic c", 0, 0) + }; + + MultiFetchRequest request = new MultiFetchRequest(requests); + + // format = len(request) + requesttype + requestcount + requestpackage + // total byte count = 4 + (2 + 2 + 100) + byte[] bytes = request.GetBytes(); + Assert.IsNotNull(bytes); + Assert.AreEqual(108, bytes.Length); + + // first 4 bytes = the length of the request + Assert.AreEqual(104, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0)); + + // next 2 bytes = the RequestType which in this case should be Produce + Assert.AreEqual((short)RequestType.MultiFetch, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0)); + + // next 2 bytes = the number of messages + Assert.AreEqual((short)4, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0)); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/MultiProducerRequestTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using Kafka.Client.Request; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Request.Tests +{ + /// + /// Tests for the class. + /// + [TestFixture] + public class MultiProducerRequestTests + { + /// + /// Tests a valid multi-producer request. + /// + [Test] + public void IsValidTrue() + { + List requests = new List + { + new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic b", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic c", 0, new List { new Message(new byte[10]) }) + }; + + MultiProducerRequest multiRequest = new MultiProducerRequest(requests); + Assert.IsTrue(multiRequest.IsValid()); + } + + /// + /// Tests for an invalid multi-request with no requests provided. + /// + [Test] + public void IsValidNoRequests() + { + MultiProducerRequest multiRequest = new MultiProducerRequest(new List()); + Assert.IsFalse(multiRequest.IsValid()); + } + + /// + /// Tests for an invalid multi-request with no requests provided. + /// + [Test] + public void IsValidNullRequests() + { + MultiProducerRequest multiRequest = new MultiProducerRequest(null); + Assert.IsFalse(multiRequest.IsValid()); + } + + /// + /// Test to ensure a valid format in the returned byte array as expected by Kafka. + /// + [Test] + public void GetBytesValidFormat() + { + List requests = new List + { + new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic a", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic b", 0, new List { new Message(new byte[10]) }), + new ProducerRequest("topic c", 0, new List { new Message(new byte[10]) }) + }; + + MultiProducerRequest request = new MultiProducerRequest(requests); + + // format = len(request) + requesttype + requestcount + requestpackage + // total byte count = 4 + (2 + 2 + 144) + byte[] bytes = request.GetBytes(); + Assert.IsNotNull(bytes); + Assert.AreEqual(152, bytes.Length); + + // first 4 bytes = the length of the request + Assert.AreEqual(148, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0)); + + // next 2 bytes = the RequestType which in this case should be Produce + Assert.AreEqual((short)RequestType.MultiProduce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0)); + + // next 2 bytes = the number of messages + Assert.AreEqual((short)4, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0)); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/OffsetRequestTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,83 @@ +using System; +using System.Linq; +using System.Text; +using Kafka.Client.Request; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Request.Tests +{ + /// + /// Tests the class. + /// + [TestFixture] + public class OffsetRequestTests + { + /// + /// Tests a valid request. + /// + [Test] + public void IsValidTrue() + { + FetchRequest request = new FetchRequest("topic", 1, 10L, 100); + Assert.IsTrue(request.IsValid()); + } + + /// + /// Tests a invalid request with no topic. + /// + [Test] + public void IsValidNoTopic() + { + FetchRequest request = new FetchRequest(" ", 1, 10L, 100); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Tests a invalid request with no topic. + /// + [Test] + public void IsValidNulltopic() + { + FetchRequest request = new FetchRequest(null, 1, 10L, 100); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Validates the list of bytes meet Kafka expectations. + /// + [Test] + public void GetBytesValid() + { + string topicName = "topic"; + OffsetRequest request = new OffsetRequest(topicName, 0, OffsetRequest.LatestTime, 10); + + // format = len(request) + requesttype + len(topic) + topic + partition + time + max + // total byte count = 4 + (2 + 2 + 5 + 4 + 8 + 4) + byte[] bytes = request.GetBytes(); + Assert.IsNotNull(bytes); + Assert.AreEqual(29, bytes.Length); + + // first 4 bytes = the length of the request + Assert.AreEqual(25, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0)); + + // next 2 bytes = the RequestType which in this case should be Produce + Assert.AreEqual((short)RequestType.Offsets, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0)); + + // next 2 bytes = the length of the topic + Assert.AreEqual((short)5, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0)); + + // next 5 bytes = the topic + Assert.AreEqual(topicName, Encoding.ASCII.GetString(bytes.Skip(8).Take(5).ToArray())); + + // next 4 bytes = the partition + Assert.AreEqual(0, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(13).Take(4).ToArray()), 0)); + + // next 8 bytes = time + Assert.AreEqual(OffsetRequest.LatestTime, BitConverter.ToInt64(BitWorks.ReverseBytes(bytes.Skip(17).Take(8).ToArray()), 0)); + + // next 4 bytes = max offsets + Assert.AreEqual(10, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(25).Take(4).ToArray()), 0)); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Request/ProducerRequestTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,86 @@ +using System; +using System.Collections.Generic; +using System.Linq; +using System.Text; +using Kafka.Client.Request; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Request.Tests +{ + /// + /// Tests for the class. + /// + [TestFixture] + public class ProducerRequestTests + { + /// + /// Tests a valid producer request. + /// + [Test] + public void IsValidTrue() + { + ProducerRequest request = new ProducerRequest( + "topic", 0, new List { new Message(new byte[10]) }); + Assert.IsTrue(request.IsValid()); + } + + /// + /// Tests a invalid producer request with no topic. + /// + [Test] + public void IsValidFalseNoTopic() + { + ProducerRequest request = new ProducerRequest(null, 0, null); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Tests a invalid producer request with no messages to send. + /// + [Test] + public void IsValidFalseNoMessages() + { + ProducerRequest request = new ProducerRequest("topic", 0, null); + Assert.IsFalse(request.IsValid()); + } + + /// + /// Test to ensure a valid format in the returned byte array as expected by Kafka. + /// + [Test] + public void GetBytesValidFormat() + { + string topicName = "topic"; + ProducerRequest request = new ProducerRequest( + topicName, 0, new List { new Message(new byte[10]) }); + + // format = len(request) + requesttype + len(topic) + topic + partition + len(messagepack) + message + // total byte count = 4 + (2 + 2 + 5 + 4 + 4 + 19) + byte[] bytes = request.GetBytes(); + Assert.IsNotNull(bytes); + Assert.AreEqual(40, bytes.Length); + + // first 4 bytes = the length of the request + Assert.AreEqual(36, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Take(4).ToArray()), 0)); + + // next 2 bytes = the RequestType which in this case should be Produce + Assert.AreEqual((short)RequestType.Produce, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(4).Take(2).ToArray()), 0)); + + // next 2 bytes = the length of the topic + Assert.AreEqual((short)5, BitConverter.ToInt16(BitWorks.ReverseBytes(bytes.Skip(6).Take(2).ToArray()), 0)); + + // next 5 bytes = the topic + Assert.AreEqual(topicName, Encoding.ASCII.GetString(bytes.Skip(8).Take(5).ToArray())); + + // next 4 bytes = the partition + Assert.AreEqual(0, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(13).Take(4).ToArray()), 0)); + + // next 4 bytes = the length of the individual messages in the pack + Assert.AreEqual(19, BitConverter.ToInt32(BitWorks.ReverseBytes(bytes.Skip(17).Take(4).ToArray()), 0)); + + // fianl bytes = the individual messages in the pack + Assert.AreEqual(19, bytes.Skip(21).ToArray().Length); + } + } +} Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs (added) +++ incubator/kafka/trunk/clients/csharp/src/Kafka/Tests/Kafka.Client.Tests/Util/BitWorksTests.cs Mon Aug 1 23:41:24 2011 @@ -0,0 +1,104 @@ +using System; +using Kafka.Client.Util; +using NUnit.Framework; + +namespace Kafka.Client.Tests.Util +{ + /// + /// Tests for utility class. + /// + [TestFixture] + public class BitWorksTests + { + /// + /// Ensures bytes are returned reversed. + /// + [Test] + public void GetBytesReversedShortValid() + { + short val = (short)100; + byte[] normal = BitConverter.GetBytes(val); + byte[] reversed = BitWorks.GetBytesReversed(val); + + TestReversedArray(normal, reversed); + } + + /// + /// Ensures bytes are returned reversed. + /// + [Test] + public void GetBytesReversedIntValid() + { + int val = 100; + byte[] normal = BitConverter.GetBytes(val); + byte[] reversed = BitWorks.GetBytesReversed(val); + + TestReversedArray(normal, reversed); + } + + /// + /// Ensures bytes are returned reversed. + /// + [Test] + public void GetBytesReversedLongValid() + { + long val = 100L; + byte[] normal = BitConverter.GetBytes(val); + byte[] reversed = BitWorks.GetBytesReversed(val); + + TestReversedArray(normal, reversed); + } + + /// + /// Null array will reverse to a null. + /// + [Test] + public void ReverseBytesNullArray() + { + byte[] arr = null; + Assert.IsNull(BitWorks.ReverseBytes(arr)); + } + + /// + /// Zero length array will reverse to a zero length array. + /// + [Test] + public void ReverseBytesZeroLengthArray() + { + byte[] arr = new byte[0]; + byte[] reversedArr = BitWorks.ReverseBytes(arr); + Assert.IsNotNull(reversedArr); + Assert.AreEqual(0, reversedArr.Length); + } + + /// + /// Array is reversed. + /// + [Test] + public void ReverseBytesValid() + { + byte[] arr = BitConverter.GetBytes((short)1); + byte[] original = new byte[2]; + arr.CopyTo(original, 0); + byte[] reversedArr = BitWorks.ReverseBytes(arr); + + TestReversedArray(original, reversedArr); + } + + /// + /// Performs asserts for two arrays that should be exactly the same, but values + /// in one are in reverse order of the other. + /// + /// The "normal" array. + /// The array that is in reverse order to the "normal" one. + private static void TestReversedArray(byte[] normal, byte[] reversed) + { + Assert.IsNotNull(reversed); + Assert.AreEqual(normal.Length, reversed.Length); + for (int ix = 0; ix < normal.Length; ix++) + { + Assert.AreEqual(normal[ix], reversed[reversed.Length - 1 - ix]); + } + } + } +} Added: incubator/kafka/trunk/clients/go/.gitignore URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/.gitignore?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/go/.gitignore (added) +++ incubator/kafka/trunk/clients/go/.gitignore Mon Aug 1 23:41:24 2011 @@ -0,0 +1,13 @@ +_go_.6 +_obj +6.out +_gotest_.6 +_test +_testmain.go +_testmain.6 +tools/*/_obj +tools/*/_go_.6 +tools/consumer/consumer +tools/publisher/publisher +tools/consumer/test.txt +tools/offsets/offsets Added: incubator/kafka/trunk/clients/go/LICENSE URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/LICENSE?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/go/LICENSE (added) +++ incubator/kafka/trunk/clients/go/LICENSE Mon Aug 1 23:41:24 2011 @@ -0,0 +1,208 @@ + + Apache License + Version 2.0, January 2004 + http://www.apache.org/licenses/ + + TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION + + 1. Definitions. + + "License" shall mean the terms and conditions for use, reproduction, + and distribution as defined by Sections 1 through 9 of this document. + + "Licensor" shall mean the copyright owner or entity authorized by + the copyright owner that is granting the License. + + "Legal Entity" shall mean the union of the acting entity and all + other entities that control, are controlled by, or are under common + control with that entity. For the purposes of this definition, + "control" means (i) the power, direct or indirect, to cause the + direction or management of such entity, whether by contract or + otherwise, or (ii) ownership of fifty percent (50%) or more of the + outstanding shares, or (iii) beneficial ownership of such entity. + + "You" (or "Your") shall mean an individual or Legal Entity + exercising permissions granted by this License. + + "Source" form shall mean the preferred form for making modifications, + including but not limited to software source code, documentation + source, and configuration files. + + "Object" form shall mean any form resulting from mechanical + transformation or translation of a Source form, including but + not limited to compiled object code, generated documentation, + and conversions to other media types. + + "Work" shall mean the work of authorship, whether in Source or + Object form, made available under the License, as indicated by a + copyright notice that is included in or attached to the work + (an example is provided in the Appendix below). + + "Derivative Works" shall mean any work, whether in Source or Object + form, that is based on (or derived from) the Work and for which the + editorial revisions, annotations, elaborations, or other modifications + represent, as a whole, an original work of authorship. For the purposes + of this License, Derivative Works shall not include works that remain + separable from, or merely link (or bind by name) to the interfaces of, + the Work and Derivative Works thereof. + + "Contribution" shall mean any work of authorship, including + the original version of the Work and any modifications or additions + to that Work or Derivative Works thereof, that is intentionally + submitted to Licensor for inclusion in the Work by the copyright owner + or by an individual or Legal Entity authorized to submit on behalf of + the copyright owner. For the purposes of this definition, "submitted" + means any form of electronic, verbal, or written communication sent + to the Licensor or its representatives, including but not limited to + communication on electronic mailing lists, source code control systems, + and issue tracking systems that are managed by, or on behalf of, the + Licensor for the purpose of discussing and improving the Work, but + excluding communication that is conspicuously marked or otherwise + designated in writing by the copyright owner as "Not a Contribution." + + "Contributor" shall mean Licensor and any individual or Legal Entity + on behalf of whom a Contribution has been received by Licensor and + subsequently incorporated within the Work. + + 2. Grant of Copyright License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + copyright license to reproduce, prepare Derivative Works of, + publicly display, publicly perform, sublicense, and distribute the + Work and such Derivative Works in Source or Object form. + + 3. Grant of Patent License. Subject to the terms and conditions of + this License, each Contributor hereby grants to You a perpetual, + worldwide, non-exclusive, no-charge, royalty-free, irrevocable + (except as stated in this section) patent license to make, have made, + use, offer to sell, sell, import, and otherwise transfer the Work, + where such license applies only to those patent claims licensable + by such Contributor that are necessarily infringed by their + Contribution(s) alone or by combination of their Contribution(s) + with the Work to which such Contribution(s) was submitted. If You + institute patent litigation against any entity (including a + cross-claim or counterclaim in a lawsuit) alleging that the Work + or a Contribution incorporated within the Work constitutes direct + or contributory patent infringement, then any patent licenses + granted to You under this License for that Work shall terminate + as of the date such litigation is filed. + + 4. Redistribution. You may reproduce and distribute copies of the + Work or Derivative Works thereof in any medium, with or without + modifications, and in Source or Object form, provided that You + meet the following conditions: + + (a) You must give any other recipients of the Work or + Derivative Works a copy of this License; and + + (b) You must cause any modified files to carry prominent notices + stating that You changed the files; and + + (c) You must retain, in the Source form of any Derivative Works + that You distribute, all copyright, patent, trademark, and + attribution notices from the Source form of the Work, + excluding those notices that do not pertain to any part of + the Derivative Works; and + + (d) If the Work includes a "NOTICE" text file as part of its + distribution, then any Derivative Works that You distribute must + include a readable copy of the attribution notices contained + within such NOTICE file, excluding those notices that do not + pertain to any part of the Derivative Works, in at least one + of the following places: within a NOTICE text file distributed + as part of the Derivative Works; within the Source form or + documentation, if provided along with the Derivative Works; or, + within a display generated by the Derivative Works, if and + wherever such third-party notices normally appear. The contents + of the NOTICE file are for informational purposes only and + do not modify the License. You may add Your own attribution + notices within Derivative Works that You distribute, alongside + or as an addendum to the NOTICE text from the Work, provided + that such additional attribution notices cannot be construed + as modifying the License. + + You may add Your own copyright statement to Your modifications and + may provide additional or different license terms and conditions + for use, reproduction, or distribution of Your modifications, or + for any such Derivative Works as a whole, provided Your use, + reproduction, and distribution of the Work otherwise complies with + the conditions stated in this License. + + 5. Submission of Contributions. Unless You explicitly state otherwise, + any Contribution intentionally submitted for inclusion in the Work + by You to the Licensor shall be under the terms and conditions of + this License, without any additional terms or conditions. + Notwithstanding the above, nothing herein shall supersede or modify + the terms of any separate license agreement you may have executed + with Licensor regarding such Contributions. + + 6. Trademarks. This License does not grant permission to use the trade + names, trademarks, service marks, or product names of the Licensor, + except as required for reasonable and customary use in describing the + origin of the Work and reproducing the content of the NOTICE file. + + 7. Disclaimer of Warranty. Unless required by applicable law or + agreed to in writing, Licensor provides the Work (and each + Contributor provides its Contributions) on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + implied, including, without limitation, any warranties or conditions + of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A + PARTICULAR PURPOSE. You are solely responsible for determining the + appropriateness of using or redistributing the Work and assume any + risks associated with Your exercise of permissions under this License. + + 8. Limitation of Liability. In no event and under no legal theory, + whether in tort (including negligence), contract, or otherwise, + unless required by applicable law (such as deliberate and grossly + negligent acts) or agreed to in writing, shall any Contributor be + liable to You for damages, including any direct, indirect, special, + incidental, or consequential damages of any character arising as a + result of this License or out of the use or inability to use the + Work (including but not limited to damages for loss of goodwill, + work stoppage, computer failure or malfunction, or any and all + other commercial damages or losses), even if such Contributor + has been advised of the possibility of such damages. + + 9. Accepting Warranty or Additional Liability. While redistributing + the Work or Derivative Works thereof, You may choose to offer, + and charge a fee for, acceptance of support, warranty, indemnity, + or other liability obligations and/or rights consistent with this + License. However, in accepting such obligations, You may act only + on Your own behalf and on Your sole responsibility, not on behalf + of any other Contributor, and only if You agree to indemnify, + defend, and hold each Contributor harmless for any liability + incurred by, or claims asserted against, such Contributor by reason + of your accepting any such warranty or additional liability. + + END OF TERMS AND CONDITIONS + + APPENDIX: How to apply the Apache License to your work. + + To apply the Apache License to your work, attach the following + boilerplate notice, with the fields enclosed by brackets "[]" + replaced with your own identifying information. (Don't include + the brackets!) The text should be enclosed in the appropriate + comment syntax for the file format. We also recommend that a + file or class name and description of purpose be included on the + same "printed page" as the copyright notice for easier + identification within third-party archives. + + Copyright (c) 2011 NeuStar, Inc. + All rights reserved. + + Licensed under the Apache License, Version 2.0 (the "License"); + you may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + NeuStar, the Neustar logo and related names and logos are registered + trademarks, service marks or tradenames of NeuStar, Inc. All other + product names, company names, marks, logos and symbols may be trademarks + of their respective owners. Added: incubator/kafka/trunk/clients/go/Makefile URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/Makefile?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/go/Makefile (added) +++ incubator/kafka/trunk/clients/go/Makefile Mon Aug 1 23:41:24 2011 @@ -0,0 +1,25 @@ +include $(GOROOT)/src/Make.inc + +TARG=kafka +GOFILES=\ + src/kafka.go\ + src/message.go\ + src/converts.go\ + src/consumer.go\ + src/publisher.go\ + src/timing.go\ + src/request.go\ + +include $(GOROOT)/src/Make.pkg + +tools: force + make -C tools/consumer clean all + make -C tools/publisher clean all + make -C tools/offsets clean all + +format: + gofmt -w -tabwidth=2 -tabindent=false src/*.go tools/consumer/*.go tools/publisher/*.go kafka_test.go + +full: format clean install tools + +.PHONY: force Added: incubator/kafka/trunk/clients/go/README.md URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/README.md?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/go/README.md (added) +++ incubator/kafka/trunk/clients/go/README.md Mon Aug 1 23:41:24 2011 @@ -0,0 +1,83 @@ +# Kafka.go - Publisher & Consumer for Kafka in Go # + +Kafka is a distributed publish-subscribe messaging system: (http://sna-projects.com/kafka/) + +Go language: (http://golang.org/)
+ +## Get up and running ## + +Install kafka.go package:
+make install +
+Make the tools (publisher & consumer)
+make tools +
+Start zookeeper, Kafka server
+For more info on Kafka, see: http://sna-projects.com/kafka/quickstart.php + + + +## Tools ## + +Start a consumer: +

+   ./tools/consumer/consumer -topic test -consumeforever
+  Consuming Messages :
+  From: localhost:9092, topic: test, partition: 0
+   ---------------------- 
+
+ +Now the consumer will just poll until a message is received. + +Publish a message: +

+  ./tools/publisher/publisher -topic test -message "Hello World"
+
+ +The consumer should output message. + +## API Usage ## + +### Publishing ### + + +

+
+broker := kafka.NewBrokerPublisher("localhost:9092", "mytesttopic", 0)
+broker.Publish(kafka.NewMessage([]byte("tesing 1 2 3")))
+
+
+ +### Consumer ### + +

+broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
+broker.Consume(func(msg *kafka.Message) { msg.Print() })
+
+
+ +Or the consumer can use a channel based approach: + +

+broker := kafka.NewBrokerConsumer("localhost:9092", "mytesttopic", 0, 0, 1048576)
+go broker.ConsumeOnChannel(msgChan, 10, quitChan)
+
+
+ +### Consuming Offsets ### + +

+broker := kafka.NewBrokerOffsetConsumer("localhost:9092", "mytesttopic", 0)
+offsets, err := broker.GetOffsets(-1, 1)
+
+ + +### Contact ### + +jeffreydamick (at) gmail (dot) com + +http://twitter.com/jeffreydamick + +Big thank you to [NeuStar](http://neustar.biz) for sponsoring this work. + + Added: incubator/kafka/trunk/clients/go/kafka_test.go URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/go/kafka_test.go?rev=1152970&view=auto ============================================================================== --- incubator/kafka/trunk/clients/go/kafka_test.go (added) +++ incubator/kafka/trunk/clients/go/kafka_test.go Mon Aug 1 23:41:24 2011 @@ -0,0 +1,128 @@ +/* + * Copyright (c) 2011 NeuStar, Inc. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * NeuStar, the Neustar logo and related names and logos are registered + * trademarks, service marks or tradenames of NeuStar, Inc. All other + * product names, company names, marks, logos and symbols may be trademarks + * of their respective owners. + */ + + +package kafka + +import ( + "testing" + //"fmt" + "bytes" + "container/list" +) + +func TestMessageCreation(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + if msg.magic != 0 { + t.Errorf("magic incorrect") + t.Fail() + } + + // generated by kafka-rb: e8 f3 5a 06 + expected := []byte{0xe8, 0xf3, 0x5a, 0x06} + if !bytes.Equal(expected, msg.checksum[:]) { + t.Fail() + } +} + + +func TestMessageEncoding(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x0c, 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + if !bytes.Equal(expected, msg.Encode()) { + t.Fail() + } + + // verify round trip + msgDecoded := Decode(msg.Encode()) + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fail() + } + if !bytes.Equal(msgDecoded.payload, payload) { + t.Fail() + } + chksum := []byte{0xE8, 0xF3, 0x5A, 0x06} + if !bytes.Equal(msgDecoded.checksum[:], chksum) { + t.Fail() + } + if msgDecoded.magic != 0 { + t.Fail() + } +} + +func TestRequestHeaderEncoding(t *testing.T) { + broker := newBroker("localhost:9092", "test", 0) + request := broker.EncodeRequestHeader(REQUEST_PRODUCE) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00} + + if !bytes.Equal(expected, request.Bytes()) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request.Bytes())) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } +} + + +func TestPublishRequestEncoding(t *testing.T) { + payload := []byte("testing") + msg := NewMessage(payload) + + messages := list.New() + messages.PushBack(msg) + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodePublishRequest(messages) + + // generated by kafka-rb: + expected := []byte{0x00, 0x00, 0x00, 0x20, 0x00, 0x00, 0x00, 0x04, 0x74, 0x65, 0x73, 0x74, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00, 0x00, 0x0c, + 0x00, 0xe8, 0xf3, 0x5a, 0x06, 0x74, 0x65, 0x73, 0x74, 0x69, 0x6e, 0x67} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } +} + +func TestConsumeRequestEncoding(t *testing.T) { + + pubBroker := NewBrokerPublisher("localhost:9092", "test", 0) + request := pubBroker.broker.EncodeConsumeRequest(0, 1048576) + + // generated by kafka-rb, encode_request_size + encode_request + expected := []byte{0x00, 0x00, 0x00, 0x18, 0x00, 0x01, 0x00, 0x04, 0x74, + 0x65, 0x73, 0x74, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x10, 0x00, 0x00} + + if !bytes.Equal(expected, request) { + t.Errorf("expected length: %d but got: %d", len(expected), len(request)) + t.Errorf("expected: %X\n but got: %X", expected, request) + t.Fail() + } +}