incubator-kafka-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jkr...@apache.org
Subject svn commit: r1152970 [13/26] - in /incubator/kafka: branches/ site/ trunk/ trunk/bin/ trunk/clients/ trunk/clients/clojure/ trunk/clients/clojure/leiningen/ trunk/clients/clojure/resources/ trunk/clients/clojure/src/ trunk/clients/clojure/src/kafka/ tr...
Date Mon, 01 Aug 2011 23:42:17 GMT
Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/MessageTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,45 @@
+<?php
+
+/**
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_MessageTest extends PHPUnit_Framework_TestCase
+{
+	private $test;
+	private $encoded;
+	private $msg;
+	public function setUp() {
+		$this->test = 'a sample string';
+		$this->encoded = Kafka_Encoder::encode_message($this->test);
+		$this->msg = new Kafka_Message($this->encoded);
+		
+	}
+	
+	public function testPayload() {
+		$this->assertEquals($this->test, $this->msg->payload());
+	}
+	
+	public function testValid() {
+		$this->assertTrue($this->msg->isValid());
+	}
+	
+	public function testEncode() {
+		$this->assertEquals($this->encoded, $this->msg->encode());
+	}
+	
+	public function testChecksum() {
+		$this->assertInternalType('integer', $this->msg->checksum());
+	}
+	
+	public function testSize() {
+		$this->assertEquals(strlen($this->test), $this->msg->size());
+	}
+	
+	public function testToString() {
+		$this->assertInternalType('string', $this->msg->__toString());
+	}
+	
+	public function testMagic() {
+		$this->assertInternalType('integer', $this->msg->magic());
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/Kafka/ProducerTest.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,59 @@
+<?php
+
+/**
+ * Override connect() method of base class
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_ProducerMock extends Kafka_Producer {
+	public function connect() {
+		if (!is_resource($this->conn)) {
+			$this->conn = fopen('php://temp', 'w+b');
+		}
+	}
+	
+	public function getData() {
+		$this->connect();
+		rewind($this->conn);
+		return stream_get_contents($this->conn);
+	}
+}
+
+/**
+ * Description of ProducerTest
+ *
+ * @author Lorenzo Alberton <l.alberton@quipo.it>
+ */
+class Kafka_ProducerTest extends PHPUnit_Framework_TestCase
+{
+	/**
+	 * @var Kafka_Producer
+	 */
+	private $producer;
+	
+	public function setUp() {
+		$this->producer = new Kafka_ProducerMock('localhost', 1234);
+	}
+	
+	public function tearDown() {
+		$this->producer->close();
+		unset($this->producer);
+	}
+
+
+	public function testProducer() {
+		$messages = array(
+			'test 1',
+			'test 2 abc',
+		);
+		$topic = 'a topic';
+		$partition = 3;
+		$this->producer->send($messages, $topic, $partition);
+		$sent = $this->producer->getData();
+		$this->assertContains($topic, $sent);
+		$this->assertContains($partition, $sent);
+		foreach ($messages as $msg) {
+			$this->assertContains($msg, $sent);
+		}
+	}
+}

Added: incubator/kafka/trunk/clients/php/src/tests/bootstrap.php
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/bootstrap.php?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/bootstrap.php (added)
+++ incubator/kafka/trunk/clients/php/src/tests/bootstrap.php Mon Aug  1 23:41:24 2011
@@ -0,0 +1,36 @@
+<?php
+
+function test_autoload($className)
+{
+	$classFile = str_replace('_', DIRECTORY_SEPARATOR, $className) . '.php';
+	if (function_exists('stream_resolve_include_path')) {
+		$file = stream_resolve_include_path($classFile);
+	} else {
+		foreach (explode(PATH_SEPARATOR, get_include_path()) as $path) {
+			if (file_exists($path . '/' . $classFile)) {
+				$file = $path . '/' . $classFile;
+				break;
+			}
+		}
+	}
+	/* If file is found, store it into the cache, classname <-> file association */
+	if (($file !== false) && ($file !== null)) {
+		include $file;
+		return;
+	}
+
+	throw new RuntimeException($className. ' not found');
+}
+
+// register the autoloader
+spl_autoload_register('test_autoload');
+
+set_include_path(
+	implode(PATH_SEPARATOR, array(
+		realpath(dirname(__FILE__).'/../lib'),
+		get_include_path(),
+	))
+);
+
+date_default_timezone_set('Europe/London');
+ 
\ No newline at end of file

Added: incubator/kafka/trunk/clients/php/src/tests/phpunit.xml
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/php/src/tests/phpunit.xml?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/php/src/tests/phpunit.xml (added)
+++ incubator/kafka/trunk/clients/php/src/tests/phpunit.xml Mon Aug  1 23:41:24 2011
@@ -0,0 +1,17 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<phpunit 
+	bootstrap="./bootstrap.php"
+	colors="true"
+	backupGlobals="false"
+	backupStaticAttributes="false">
+
+    <testsuite name="Kafka PHP Client Test Suite">
+        <directory>./Kafka</directory>
+    </testsuite>
+
+    <filter>
+        <blacklist>
+            <directory>./</directory>
+        </blacklist>
+    </filter>
+</phpunit>

Added: incubator/kafka/trunk/clients/python/kafka.py
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/python/kafka.py?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/python/kafka.py (added)
+++ incubator/kafka/trunk/clients/python/kafka.py Mon Aug  1 23:41:24 2011
@@ -0,0 +1,68 @@
+# Copyright 2010 LinkedIn
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import socket
+import struct
+import binascii
+import sys
+
+PRODUCE_REQUEST_ID = 0
+
+def encode_message(message):
+    # <MAGIC_BYTE: char> <CRC32: int> <PAYLOAD: bytes>
+    return struct.pack('>B', 0) + \
+           struct.pack('>i', binascii.crc32(message)) + \
+           message
+
+def encode_produce_request(topic, partition, messages):
+    # encode messages as <LEN: int><MESSAGE_BYTES>
+    encoded = [encode_message(message) for message in messages]
+    message_set = ''.join([struct.pack('>i', len(m)) + m for m in encoded])
+    
+    # create the request as <REQUEST_SIZE: int> <REQUEST_ID: short> <TOPIC: bytes> <PARTITION: int> <BUFFER_SIZE: int> <BUFFER: bytes>
+    data = struct.pack('>H', PRODUCE_REQUEST_ID) + \
+           struct.pack('>H', len(topic)) + topic + \
+           struct.pack('>i', partition) + \
+           struct.pack('>i', len(message_set)) + message_set
+    return struct.pack('>i', len(data)) + data
+
+
+class KafkaProducer:
+    def __init__(self, host, port):
+        self.REQUEST_KEY = 0
+        self.connection = socket.socket()
+        self.connection.connect((host, port))
+
+    def close(self):
+        self.connection.close()
+
+    def send(self, messages, topic, partition = 0):
+        self.connection.sendall(encode_produce_request(topic, partition, messages))
+    
+if __name__ == '__main__':
+    if len(sys.argv) < 4:
+        print >> sys.stderr, 'USAGE: python', sys.argv[0], 'host port topic'
+        sys.exit(1)
+    host = sys.argv[1]
+    port = int(sys.argv[2])
+    topic = sys.argv[3]
+
+    producer = KafkaProducer(host, port)
+
+    while True:
+        print 'Enter comma seperated messages: ',
+        line = sys.stdin.readline()
+        messages = line.split(',')
+        producer.send(messages, topic)
+        print 'Sent', len(messages), 'messages successfully'
\ No newline at end of file

Added: incubator/kafka/trunk/clients/python/setup.py
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/python/setup.py?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/python/setup.py (added)
+++ incubator/kafka/trunk/clients/python/setup.py Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+from distutils.core import setup
+ 
+setup(
+    name='kafka-python-client',
+    version='0.6',
+    description='This library implements a Kafka client',
+    author='LinkedIn.com',
+    url='https://github.com/kafka-dev/kafka',
+    package_dir={'': '.'},
+    py_modules=[
+        'kafka',
+    ],
+)

