kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [10/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Propchange: incubator/kafka/trunk/clients/cpp/configure
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/clients/cpp/configure.ac
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/configure.ac?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/configure.ac (added)
+++ incubator/kafka/trunk/clients/cpp/configure.ac Mon Aug  1 23:41:24 2011
@@ -0,0 +1,28 @@
+## LibKafkaConect
+## A C++ shared libray for connecting to Kafka
+
+#
+# Warning this is the first time I've made a configure.ac/Makefile.am thing
+# Please improve it as I have no idea what I am doing
+# @benjamg
+#
+
+AC_INIT([LibKafkaConnect], [0.1])
+AC_PREREQ([2.59])
+
+AC_CONFIG_AUX_DIR([build-aux])
+AM_INIT_AUTOMAKE([foreign -Wall])
+
+AC_PROG_LIBTOOL
+AC_PROG_CXX
+AC_PROG_CPP
+
+AC_CONFIG_MACRO_DIR([build-aux/m4])
+
+#
+# Version number
+#
+AC_SUBST([KAFKACONNECT_VERSION], [1:0:1])
+
+AC_CONFIG_FILES([Makefile])
+AC_OUTPUT
\ No newline at end of file

Added: incubator/kafka/trunk/clients/cpp/src/encoder.hpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/encoder.hpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/encoder.hpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/encoder.hpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,49 @@
+/*
+ * encoder.hpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#ifndef KAFKA_ENCODER_HPP_
+#define KAFKA_ENCODER_HPP_
+
+#include <boost/foreach.hpp>
+#include "encoder_helper.hpp"
+
+namespace kafkaconnect {
+
+template <typename List>
+void encode(std::ostream& stream, const std::string& topic, const uint32_t partition, const List& messages)
+{
+	// Pre-calculate size of message set
+	uint32_t messageset_size = 0;
+	BOOST_FOREACH(const std::string& message, messages)
+	{
+		messageset_size += message_format_header_size + message.length();
+	}
+
+	// Packet format is ... packet size (4 bytes)
+	encoder_helper::raw(stream, htonl(2 + 2 + topic.size() + 4 + 4 + messageset_size));
+
+	// ... magic number (2 bytes)
+	encoder_helper::raw(stream, htons(kafka_format_version));
+
+	// ... topic string size (2 bytes) & topic string
+	encoder_helper::raw(stream, htons(topic.size()));
+	stream << topic;
+
+	// ... partition (4 bytes)
+	encoder_helper::raw(stream, htonl(partition));
+
+	// ... message set size (4 bytes) and message set
+	encoder_helper::raw(stream, htonl(messageset_size));
+	BOOST_FOREACH(const std::string& message, messages)
+	{
+		encoder_helper::message(stream, message);
+	}
+}
+
+}
+
+#endif /* KAFKA_ENCODER_HPP_ */

Added: incubator/kafka/trunk/clients/cpp/src/encoder_helper.hpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/encoder_helper.hpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/encoder_helper.hpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/encoder_helper.hpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,63 @@
+/*
+ * encoder_helper.hpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#ifndef KAFKA_ENCODER_HELPER_HPP_
+#define KAFKA_ENCODER_HELPER_HPP_
+
+#include <ostream>
+#include <string>
+
+#include <arpa/inet.h>
+#include <boost/crc.hpp>
+
+#include <stdint.h>
+
+namespace kafkaconnect {
+namespace test { class encoder_helper; }
+
+const uint16_t kafka_format_version = 0;
+
+const uint8_t message_format_magic_number = 0;
+const uint8_t message_format_extra_data_size = 1 + 4;
+const uint8_t message_format_header_size = message_format_extra_data_size + 4;
+
+class encoder_helper
+{
+private:
+	friend class test::encoder_helper;
+	template <typename T> friend void encode(std::ostream&, const std::string&, const uint32_t, const T&);
+
+	static std::ostream& message(std::ostream& stream, const std::string message)
+	{
+		// Message format is ... message & data size (4 bytes)
+		raw(stream, htonl(message_format_extra_data_size + message.length()));
+
+		// ... magic number (1 byte)
+		stream << message_format_magic_number;
+
+		// ... string crc32 (4 bytes)
+		boost::crc_32_type result;
+		result.process_bytes(message.c_str(), message.length());
+		raw(stream, htonl(result.checksum()));
+
+		// ... message string bytes
+		stream << message;
+
+		return stream;
+	}
+
+	template <typename Data>
+	static std::ostream& raw(std::ostream& stream, const Data& data)
+	{
+		stream.write(reinterpret_cast<const char*>(&data), sizeof(Data));
+		return stream;
+	}
+};
+
+}
+
+#endif /* KAFKA_ENCODER_HELPER_HPP_ */

Added: incubator/kafka/trunk/clients/cpp/src/example.cpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/example.cpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/example.cpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/example.cpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,38 @@
+
+#include <exception>
+#include <cstdlib>
+#include <iostream>
+#include <string>
+
+#include <boost/thread.hpp>
+
+#include "producer.hpp"
+
+int main(int argc, char* argv[])
+{
+	std::string hostname = (argc >= 2) ? argv[1] : "localhost";
+	std::string port = (argc >= 3) ? argv[2] : "9092";
+
+	boost::asio::io_service io_service;
+	std::auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
+	boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service));
+
+	kafkaconnect::producer producer(io_service);
+	producer.connect(hostname, port);
+
+	while(!producer.is_connected())
+	{
+		boost::this_thread::sleep(boost::posix_time::seconds(1));
+	}
+
+	std::vector<std::string> messages;
+	messages.push_back("So long and thanks for all the fish");
+	messages.push_back("Time is an illusion. Lunchtime doubly so.");
+	producer.send(messages, "test");
+
+	work.reset();
+	io_service.stop();
+
+	return EXIT_SUCCESS;
+}
+