Added: incubator/kafka/trunk/clients/ruby/LICENSE
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/LICENSE?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/LICENSE (added)
+++ incubator/kafka/trunk/clients/ruby/LICENSE Mon Aug  1 23:41:24 2011
@@ -0,0 +1,202 @@
+
+                              Apache License
+                        Version 2.0, January 2004
+                     http://www.apache.org/licenses/
+
+TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+1. Definitions.
+
+   "License" shall mean the terms and conditions for use, reproduction,
+   and distribution as defined by Sections 1 through 9 of this document.
+
+   "Licensor" shall mean the copyright owner or entity authorized by
+   the copyright owner that is granting the License.
+
+   "Legal Entity" shall mean the union of the acting entity and all
+   other entities that control, are controlled by, or are under common
+   control with that entity. For the purposes of this definition,
+   "control" means (i) the power, direct or indirect, to cause the
+   direction or management of such entity, whether by contract or
+   otherwise, or (ii) ownership of fifty percent (50%) or more of the
+   outstanding shares, or (iii) beneficial ownership of such entity.
+
+   "You" (or "Your") shall mean an individual or Legal Entity
+   exercising permissions granted by this License.
+
+   "Source" form shall mean the preferred form for making modifications,
+   including but not limited to software source code, documentation
+   source, and configuration files.
+
+   "Object" form shall mean any form resulting from mechanical
+   transformation or translation of a Source form, including but
+   not limited to compiled object code, generated documentation,
+   and conversions to other media types.
+
+   "Work" shall mean the work of authorship, whether in Source or
+   Object form, made available under the License, as indicated by a
+   copyright notice that is included in or attached to the work
+   (an example is provided in the Appendix below).
+
+   "Derivative Works" shall mean any work, whether in Source or Object
+   form, that is based on (or derived from) the Work and for which the
+   editorial revisions, annotations, elaborations, or other modifications
+   represent, as a whole, an original work of authorship. For the purposes
+   of this License, Derivative Works shall not include works that remain
+   separable from, or merely link (or bind by name) to the interfaces of,
+   the Work and Derivative Works thereof.
+
+   "Contribution" shall mean any work of authorship, including
+   the original version of the Work and any modifications or additions
+   to that Work or Derivative Works thereof, that is intentionally
+   submitted to Licensor for inclusion in the Work by the copyright owner
+   or by an individual or Legal Entity authorized to submit on behalf of
+   the copyright owner. For the purposes of this definition, "submitted"
+   means any form of electronic, verbal, or written communication sent
+   to the Licensor or its representatives, including but not limited to
+   communication on electronic mailing lists, source code control systems,
+   and issue tracking systems that are managed by, or on behalf of, the
+   Licensor for the purpose of discussing and improving the Work, but
+   excluding communication that is conspicuously marked or otherwise
+   designated in writing by the copyright owner as "Not a Contribution."
+
+   "Contributor" shall mean Licensor and any individual or Legal Entity
+   on behalf of whom a Contribution has been received by Licensor and
+   subsequently incorporated within the Work.
+
+2. Grant of Copyright License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   copyright license to reproduce, prepare Derivative Works of,
+   publicly display, publicly perform, sublicense, and distribute the
+   Work and such Derivative Works in Source or Object form.
+
+3. Grant of Patent License. Subject to the terms and conditions of
+   this License, each Contributor hereby grants to You a perpetual,
+   worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+   (except as stated in this section) patent license to make, have made,
+   use, offer to sell, sell, import, and otherwise transfer the Work,
+   where such license applies only to those patent claims licensable
+   by such Contributor that are necessarily infringed by their
+   Contribution(s) alone or by combination of their Contribution(s)
+   with the Work to which such Contribution(s) was submitted. If You
+   institute patent litigation against any entity (including a
+   cross-claim or counterclaim in a lawsuit) alleging that the Work
+   or a Contribution incorporated within the Work constitutes direct
+   or contributory patent infringement, then any patent licenses
+   granted to You under this License for that Work shall terminate
+   as of the date such litigation is filed.
+
+4. Redistribution. You may reproduce and distribute copies of the
+   Work or Derivative Works thereof in any medium, with or without
+   modifications, and in Source or Object form, provided that You
+   meet the following conditions:
+
+   (a) You must give any other recipients of the Work or
+       Derivative Works a copy of this License; and
+
+   (b) You must cause any modified files to carry prominent notices
+       stating that You changed the files; and
+
+   (c) You must retain, in the Source form of any Derivative Works
+       that You distribute, all copyright, patent, trademark, and
+       attribution notices from the Source form of the Work,
+       excluding those notices that do not pertain to any part of
+       the Derivative Works; and
+
+   (d) If the Work includes a "NOTICE" text file as part of its
+       distribution, then any Derivative Works that You distribute must
+       include a readable copy of the attribution notices contained
+       within such NOTICE file, excluding those notices that do not
+       pertain to any part of the Derivative Works, in at least one
+       of the following places: within a NOTICE text file distributed
+       as part of the Derivative Works; within the Source form or
+       documentation, if provided along with the Derivative Works; or,
+       within a display generated by the Derivative Works, if and
+       wherever such third-party notices normally appear. The contents
+       of the NOTICE file are for informational purposes only and
+       do not modify the License. You may add Your own attribution
+       notices within Derivative Works that You distribute, alongside
+       or as an addendum to the NOTICE text from the Work, provided
+       that such additional attribution notices cannot be construed
+       as modifying the License.
+
+   You may add Your own copyright statement to Your modifications and
+   may provide additional or different license terms and conditions
+   for use, reproduction, or distribution of Your modifications, or
+   for any such Derivative Works as a whole, provided Your use,
+   reproduction, and distribution of the Work otherwise complies with
+   the conditions stated in this License.
+
+5. Submission of Contributions. Unless You explicitly state otherwise,
+   any Contribution intentionally submitted for inclusion in the Work
+   by You to the Licensor shall be under the terms and conditions of
+   this License, without any additional terms or conditions.
+   Notwithstanding the above, nothing herein shall supersede or modify
+   the terms of any separate license agreement you may have executed
+   with Licensor regarding such Contributions.
+
+6. Trademarks. This License does not grant permission to use the trade
+   names, trademarks, service marks, or product names of the Licensor,
+   except as required for reasonable and customary use in describing the
+   origin of the Work and reproducing the content of the NOTICE file.
+
+7. Disclaimer of Warranty. Unless required by applicable law or
+   agreed to in writing, Licensor provides the Work (and each
+   Contributor provides its Contributions) on an "AS IS" BASIS,
+   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+   implied, including, without limitation, any warranties or conditions
+   of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+   PARTICULAR PURPOSE. You are solely responsible for determining the
+   appropriateness of using or redistributing the Work and assume any
+   risks associated with Your exercise of permissions under this License.
+
+8. Limitation of Liability. In no event and under no legal theory,
+   whether in tort (including negligence), contract, or otherwise,
+   unless required by applicable law (such as deliberate and grossly
+   negligent acts) or agreed to in writing, shall any Contributor be
+   liable to You for damages, including any direct, indirect, special,
+   incidental, or consequential damages of any character arising as a
+   result of this License or out of the use or inability to use the
+   Work (including but not limited to damages for loss of goodwill,
+   work stoppage, computer failure or malfunction, or any and all
+   other commercial damages or losses), even if such Contributor
+   has been advised of the possibility of such damages.
+
+9. Accepting Warranty or Additional Liability. While redistributing
+   the Work or Derivative Works thereof, You may choose to offer,
+   and charge a fee for, acceptance of support, warranty, indemnity,
+   or other liability obligations and/or rights consistent with this
+   License. However, in accepting such obligations, You may act only
+   on Your own behalf and on Your sole responsibility, not on behalf
+   of any other Contributor, and only if You agree to indemnify,
+   defend, and hold each Contributor harmless for any liability
+   incurred by, or claims asserted against, such Contributor by reason
+   of your accepting any such warranty or additional liability.
+
+END OF TERMS AND CONDITIONS
+
+APPENDIX: How to apply the Apache License to your work.
+
+   To apply the Apache License to your work, attach the following
+   boilerplate notice, with the fields enclosed by brackets "[]"
+   replaced with your own identifying information. (Don't include
+   the brackets!)  The text should be enclosed in the appropriate
+   comment syntax for the file format. We also recommend that a
+   file or class name and description of purpose be included on the
+   same "printed page" as the copyright notice for easier
+   identification within third-party archives.
+
+Copyright 2011 LinkedIn
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.

Added: incubator/kafka/trunk/clients/ruby/README.md
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/README.md?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/README.md (added)
+++ incubator/kafka/trunk/clients/ruby/README.md Mon Aug  1 23:41:24 2011
@@ -0,0 +1,62 @@
+# kafka-rb
+kafka-rb allows you to produce messages to the Kafka distributed publish/subscribe messaging service.
+
+## Requirements
+You need to have access to your Kafka instance and be able to connect through TCP. You can obtain a copy and instructions on how to setup kafka at https://github.com/kafka-dev/kafka
+
+## Installation
+sudo gem install kafka-rb
+
+(the code works fine with JRuby, Ruby 1.8x and Ruby 1.9.x)
+
+## Usage
+
+### Sending a simple message
+
+    require 'kafka'
+    producer = Kafka::Producer.new
+    message = Kafka::Message.new("some random message content")
+    producer.send(message)
+
+### Sending a sequence of messages
+
+    require 'kafka'
+    producer = Kafka::Producer.new
+    message1 = Kafka::Message.new("some random message content")
+    message2 = Kafka::Message.new("some more content")
+    producer.send([message1, message2])
+
+### Batching a bunch of messages using the block syntax
+
+    require 'kafka'
+    producer = Kafka::Producer.new
+    producer.batch do |messages|
+        puts "Batching a send of multiple messages.."
+        messages << Kafka::Message.new("first message to send")
+        messages << Kafka::Message.new("second message to send")
+    end
+
+* they will be sent all at once, after the block execution
+
+### Consuming messages one by one
+
+    require 'kafka'
+    consumer = Kafka::Consumer.new
+    messages = consumer.consume
+
+
+### Consuming messages using a block loop
+
+    require 'kafka'
+    consumer = Kafka::Consumer.new
+    consumer.loop do |messages|
+        puts "Received"
+        puts messages
+    end
+
+
+Contact for questions
+
+alejandrocrosa at(@) gmail.com
+
+http://twitter.com/alejandrocrosa