Added: incubator/kafka/trunk/clients/cpp/src/producer.cpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/producer.cpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/producer.cpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/producer.cpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,100 @@
+/*
+ * producer.cpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#include <boost/lexical_cast.hpp>
+
+#include "producer.hpp"
+
+namespace kafkaconnect {
+
+producer::producer(boost::asio::io_service& io_service, const error_handler_function& error_handler)
+	: _connected(false)
+	, _resolver(io_service)
+	, _socket(io_service)
+	, _error_handler(error_handler)
+{
+}
+
+producer::~producer()
+{
+	close();
+}
+
+void producer::connect(const std::string& hostname, const uint16_t port)
+{
+	connect(hostname, boost::lexical_cast<std::string>(port));
+}
+
+void producer::connect(const std::string& hostname, const std::string& servicename)
+{
+	boost::asio::ip::tcp::resolver::query query(hostname, servicename);
+	_resolver.async_resolve(
+		query,
+		boost::bind(
+			&producer::handle_resolve, this,
+			boost::asio::placeholders::error, boost::asio::placeholders::iterator
+		)
+	);
+}
+
+void producer::close()
+{
+	_connected = false;
+	_socket.close();
+}
+
+bool producer::is_connected() const
+{
+	return _connected;
+}
+
+
+void producer::handle_resolve(const boost::system::error_code& error_code, boost::asio::ip::tcp::resolver::iterator endpoints)
+{
+	if (!error_code)
+	{
+		boost::asio::ip::tcp::endpoint endpoint = *endpoints;
+		_socket.async_connect(
+			endpoint,
+			boost::bind(
+				&producer::handle_connect, this,
+				boost::asio::placeholders::error, ++endpoints
+			)
+		);
+	}
+	else { fail_fast_error_handler(error_code); }
+}
+
+void producer::handle_connect(const boost::system::error_code& error_code, boost::asio::ip::tcp::resolver::iterator endpoints)
+{
+	if (!error_code)
+	{
+		// The connection was successful. Send the request.
+		_connected = true;
+	}
+	else if (endpoints != boost::asio::ip::tcp::resolver::iterator())
+	{
+		// TODO: handle connection error (we might not need this as we have others though?)
+
+		// The connection failed, but we have more potential endpoints so throw it back to handle resolve
+		_socket.close();
+		handle_resolve(boost::system::error_code(), endpoints);
+	}
+	else { fail_fast_error_handler(error_code); }
+}
+
+void producer::handle_write_request(const boost::system::error_code& error_code, boost::asio::streambuf* buffer)
+{
+	if (error_code)
+	{
+		fail_fast_error_handler(error_code);
+	}
+
+	delete buffer;
+}
+
+}

Added: incubator/kafka/trunk/clients/cpp/src/producer.hpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/producer.hpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/producer.hpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/producer.hpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,99 @@
+/*
+ * producer.hpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#ifndef KAFKA_PRODUCER_HPP_
+#define KAFKA_PRODUCER_HPP_
+
+#include <string>
+#include <vector>
+
+#include <boost/array.hpp>
+#include <boost/asio.hpp>
+#include <boost/bind.hpp>
+#include <boost/function.hpp>
+#include <stdint.h>
+
+#include "encoder.hpp"
+
+namespace kafkaconnect {
+
+const uint32_t use_random_partition = 0xFFFFFFFF;
+
+class producer
+{
+public:
+	typedef boost::function<void(const boost::system::error_code&)> error_handler_function;
+
+	producer(boost::asio::io_service& io_service, const error_handler_function& error_handler = error_handler_function());
+	~producer();
+
+	void connect(const std::string& hostname, const uint16_t port);
+	void connect(const std::string& hostname, const std::string& servicename);
+
+	void close();
+	bool is_connected() const;
+
+	bool send(const std::string& message, const std::string& topic, const uint32_t partition = kafkaconnect::use_random_partition)
+	{
+		boost::array<std::string, 1> messages = { { message } };
+		return send(messages, topic, partition);
+	}
+
+	// TODO: replace this with a sending of the buffered data so encode is called prior to send this will allow for decoupling from the encoder
+	template <typename List>
+	bool send(const List& messages, const std::string& topic, const uint32_t partition = kafkaconnect::use_random_partition)
+	{
+		if (!is_connected())
+		{
+			return false;
+		}
+
+		// TODO: make this more efficient with memory allocations.
+		boost::asio::streambuf* buffer = new boost::asio::streambuf();
+		std::ostream stream(buffer);
+
+		kafkaconnect::encode(stream, topic, partition, messages);
+
+		boost::asio::async_write(
+			_socket, *buffer,
+			boost::bind(&producer::handle_write_request, this, boost::asio::placeholders::error, buffer)
+		);
+
+		return true;
+	}
+
+
+private:
+	bool _connected;
+	boost::asio::ip::tcp::resolver _resolver;
+	boost::asio::ip::tcp::socket _socket;
+	error_handler_function _error_handler;
+
+	void handle_resolve(const boost::system::error_code& error_code, boost::asio::ip::tcp::resolver::iterator endpoints);
+	void handle_connect(const boost::system::error_code& error_code, boost::asio::ip::tcp::resolver::iterator endpoints);
+	void handle_write_request(const boost::system::error_code& error_code, boost::asio::streambuf* buffer);
+
+	/* Fail Fast Error Handler Braindump
+	 *
+	 * If an error handler is not provided in the constructor then the default response is to throw
+	 * back the boost error_code from asio as a boost system_error exception.
+	 *
+	 * Most likely this will cause whatever thread you have processing boost io to terminate unless caught.
+	 * This is great on debug systems or anything where you use io polling to process any outstanding io,
+	 * however if your io thread is seperate and not monitored it is recommended to pass a handler to
+	 * the constructor.
+	 */
+	inline void fail_fast_error_handler(const boost::system::error_code& error_code)
+	{
+		if(_error_handler.empty()) { throw boost::system::system_error(error_code); }
+		else { _error_handler(error_code); }
+	}
+};
+
+}
+
+#endif /* KAFKA_PRODUCER_HPP_ */

Added: incubator/kafka/trunk/clients/cpp/src/tests/encoder_helper_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/tests/encoder_helper_tests.cpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/tests/encoder_helper_tests.cpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/tests/encoder_helper_tests.cpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,71 @@
+/*
+ * encoder_helper_tests.cpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MODULE kafkaconnect
+#include <boost/test/unit_test.hpp>
+
+#include <arpa/inet.h>
+
+#include "../encoder_helper.hpp"
+
+// test wrapper
+namespace kafkaconnect { namespace test {
+class encoder_helper {
+public:
+	static std::ostream& message(std::ostream& stream, const std::string message) { return kafkaconnect::encoder_helper::message(stream, message); }
+	template <typename T> static std::ostream& raw(std::ostream& stream, const T& t) { return kafkaconnect::encoder_helper::raw(stream, t); }
+};
+} }
+
+using namespace kafkaconnect::test;
+
+BOOST_AUTO_TEST_SUITE(kafka_encoder_helper)
+
+BOOST_AUTO_TEST_CASE(encode_raw_char)
+{
+	std::ostringstream stream;
+	char value = 0x1;
+
+	encoder_helper::raw(stream, value);
+
+	BOOST_CHECK_EQUAL(stream.str().length(), 1);
+	BOOST_CHECK_EQUAL(stream.str().at(0), value);
+}
+
+BOOST_AUTO_TEST_CASE(encode_raw_integer)
+{
+	std::ostringstream stream;
+	int value = 0x10203;
+
+	encoder_helper::raw(stream, htonl(value));
+
+	BOOST_CHECK_EQUAL(stream.str().length(), 4);
+	BOOST_CHECK_EQUAL(stream.str().at(0), 0);
+	BOOST_CHECK_EQUAL(stream.str().at(1), 0x1);
+	BOOST_CHECK_EQUAL(stream.str().at(2), 0x2);
+	BOOST_CHECK_EQUAL(stream.str().at(3), 0x3);
+}
+
+BOOST_AUTO_TEST_CASE(encode_message)
+{
+	std::string message = "a simple test";
+	std::ostringstream stream;
+
+	encoder_helper::message(stream, message);
+
+	BOOST_CHECK_EQUAL(stream.str().length(), kafkaconnect::message_format_header_size + message.length());
+	BOOST_CHECK_EQUAL(stream.str().at(3), 5 + message.length());
+	BOOST_CHECK_EQUAL(stream.str().at(4), kafkaconnect::message_format_magic_number);
+
+	for(size_t i = 0; i < message.length(); ++i)
+	{
+		BOOST_CHECK_EQUAL(stream.str().at(9 + i), message.at(i));
+	}
+}
+
+BOOST_AUTO_TEST_SUITE_END()

Added: incubator/kafka/trunk/clients/cpp/src/tests/encoder_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/tests/encoder_tests.cpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/tests/encoder_tests.cpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/tests/encoder_tests.cpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,52 @@
+/*
+ * encoder_tests.cpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MODULE kafkaconnect
+#include <boost/test/unit_test.hpp>
+
+#include <string>
+#include <vector>
+
+#include "../encoder.hpp"
+
+BOOST_AUTO_TEST_CASE(single_message_test)
+{
+	std::ostringstream stream;
+
+	std::vector<std::string> messages;
+	messages.push_back("test message");
+
+	kafkaconnect::encode(stream, "topic", 1, messages);
+
+	BOOST_CHECK_EQUAL(stream.str().length(), 4 + 2 + 2 + strlen("topic") + 4 + 4 + 9 + strlen("test message"));
+	BOOST_CHECK_EQUAL(stream.str().at(3), 2 + 2 + strlen("topic") + 4 + 4 + 9 + strlen("test message"));
+	BOOST_CHECK_EQUAL(stream.str().at(5), 0);
+	BOOST_CHECK_EQUAL(stream.str().at(7), strlen("topic"));
+	BOOST_CHECK_EQUAL(stream.str().at(8), 't');
+	BOOST_CHECK_EQUAL(stream.str().at(8 + strlen("topic") - 1), 'c');
+	BOOST_CHECK_EQUAL(stream.str().at(11 + strlen("topic")), 1);
+	BOOST_CHECK_EQUAL(stream.str().at(15 + strlen("topic")), 9 + strlen("test message"));
+	BOOST_CHECK_EQUAL(stream.str().at(16 + strlen("topic")), 0);
+	BOOST_CHECK_EQUAL(stream.str().at(25 + strlen("topic")), 't');
+}
+
+BOOST_AUTO_TEST_CASE(multiple_message_test)
+{
+	std::ostringstream stream;
+
+	std::vector<std::string> messages;
+	messages.push_back("test message");
+	messages.push_back("another message to check");
+
+	kafkaconnect::encode(stream, "topic", 1, messages);
+
+	BOOST_CHECK_EQUAL(stream.str().length(), 4 + 2 + 2 + strlen("topic") + 4 + 4 + 9 + strlen("test message") + 9 + strlen("another message to check"));
+	BOOST_CHECK_EQUAL(stream.str().at(3), 2 + 2 + strlen("topic") + 4 + 4 + 9 + strlen("test message") + 9 + strlen("another message to check"));
+	BOOST_CHECK_EQUAL(stream.str().at(15 + strlen("topic")), 9 + strlen("test message") + 9 + strlen("another message to check"));
+}
+

Added: incubator/kafka/trunk/clients/cpp/src/tests/producer_tests.cpp
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/cpp/src/tests/producer_tests.cpp?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/cpp/src/tests/producer_tests.cpp (added)
+++ incubator/kafka/trunk/clients/cpp/src/tests/producer_tests.cpp Mon Aug  1 23:41:24 2011
@@ -0,0 +1,59 @@
+/*
+ * producer_tests.cpp
+ *
+ *  Created on: 21 Jun 2011
+ *      Author: Ben Gray (@benjamg)
+ */
+
+#define BOOST_TEST_DYN_LINK
+#define BOOST_TEST_MODULE kafkaconnect
+#include <boost/test/unit_test.hpp>
+
+#include <memory>
+
+#include <boost/thread.hpp>
+
+#include "../producer.hpp"
+
+BOOST_AUTO_TEST_CASE(basic_message_test)
+{
+	boost::asio::io_service io_service;
+	std::auto_ptr<boost::asio::io_service::work> work(new boost::asio::io_service::work(io_service));
+	boost::asio::ip::tcp::acceptor acceptor(io_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), 12345));
+	boost::thread bt(boost::bind(&boost::asio::io_service::run, &io_service));
+
+	kafkaconnect::producer producer(io_service);
+	BOOST_CHECK_EQUAL(producer.is_connected(), false);
+	producer.connect("localhost", 12345);
+
+	boost::asio::ip::tcp::socket socket(io_service);
+	acceptor.accept(socket);
+
+	while(!producer.is_connected())
+	{
+		boost::this_thread::sleep(boost::posix_time::seconds(1));
+	}
+
+	std::vector<std::string> messages;
+	messages.push_back("so long and thanks for all the fish");
+	producer.send(messages, "mice", 42);
+
+	boost::array<char, 1024> buffer;
+	boost::system::error_code error;
+	size_t len = socket.read_some(boost::asio::buffer(buffer), error);
+
+	BOOST_CHECK_EQUAL(len, 4 + 2 + 2 + strlen("mice") + 4 + 4 + 9 + strlen("so long and thanks for all the fish"));
+	BOOST_CHECK_EQUAL(buffer[3], 2 + 2 + strlen("mice") + 4 + 4 + 9 + strlen("so long and thanks for all the fish"));
+	BOOST_CHECK_EQUAL(buffer[5], 0);
+	BOOST_CHECK_EQUAL(buffer[7], strlen("mice"));
+	BOOST_CHECK_EQUAL(buffer[8], 'm');
+	BOOST_CHECK_EQUAL(buffer[8 + strlen("mice") - 1], 'e');
+	BOOST_CHECK_EQUAL(buffer[11 + strlen("mice")], 42);
+	BOOST_CHECK_EQUAL(buffer[15 + strlen("mice")], 9 + strlen("so long and thanks for all the fish"));
+	BOOST_CHECK_EQUAL(buffer[16 + strlen("mice")], 0);
+	BOOST_CHECK_EQUAL(buffer[25 + strlen("mice")], 's');
+
+	work.reset();
+	io_service.stop();
+}
+