Added: incubator/kafka/trunk/clients/ruby/Rakefile
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/Rakefile?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/Rakefile (added)
+++ incubator/kafka/trunk/clients/ruby/Rakefile Mon Aug  1 23:41:24 2011
@@ -0,0 +1,61 @@
+require 'rubygems'
+require 'rake/gempackagetask'
+require 'rubygems/specification'
+require 'date'
+require 'rspec/core/rake_task'
+
+GEM = 'kafka-rb'
+GEM_NAME = 'Kafka Client'
+GEM_VERSION = '0.0.5'
+AUTHORS = ['Alejandro Crosa']
+EMAIL = "alejandrocrosa@gmail.com"
+HOMEPAGE = "http://github.com/acrosa/kafka-rb"
+SUMMARY = "A Ruby client for the Kafka distributed publish/subscribe messaging service"
+DESCRIPTION = "kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service."
+
+spec = Gem::Specification.new do |s|
+  s.name = GEM
+  s.version = GEM_VERSION
+  s.platform = Gem::Platform::RUBY
+  s.has_rdoc = true
+  s.extra_rdoc_files = ["LICENSE"]
+  s.summary = SUMMARY
+  s.description = DESCRIPTION
+  s.authors = AUTHORS
+  s.email = EMAIL
+  s.homepage = HOMEPAGE
+  s.add_development_dependency "rspec"
+  s.require_path = 'lib'
+  s.autorequire = GEM
+  s.files = %w(LICENSE README.md Rakefile) + Dir.glob("{lib,tasks,spec}/**/*")
+end
+
+task :default => :spec
+
+desc "Run specs"
+RSpec::Core::RakeTask.new do |t|
+  t.pattern = FileList['spec/**/*_spec.rb']
+  t.rspec_opts = %w(-fs --color)
+end
+
+Rake::GemPackageTask.new(spec) do |pkg|
+  pkg.gem_spec = spec
+end
+
+desc "install the gem locally"
+task :install => [:package] do
+  sh %{sudo gem install pkg/#{GEM}-#{GEM_VERSION}}
+end
+
+desc "create a gemspec file"
+task :make_spec do
+  File.open("#{GEM}.gemspec", "w") do |file|
+    file.puts spec.to_ruby
+  end
+end
+
+desc "Run all examples with RCov"
+RSpec::Core::RakeTask.new(:rcov) do |t|
+  t.pattern = FileList['spec/**/*_spec.rb']
+  t.rcov = true
+end

Added: incubator/kafka/trunk/clients/ruby/TODO
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/TODO?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/TODO (added)
+++ incubator/kafka/trunk/clients/ruby/TODO Mon Aug  1 23:41:24 2011
@@ -0,0 +1 @@
+* should persist the offset somewhere (currently thinking alternatives)

Added: incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec (added)
+++ incubator/kafka/trunk/clients/ruby/kafka-rb.gemspec Mon Aug  1 23:41:24 2011
@@ -0,0 +1,32 @@
+# -*- encoding: utf-8 -*-
+
+Gem::Specification.new do |s|
+  s.name = %q{kafka-rb}
+  s.version = "0.0.5"
+
+  s.required_rubygems_version = Gem::Requirement.new(">= 0") if s.respond_to? :required_rubygems_version=
+  s.authors = ["Alejandro Crosa"]
+  s.autorequire = %q{kafka-rb}
+  s.date = %q{2011-01-13}
+  s.description = %q{kafka-rb allows you to produce and consume messages using the Kafka distributed publish/subscribe messaging service.}
+  s.email = %q{alejandrocrosa@gmail.com}
+  s.extra_rdoc_files = ["LICENSE"]
+  s.files = ["LICENSE", "README.md", "Rakefile", "lib/kafka", "lib/kafka/batch.rb", "lib/kafka/consumer.rb", "lib/kafka/io.rb", "lib/kafka/message.rb", "lib/kafka/producer.rb", "lib/kafka/request_type.rb", "lib/kafka.rb", "spec/batch_spec.rb", "spec/consumer_spec.rb", "spec/io_spec.rb", "spec/kafka_spec.rb", "spec/message_spec.rb", "spec/producer_spec.rb", "spec/spec_helper.rb"]
+  s.homepage = %q{http://github.com/acrosa/kafka-rb}
+  s.require_paths = ["lib"]
+  s.rubygems_version = %q{1.3.7}
+  s.summary = %q{A Ruby client for the Kafka distributed publish/subscribe messaging service}
+
+  if s.respond_to? :specification_version then
+    current_version = Gem::Specification::CURRENT_SPECIFICATION_VERSION
+    s.specification_version = 3
+
+    if Gem::Version.new(Gem::VERSION) >= Gem::Version.new('1.2.0') then
+      s.add_development_dependency(%q<rspec>, [">= 0"])
+    else
+      s.add_dependency(%q<rspec>, [">= 0"])
+    end
+  else
+    s.add_dependency(%q<rspec>, [">= 0"])
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,13 @@
+require 'socket'
+require 'zlib'
+
+require File.join(File.dirname(__FILE__), "kafka", "io")
+require File.join(File.dirname(__FILE__), "kafka", "request_type")
+require File.join(File.dirname(__FILE__), "kafka", "error_codes")
+require File.join(File.dirname(__FILE__), "kafka", "batch")
+require File.join(File.dirname(__FILE__), "kafka", "message")
+require File.join(File.dirname(__FILE__), "kafka", "producer")
+require File.join(File.dirname(__FILE__), "kafka", "consumer")
+
+module Kafka
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/batch.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,13 @@
+module Kafka
+  class Batch
+    attr_accessor :messages
+
+    def initialize
+      self.messages = []
+    end
+
+    def << (message)
+      self.messages << message
+    end
+  end
+end
\ No newline at end of file

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/consumer.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,135 @@
+module Kafka
+  class Consumer
+
+    include Kafka::IO
+
+    CONSUME_REQUEST_TYPE = Kafka::RequestType::FETCH
+    MAX_SIZE = 1048576 # 1 MB
+    DEFAULT_POLLING_INTERVAL = 2 # 2 seconds
+    MAX_OFFSETS = 100
+
+    attr_accessor :topic, :partition, :offset, :max_size, :request_type, :polling
+
+    def initialize(options = {})
+      self.topic        = options[:topic]        || "test"
+      self.partition    = options[:partition]    || 0
+      self.host         = options[:host]         || "localhost"
+      self.port         = options[:port]         || 9092
+      self.offset       = options[:offset]       || -2
+      self.max_size     = options[:max_size]     || MAX_SIZE
+      self.request_type = options[:request_type] || CONSUME_REQUEST_TYPE
+      self.polling      = options[:polling]      || DEFAULT_POLLING_INTERVAL
+      self.connect(self.host, self.port)
+
+      if @offset < 0
+         send_offsets_request
+         offsets = read_offsets_response
+         raise Exception, "No offsets for #@topic-#@partition" if offsets.empty?
+         @offset = offsets[0]
+      end
+    end
+
+    # REQUEST TYPE ID + TOPIC LENGTH + TOPIC + PARTITION + OFFSET + MAX SIZE
+    def request_size
+      2 + 2 + topic.length + 4 + 8 + 4
+    end
+
+    def encode_request_size
+      [self.request_size].pack("N")
+    end
+
+    def encode_request(request_type, topic, partition, offset, max_size)
+      request_type = [request_type].pack("n")
+      topic        = [topic.length].pack('n') + topic
+      partition    = [partition].pack("N")
+      offset       = [offset].pack("Q").reverse # DIY 64bit big endian integer
+      max_size     = [max_size].pack("N")
+
+      request_type + topic + partition + offset + max_size
+    end
+
+    def offsets_request_size
+       2 + 2 + topic.length + 4 + 8 +4
+    end
+
+    def encode_offsets_request_size
+       [offsets_request_size].pack('N')
+    end
+
+    # Query the server for the offsets
+    def encode_offsets_request(topic, partition, time, max_offsets)
+       req         = [Kafka::RequestType::OFFSETS].pack('n')
+       topic       = [topic.length].pack('n') + topic
+       partition   = [partition].pack('N')
+       time        = [time].pack("q").reverse # DIY 64bit big endian integer
+       max_offsets = [max_offsets].pack('N')
+
+       req + topic + partition + time + max_offsets
+    end
+
+    def consume
+      self.send_consume_request         # request data
+      data = self.read_data_response    # read data response
+      self.parse_message_set_from(data) # parse message set
+    end
+
+    def loop(&block)
+      messages = []
+      while(true) do
+        messages = self.consume
+        block.call(messages) if messages && !messages.empty?
+        sleep(self.polling)
+      end
+    end
+
+    def read_data_response
+      data_length = self.socket.read(4).unpack("N").shift # read length
+      data = self.socket.read(data_length)                # read message set
+      data[2, data.length]                                # we start with a 2 byte offset
+    end
+
+    def send_consume_request
+      self.write(self.encode_request_size) # write request_size
+      self.write(self.encode_request(self.request_type, self.topic, self.partition, self.offset, self.max_size)) # write request
+    end
+
+    def send_offsets_request
+      self.write(self.encode_offsets_request_size) # write request_size
+      self.write(self.encode_offsets_request(@topic, @partition, -2, MAX_OFFSETS)) # write request
+    end
+
+    def read_offsets_response
+      data_length = self.socket.read(4).unpack('N').shift # read length
+      data = self.socket.read(data_length)                # read message
+
+      pos = 0
+      error_code = data[pos,2].unpack('n')[0]
+      raise Exception, Kafka::ErrorCodes::to_s(error_code) if error_code != Kafka::ErrorCodes::NO_ERROR
+
+      pos += 2
+      count = data[pos,4].unpack('N')[0]
+      pos += 4
+
+      res = []
+      while pos != data.size
+         res << data[pos,8].reverse.unpack('q')[0]
+         pos += 8
+      end
+
+      res
+    end
+
+    def parse_message_set_from(data)
+      messages = []
+      processed = 0
+      length = data.length - 4
+      while(processed <= length) do
+        message_size = data[processed, 4].unpack("N").shift
+        messages << Kafka::Message.parse_from(data[processed, message_size + 4])
+        processed += 4 + message_size
+      end
+      self.offset += processed
+      messages
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/error_codes.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,21 @@
+module Kafka
+  module ErrorCodes
+    NO_ERROR                = 0
+    OFFSET_OUT_OF_RANGE     = 1
+    INVALID_MESSAGE_CODE    = 2
+    WRONG_PARTITION_CODE    = 3
+    INVALID_RETCH_SIZE_CODE = 4
+
+    STRINGS = {
+      0 => 'No error',
+      1 => 'Offset out of range',
+      2 => 'Invalid message code',
+      3 => 'Wrong partition code',
+      4 => 'Invalid retch size code',
+    }
+
+    def self.to_s(code)
+      STRINGS[code] || 'Unknown error'
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/io.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,39 @@
+module Kafka
+  module IO
+    attr_accessor :socket, :host, :port
+
+    def connect(host, port)
+      raise ArgumentError, "No host or port specified" unless host && port
+      self.host = host
+      self.port = port
+      self.socket = TCPSocket.new(host, port)
+    end
+
+    def reconnect
+      self.disconnect
+      self.socket = self.connect(self.host, self.port)
+    end
+
+    def disconnect
+      self.socket.close rescue nil
+      self.socket = nil
+    end
+
+    def write(data)
+      self.reconnect unless self.socket
+      self.socket.write(data)
+    rescue Errno::ECONNRESET, Errno::EPIPE, Errno::ECONNABORTED
+      self.reconnect
+      self.socket.write(data) # retry
+    end
+
+    def read(length)
+      begin
+        self.socket.read(length)
+      rescue Errno::EAGAIN
+        self.disconnect
+        raise Errno::EAGAIN, "Timeout reading from the socket"
+      end
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/message.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,35 @@
+module Kafka
+
+  # A message. The format of an N byte message is the following:
+  # 1 byte "magic" identifier to allow format changes
+  # 4 byte CRC32 of the payload
+  # N - 5 byte payload
+  class Message
+
+    MAGIC_IDENTIFIER_DEFAULT = 0
+
+    attr_accessor :magic, :checksum, :payload
+
+    def initialize(payload = nil, magic = MAGIC_IDENTIFIER_DEFAULT, checksum = nil)
+      self.magic    = magic
+      self.payload  = payload
+      self.checksum = checksum || self.calculate_checksum
+    end
+
+    def calculate_checksum
+      Zlib.crc32(self.payload)
+    end
+
+    def valid?
+      self.checksum == Zlib.crc32(self.payload)
+    end
+
+    def self.parse_from(binary)
+      size     = binary[0, 4].unpack("N").shift.to_i
+      magic    = binary[4, 1].unpack("C").shift
+      checksum = binary[5, 4].unpack("N").shift
+      payload  = binary[9, size] # 5 = 1 + 4 is Magic + Checksum
+      return Kafka::Message.new(payload, magic, checksum)
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/producer.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,49 @@
+module Kafka
+  class Producer
+
+    include Kafka::IO
+
+    PRODUCE_REQUEST_ID = Kafka::RequestType::PRODUCE
+
+    attr_accessor :topic, :partition
+
+    def initialize(options = {})
+      self.topic     = options[:topic]      || "test"
+      self.partition = options[:partition]  || 0
+      self.host      = options[:host]       || "localhost"
+      self.port      = options[:port]       || 9092
+      self.connect(self.host, self.port)
+    end
+
+    def encode(message)
+      [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
+    end
+
+    def encode_request(topic, partition, messages)
+      message_set = Array(messages).collect { |message|
+        encoded_message = self.encode(message)
+        [encoded_message.length].pack("N") + encoded_message
+      }.join("")
+
+      request   = [PRODUCE_REQUEST_ID].pack("n")
+      topic     = [topic.length].pack("n") + topic
+      partition = [partition].pack("N")
+      messages  = [message_set.length].pack("N") + message_set
+
+      data = request + topic + partition + messages
+
+      return [data.length].pack("N") + data
+    end
+
+    def send(messages)
+      self.write(self.encode_request(self.topic, self.partition, messages))
+    end
+
+    def batch(&block)
+      batch = Kafka::Batch.new
+      block.call( batch )
+      self.send(batch.messages)
+      batch.messages.clear
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb (added)
+++ incubator/kafka/trunk/clients/ruby/lib/kafka/request_type.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,9 @@
+module Kafka
+  module RequestType
+    PRODUCE      = 0
+    FETCH        = 1
+    MULTIFETCH   = 2
+    MULTIPRODUCE = 3
+    OFFSETS      = 4
+  end
+end
\ No newline at end of file

Added: incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/batch_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,21 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Batch do
+
+  before(:each) do
+    @batch = Batch.new
+  end
+
+  describe "batch messages" do
+    it "holds all messages to be sent" do
+      @batch.should respond_to(:messages)
+      @batch.messages.class.should eql(Array)
+    end
+
+    it "supports queueing/adding messages to be send" do
+      @batch.messages << mock(Kafka::Message.new("one"))
+      @batch.messages << mock(Kafka::Message.new("two"))
+      @batch.messages.length.should eql(2)
+    end
+  end
+end
\ No newline at end of file

Added: incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/consumer_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,120 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Consumer do
+
+  before(:each) do
+    @mocked_socket = mock(TCPSocket)
+    TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+    @consumer = Consumer.new
+  end
+
+  describe "Kafka Consumer" do
+
+    it "should have a CONSUME_REQUEST_TYPE" do
+      Consumer::CONSUME_REQUEST_TYPE.should eql(1)
+      @consumer.should respond_to(:request_type)
+    end
+
+    it "should have a topic and a partition" do
+      @consumer.should respond_to(:topic)
+      @consumer.should respond_to(:partition)
+    end
+
+    it "should have a polling option, and a default value" do
+      Consumer::DEFAULT_POLLING_INTERVAL.should eql(2)
+      @consumer.should respond_to(:polling)
+      @consumer.polling.should eql(2)
+    end
+
+    it "should set a topic and partition on initialize" do
+      @consumer = Consumer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
+      @consumer.topic.should eql("testing")
+      @consumer.partition.should eql(0)
+      @consumer = Consumer.new({ :topic => "testing", :partition => 3 })
+      @consumer.partition.should eql(3)
+    end
+
+    it "should set default host and port if none is specified" do
+      @consumer = Consumer.new
+      @consumer.host.should eql("localhost")
+      @consumer.port.should eql(9092)
+    end
+
+    it "should have a default offset, and be able to set it" do
+      @consumer.offset.should eql(0)
+      @consumer = Consumer.new({ :offset => 1111 })
+      @consumer.offset.should eql(1111)
+    end
+
+    it "should have a max size" do
+      Consumer::MAX_SIZE.should eql(1048576)
+      @consumer.max_size.should eql(1048576)
+    end
+
+    it "should return the size of the request" do
+      @consumer.request_size.should eql(24)
+      @consumer.topic = "someothertopicname"
+      @consumer.request_size.should eql(38)
+      @consumer.encode_request_size.should eql([@consumer.request_size].pack("N"))
+    end
+
+    it "should encode a request to consume" do
+      bytes = [Kafka::Consumer::CONSUME_REQUEST_TYPE].pack("n") + ["test".length].pack("n") + "test" + [0].pack("N") + [0].pack("L_") + [Kafka::Consumer::MAX_SIZE].pack("N")
+      @consumer.encode_request(Kafka::Consumer::CONSUME_REQUEST_TYPE, "test", 0, 0, Kafka::Consumer::MAX_SIZE).should eql(bytes)
+    end
+
+    it "should read the response data" do
+      bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+      @mocked_socket.should_receive(:read).exactly(:twice).and_return(bytes)
+      @consumer.read_data_response.should eql(bytes[2, bytes.length])
+    end
+
+    it "should send a consumer request" do
+      @consumer.stub!(:encode_request_size).and_return(666)
+      @consumer.stub!(:encode_request).and_return("someencodedrequest")
+      @consumer.should_receive(:write).with("someencodedrequest").exactly(:once).and_return(true)
+      @consumer.should_receive(:write).with(666).exactly(:once).and_return(true)
+      @consumer.send_consume_request.should eql(true)
+    end
+
+    it "should parse a message set from bytes" do
+      bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+      message = @consumer.parse_message_set_from(bytes).first
+      message.payload.should eql("ale")
+      message.checksum.should eql(1120192889)
+      message.magic.should eql(0)
+      message.valid?.should eql(true)
+    end
+
+    it "should consume messages" do
+      @consumer.should_receive(:send_consume_request).and_return(true)
+      @consumer.should_receive(:read_data_response).and_return("")
+      @consumer.consume.should eql([])
+    end
+
+    it "should loop and execute a block with the consumed messages" do
+      @consumer.stub!(:consume).and_return([mock(Kafka::Message)])
+      messages = []
+      messages.should_receive(:<<).exactly(:once).and_return([])
+      @consumer.loop do |message|
+        messages << message
+        break # we don't wanna loop forever on the test
+      end
+    end
+
+    it "should loop (every N seconds, configurable on polling attribute), and execute a block with the consumed messages" do
+      @consumer = Consumer.new({ :polling => 1 })
+      @consumer.stub!(:consume).and_return([mock(Kafka::Message)])
+      messages = []
+      messages.should_receive(:<<).exactly(:twice).and_return([])
+      executed_times = 0
+      @consumer.loop do |message|
+        messages << message
+        executed_times += 1
+        break if executed_times >= 2 # we don't wanna loop forever on the test, only 2 seconds
+      end
+
+      executed_times.should eql(2)
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/spec/io_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/io_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/io_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/io_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,77 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+class IOTest
+  include Kafka::IO
+end
+
+describe IO do
+
+  before(:each) do
+    @mocked_socket = mock(TCPSocket)
+    TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+    @io = IOTest.new
+    @io.connect("somehost", 9093)
+  end
+
+  describe "default methods" do
+    it "has a socket, a host and a port" do
+      [:socket, :host, :port].each do |m|
+        @io.should respond_to(m.to_sym)
+      end
+    end
+
+    it "raises an exception if no host and port is specified" do
+      lambda {
+        io = IOTest.new
+        io.connect
+      }.should raise_error(ArgumentError)
+    end
+    
+    it "should remember the port and host on connect" do
+      @io.connect("somehost", 9093)
+      @io.host.should eql("somehost")
+      @io.port.should eql(9093)
+    end
+
+    it "should write to a socket" do
+      data = "some data"
+      @mocked_socket.should_receive(:write).with(data).and_return(9)
+      @io.write(data).should eql(9)
+    end
+
+    it "should read from a socket" do
+      length = 200
+      @mocked_socket.should_receive(:read).with(length).and_return(nil)
+      @io.read(length)
+    end
+
+    it "should disconnect on a timeout when reading from a socket (to aviod protocol desync state)" do
+      length = 200
+      @mocked_socket.should_receive(:read).with(length).and_raise(Errno::EAGAIN)
+      @io.should_receive(:disconnect)
+      lambda { @io.read(length) }.should raise_error(Errno::EAGAIN)
+    end
+
+    it "should disconnect" do
+      @io.should respond_to(:disconnect)
+      @mocked_socket.should_receive(:close).and_return(nil)
+      @io.disconnect
+    end
+
+    it "should reconnect" do
+      @mocked_socket.should_receive(:close)
+      @io.should_receive(:connect)
+      @io.reconnect
+    end
+
+    it "should reconnect on a broken pipe error" do
+      [Errno::ECONNABORTED, Errno::EPIPE, Errno::ECONNRESET].each do |error|
+        @mocked_socket.should_receive(:write).exactly(:twice).and_raise(error)
+        @mocked_socket.should_receive(:close).exactly(:once).and_return(nil)
+        lambda {
+          @io.write("some data to send")
+        }.should raise_error(error)
+      end
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/kafka_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,7 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Kafka do
+
+  before(:each) do
+  end
+end
\ No newline at end of file

Added: incubator/kafka/trunk/clients/ruby/spec/message_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/message_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/message_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/message_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,55 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Message do
+
+  before(:each) do
+    @message = Message.new
+  end
+
+  describe "Kafka Message" do
+    it "should have a default magic number" do
+      Message::MAGIC_IDENTIFIER_DEFAULT.should eql(0)
+    end
+
+    it "should have a magic field, a checksum and a payload" do
+      [:magic, :checksum, :payload].each do |field|
+        @message.should respond_to(field.to_sym)
+      end
+    end
+
+    it "should set a default value of zero" do
+      @message.magic.should eql(Kafka::Message::MAGIC_IDENTIFIER_DEFAULT)
+    end
+
+    it "should allow to set a custom magic number" do
+      @message = Message.new("ale", 1)
+      @message.magic.should eql(1)
+    end
+
+    it "should calculate the checksum (crc32 of a given message)" do
+      @message.payload = "ale"
+      @message.calculate_checksum.should eql(1120192889)
+      @message.payload = "alejandro"
+      @message.calculate_checksum.should eql(2865078607)
+    end
+
+    it "should say if the message is valid using the crc32 signature" do
+      @message.payload  = "alejandro"
+      @message.checksum = 2865078607
+      @message.valid?.should eql(true)
+      @message.checksum = 0
+      @message.valid?.should eql(false)
+      @message = Message.new("alejandro", 0, 66666666) # 66666666 is a funny checksum
+      @message.valid?.should eql(false)
+    end
+
+    it "should parse a message from bytes" do
+      bytes = [12].pack("N") + [0].pack("C") + [1120192889].pack("N") + "ale"
+      message = Kafka::Message.parse_from(bytes)
+      message.valid?.should eql(true)
+      message.magic.should eql(0)
+      message.checksum.should eql(1120192889)
+      message.payload.should eql("ale")
+    end
+  end
+end

Added: incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/producer_spec.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,95 @@
+require File.dirname(__FILE__) + '/spec_helper'
+
+describe Producer do
+
+  before(:each) do
+    @mocked_socket = mock(TCPSocket)
+    TCPSocket.stub!(:new).and_return(@mocked_socket) # don't use a real socket
+    @producer = Producer.new
+  end
+
+  describe "Kafka Producer" do
+    it "should have a PRODUCE_REQUEST_ID" do
+      Producer::PRODUCE_REQUEST_ID.should eql(0)
+    end
+
+    it "should have a topic and a partition" do
+      @producer.should respond_to(:topic)
+      @producer.should respond_to(:partition)
+    end
+
+    it "should set a topic and partition on initialize" do
+      @producer = Producer.new({ :host => "localhost", :port => 9092, :topic => "testing" })
+      @producer.topic.should eql("testing")
+      @producer.partition.should eql(0)
+      @producer = Producer.new({ :topic => "testing", :partition => 3 })
+      @producer.partition.should eql(3)
+    end
+
+    it "should set default host and port if none is specified" do
+      @producer = Producer.new
+      @producer.host.should eql("localhost")
+      @producer.port.should eql(9092)
+    end
+
+    describe "Message Encoding" do
+      it "should encode a message" do
+        message = Kafka::Message.new("alejandro")
+        full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload
+        @producer.encode(message).should eql(full_message)
+      end
+      
+      it "should encode an empty message" do
+        message = Kafka::Message.new()
+        full_message = [message.magic].pack("C") + [message.calculate_checksum].pack("N") + message.payload.to_s
+        @producer.encode(message).should eql(full_message)
+      end
+    end
+
+    describe "Request Encoding" do
+      it "should binary encode an empty request" do
+        bytes = @producer.encode_request("test", 0, [])
+        bytes.length.should eql(20)
+        bytes.should eql("\000\000\000\020\000\000\000\004test\000\000\000\000\000\000\000\000")
+      end
+
+      it "should binary encode a request with a message, using a specific wire format" do
+        message = Kafka::Message.new("ale")
+        bytes = @producer.encode_request("test", 3, message)
+        data_size  = bytes[0, 4].unpack("N").shift
+        request_id = bytes[4, 2].unpack("n").shift
+        topic_length = bytes[6, 2].unpack("n").shift
+        topic = bytes[8, 4]
+        partition = bytes[12, 4].unpack("N").shift
+        messages_length = bytes[16, 4].unpack("N").shift
+        messages = bytes[20, messages_length]
+
+        bytes.length.should eql(32)
+        data_size.should eql(28)
+        request_id.should eql(0)
+        topic_length.should eql(4)
+        topic.should eql("test")
+        partition.should eql(3)
+        messages_length.should eql(12)
+      end
+    end
+  end
+
+  it "should send messages" do
+    @producer.should_receive(:write).and_return(32)
+    message = Kafka::Message.new("ale")
+    @producer.send(message).should eql(32)
+  end
+
+  describe "Message Batching" do
+    it "should batch messages and send them at once" do
+      message1 = Kafka::Message.new("one")
+      message2 = Kafka::Message.new("two")
+      @producer.should_receive(:send).with([message1, message2]).exactly(:once).and_return(nil)
+      @producer.batch do |messages|
+        messages << message1
+        messages << message2
+      end
+    end
+  end
+end
\ No newline at end of file

Added: incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb (added)
+++ incubator/kafka/trunk/clients/ruby/spec/spec_helper.rb Mon Aug  1 23:41:24 2011
@@ -0,0 +1,4 @@
+require 'rubygems'
+require 'kafka'
+
+include Kafka
\ No newline at end of file

Added: incubator/kafka/trunk/config/consumer.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/consumer.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/consumer.properties (added)
+++ incubator/kafka/trunk/config/consumer.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,15 @@
+# see kafka.consumer.ConsumerConfig for more details
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=127.0.0.1:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+#consumer group id
+groupid=test-consumer-group
+
+#consumer timeout
+#consumer.timeout.ms=5000

Added: incubator/kafka/trunk/config/log4j.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/log4j.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/log4j.properties (added)
+++ incubator/kafka/trunk/config/log4j.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,22 @@
+log4j.rootLogger=INFO, stdout
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c)%n
+
+#log4j.appender.fileAppender=org.apache.log4j.FileAppender
+#log4j.appender.fileAppender.File=kafka-request.log
+#log4j.appender.fileAppender.layout=org.apache.log4j.PatternLayout
+#log4j.appender.fileAppender.layout.ConversionPattern= %-4r [%t] %-5p %c %x - %m%n
+
+
+# Turn on all our debugging info
+log4j.logger.kafka=INFO,stdout
+#log4j.logger.kafka.producer.async.DefaultEventHandler=DEBUG,stdout
+#log4j.logger.kafka.consumer.PartitionTopicInfo=TRACE,stdout
+#log4j.logger.kafka.request.logger=TRACE,fileAppender
+#log4j.additivity.kafka.request.logger=false
+#log4j.logger.kafka.network.Processor=TRACE,fileAppender
+#log4j.additivity.kafka.network.Processor=false
+#log4j.logger.org.I0Itec.zkclient.ZkClient=DEBUG
+

Added: incubator/kafka/trunk/config/server.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/server.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/server.properties (added)
+++ incubator/kafka/trunk/config/server.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,64 @@
+# see kafka.server.KafkaConfig for additional details and defaults
+
+# the id of the broker
+brokerid=0
+
+# hostname of broker. If not set, will pick up from the value returned
+# from getLocalHost.  If there are multiple interfaces getLocalHost
+# may not be what you want.
+# hostname=
+
+# number of logical partitions on this broker
+num.partitions=1
+
+# the port the socket server runs on
+port=9092
+
+# the number of processor threads the socket server uses. Defaults to the number of cores on the machine
+num.threads=8
+
+# the directory in which to store log files
+log.dir=/tmp/kafka-logs
+
+# the send buffer used by the socket server 
+socket.send.buffer=1048576
+
+# the receive buffer used by the socket server
+socket.receive.buffer=1048576
+
+# the maximum size of a log segment
+log.file.size=536870912
+
+# the interval between running cleanup on the logs
+log.cleanup.interval.mins=1
+
+# the minimum age of a log file to eligible for deletion
+log.retention.hours=168
+
+#the number of messages to accept without flushing the log to disk
+log.flush.interval=1
+
+#set the following properties to use zookeeper
+
+# enable connecting to zookeeper
+enable.zookeeper=true
+
+# zk connection string
+# comma separated host:port pairs, each corresponding to a zk
+# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002"
+zk.connect=localhost:2181
+
+# timeout in ms for connecting to zookeeper
+zk.connectiontimeout.ms=1000000
+
+# time based topic flush intervals in ms
+#topic.flush.intervals.ms=topic:1000
+
+# default time based flush interval in ms
+log.default.flush.interval.ms=2000
+
+# the interval (in ms) at which logs are checked to see if they need to be flushed to disk.
+log.default.flush.scheduler.interval.ms=1000
+
+# topic partition count map
+# topic.partition.count.map=topic1:3, topic2:4

Added: incubator/kafka/trunk/config/zookeeper.properties
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/config/zookeeper.properties?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/config/zookeeper.properties (added)
+++ incubator/kafka/trunk/config/zookeeper.properties Mon Aug  1 23:41:24 2011
@@ -0,0 +1,4 @@
+# the directory where the snapshot is stored.
+dataDir=/tmp/zookeeper
+# the port at which the clients will connect
+clientPort=2181

Added: incubator/kafka/trunk/contrib/hadoop-consumer/README
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/README?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/README (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/README Mon Aug  1 23:41:24 2011
@@ -0,0 +1,66 @@
+This is a Hadoop job that pulls data from kafka server into HDFS.
+
+It requires the following inputs from a configuration file 
+(test/test.properties is an example)
+
+kafka.etl.topic : the topic to be fetched;
+
+input		: input directory containing topic offsets and
+		  it can be generated by DataGenerator; 
+		  the number of files in this directory determines the
+		  number of mappers in the hadoop job;
+
+output		: output directory containing kafka data and updated 
+		  topic offsets;
+
+kafka.request.limit : it is used to limit the number events fetched. 
+
+KafkaETLRecordReader is a record reader associated with KafkaETLInputFormat.
+It fetches kafka data from the server. It starts from provided offsets 
+(specified by "input") and stops when it reaches the largest available offsets 
+or the specified limit (specified by "kafka.request.limit").
+
+KafkaETLJob contains some helper functions to initialize job configuration.
+
+SimpleKafkaETLJob sets up job properties and files Hadoop job. 
+
+SimpleKafkaETLMapper dumps kafka data into hdfs. 
+
+HOW TO RUN:
+In order to run this, make sure the HADOOP_HOME environment variable points to 
+your hadoop installation directory.
+
+1. Complile using "sbt" to create a package for hadoop consumer code.
+./sbt package
+
+2. Run the hadoop-setup.sh script that enables write permission on the 
+   required HDFS directory
+
+3. Produce test events in server and generate offset files
+  1) Start kafka server [ Follow the quick start - 
+                        http://sna-projects.com/kafka/quickstart.php ]
+
+  2) Update test/test.properties to change the following parameters:  
+   kafka.etl.topic 	: topic name
+   event.count		: number of events to be generated
+   kafka.server.uri     : kafka server uri;
+   input                : hdfs directory of offset files
+
+  3) Produce test events to Kafka server and generate offset files
+   ./run-class.sh kafka.etl.impl.DataGenerator test/test.properties
+
+4. Fetch generated topic into HDFS:
+  1) Update test/test.properties to change the following parameters:
+	hadoop.job.ugi	: id and group 
+	input           : input location 
+	output	        : output location 
+	kafka.request.limit: limit the number of events to be fetched; 
+			     -1 means no limitation.
+        hdfs.default.classpath.dir : hdfs location of jars
+
+  2) copy jars into hdfs
+   ./copy-jars.sh ${hdfs.default.classpath.dir}
+
+  2) Fetch data
+  ./run-class.sh kafka.etl.impl.SimpleKafkaETLJob test/test.properties
+