Added: incubator/kafka/trunk/clients/csharp/.gitignore
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/.gitignore?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/.gitignore (added)
+++ incubator/kafka/trunk/clients/csharp/.gitignore Mon Aug  1 23:41:24 2011
@@ -0,0 +1,5 @@
+StyleCop.Cache
+bin
+obj
+*.suo
+*.csproj.user
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/LICENSE?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/LICENSE (added)
+++ incubator/kafka/trunk/clients/csharp/LICENSE Mon Aug  1 23:41:24 2011
@@ -0,0 +1,202 @@
+
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright 2011 LinkedIn
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/README.md (added)
+++ incubator/kafka/trunk/clients/csharp/README.md Mon Aug  1 23:41:24 2011
@@ -0,0 +1,66 @@
+# .NET Kafka Client
+
+This is a .NET implementation of a client for Kafka using C#.  It provides for a basic implementation that covers most basic functionalities to include a simple Producer and Consumer.
+
+The .NET client will wrap Kafka server error codes to the `KafkaException` class.  Exceptions are not trapped within the library and basically bubble up directly from the TcpClient and it's underlying Socket connection.  Clients using this library should look to do their own exception handling regarding these kinds of errors.
+
+## Producer
+
+The Producer can send one or more messages to Kafka in both a synchronous and asynchronous fashion.
+
+### Producer Usage
+
+    string payload1 = "kafka 1.";
+    byte[] payloadData1 = Encoding.UTF8.GetBytes(payload1);
+    Message msg1 = new Message(payloadData1);
+
+    string payload2 = "kafka 2.";
+    byte[] payloadData2 = Encoding.UTF8.GetBytes(payload2);
+    Message msg2 = new Message(payloadData2);
+
+    Producer producer = new Producer("localhost", 9092);
+    producer.Send("test", 0, new List<Message> { msg1, msg2 });
+
+### Asynchronous Producer Usage
+
+    List<Message> messages = GetBunchOfMessages();
+
+    Producer producer = new Producer("localhost", 9092);
+    producer.SendAsync("test", 0, messages, (requestContext) => { // doing work });
+
+### Multi-Producer Usage
+
+    List<ProducerRequest> requests = new List<ProducerRequest>
+    { 
+        new ProducerRequest("test a", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("1: " + DateTime.UtcNow)) }),
+        new ProducerRequest("test b", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("2: " + DateTime.UtcNow)) }),
+        new ProducerRequest("test c", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("3: " + DateTime.UtcNow)) }),
+        new ProducerRequest("test d", 0, new List<Message> { new Message(Encoding.UTF8.GetBytes("4: " + DateTime.UtcNow)) })
+    };
+
+    MultiProducerRequest request = new MultiProducerRequest(requests);
+    Producer producer = new Producer("localhost", 9092);
+    producer.Send(request);
+
+## Consumer
+
+The consumer has several functions of interest: `GetOffsetsBefore` and `Consume`.  `GetOffsetsBefore` will retrieve a list of offsets before a given time and `Consume` will attempt to get a list of messages from Kafka given a topic, partition and offset.  `Consume` allows for both a single and batched request function using the `MultiFetchRequest`.
+
+### Consumer Usage
+
+    Consumer consumer = new Consumer("localhost", 9092);
+    int max = 10;
+    long[] offsets = consumer.GetOffsetsBefore("test", 0, OffsetRequest.LatestTime, max);
+    List<Message> messages = consumer.Consume("test", 0, offsets[0]);
+
+### Consumer Multi-fetch
+
+    Consumer consumer = new Consumer("localhost", 9092);
+    MultiFetchRequest request = new MultiFetchRequest(new List<FetchRequest>
+    {
+        new FetchRequest("testa", 0, 0),
+        new FetchRequest("testb", 0, 0),
+        new FetchRequest("testc", 0, 0)
+    });
+
+    List<List<Message>> messages = consumer.Consume(request);
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/Settings.StyleCop
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/Settings.StyleCop?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/Settings.StyleCop (added)
+++ incubator/kafka/trunk/clients/csharp/Settings.StyleCop Mon Aug  1 23:41:24 2011
@@ -0,0 +1,76 @@
+<StyleCopSettings Version="4.3">
+  <Parsers>
+    <Parser ParserId="Microsoft.StyleCop.CSharp.CsParser">
+      <ParserSettings>
+        <BooleanProperty Name="AnalyzeDesignerFiles">False</BooleanProperty>
+      </ParserSettings>
+    </Parser>
+  </Parsers>
+  <Analyzers>
+    <Analyzer AnalyzerId="Microsoft.StyleCop.CSharp.NamingRules">
+      <Rules>
+        <Rule Name="FieldNamesMustNotBeginWithUnderscore">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+      </Rules>
+      <AnalyzerSettings />
+    </Analyzer>
+    <Analyzer AnalyzerId="Microsoft.StyleCop.CSharp.DocumentationRules">
+      <Rules>
+        <Rule Name="FileMustHaveHeader">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+        <Rule Name="FileHeaderMustShowCopyright">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+        <Rule Name="FileHeaderMustHaveCopyrightText">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+        <Rule Name="FileHeaderMustContainFileName">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+        <Rule Name="FileHeaderFileNameDocumentationMustMatchFileName">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+        <Rule Name="FileHeaderMustHaveValidCompanyText">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+      </Rules>
+      <AnalyzerSettings />
+    </Analyzer>
+    <Analyzer AnalyzerId="Microsoft.StyleCop.CSharp.OrderingRules">
+      <Rules>
+        <Rule Name="UsingDirectivesMustBePlacedWithinNamespace">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+      </Rules>
+      <AnalyzerSettings />
+    </Analyzer>
+    <Analyzer AnalyzerId="Microsoft.StyleCop.CSharp.ReadabilityRules">
+      <Rules>
+        <Rule Name="PrefixLocalCallsWithThis">
+          <RuleSettings>
+            <BooleanProperty Name="Enabled">False</BooleanProperty>
+          </RuleSettings>
+        </Rule>
+      </Rules>
+      <AnalyzerSettings />
+    </Analyzer>
+  </Analyzers>
+</StyleCopSettings>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/lib/nunit/2.5.9/nunit.framework.dll
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/lib/nunit/2.5.9/nunit.framework.dll?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/clients/csharp/lib/nunit/2.5.9/nunit.framework.dll
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/AbstractRequest.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,35 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// Base request to make to Kafka.
+    /// </summary>
+    public abstract class AbstractRequest
+    {
+        /// <summary>
+        /// Gets or sets the topic to publish to.
+        /// </summary>
+        public string Topic { get; set; }
+
+        /// <summary>
+        /// Gets or sets the partition to publish to.
+        /// </summary>
+        public int Partition { get; set; }
+
+        /// <summary>
+        /// Converts the request to an array of bytes that is expected by Kafka.
+        /// </summary>
+        /// <returns>An array of bytes that represents the request.</returns>
+        public abstract byte[] GetBytes();
+
+        /// <summary>
+        /// Determines if the request has valid settings.
+        /// </summary>
+        /// <returns>True if valid and false otherwise.</returns>
+        public abstract bool IsValid();
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumer.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Consumer.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,232 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Request;
+using Kafka.Client.Util;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// Consumes messages from Kafka.
+    /// </summary>
+    public class Consumer
+    {
+        /// <summary>
+        /// Maximum size.
+        /// </summary>
+        private static readonly int MaxSize = 1048576;
+
+        /// <summary>
+        /// Initializes a new instance of the Consumer class.
+        /// </summary>
+        /// <param name="server">The server to connect to.</param>
+        /// <param name="port">The port to connect to.</param>
+        public Consumer(string server, int port)
+        {
+            Server = server;
+            Port = port;
+        }
+
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        public string Server { get; private set; }
+
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        public int Port { get; private set; }
+
+        /// <summary>
+        /// Consumes messages from Kafka.
+        /// </summary>
+        /// <param name="topic">The topic to consume from.</param>
+        /// <param name="partition">The partition to consume from.</param>
+        /// <param name="offset">The offset to start at.</param>
+        /// <returns>A list of messages from Kafka.</returns>
+        public List<Message> Consume(string topic, int partition, long offset)
+        {
+            return Consume(topic, partition, offset, MaxSize);
+        }
+
+        /// <summary>
+        /// Consumes messages from Kafka.
+        /// </summary>
+        /// <param name="topic">The topic to consume from.</param>
+        /// <param name="partition">The partition to consume from.</param>
+        /// <param name="offset">The offset to start at.</param>
+        /// <param name="maxSize">The maximum size.</param>
+        /// <returns>A list of messages from Kafka.</returns>
+        public List<Message> Consume(string topic, int partition, long offset, int maxSize)
+        {
+            return Consume(new FetchRequest(topic, partition, offset, maxSize));
+        }
+
+        /// <summary>
+        /// Consumes messages from Kafka.
+        /// </summary>
+        /// <param name="request">The request to send to Kafka.</param>
+        /// <returns>A list of messages from Kafka.</returns>
+        public List<Message> Consume(FetchRequest request)
+        {
+            List<Message> messages = new List<Message>();
+            using (KafkaConnection connection = new KafkaConnection(Server, Port))
+            {
+                connection.Write(request.GetBytes());
+                int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0);
+
+                if (dataLength > 0) 
+                {
+                    byte[] data = connection.Read(dataLength);
+
+                    int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+                    if (errorCode != KafkaException.NoError)
+                    {
+                        throw new KafkaException(errorCode);
+                    }
+
+                    // skip the error code and process the rest
+                    byte[] unbufferedData = data.Skip(2).ToArray();
+
+                    int processed = 0;
+                    int length = unbufferedData.Length - 4;
+                    int messageSize = 0;
+                    while (processed <= length) 
+                    {
+                        messageSize = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Skip(processed).Take(4).ToArray<byte>()), 0);
+                        messages.Add(Message.ParseFrom(unbufferedData.Skip(processed).Take(messageSize + 4).ToArray<byte>()));
+                        processed += 4 + messageSize;
+                    }
+                }
+            }
+
+            return messages;
+        }
+
+        /// <summary>
+        /// Executes a multi-fetch operation.
+        /// </summary>
+        /// <param name="request">The request to push to Kafka.</param>
+        /// <returns>
+        /// A list containing sets of messages. The message sets should match the request order.
+        /// </returns>
+        public List<List<Message>> Consume(MultiFetchRequest request)
+        {
+            int fetchRequests = request.ConsumerRequests.Count;
+
+            List<List<Message>> messages = new List<List<Message>>();
+            using (KafkaConnection connection = new KafkaConnection(Server, Port))
+            {
+                connection.Write(request.GetBytes());
+                int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0);
+
+                if (dataLength > 0)
+                {
+                    byte[] data = connection.Read(dataLength);
+
+                    int position = 0;
+
+                    int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+                    if (errorCode != KafkaException.NoError)
+                    {
+                        throw new KafkaException(errorCode);
+                    }
+
+                    // skip the error code and process the rest
+                    position = position + 2;
+
+                    for (int ix = 0; ix < fetchRequests; ix++)
+                    {
+                        messages.Add(new List<Message>()); 
+
+                        int messageSetSize = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Skip(position).Take(4).ToArray<byte>()), 0);
+                        position = position + 4;
+
+                        errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Skip(position).Take(2).ToArray<byte>()), 0);
+                        if (errorCode != KafkaException.NoError)
+                        {
+                            throw new KafkaException(errorCode);
+                        }
+
+                        // skip the error code and process the rest
+                        position = position + 2;
+
+                        byte[] messageSetBytes = data.Skip(position).ToArray<byte>().Take(messageSetSize).ToArray<byte>();
+
+                        int processed = 0;
+                        int messageSize = 0;
+
+                        // dropped 2 bytes at the end...padding???
+                        while (processed < messageSetBytes.Length - 2)
+                        {
+                            messageSize = BitConverter.ToInt32(BitWorks.ReverseBytes(messageSetBytes.Skip(processed).Take(4).ToArray<byte>()), 0);
+                            messages[ix].Add(Message.ParseFrom(messageSetBytes.Skip(processed).Take(messageSize + 4).ToArray<byte>()));
+                            processed += 4 + messageSize;
+                        }
+
+                        position = position + processed;
+                    }
+                }
+            }
+
+            return messages;
+        }
+
+        /// <summary>
+        /// Get a list of valid offsets (up to maxSize) before the given time.
+        /// </summary>
+        /// <param name="topic">The topic to check.</param>
+        /// <param name="partition">The partition on the topic.</param>
+        /// <param name="time">time in millisecs (if -1, just get from the latest available)</param>
+        /// <param name="maxNumOffsets">That maximum number of offsets to return.</param>
+        /// <returns>List of offsets, in descending order.</returns>
+        public IList<long> GetOffsetsBefore(string topic, int partition, long time, int maxNumOffsets)
+        {
+            return GetOffsetsBefore(new OffsetRequest(topic, partition, time, maxNumOffsets));
+        }
+
+        /// <summary>
+        /// Get a list of valid offsets (up to maxSize) before the given time.
+        /// </summary>
+        /// <param name="request">The offset request.</param>
+        /// <returns>List of offsets, in descending order.</returns>
+        public IList<long> GetOffsetsBefore(OffsetRequest request)
+        {
+            List<long> offsets = new List<long>();
+
+            using (KafkaConnection connection = new KafkaConnection(Server, Port))
+            {
+                connection.Write(request.GetBytes());
+
+                int dataLength = BitConverter.ToInt32(BitWorks.ReverseBytes(connection.Read(4)), 0);
+                
+                if (dataLength > 0)
+                {
+                    byte[] data = connection.Read(dataLength);
+
+                    int errorCode = BitConverter.ToInt16(BitWorks.ReverseBytes(data.Take(2).ToArray<byte>()), 0);
+                    if (errorCode != KafkaException.NoError)
+                    {
+                        throw new KafkaException(errorCode);
+                    }
+
+                    // skip the error code and process the rest
+                    byte[] unbufferedData = data.Skip(2).ToArray();
+
+                    // first four bytes are the number of offsets
+                    int numOfOffsets = BitConverter.ToInt32(BitWorks.ReverseBytes(unbufferedData.Take(4).ToArray<byte>()), 0);
+
+                    int position = 0;
+                    for (int ix = 0; ix < numOfOffsets; ix++)
+                    {
+                        position = (ix * 8) + 4;
+                        offsets.Add(BitConverter.ToInt64(BitWorks.ReverseBytes(unbufferedData.Skip(position).Take(8).ToArray<byte>()), 0));
+                    }
+                }
+            }
+
+            return offsets;
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Kafka.Client.csproj Mon Aug  1 23:41:24 2011
@@ -0,0 +1,69 @@
+<?xml version="1.0" encoding="utf-8"?>
+<Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
+  <PropertyGroup>
+    <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
+    <Platform Condition=" '$(Platform)' == '' ">AnyCPU</Platform>
+    <ProductVersion>8.0.30703</ProductVersion>
+    <SchemaVersion>2.0</SchemaVersion>
+    <ProjectGuid>{A92DD03B-EE4F-4A78-9FB2-279B6348C7D2}</ProjectGuid>
+    <OutputType>Library</OutputType>
+    <AppDesignerFolder>Properties</AppDesignerFolder>
+    <RootNamespace>Kafka.Client</RootNamespace>
+    <AssemblyName>Kafka.Client</AssemblyName>
+    <TargetFrameworkVersion>v4.0</TargetFrameworkVersion>
+    <FileAlignment>512</FileAlignment>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Debug|AnyCPU' ">
+    <DebugSymbols>true</DebugSymbols>
+    <DebugType>full</DebugType>
+    <Optimize>false</Optimize>
+    <OutputPath>bin\Debug\</OutputPath>
+    <DefineConstants>DEBUG;TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <PropertyGroup Condition=" '$(Configuration)|$(Platform)' == 'Release|AnyCPU' ">
+    <DebugType>pdbonly</DebugType>
+    <Optimize>true</Optimize>
+    <OutputPath>bin\Release\</OutputPath>
+    <DefineConstants>TRACE</DefineConstants>
+    <ErrorReport>prompt</ErrorReport>
+    <WarningLevel>4</WarningLevel>
+  </PropertyGroup>
+  <ItemGroup>
+    <Reference Include="System" />
+    <Reference Include="System.Core" />
+    <Reference Include="System.Xml.Linq" />
+    <Reference Include="System.Data.DataSetExtensions" />
+    <Reference Include="Microsoft.CSharp" />
+    <Reference Include="System.Data" />
+    <Reference Include="System.Xml" />
+  </ItemGroup>
+  <ItemGroup>
+    <Compile Include="AbstractRequest.cs" />
+    <Compile Include="Consumer.cs" />
+    <Compile Include="KafkaException.cs" />
+    <Compile Include="RequestContext.cs" />
+    <Compile Include="Request\FetchRequest.cs" />
+    <Compile Include="Request\MultiFetchRequest.cs" />
+    <Compile Include="Request\MultiProducerRequest.cs" />
+    <Compile Include="Request\OffsetRequest.cs" />
+    <Compile Include="Request\ProducerRequest.cs" />
+    <Compile Include="Util\Crc32.cs" />
+    <Compile Include="KafkaConnection.cs" />
+    <Compile Include="Message.cs" />
+    <Compile Include="Producer.cs" />
+    <Compile Include="Properties\AssemblyInfo.cs" />
+    <Compile Include="RequestType.cs" />
+    <Compile Include="Util\BitWorks.cs" />
+  </ItemGroup>
+  <ItemGroup />
+  <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
+  <!-- To modify your build process, add your task inside one of the targets below and uncomment it. 
+       Other similar extension points exist, see Microsoft.Common.targets.
+  <Target Name="BeforeBuild">
+  </Target>
+  <Target Name="AfterBuild">
+  </Target>
+  -->
+</Project>
\ No newline at end of file

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaConnection.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,204 @@
+using System;
+using System.Net.Sockets;
+using System.Threading;
+using Kafka.Client.Request;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// Callback made when a message request is finished being sent asynchronously.
+    /// </summary>
+    /// <typeparam name="T">
+    /// Must be of type <see cref="AbstractRequest"/> and represents the type of message 
+    /// sent to Kafka.
+    /// </typeparam>
+    /// <param name="request">The request that was sent to the server.</param>
+    public delegate void MessageSent<T>(RequestContext<T> request) where T : AbstractRequest;
+
+    /// <summary>
+    /// Manages connections to the Kafka.
+    /// </summary>
+    public class KafkaConnection : IDisposable
+    {
+        /// <summary>
+        /// TCP client that connects to the server.
+        /// </summary>
+        private TcpClient _client;
+
+        /// <summary>
+        /// Initializes a new instance of the KafkaConnection class.
+        /// </summary>
+        /// <param name="server">The server to connect to.</param>
+        /// <param name="port">The port to connect to.</param>
+        public KafkaConnection(string server, int port)
+        {
+            Server = server;
+            Port = port;
+
+            // connection opened
+            _client = new TcpClient(server, port);
+        }
+
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        public string Server { get; private set; }
+        
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        public int Port { get; private set; }
+
+        /// <summary>
+        /// Readds data from the server.
+        /// </summary>
+        /// <remarks>
+        /// Defauls the amount of time that a read operation blocks waiting for data to <see cref="Timeout.Infinite"/>.
+        /// </remarks>
+        /// <param name="size">The number of bytes to read from the server.</param>
+        /// <returns>The data read from the server as a byte array.</returns>
+        public byte[] Read(int size)
+        {
+            return Read(size, Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Readds data from the server.
+        /// </summary>
+        /// <param name="size">The number of bytes to read from the server.</param>
+        /// <param name="readTimeout">The amount of time that a read operation blocks waiting for data.</param>
+        /// <returns>The data read from the server as a byte array.</returns>
+        public byte[] Read(int size, int readTimeout)
+        {
+            NetworkStream stream = _client.GetStream();
+            stream.ReadTimeout = readTimeout;
+
+            byte[] bytes = new byte[size];
+            bool readComplete = false;
+            int numberOfTries = 0;
+
+            while (!readComplete && numberOfTries < 1000)
+            {
+                if (stream.DataAvailable)
+                {
+                    stream.Read(bytes, 0, size);
+                    readComplete = true;
+                }
+                else
+                {
+                    // wait until the server is ready to send some stuff.
+                    numberOfTries++;
+                    Thread.Sleep(10);
+                }
+            } 
+            
+            return bytes;
+        }
+        
+        /// <summary>
+        /// Writes a producer request to the server asynchronously.
+        /// </summary>
+        /// <param name="request">The request to make.</param>
+        /// <param name="callback">The code to execute once the message is completely sent.</param>
+        public void BeginWrite(ProducerRequest request, MessageSent<ProducerRequest> callback)
+        {
+            NetworkStream stream = _client.GetStream();
+            RequestContext<ProducerRequest> ctx = new RequestContext<ProducerRequest>(stream, request);
+
+            byte[] data = request.GetBytes();
+            stream.BeginWrite(
+                data, 
+                0, 
+                data.Length, 
+                delegate(IAsyncResult asyncResult)
+                {
+                    RequestContext<ProducerRequest> context = (RequestContext<ProducerRequest>)asyncResult.AsyncState;
+
+                    if (callback != null)
+                    {
+                        callback(context);
+                    }
+
+                    context.NetworkStream.EndWrite(asyncResult);
+                    context.NetworkStream.Dispose();
+                }, 
+                ctx);
+        }
+
+        /// <summary>
+        /// Writes a producer request to the server asynchronously.
+        /// </summary>
+        /// <remarks>
+        /// The default callback simply calls the <see cref="NetworkStream.EndWrite"/>. This is
+        /// basically a low level fire and forget call.
+        /// </remarks>
+        /// <param name="data">The data to send to the server.</param>
+        public void BeginWrite(byte[] data)
+        {
+            NetworkStream stream = _client.GetStream();
+            stream.BeginWrite(data, 0, data.Length, (asyncResult) => ((NetworkStream)asyncResult.AsyncState).EndWrite(asyncResult), stream);
+        }
+
+        /// <summary>
+        /// Writes data to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infinite.
+        /// </remarks>
+        /// <param name="data">The data to write to the server.</param>
+        public void Write(byte[] data)
+        {
+            Write(data, Timeout.Infinite);
+        }
+
+        /// <summary>
+        /// Writes a producer request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="ProducerRequest"/> to send to the server.</param>
+        public void Write(ProducerRequest request)
+        {
+            Write(request.GetBytes());
+        }
+
+        /// <summary>
+        /// Writes a multi-producer request to the server.
+        /// </summary>
+        /// <remarks>
+        /// Write timeout is defaulted to infitite.
+        /// </remarks>
+        /// <param name="request">The <see cref="MultiProducerRequest"/> to send to the server.</param>
+        public void Write(MultiProducerRequest request)
+        {
+            Write(request.GetBytes());
+        }
+
+        /// <summary>
+        /// Writes data to the server.
+        /// </summary>
+        /// <param name="data">The data to write to the server.</param>
+        /// <param name="writeTimeout">The amount of time that a write operation blocks waiting for data.</param>
+        public void Write(byte[] data, int writeTimeout)
+        {
+            NetworkStream stream = _client.GetStream();
+            stream.WriteTimeout = writeTimeout;
+
+            // Send the message to the connected TcpServer. 
+            stream.Write(data, 0, data.Length);
+        }
+
+        /// <summary>
+        /// Close the connection to the server.
+        /// </summary>
+        public void Dispose()
+        {
+            if (_client != null)
+            {
+                _client.GetStream().Close();
+                _client.Close();
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/KafkaException.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,81 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// A wrapping of an error code returned from Kafka.
+    /// </summary>
+    public class KafkaException : Exception
+    {
+        /// <summary>
+        /// No error occurred.
+        /// </summary>
+        public const int NoError = 0;
+
+        /// <summary>
+        /// The offset requested was out of range.
+        /// </summary>
+        public const int OffsetOutOfRangeCode = 1;
+
+        /// <summary>
+        /// The message was invalid.
+        /// </summary>
+        public const int InvalidMessageCode = 2;
+
+        /// <summary>
+        /// The wrong partition.
+        /// </summary>
+        public const int WrongPartitionCode = 3;
+
+        /// <summary>
+        /// Invalid message size.
+        /// </summary>
+        public const int InvalidRetchSizeCode = 4;
+
+        /// <summary>
+        /// Initializes a new instance of the KafkaException class.
+        /// </summary>
+        /// <param name="errorCode">The error code generated by a request to Kafka.</param>
+        public KafkaException(int errorCode) : base(GetMessage(errorCode))
+        {
+            ErrorCode = errorCode;
+        }
+
+        /// <summary>
+        /// Gets the error code that was sent from Kafka.
+        /// </summary>
+        public int ErrorCode { get; private set; }
+
+        /// <summary>
+        /// Gets the message for the exception based on the Kafka error code.
+        /// </summary>
+        /// <param name="errorCode">The error code from Kafka.</param>
+        /// <returns>A string message representation </returns>
+        private static string GetMessage(int errorCode)
+        {
+            if (errorCode == OffsetOutOfRangeCode)
+            {
+                return "Offset out of range";
+            }
+            else if (errorCode == InvalidMessageCode)
+            {
+                return "Invalid message";
+            }
+            else if (errorCode == WrongPartitionCode)
+            {
+                return "Wrong partition";
+            }
+            else if (errorCode == InvalidRetchSizeCode)
+            {
+                return "Invalid message size";
+            }
+            else
+            {
+                return "Unknown error";
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Message.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Message.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Message.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Message.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,140 @@
+using System;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Util;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// Message for Kafka.
+    /// </summary>
+    /// <remarks>
+    /// A message. The format of an N byte message is the following:
+    /// <list type="bullet">
+    ///     <item>
+    ///         <description>1 byte "magic" identifier to allow format changes</description>
+    ///     </item>
+    ///     <item>
+    ///         <description>4 byte CRC32 of the payload</description>
+    ///     </item>
+    ///     <item>
+    ///         <description>N - 5 byte payload</description>
+    ///     </item>
+    /// </list>
+    /// </remarks>
+    public class Message
+    {
+        /// <summary>
+        /// Magic identifier for Kafka.
+        /// </summary>
+        private static readonly byte DefaultMagicIdentifier = 0;
+
+        /// <summary>
+        /// Initializes a new instance of the Message class.
+        /// </summary>
+        /// <remarks>
+        /// Uses the <see cref="DefaultMagicIdentifier"/> as a default.
+        /// </remarks>
+        /// <param name="payload">The data for the payload.</param>
+        public Message(byte[] payload) : this(payload, DefaultMagicIdentifier)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the Message class.
+        /// </summary>
+        /// <remarks>
+        /// Initializes the checksum as null.  It will be automatically computed.
+        /// </remarks>
+        /// <param name="payload">The data for the payload.</param>
+        /// <param name="magic">The magic identifier.</param>
+        public Message(byte[] payload, byte magic) : this(payload, magic, null)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the Message class.
+        /// </summary>
+        /// <param name="payload">The data for the payload.</param>
+        /// <param name="magic">The magic identifier.</param>
+        /// <param name="checksum">The checksum for the payload.</param>
+        public Message(byte[] payload, byte magic, byte[] checksum)
+        {
+            Payload = payload;
+            Magic = magic;
+            Checksum = checksum == null ? CalculateChecksum() : checksum;
+        }
+    
+        /// <summary>
+        /// Gets the magic bytes.
+        /// </summary>
+        public byte Magic { get; private set; }
+        
+        /// <summary>
+        /// Gets the CRC32 checksum for the payload.
+        /// </summary>
+        public byte[] Checksum { get; private set; }
+
+        /// <summary>
+        /// Gets the payload.
+        /// </summary>
+        public byte[] Payload { get; private set; }
+
+        /// <summary>
+        /// Parses a message from a byte array given the format Kafka likes. 
+        /// </summary>
+        /// <param name="data">The data for a message.</param>
+        /// <returns>The message.</returns>
+        public static Message ParseFrom(byte[] data)
+        {
+            int size = BitConverter.ToInt32(BitWorks.ReverseBytes(data.Take(4).ToArray<byte>()), 0);
+            byte magic = data[4];
+            byte[] checksum = data.Skip(5).Take(4).ToArray<byte>();
+            byte[] payload = data.Skip(9).Take(size).ToArray<byte>();
+
+            return new Message(payload, magic, checksum);
+        }
+
+        /// <summary>
+        /// Converts the message to bytes in the format Kafka likes.
+        /// </summary>
+        /// <returns>The byte array.</returns>
+        public byte[] GetBytes()
+        {
+            byte[] encodedMessage = new byte[Payload.Length + 1 + Checksum.Length];
+            encodedMessage[0] = Magic;
+            Buffer.BlockCopy(Checksum, 0, encodedMessage, 1, Checksum.Length);
+            Buffer.BlockCopy(Payload, 0, encodedMessage, 1 + Checksum.Length, Payload.Length);
+
+            return encodedMessage;
+        }
+
+        /// <summary>
+        /// Determines if the message is valid given the payload and its checksum.
+        /// </summary>
+        /// <returns>True if valid and false otherwise.</returns>
+        public bool IsValid()
+        {
+            return Checksum.SequenceEqual(CalculateChecksum());
+        }
+
+        /// <summary>
+        /// Try to show the payload as decoded to UTF-8.
+        /// </summary>
+        /// <returns>The decoded payload as string.</returns>
+        public override string ToString()
+        {
+            return Encoding.UTF8.GetString(Payload);
+        }
+
+        /// <summary>
+        /// Calculates the CRC32 checksum on the payload of the message.
+        /// </summary>
+        /// <returns>The checksum given the payload.</returns>
+        private byte[] CalculateChecksum()
+        { 
+            Crc32 crc32 = new Crc32();
+            return crc32.ComputeHash(Payload);
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producer.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producer.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producer.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Producer.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,135 @@
+using System;
+using System.Collections.Generic;
+using System.Text;
+using Kafka.Client.Request;
+using Kafka.Client.Util;
+
+namespace Kafka.Client
+{
+    /// <summary>
+    /// Sends message to Kafka.
+    /// </summary>
+    public class Producer
+    {
+        /// <summary>
+        /// Initializes a new instance of the Producer class.
+        /// </summary>
+        /// <param name="server">The server to connect to.</param>
+        /// <param name="port">The port to connect to.</param>
+        public Producer(string server, int port)
+        {
+            Server = server;
+            Port = port;
+        }
+
+        /// <summary>
+        /// Gets the server to which the connection is to be established.
+        /// </summary>
+        public string Server { get; private set; }
+
+        /// <summary>
+        /// Gets the port to which the connection is to be established.
+        /// </summary>
+        public int Port { get; private set; }
+
+        /// <summary>
+        /// Sends a message to Kafka.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="msg">The message to send.</param>
+        public void Send(string topic, int partition, Message msg)
+        {
+            Send(topic, partition, new List<Message> { msg });
+        }
+
+        /// <summary>
+        /// Sends a list of messages to Kafka.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="messages">The list of messages to send.</param>
+        public void Send(string topic, int partition, IList<Message> messages)
+        {
+            Send(new ProducerRequest(topic, partition, messages));
+        }
+
+        /// <summary>
+        /// Sends a request to Kafka.
+        /// </summary>
+        /// <param name="request">The request to send to Kafka.</param>
+        public void Send(ProducerRequest request)
+        {
+            if (request.IsValid())
+            {
+                using (KafkaConnection connection = new KafkaConnection(Server, Port))
+                {
+                    connection.Write(request);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Sends a request to Kafka.
+        /// </summary>
+        /// <param name="request">The request to send to Kafka.</param>
+        public void Send(MultiProducerRequest request)
+        {
+            if (request.IsValid())
+            {
+                using (KafkaConnection connection = new KafkaConnection(Server, Port))
+                {
+                    connection.Write(request);
+                }
+            }
+        }
+
+        /// <summary>
+        /// Sends a list of messages to Kafka.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="messages">The list of messages to send.</param>
+        /// <param name="callback">
+        /// A block of code to execute once the request has been sent to Kafka.  This value may 
+        /// be set to null.
+        /// </param>
+        public void SendAsync(string topic, int partition, IList<Message> messages, MessageSent<ProducerRequest> callback)
+        {
+            SendAsync(new ProducerRequest(topic, partition, messages), callback);
+        }
+
+        /// <summary>
+        /// Send a request to Kafka asynchronously.
+        /// </summary>
+        /// <remarks>
+        /// If the callback is not specified then the method behaves as a fire-and-forget call
+        /// with the callback being ignored.  By the time this callback is executed, the 
+        /// <see cref="RequestContext.NetworkStream"/> will already have been closed given an 
+        /// internal call <see cref="NetworkStream.EndWrite"/>.
+        /// </remarks>
+        /// <param name="request">The request to send to Kafka.</param>
+        /// <param name="callback">
+        /// A block of code to execute once the request has been sent to Kafka.  This value may 
+        /// be set to null.
+        /// </param>
+        public void SendAsync(ProducerRequest request, MessageSent<ProducerRequest> callback)
+        {
+            if (request.IsValid())
+            {
+                KafkaConnection connection = new KafkaConnection(Server, Port);
+
+                if (callback == null)
+                {
+                    // fire and forget
+                    connection.BeginWrite(request.GetBytes());
+                }
+                else
+                {
+                    // execute with callback
+                    connection.BeginWrite(request, callback);
+                }
+            }
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Properties/AssemblyInfo.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,36 @@
+using System.Reflection;
+using System.Runtime.CompilerServices;
+using System.Runtime.InteropServices;
+
+// General Information about an assembly is controlled through the following 
+// set of attributes. Change these attribute values to modify the information
+// associated with an assembly.
+[assembly: AssemblyTitle("Kafka.Client")]
+[assembly: AssemblyDescription("")]
+[assembly: AssemblyConfiguration("")]
+[assembly: AssemblyCompany("Microsoft")]
+[assembly: AssemblyProduct("Kafka.Client")]
+[assembly: AssemblyCopyright("Copyright © Microsoft 2011")]
+[assembly: AssemblyTrademark("")]
+[assembly: AssemblyCulture("")]
+
+// Setting ComVisible to false makes the types in this assembly not visible 
+// to COM components.  If you need to access a type in this assembly from 
+// COM, set the ComVisible attribute to true on that type.
+[assembly: ComVisible(false)]
+
+// The following GUID is for the ID of the typelib if this project is exposed to COM
+[assembly: Guid("93d702e5-9998-49a8-8c16-5b04b3ba55c1")]
+
+// Version information for an assembly consists of the following four values:
+//
+//      Major Version
+//      Minor Version 
+//      Build Number
+//      Revision
+//
+// You can specify all the values or you can default the Build and Revision Numbers 
+// by using the '*' as shown below:
+// [assembly: AssemblyVersion("1.0.*")]
+[assembly: AssemblyVersion("1.0.0.0")]
+[assembly: AssemblyFileVersion("1.0.0.0")]

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/FetchRequest.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,113 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Util;
+
+namespace Kafka.Client.Request
+{
+    /// <summary>
+    /// Constructs a request to send to Kafka.
+    /// </summary>
+    public class FetchRequest : AbstractRequest
+    {
+        /// <summary>
+        /// Maximum size.
+        /// </summary>
+        private static readonly int DefaultMaxSize = 1048576;
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        public FetchRequest()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+        public FetchRequest(string topic, int partition, long offset)
+            : this(topic, partition, offset, DefaultMaxSize)
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the FetchRequest class.
+        /// </summary>
+        /// <param name="topic">The topic to publish to.</param>
+        /// <param name="partition">The partition to publish to.</param>
+        /// <param name="offset">The offset in the topic/partition to retrieve from.</param>
+        /// <param name="maxSize">The maximum size.</param>
+        public FetchRequest(string topic, int partition, long offset, int maxSize)
+        {
+            Topic = topic;
+            Partition = partition;
+            Offset = offset;
+            MaxSize = maxSize;
+        }
+
+        /// <summary>
+        /// Gets or sets the offset to request.
+        /// </summary>
+        public long Offset { get; set; }
+
+        /// <summary>
+        /// Gets or sets the maximum size to pass in the request.
+        /// </summary>
+        public int MaxSize { get; set; }
+
+        /// <summary>
+        /// Determines if the request has valid settings.
+        /// </summary>
+        /// <returns>True if valid and false otherwise.</returns>
+        public override bool IsValid()
+        {
+            return !string.IsNullOrWhiteSpace(Topic);
+        }
+
+        /// <summary>
+        /// Gets the bytes matching the expected Kafka structure. 
+        /// </summary>
+        /// <returns>The byte array of the request.</returns>
+        public override byte[] GetBytes()
+        {
+            byte[] internalBytes = GetInternalBytes();
+
+            List<byte> request = new List<byte>();
+
+            // add the 2 for the RequestType.Fetch
+            request.AddRange(BitWorks.GetBytesReversed(internalBytes.Length + 2));
+            request.AddRange(BitWorks.GetBytesReversed((short)RequestType.Fetch));
+            request.AddRange(internalBytes);
+
+            return request.ToArray<byte>();
+        }
+
+        /// <summary>
+        /// Gets the bytes representing the request which is used when generating a multi-request.
+        /// </summary>
+        /// <remarks>
+        /// The <see cref="GetBytes"/> method is used for sending a single <see cref="RequestType.Fetch"/>.
+        /// It prefixes this byte array with the request type and the number of messages. This method
+        /// is used to supply the <see cref="MultiFetchRequest"/> with the contents for its message.
+        /// </remarks>
+        /// <returns>The bytes that represent this <see cref="FetchRequest"/>.</returns>
+        internal byte[] GetInternalBytes()
+        {
+            // TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
+            int requestSize = 2 + Topic.Length + 4 + 8 + 4;
+
+            List<byte> request = new List<byte>();
+            request.AddRange(BitWorks.GetBytesReversed((short)Topic.Length));
+            request.AddRange(Encoding.ASCII.GetBytes(Topic));
+            request.AddRange(BitWorks.GetBytesReversed(Partition));
+            request.AddRange(BitWorks.GetBytesReversed(Offset));
+            request.AddRange(BitWorks.GetBytesReversed(MaxSize));
+
+            return request.ToArray<byte>();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiFetchRequest.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,62 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Util;
+
+namespace Kafka.Client.Request
+{
+    /// <summary>
+    /// Constructs a multi-consumer request to send to Kafka.
+    /// </summary>
+    public class MultiFetchRequest : AbstractRequest
+    {
+        /// <summary>
+        /// Initializes a new instance of the MultiFetchRequest class.
+        /// </summary>
+        /// <param name="requests">Requests to package up and batch.</param>
+        public MultiFetchRequest(IList<FetchRequest> requests)
+        {
+            ConsumerRequests = requests;
+        }
+
+        /// <summary>
+        /// Gets or sets the consumer requests to be batched into this multi-request.
+        /// </summary>
+        public IList<FetchRequest> ConsumerRequests { get; set; }
+
+        /// <summary>
+        /// Determines if the request has valid settings.
+        /// </summary>
+        /// <returns>True if valid and false otherwise.</returns>
+        public override bool IsValid()
+        {
+            return ConsumerRequests != null && ConsumerRequests.Count > 0
+                && ConsumerRequests.Select(itm => !itm.IsValid()).Count() > 0;
+        }
+
+        /// <summary>
+        /// Gets the bytes matching the expected Kafka structure. 
+        /// </summary>
+        /// <returns>The byte array of the request.</returns>
+        public override byte[] GetBytes()
+        {
+            List<byte> messagePack = new List<byte>();
+            byte[] requestBytes = BitWorks.GetBytesReversed(Convert.ToInt16((int)RequestType.MultiFetch));
+            byte[] consumerRequestCountBytes = BitWorks.GetBytesReversed(Convert.ToInt16(ConsumerRequests.Count));
+
+            List<byte> encodedMessageSet = new List<byte>();
+            encodedMessageSet.AddRange(requestBytes);
+            encodedMessageSet.AddRange(consumerRequestCountBytes);
+
+            foreach (FetchRequest consumerRequest in ConsumerRequests)
+            {
+                encodedMessageSet.AddRange(consumerRequest.GetInternalBytes());
+            }
+
+            encodedMessageSet.InsertRange(0, BitWorks.GetBytesReversed(encodedMessageSet.Count));
+
+            return encodedMessageSet.ToArray();
+        }
+    }
+}

Added: incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs (added)
+++ incubator/kafka/trunk/clients/csharp/src/Kafka/Kafka.Client/Request/MultiProducerRequest.cs Mon Aug  1 23:41:24 2011
@@ -0,0 +1,71 @@
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using Kafka.Client.Util;
+
+namespace Kafka.Client.Request
+{
+    /// <summary>
+    /// Constructs a request containing multiple producer requests to send to Kafka.
+    /// </summary>
+    public class MultiProducerRequest : AbstractRequest
+    {
+        /// <summary>
+        /// Initializes a new instance of the MultiProducerRequest class.
+        /// </summary>
+        public MultiProducerRequest()
+        {
+        }
+
+        /// <summary>
+        /// Initializes a new instance of the MultiProducerRequest class.
+        /// </summary>
+        /// <param name="producerRequests">
+        /// The list of individual producer requests to send in this request.
+        /// </param>
+        public MultiProducerRequest(IList<ProducerRequest> producerRequests)
+        {
+            ProducerRequests = producerRequests;
+        }
+
+        /// <summary>
+        /// Gets or sets the list of producer requests to be sent in batch.
+        /// </summary>
+        public IList<ProducerRequest> ProducerRequests { get; set; }
+
+        /// <summary>
+        /// Determines if the request has valid settings.
+        /// </summary>
+        /// <returns>True if valid and false otherwise.</returns>
+        public override bool IsValid()
+        {
+            return ProducerRequests != null && ProducerRequests.Count > 0
+                && ProducerRequests.Select(itm => !itm.IsValid()).Count() > 0;
+        }
+
+        /// <summary>
+        /// Gets the bytes matching the expected Kafka structure. 
+        /// </summary>
+        /// <returns>The byte array of the request.</returns>
+        public override byte[] GetBytes()
+        {
+            List<byte> messagePack = new List<byte>();
+            byte[] requestBytes = BitWorks.GetBytesReversed(Convert.ToInt16((int)RequestType.MultiProduce));
+            byte[] producerRequestCountBytes = BitWorks.GetBytesReversed(Convert.ToInt16(ProducerRequests.Count));
+
+            List<byte> encodedMessageSet = new List<byte>();
+            encodedMessageSet.AddRange(requestBytes);
+            encodedMessageSet.AddRange(producerRequestCountBytes);
+
+            foreach (ProducerRequest producerRequest in ProducerRequests)
+            {
+                encodedMessageSet.AddRange(producerRequest.GetInternalBytes());
+            }
+
+            encodedMessageSet.InsertRange(0, BitWorks.GetBytesReversed(encodedMessageSet.Count));
+
+            return encodedMessageSet.ToArray();
+        }
+    }
+}



Mime
View raw message