Added: incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,55 @@
+#!/bin/bash
+
+if [ $# -lt 1 ];
+then
+  echo "USAGE: $0 dir"
+  exit 1
+fi
+
+base_dir=$(dirname $0)/../..
+
+hadoop=${HADOOP_HOME}/bin/hadoop
+
+echo "$hadoop fs -rmr $1"
+$hadoop fs -rmr $1
+
+echo "$hadoop fs -mkdir $1"
+$hadoop fs -mkdir $1
+
+# include kafka jars
+for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
+do
+   echo "$hadoop fs -put $file $1/"
+   $hadoop fs -put $file $1/ 
+done
+
+# include kafka jars
+echo "$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar; $1/"
+$hadoop fs -put $base_dir/core/target/scala_2.8.0/kafka-*.jar $1/ 
+
+# include core lib jars
+for file in $base_dir/core/lib/*.jar;
+do
+   echo "$hadoop fs -put $file $1/"
+   $hadoop fs -put $file $1/ 
+done
+
+for file in $base_dir/core/lib_managed/scala_2.8.0/compile/*.jar;
+do
+   echo "$hadoop fs -put $file $1/"
+   $hadoop fs -put $file $1/ 
+done
+
+# include scala library jar
+echo "$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar; $1/"
+$hadoop fs -put $base_dir/project/boot/scala-2.8.0/lib/scala-library.jar $1/
+
+local_dir=$(dirname $0)
+
+# include hadoop-consumer jars
+for file in $local_dir/lib/*.jar;
+do
+   echo "$hadoop fs -put $file $1/"
+   $hadoop fs -put $file $1/ 
+done
+

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/copy-jars.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,6 @@
+#!/bin/bash
+
+hadoop=${HADOOP_HOME}/bin/hadoop
+
+$hadoop fs -chmod ugoa+w /tmp
+

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/hadoop-setup.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/avro-1.4.0.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/commons-logging-1.0.4.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/hadoop-0.20.2-core.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-core-asl-1.5.5.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/jackson-mapper-asl-1.5.5.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/pig-0.8.0-core.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar?rev=1152970&view=auto
==============================================================================
Binary file - no diff available.

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/lib/piggybank.jar
------------------------------------------------------------------------------
    svn:mime-type = application/octet-stream

Added: incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh Mon Aug  1 23:41:24 2011
@@ -0,0 +1,51 @@
+#!/bin/bash
+
+if [ $# -lt 1 ];
+then
+  echo "USAGE: $0 classname [opts]"
+  exit 1
+fi
+
+base_dir=$(dirname $0)/../..
+
+# include kafka jars
+for file in $base_dir/core/target/scala_2.8.0/kafka-*.jar
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/hadoop-consumer/lib_managed/scala_2.8.0/compile/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+local_dir=$(dirname $0)
+
+# include hadoop-consumer jars
+for file in $base_dir/contrib/hadoop-consumer/target/scala_2.8.0/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+for file in $base_dir/contrib/hadoop-consumer/lib/*.jar;
+do
+  CLASSPATH=$CLASSPATH:$file
+done
+
+CLASSPATH=$CLASSPATH:$base_dir/project/boot/scala-2.8.0/lib/scala-library.jar
+
+echo $CLASSPATH
+
+CLASSPATH=dist:$CLASSPATH:${HADOOP_HOME}/conf
+
+#if [ -z "$KAFKA_OPTS" ]; then
+#  KAFKA_OPTS="-Xmx512M -server -Dcom.sun.management.jmxremote"
+#fi
+
+if [ -z "$JAVA_HOME" ]; then
+  JAVA="java"
+else
+  JAVA="$JAVA_HOME/bin/java"
+fi
+
+$JAVA $KAFKA_OPTS -cp $CLASSPATH $@

Propchange: incubator/kafka/trunk/contrib/hadoop-consumer/run-class.sh
------------------------------------------------------------------------------
    svn:executable = *

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLContext.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,270 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.zip.CRC32;
+import kafka.api.FetchRequest;
+import kafka.javaapi.MultiFetchResponse;
+import kafka.api.OffsetRequest;
+import kafka.common.ErrorMapping;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import kafka.message.MessageSet;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import java.nio.ByteBuffer;
+
+@SuppressWarnings({ "deprecation"})
+public class KafkaETLContext {
+    
+    static protected int MAX_RETRY_TIME = 1;
+    final static String CLIENT_BUFFER_SIZE = "client.buffer.size";
+    final static String CLIENT_TIMEOUT = "client.so.timeout";
+
+    final static int DEFAULT_BUFFER_SIZE = 1 * 1024 * 1024;
+    final static int DEFAULT_TIMEOUT = 60000; // one minute
+
+    final static KafkaETLKey DUMMY_KEY = new KafkaETLKey();
+    
+    protected int _index; /*index of context*/
+    protected String _input = null; /*input string*/
+    protected KafkaETLRequest _request = null;
+    protected SimpleConsumer _consumer = null; /*simple consumer*/
+
+    protected long[] _offsetRange = {0, 0};  /*offset range*/
+    protected long _offset = Long.MAX_VALUE; /*current offset*/
+    protected long _count; /*current count*/
+
+    protected MultiFetchResponse _response = null;  /*fetch response*/
+    protected Iterator<MessageAndOffset> _messageIt = null; /*message iterator*/
+    
+    protected int _retry = 0;
+    protected long _requestTime = 0; /*accumulative request time*/
+    protected long _startTime = -1;
+    
+    protected int _bufferSize;
+    protected int _timeout;
+    protected Reporter _reporter;
+    
+    protected MultipleOutputs _mos;
+    protected OutputCollector<KafkaETLKey, BytesWritable> _offsetOut = null;
+    
+    public long getTotalBytes() {
+        return (_offsetRange[1] > _offsetRange[0])? _offsetRange[1] - _offsetRange[0] : 0;
+    }
+    
+    public long getReadBytes() {
+        return _offset - _offsetRange[0];
+    }
+    
+    public long getCount() {
+        return _count;
+    }
+    
+    /**
+     * construct using input string
+     */
+    @SuppressWarnings("unchecked")
+    public KafkaETLContext(JobConf job, Props props, Reporter reporter, 
+                                    MultipleOutputs mos, int index, String input) 
+    throws Exception {
+        
+        _bufferSize = getClientBufferSize(props);
+        _timeout = getClientTimeout(props);
+        System.out.println("bufferSize=" +_bufferSize);
+        System.out.println("timeout=" + _timeout);
+        _reporter = reporter;
+        _mos = mos;
+        
+        // read topic and current offset from input
+        _index= index; 
+        _input = input;
+        _request = new KafkaETLRequest(input.trim());
+        
+        // read data from queue
+        URI uri = _request.getURI();
+        _consumer = new SimpleConsumer(uri.getHost(), uri.getPort(), _timeout, _bufferSize);
+        
+        // get available offset range
+        _offsetRange = getOffsetRange();
+        System.out.println("Connected to node " + uri 
+                + " beginning reading at offset " + _offsetRange[0]
+                + " latest offset=" + _offsetRange[1]);
+
+        _offset = _offsetRange[0];
+        _count = 0;
+        _requestTime = 0;
+        _retry = 0;
+        
+        _startTime = System.currentTimeMillis();
+    }
+    
+    public boolean hasMore () {
+        return _messageIt != null && _messageIt.hasNext() 
+                || _response != null && _response.iterator().hasNext()
+                || _offset < _offsetRange[1]; 
+    }
+    
+    public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
+        if ( !hasMore() ) return false;
+        
+        boolean gotNext = get(key, value);
+
+        if(_response != null) {
+            Iterator<ByteBufferMessageSet> iter = _response.iterator();
+            while ( !gotNext && iter.hasNext()) {
+                ByteBufferMessageSet msgSet = iter.next();
+                if ( hasError(msgSet)) return false;
+                _messageIt =  (Iterator<MessageAndOffset>) msgSet.iterator();
+                gotNext = get(key, value);
+            }
+        }
+        return gotNext;
+    }
+    
+    public boolean fetchMore () throws IOException {
+        if (!hasMore()) return false;
+        
+        FetchRequest fetchRequest = 
+            new FetchRequest(_request.getTopic(), _request.getPartition(), _offset, _bufferSize);
+        List<FetchRequest> array = new ArrayList<FetchRequest>();
+        array.add(fetchRequest);
+
+        long tempTime = System.currentTimeMillis();
+        _response = _consumer.multifetch(array);
+        _requestTime += (System.currentTimeMillis() - tempTime);
+        
+        return true;
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void output(String fileprefix) throws IOException {
+       String offsetString = _request.toString(_offset);
+
+        if (_offsetOut == null)
+            _offsetOut = (OutputCollector<KafkaETLKey, BytesWritable>)
+                                    _mos.getCollector("offsets", fileprefix+_index, _reporter);
+        _offsetOut.collect(DUMMY_KEY, new BytesWritable(offsetString.getBytes("UTF-8")));
+        
+    }
+    
+    public void close() throws IOException {
+        if (_consumer != null) _consumer.close();
+        
+        String topic = _request.getTopic();
+        long endTime = System.currentTimeMillis();
+        _reporter.incrCounter(topic, "read-time(ms)", endTime - _startTime);
+        _reporter.incrCounter(topic, "request-time(ms)", _requestTime);
+        
+        long bytesRead = _offset - _offsetRange[0];
+        double megaRead = bytesRead / (1024.0*1024.0);
+        _reporter.incrCounter(topic, "data-read(mb)", (long) megaRead);
+        _reporter.incrCounter(topic, "event-count", _count);
+    }
+    
+    protected boolean get(KafkaETLKey key, BytesWritable value) throws IOException {
+        if (_messageIt != null && _messageIt.hasNext()) {
+            MessageAndOffset msgAndOffset = _messageIt.next();
+            
+            ByteBuffer buf = msgAndOffset.message().payload();
+            int origSize = buf.remaining();
+            byte[] bytes = new byte[origSize];
+            buf.get(bytes, buf.position(), origSize);
+            value.set(bytes, 0, origSize);
+            
+            key.set(_index, _offset, msgAndOffset.message().checksum());
+            
+            _offset += msgAndOffset.offset();  //increase offset
+            _count ++;  //increase count
+            
+            return true;
+        }
+        else return false;
+    }
+    
+    /**
+     * Get offset ranges
+     */
+    protected long[] getOffsetRange() throws IOException {
+
+        /* get smallest and largest offsets*/
+        long[] range = new long[2];
+
+        long[] startOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
+                OffsetRequest.EarliestTime(), 1);
+        if (startOffsets.length != 1)
+            throw new IOException("input:" + _input + " Expect one smallest offset but get "
+                                            + startOffsets.length);
+        range[0] = startOffsets[0];
+        
+        long[] endOffsets = _consumer.getOffsetsBefore(_request.getTopic(), _request.getPartition(),
+                                        OffsetRequest.LatestTime(), 1);
+        if (endOffsets.length != 1)
+            throw new IOException("input:" + _input + " Expect one latest offset but get " 
+                                            + endOffsets.length);
+        range[1] = endOffsets[0];
+
+        /*adjust range based on input offsets*/
+        if ( _request.isValidOffset()) {
+            long startOffset = _request.getOffset();
+            if (startOffset > range[0]) {
+                System.out.println("Update starting offset with " + startOffset);
+                range[0] = startOffset;
+            }
+            else {
+                System.out.println("WARNING: given starting offset " + startOffset 
+                                            + " is smaller than the smallest one " + range[0] 
+                                            + ". Will ignore it.");
+            }
+        }
+        System.out.println("Using offset range [" + range[0] + ", " + range[1] + "]");
+        return range;
+    }
+    
+    /**
+     * Called by the default implementation of {@link #map} to check error code
+     * to determine whether to continue.
+     */
+    protected boolean hasError(ByteBufferMessageSet messages)
+            throws IOException {
+        int errorCode = messages.getErrorCode();
+        if (errorCode == ErrorMapping.OffsetOutOfRangeCode()) {
+            /* offset cannot cross the maximum offset (guaranteed by Kafka protocol).
+               Kafka server may delete old files from time to time */
+            System.err.println("WARNING: current offset=" + _offset + ". It is out of range.");
+
+            if (_retry >= MAX_RETRY_TIME)  return true;
+            _retry++;
+            // get the current offset range
+            _offsetRange = getOffsetRange();
+            _offset =  _offsetRange[0];
+            return false;
+        } else if (errorCode == ErrorMapping.InvalidMessageCode()) {
+            throw new IOException(_input + " current offset=" + _offset
+                    + " : invalid offset.");
+        } else if (errorCode == ErrorMapping.WrongPartitionCode()) {
+            throw new IOException(_input + " : wrong partition");
+        } else if (errorCode != ErrorMapping.NoError()) {
+            throw new IOException(_input + " current offset=" + _offset
+                    + " error:" + errorCode);
+        } else
+            return false;
+    }
+    
+    public static int getClientBufferSize(Props props) throws Exception {
+        return props.getInt(CLIENT_BUFFER_SIZE, DEFAULT_BUFFER_SIZE);
+    }
+
+    public static int getClientTimeout(Props props) throws Exception {
+        return props.getInt(CLIENT_TIMEOUT, DEFAULT_TIMEOUT);
+    }
+
+}

Added: incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java
URL: http://svn.apache.org/viewvc/incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java?rev=1152970&view=auto
==============================================================================
--- incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java (added)
+++ incubator/kafka/trunk/contrib/hadoop-consumer/src/main/java/kafka/etl/KafkaETLInputFormat.java Mon Aug  1 23:41:24 2011
@@ -0,0 +1,61 @@
+package kafka.etl;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.Map;
+import kafka.consumer.SimpleConsumer;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapred.OutputCollector;
+import org.apache.hadoop.mapred.lib.MultipleOutputs;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.SequenceFileInputFormat;
+
+
+@SuppressWarnings("deprecation")
+public class KafkaETLInputFormat 
+extends SequenceFileInputFormat<KafkaETLKey, BytesWritable> {
+
+    protected Props _props;
+    protected int _bufferSize;
+    protected int _soTimeout;
+
+    protected Map<Integer, URI> _nodes;
+    protected int _partition;
+    protected int _nodeId;
+    protected String _topic;
+    protected SimpleConsumer _consumer;
+
+    protected MultipleOutputs _mos;
+    protected OutputCollector<BytesWritable, BytesWritable> _offsetOut = null;
+
+    protected long[] _offsetRange;
+    protected long _startOffset;
+    protected long _offset;
+    protected boolean _toContinue = true;
+    protected int _retry;
+    protected long _timestamp;
+    protected long _count;
+    protected boolean _ignoreErrors = false;
+
+    @Override
+    public RecordReader<KafkaETLKey, BytesWritable> getRecordReader(InputSplit split,
+                                    JobConf job, Reporter reporter)
+                                    throws IOException {
+        return new KafkaETLRecordReader(split, job, reporter);
+    }
+
+    @Override
+    protected boolean isSplitable(FileSystem fs, Path file) {
+        return super.isSplitable(fs, file);
+    }
+
+    @Override
+    public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
+        return super.getSplits(conf, numSplits);
+    }
+}



Mime
View raw message