crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject [1/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable
Date Mon, 23 May 2016 20:21:48 GMT
Repository: crunch
Updated Branches:
  refs/heads/master c09c4ee2d -> 360d72a4f


http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
new file mode 100644
index 0000000..ce97ec1
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaRecordsIterableIT.java
@@ -0,0 +1,415 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.kafka.common.errors.TimeoutException;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Matchers;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordsIterableIT {
+
+  @Mock
+  private Consumer<String, String> mockedConsumer;
+
+  @Mock
+  private ConsumerRecords<String, String> records;
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Consumer<String, String> consumer;
+  private Properties props;
+  private Properties consumerProps;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = testName.getMethodName();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+
+
+    consumerProps = new Properties();
+    consumerProps.putAll(props);
+  }
+
+  @After
+  public void shutdown() {
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullConsumer() {
+    new KafkaRecordsIterable(null, offsets, new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullOffsets() {
+    new KafkaRecordsIterable<>(consumer, null, new Properties());
+  }
+
+  @Test(expected=IllegalArgumentException.class)
+  public void emptyOffsets() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer,
+        Collections.<TopicPartition, Pair<Long, Long>>emptyMap(), new Properties());
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void nullProperties() {
+    new KafkaRecordsIterable(consumer, offsets, null);
+  }
+
+  @Test
+  public void iterateOverValues() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverOneValue() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 1;
+    int numPerLoop = 1;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+      System.out.println(entry.getKey()+ "start:"+entry.getValue()+":end:"+stopOffsets.get(entry.getKey()));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateOverNothing() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStartOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+  }
+
+  @Test
+  public void iterateOverPartial() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+    int numPerPartition = 50;
+
+    writeData(props, topic, "batch", loops, numPerLoop);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), entry.getValue() + numPerPartition));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(startOffsets.size() * numPerPartition));
+  }
+
+  @Test
+  public void dontIteratePastStop() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(consumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      assertThat(secondKeys, not(hasItem(event.first())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(keys.size(), is(0));
+  }
+
+  @Test
+  public void iterateSkipInitialValues() {
+    consumer = new KafkaConsumer<String, String>(consumerProps, new ClusterTest.StringSerDe(), new ClusterTest.StringSerDe());
+    int loops = 10;
+    int numPerLoop = 100;
+
+    List<String> keys = writeData(props, topic, "batch1", loops, numPerLoop);
+
+    //set the start offsets equal to the stop so won't iterate over anything
+    startOffsets = getStopOffsets(props, topic);
+
+    List<String> secondKeys = writeData(props, topic, "batch2", loops, numPerLoop);
+
+    stopOffsets = getStopOffsets(props, topic);
+
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<String, String>(consumer, offsets,
+        new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(secondKeys, hasItem(event.first()));
+      assertTrue(secondKeys.remove(event.first()));
+      assertThat(keys, not(hasItem(event.first())));
+      count++;
+    }
+
+    assertThat(count, is(loops * numPerLoop));
+    assertThat(secondKeys.size(), is(0));
+  }
+
+  @Test
+  public void iterateValuesWithExceptions() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+
+    for(int i = 0; i < 25; i++){
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 0, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 1, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 2, i, "key", null));
+      returnedRecords.add(new ConsumerRecord<String, String>(topic, 3, i, "key", null));
+    }
+
+    offsets = new HashMap<>();
+    offsets.put(new TopicPartition(topic, 0), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 1), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 2), Pair.of(0L, 25L));
+    offsets.put(new TopicPartition(topic, 3), Pair.of(0L, 25L));
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //request for the first poll
+        .thenReturn(null)
+        //fail twice
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        //request that will give data
+        .thenReturn(records)
+        // shows to stop retrieving data
+        .thenReturn(null);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    //should have gotten one value per topicpartition
+    assertThat(count, is(returnedRecords.size()));
+  }
+
+  @Test
+  public void iterateValuesAfterStopOffsets() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong())).thenReturn(records).thenReturn(records).thenReturn(null);
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      count++;
+    }
+
+    assertThat(count, is(0));
+
+  }
+
+  @Test(expected = RetriableException.class)
+  public void iterateRetriableExceptionMaxExceeded() {
+    List<ConsumerRecord<String, String>> returnedRecords = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Long> entry : stopOffsets.entrySet()) {
+      returnedRecords.add(new ConsumerRecord<String, String>(entry.getKey().topic(),
+          entry.getKey().partition(), entry.getValue() + 1, "key", null));
+    }
+
+    when(records.isEmpty()).thenReturn(false);
+    when(records.iterator()).thenReturn(returnedRecords.iterator());
+    when(mockedConsumer.poll(Matchers.anyLong()))
+        //for the fill poll call
+        .thenReturn(null)
+        //retry 5 times then fail
+        .thenThrow(new TimeoutException("fail1"))
+        .thenThrow(new TimeoutException("fail2"))
+        .thenThrow(new TimeoutException("fail3"))
+        .thenThrow(new TimeoutException("fail4"))
+        .thenThrow(new TimeoutException("fail5"))
+        .thenThrow(new TimeoutException("fail6"));
+
+    Iterable<Pair<String, String>> data = new KafkaRecordsIterable<>(mockedConsumer, offsets, new Properties());
+
+    data.iterator().next();
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
new file mode 100644
index 0000000..3800c24
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaSourceIT.java
@@ -0,0 +1,169 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.MapFn;
+import org.apache.crunch.PCollection;
+import org.apache.crunch.PTable;
+import org.apache.crunch.Pair;
+import org.apache.crunch.Pipeline;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.MRPipeline;
+import org.apache.crunch.io.From;
+import org.apache.crunch.io.To;
+import org.apache.crunch.test.TemporaryPath;
+import org.apache.crunch.types.avro.Avros;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+
+public class KafkaSourceIT {
+
+  @Rule
+  public TemporaryPath path = new TemporaryPath();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Properties consumerProps;
+  private Configuration config;
+  private String topic;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+    config = ClusterTest.getConsumerConfig();
+  }
+
+  @Test
+  public void sourceReadData() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (Pair<BytesWritable, BytesWritable> values : read.materialize()) {
+      assertThat(keys, hasItem(new String(values.first().getBytes())));
+      numRecordsFound++;
+      keysRead.add(new String(values.first().getBytes()));
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  @Test
+  public void sourceReadDataThroughPipeline() {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    Configuration config = ClusterTest.getConf();
+
+    Pipeline pipeline = new MRPipeline(KafkaSourceIT.class, config);
+    pipeline.enableDebug();
+
+    TableSource<BytesWritable, BytesWritable> kafkaSource = new KafkaSource(consumerProps, offsets);
+
+    PTable<BytesWritable, BytesWritable> read = pipeline.read(kafkaSource);
+    Path out = path.getPath("out");
+    read.parallelDo(new SimpleConvertFn(), Avros.strings()).write(To.textFile(out));
+
+    pipeline.run();
+
+    PCollection<String> persistedKeys = pipeline.read(From.textFile(out));
+
+    Set<String> keysRead = new HashSet<>();
+    int numRecordsFound = 0;
+    for (String value : persistedKeys.materialize()) {
+      assertThat(keys, hasItem(value));
+      numRecordsFound++;
+      keysRead.add(value);
+    }
+
+    assertThat(numRecordsFound, is(keys.size()));
+    assertThat(keysRead.size(), is(keys.size()));
+
+    pipeline.done();
+  }
+
+
+  private static class SimpleConvertFn extends MapFn<Pair<BytesWritable, BytesWritable>, String> {
+    @Override
+    public String map(Pair<BytesWritable, BytesWritable> input) {
+      return new String(input.first().getBytes());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
new file mode 100644
index 0000000..38c3fce
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaUtilsIT.java
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.cluster.Broker;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+
+public class KafkaUtilsIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private static Broker broker;
+
+  @BeforeClass
+  public static void startup() throws Exception {
+    ClusterTest.startTest();
+
+    Properties props = ClusterTest.getConsumerProperties();
+    String brokerHostPorts = props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG);
+
+    String brokerHostPortString = brokerHostPorts.split(",")[0];
+    String[] brokerHostPort = brokerHostPortString.split(":");
+
+    String brokerHost = brokerHostPort[0];
+    int brokerPort = Integer.parseInt(brokerHostPort[1]);
+
+    broker = new Broker(0, brokerHost, brokerPort, SecurityProtocol.PLAINTEXT);
+  }
+
+  @AfterClass
+  public static void shutdown() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() throws IOException {
+    topic = "topic-" + testName.getMethodName();
+  }
+
+  @Test
+  public void getKafkaProperties() {
+    Configuration config = new Configuration(false);
+    String propertyKey = "fake.kafka.property";
+    String propertyValue = testName.getMethodName();
+    config.set(propertyKey, propertyValue);
+
+    Properties props = KafkaUtils.getKafkaConnectionProperties(config);
+    assertThat(props.get(propertyKey), is((Object) propertyValue));
+  }
+
+  @Test
+  public void addKafkaProperties() {
+    String propertyKey = "fake.kafka.property";
+    String propertyValue = testName.getMethodName();
+
+    Properties props = new Properties();
+    props.setProperty(propertyKey, propertyValue);
+
+    Configuration config = new Configuration(false);
+
+    KafkaUtils.addKafkaConnectionProperties(props, config);
+    assertThat(config.get(propertyKey), is(propertyValue));
+  }
+
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaNullProperties() throws IOException {
+    KafkaUtils.getBrokerOffsets((Properties) null, kafka.api.OffsetRequest.LatestTime(), topic);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaNullTopics() throws IOException {
+    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime(), (String[]) null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getBrokerOffsetsKafkaEmptyTopics() throws IOException {
+    KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), kafka.api.OffsetRequest.LatestTime());
+  }
+
+  @Test(timeout = 10000)
+  public void getLatestBrokerOffsetsKafka() throws IOException, InterruptedException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+    while (true) {
+      Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+          kafka.api.OffsetRequest.LatestTime(), topic);
+
+      assertNotNull(offsets);
+      assertThat(offsets.size(), is(4));
+      boolean allMatch = true;
+      for (int i = 0; i < 4; i++) {
+        TopicPartition tp = new TopicPartition(topic, i);
+        assertThat(offsets.keySet(), hasItem(tp));
+        allMatch &= (offsets.get(tp) == 1L);
+      }
+      if (allMatch) {
+        break;
+      }
+      Thread.sleep(100L);
+    }
+  }
+
+  @Test
+  public void getEarliestBrokerOffsetsKafka() throws IOException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 1);
+
+    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(),
+        kafka.api.OffsetRequest.EarliestTime(), topic);
+
+    assertNotNull(offsets);
+    //default create 4 topics
+    assertThat(offsets.size(), is(4));
+    for (int i = 0; i < 4; i++) {
+      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+    }
+  }
+
+  @Test
+  public void getBrokerOffsetsKafkaWithTimeBeforeTopicExists() throws IOException {
+    ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 1, 4);
+
+    // A time of 1L (1 ms after epoch) should be before the topic was created
+    Map<TopicPartition, Long> offsets = KafkaUtils.getBrokerOffsets(ClusterTest.getConsumerProperties(), 1L, topic);
+
+    assertNotNull(offsets);
+    //default create 4 topics
+    assertThat(offsets.size(), is(4));
+    for (int i = 0; i < 4; i++) {
+      assertThat(offsets.keySet(), hasItem(new TopicPartition(topic, i)));
+      assertThat(offsets.get(new TopicPartition(topic, i)), is(0L));
+    }
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void getBrokerOffsetsNoHostAvailable() throws IOException {
+    Properties testProperties = ClusterTest.getConsumerProperties();
+    testProperties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+    testProperties.setProperty("metadata.broker.list", "dummyBrokerHost1:0000,dummyBrokerHost2:0000");
+    assertNotNull(KafkaUtils.getBrokerOffsets(testProperties, kafka.api.OffsetRequest.LatestTime(), topic));
+  }
+
+  @Test
+  public void getBrokerOffsetsSomeHostsUnavailable() throws IOException {
+    final Broker bad = new Broker(0, "dummyBrokerHost1", 0, SecurityProtocol.PLAINTEXT);
+    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(broker, bad), kafka.api.OffsetRequest.LatestTime(), topic));
+    assertNotNull(KafkaUtils.getBrokerOffsets(Arrays.asList(bad, broker), kafka.api.OffsetRequest.LatestTime(), topic));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
new file mode 100644
index 0000000..d760a02
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputFormatIT.java
@@ -0,0 +1,407 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.crunch.kafka.KafkaSource;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaInputFormatIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Mock
+  private TaskAttemptContext taskContext;
+
+  @Mock
+  private FormatBundle bundle;
+  private Properties consumerProps;
+  private Configuration config;
+  private String topic;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+
+    consumerProps.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    consumerProps.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+
+    config = ClusterTest.getConsumerConfig();
+
+    config.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+    config.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaSource.BytesDeserializer.class.getName());
+  }
+
+  @Test
+  public void getSplitsFromFormat() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(offsets.size()));
+
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+    }
+  }
+
+  @Test
+  public void getSplitsSameStartEnd() throws IOException, InterruptedException {
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for(int i = 0; i < 10; i++) {
+      offsets.put(new TopicPartition(topic, i), Pair.of((long)i, (long)i));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(0));
+  }
+
+  @Test
+  public void getSplitsCreateReaders() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    KafkaInputFormat inputFormat = new KafkaInputFormat();
+    inputFormat.setConf(config);
+    List<InputSplit> splits = inputFormat.getSplits(null);
+
+    assertThat(splits.size(), is(offsets.size()));
+
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      Pair<Long, Long> startEnd = offsets.get(inputSplit.getTopicPartition());
+      assertThat(inputSplit.getStartingOffset(), is(startEnd.first()));
+      assertThat(inputSplit.getEndingOffset(), is(startEnd.second()));
+    }
+
+    //create readers and consume the data
+    when(taskContext.getConfiguration()).thenReturn(config);
+    Set<String> keysRead = new HashSet<>();
+    //read all data from all splits
+    for (InputSplit split : splits) {
+      KafkaInputSplit inputSplit = (KafkaInputSplit) split;
+      long start = inputSplit.getStartingOffset();
+      long end = inputSplit.getEndingOffset();
+
+      RecordReader<BytesWritable, BytesWritable> recordReader = inputFormat.createRecordReader(split, taskContext);
+      recordReader.initialize(split, taskContext);
+
+      int numRecordsFound = 0;
+      while (recordReader.nextKeyValue()) {
+        keysRead.add(new String(recordReader.getCurrentKey().getBytes()));
+        assertThat(keys, hasItem(new String(recordReader.getCurrentKey().getBytes())));
+        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+        numRecordsFound++;
+      }
+      recordReader.close();
+
+      //assert that it encountered a partitions worth of data
+      assertThat(((long) numRecordsFound), is(end - start));
+    }
+
+    //validate the same number of unique keys was read as were written.
+    assertThat(keysRead.size(), is(keys.size()));
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundle() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    String topic = testName.getMethodName();
+    int numPartitions = 10;
+    for (int i = 0; i < numPartitions; i++) {
+      TopicPartition tAndP = new TopicPartition(topic, i);
+      offsets.put(tAndP, Pair.of((long) i, i * 10L));
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + 1 for the topic
+    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+    assertThat(keyValues, hasItem(partitionKey));
+
+    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+    List<String> parts = Arrays.asList(partitions.split(","));
+
+    for (int i = 0; i < numPartitions; i++) {
+      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+      assertThat(keyValues, hasItem(startKey));
+      assertThat(keyValues, hasItem(endKey));
+      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+      assertThat(parts, hasItem(Long.toString(i)));
+    }
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundleSpecialCharacters() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    String topic = "partitions." + testName.getMethodName();
+    int numPartitions = 10;
+    for (int i = 0; i < numPartitions; i++) {
+      TopicPartition tAndP = new TopicPartition(topic, i);
+      offsets.put(tAndP, Pair.of((long) i, i * 10L));
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + 1 for the topic
+    verify(bundle, times((numPartitions * 2) + 1)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+    assertThat(keyValues, hasItem(partitionKey));
+
+    String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+    List<String> parts = Arrays.asList(partitions.split(","));
+
+    for (int i = 0; i < numPartitions; i++) {
+      assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+      String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+      String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+      assertThat(keyValues, hasItem(startKey));
+      assertThat(keyValues, hasItem(endKey));
+      assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+      assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+      assertThat(parts, hasItem(Long.toString(i)));
+    }
+  }
+
+  @Test
+  public void writeOffsetsToFormatBundleMultipleTopics() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    ArgumentCaptor<String> keyCaptor = ArgumentCaptor.forClass(String.class);
+    ArgumentCaptor<String> valueCaptor = ArgumentCaptor.forClass(String.class);
+
+    //number of Partitions * 2 for start and end + num of topics
+    verify(bundle, times((numTopics * numPartitions * 2) + numTopics)).set(keyCaptor.capture(), valueCaptor.capture());
+
+    List<String> keyValues = keyCaptor.getAllValues();
+    List<String> valueValues = valueCaptor.getAllValues();
+
+    for (String topic : topics) {
+
+      String partitionKey = KafkaInputFormat.generateTopicPartitionsKey(topic);
+      assertThat(keyValues, hasItem(partitionKey));
+
+      String partitions = valueValues.get(keyValues.indexOf(partitionKey));
+      List<String> parts = Arrays.asList(partitions.split(","));
+
+      for (int i = 0; i < numPartitions; i++) {
+        assertThat(keyValues, hasItem(KafkaInputFormat.generateTopicPartitionsKey(topic)));
+        String startKey = KafkaInputFormat.generatePartitionStartKey(topic, i);
+        String endKey = KafkaInputFormat.generatePartitionEndKey(topic, i);
+        assertThat(keyValues, hasItem(startKey));
+        assertThat(keyValues, hasItem(endKey));
+        assertThat(valueValues.get(keyValues.indexOf(startKey)), is(Long.toString(i)));
+        assertThat(valueValues.get(keyValues.indexOf(endKey)), is(Long.toString(i * 10L)));
+        assertThat(parts, hasItem(Long.toString(i)));
+      }
+    }
+  }
+
+  @Test
+  public void getOffsetsFromConfig() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+
+    assertThat(returnedOffsets.size(), is(returnedOffsets.size()));
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      Pair<Long, Long> valuePair = returnedOffsets.get(entry.getKey());
+      assertThat(valuePair, is(entry.getValue()));
+    }
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void getOffsetsFromConfigMissingStart() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.start");
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+  }
+
+  @Test(expected=IllegalStateException.class)
+  public void getOffsetsFromConfigMissingEnd() {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    Set<String> topics = new HashSet<>();
+
+    int numPartitions = 10;
+    int numTopics = 10;
+    for (int j = 0; j < numTopics; j++) {
+      String topic = testName.getMethodName() + ".partitions" + j;
+      topics.add(topic);
+      for (int i = 0; i < numPartitions; i++) {
+        TopicPartition tAndP = new TopicPartition(topic, i);
+        offsets.put(tAndP, Pair.of((long) i, i * 10L));
+      }
+    }
+
+    Configuration config = new Configuration(false);
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    config.unset("org.apache.crunch.kafka.offsets.topic."+topics.iterator().next()+".partitions.0.end");
+
+    Map<TopicPartition, Pair<Long, Long>> returnedOffsets = KafkaInputFormat.getOffsets(config);
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
new file mode 100644
index 0000000..3833e9d
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaInputSplitTest.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class KafkaInputSplitTest {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  @Test
+  public void createSplit() throws IOException, InterruptedException {
+    String topic = testName.getMethodName();
+    int partition = 18;
+    long startingOffet = 10;
+    long endingOffset = 23;
+
+
+    KafkaInputSplit split = new KafkaInputSplit(topic, partition, startingOffet, endingOffset);
+    assertThat(split.getStartingOffset(), is(startingOffet));
+    assertThat(split.getEndingOffset(), is(endingOffset));
+    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+    assertThat(split.getLength(), is(endingOffset - startingOffet));
+    assertThat(split.getLocations(), is(new String[0]));
+  }
+
+  @Test
+  public void createSplitEarliestOffset() throws IOException, InterruptedException {
+    String topic = testName.getMethodName();
+    int partition = 18;
+    long endingOffset = 23;
+
+    KafkaInputSplit split = new KafkaInputSplit(topic, partition, -1L, endingOffset);
+    assertThat(split.getStartingOffset(), is(-1L));
+    assertThat(split.getEndingOffset(), is(endingOffset));
+    assertThat(split.getTopicPartition(), is(new TopicPartition(topic, partition)));
+    assertThat(split.getLength(), is(endingOffset));
+    assertThat(split.getLocations(), is(new String[0]));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
new file mode 100644
index 0000000..ba5b65b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/inputformat/KafkaRecordReaderIT.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.ClusterTest;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static org.apache.crunch.kafka.KafkaUtils.getBrokerOffsets;
+import static org.hamcrest.CoreMatchers.notNullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+import static org.junit.matchers.JUnitMatchers.hasItem;
+import static org.mockito.Mockito.when;
+
+@RunWith(MockitoJUnitRunner.class)
+public class KafkaRecordReaderIT {
+
+  @Mock
+  private TaskAttemptContext context;
+
+  @Rule
+  public TestName testName = new TestName();
+  private Properties consumerProps;
+  private Configuration config;
+
+  @BeforeClass
+  public static void setup() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  private String topic;
+
+  @Before
+  public void setupTest() {
+    topic = testName.getMethodName();
+    consumerProps = ClusterTest.getConsumerProperties();
+    config = ClusterTest.getConsumerConfig();
+    when(context.getConfiguration()).thenReturn(config);
+  }
+
+  @Test
+  public void readData() throws IOException, InterruptedException {
+    List<String> keys = ClusterTest.writeData(ClusterTest.getProducerProperties(), topic, "batch", 10, 10);
+
+    Map<TopicPartition, Long> startOffsets = getBrokerOffsets(consumerProps, OffsetRequest.EarliestTime(), topic);
+    Map<TopicPartition, Long> endOffsets = getBrokerOffsets(consumerProps, OffsetRequest.LatestTime(), topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      Long endingOffset = endOffsets.get(entry.getKey());
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), endingOffset));
+    }
+
+    KafkaInputFormat.writeOffsetsToConfiguration(offsets, config);
+
+    Set<String> keysRead = new HashSet<>();
+    //read all data from all splits
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> partitionInfo : offsets.entrySet()) {
+      KafkaInputSplit split = new KafkaInputSplit(partitionInfo.getKey().topic(), partitionInfo.getKey().partition(),
+          partitionInfo.getValue().first(), partitionInfo.getValue().second());
+
+      KafkaRecordReader<String, String> recordReader = new KafkaRecordReader<>();
+      recordReader.initialize(split, context);
+
+      int numRecordsFound = 0;
+      while (recordReader.nextKeyValue()) {
+        keysRead.add(recordReader.getCurrentKey());
+        assertThat(keys, hasItem(recordReader.getCurrentKey()));
+        assertThat(recordReader.getCurrentValue(), is(notNullValue()));
+        numRecordsFound++;
+      }
+      recordReader.close();
+
+      //assert that it encountered a partitions worth of data
+      assertThat(((long) numRecordsFound), is(partitionInfo.getValue().second() - partitionInfo.getValue().first()));
+    }
+
+    //validate the same number of unique keys was read as were written.
+    assertThat(keysRead.size(), is(keys.size()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
new file mode 100644
index 0000000..ede3cf0
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/EmbeddedZookeeper.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ * <p>
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.zookeeper.server.NIOServerCnxnFactory;
+import org.apache.zookeeper.server.ZooKeeperServer;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+
+/**
+ * Embedded Zookeeper instance for testing purposes.
+ * <p>
+ * Adapted from the {@code kafka.zk.EmbeddedZookeeper} class.
+ * </p>
+ */
+class EmbeddedZookeeper {
+
+  private final File snapshotDir;
+  private final File logDir;
+  private final NIOServerCnxnFactory factory;
+
+  /**
+   * Constructs an embedded Zookeeper instance.
+   *
+   * @param connectString Zookeeper connection string.
+   *
+   * @throws IOException if an error occurs during Zookeeper initialization.
+   */
+  public EmbeddedZookeeper(String connectString) throws IOException {
+    this.snapshotDir = KafkaTestUtils.getTempDir();
+    this.logDir = KafkaTestUtils.getTempDir();
+    this.factory = new NIOServerCnxnFactory();
+    String hostname = connectString.split(":")[0];
+    int port = Integer.valueOf(connectString.split(":")[1]);
+    int maxClientConnections = 1024;
+    factory.configure(new InetSocketAddress(hostname, port), maxClientConnections);
+    try {
+      int tickTime = 500;
+      factory.startup(new ZooKeeperServer(snapshotDir, logDir, tickTime));
+    } catch (InterruptedException e) {
+      throw new IOException(e);
+    }
+  }
+
+  /**
+   * Shuts down the embedded Zookeeper instance.
+   */
+  public void shutdown() throws IOException {
+    factory.shutdown();
+    FileUtils.deleteDirectory(snapshotDir);
+    FileUtils.deleteDirectory(logDir);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
new file mode 100644
index 0000000..f47f168
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaBrokerTestHarness.java
@@ -0,0 +1,369 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.Time;
+import org.apache.commons.io.FileUtils;
+import scala.Option;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Properties;
+
+import static scala.collection.JavaConversions.asJavaIterable;
+
+/**
+ * A test harness that brings up some number of Kafka broker nodes.
+ * <p>
+ * Adapted from the {@code kafka.integration.KafkaServerTestHarness} class.
+ * </p>
+ */
+public class KafkaBrokerTestHarness extends ZookeeperTestHarness {
+
+  /**
+   * Producer send acknowledgment timeout in milliseconds.
+   */
+  public static final String KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS = "request.timeout.ms";
+
+  /**
+   * Producer send retry maximum count.
+   */
+  public static final String KAFKA_PRODUCER_RETRY_MAX = "message.send.max.retries";
+
+  /**
+   * Producer send retry backoff interval in milliseconds.
+   */
+  public static final String KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS = "retry.backoff.ms";
+
+  /**
+   * Comma-delimited Kafka Zookeeper quorum list.
+   */
+  public static final String KAFKA_ZOOKEEPERS = "zookeeper.connect";
+
+  /**
+   * Comma-delimited list of Kafka brokers, for producer bootstrapping purposes.
+   */
+  public static final String KAFKA_BROKERS = "metadata.broker.list";
+
+  /**
+   * Default number of brokers in the Kafka cluster.
+   */
+  public static final int DEFAULT_BROKERS = 1;
+
+  /**
+   * Default number of partitions per Kafka topic.
+   */
+  public static final int PARTITIONS_PER_TOPIC = 4;
+
+  private List<KafkaConfig> brokerConfigs;
+  private List<KafkaServer> brokers;
+  private File clientConfig;
+  private boolean setUp;
+  private boolean tornDown;
+
+  /**
+   * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers.
+   */
+  public KafkaBrokerTestHarness() {
+    this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0]);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the {@link #DEFAULT_BROKERS default} number of brokers and the supplied
+   * {@link Properties} which will be applied to the brokers.
+   *
+   * @param properties
+   *            the additional {@link Properties} supplied to the brokers
+   * @throws IllegalArgumentException
+   *             if {@code properties} is {@code null}
+   */
+  public KafkaBrokerTestHarness(Properties properties) {
+    this(DEFAULT_BROKERS, KafkaTestUtils.getPorts(1)[0], properties);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+   *
+   * @param brokers Number of Kafka brokers to start up.
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   *
+   * @throws IllegalArgumentException if {@code brokers} is less than 1.
+   */
+  public KafkaBrokerTestHarness(int brokers, int zookeeperPort) {
+    this(getBrokerConfig(brokers, zookeeperPort), zookeeperPort);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given number of brokers and Zookeeper port.
+   *
+   * @param brokers
+   *            Number of Kafka brokers to start up.
+   * @param zookeeperPort
+   *            The port number to use for Zookeeper client connections.
+   * @param properties
+   *            the additional {@link Properties} supplied to the brokers
+   *
+   * @throws IllegalArgumentException
+   *             if {@code brokers} is less than 1 or if {@code baseProperties} is {@code null}
+   */
+  public KafkaBrokerTestHarness(int brokers, int zookeeperPort, Properties properties) {
+    this(getBrokerConfig(brokers, zookeeperPort, properties), zookeeperPort);
+  }
+
+  /**
+   * Creates a new Kafka broker test harness using the given broker configuration properties and Zookeeper port.
+   *
+   * @param brokerConfigs List of Kafka broker configurations.
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   *
+   * @throws IllegalArgumentException if {@code brokerConfigs} is {@code null} or empty.
+   */
+  public KafkaBrokerTestHarness(List<KafkaConfig> brokerConfigs, int zookeeperPort) {
+    super(zookeeperPort);
+    if (brokerConfigs == null || brokerConfigs.isEmpty()) {
+      throw new IllegalArgumentException("Must supply at least one broker configuration.");
+    }
+    this.brokerConfigs = brokerConfigs;
+    this.brokers = null;
+    this.setUp = false;
+    this.tornDown = false;
+  }
+
+  /**
+   * Start up the Kafka broker cluster.
+   *
+   * @throws IOException if an error occurs during Kafka broker startup.
+   * @throws IllegalStateException if the Kafka broker cluster has already been {@link #setUp() setup}.
+   */
+  @Override
+  public void setUp() throws IOException {
+    if (setUp) {
+      throw new IllegalStateException("Already setup, cannot setup again");
+    }
+    setUp = true;
+
+    // Start up zookeeper.
+    super.setUp();
+
+    brokers = new ArrayList<KafkaServer>(brokerConfigs.size());
+    for (KafkaConfig config : brokerConfigs) {
+      brokers.add(startBroker(config));
+    }
+
+    // Write out Kafka client config to a temp file.
+    clientConfig = new File(KafkaTestUtils.getTempDir(), "kafka-config.xml");
+    FileWriter writer = new FileWriter(clientConfig);
+    writer.append("<configuration>");
+    for (String prop : Arrays.asList(KAFKA_BROKERS, KAFKA_ZOOKEEPERS)) {
+      writer.append("<property>");
+      writer.append("<name>").append(prop).append("</name>");
+      writer.append("<value>").append(getProps().getProperty(prop)).append("</value>");
+      writer.append("</property>");
+    }
+    writer.append("</configuration>");
+    writer.close();
+  }
+
+  /**
+   * Shutdown the Kafka broker cluster. Attempting to {@link #setUp()} a cluster again after calling this method is not allowed;
+   * a new {@code KafkaBrokerTestHarness} must be created instead.
+   *
+   * @throws IllegalStateException if the Kafka broker cluster has already been {@link #tearDown() torn down} or has not been
+   *      {@link #setUp()}.
+   */
+  @Override
+  public void tearDown() throws IOException {
+    if (!setUp) {
+      throw new IllegalStateException("Not set up, cannot tear down");
+    }
+    if (tornDown) {
+      throw new IllegalStateException("Already torn down, cannot tear down again");
+    }
+    tornDown = true;
+
+    for (KafkaServer broker : brokers) {
+      broker.shutdown();
+    }
+
+    for (KafkaServer broker : brokers) {
+      for (String logDir : asJavaIterable(broker.config().logDirs())) {
+        FileUtils.deleteDirectory(new File(logDir));
+      }
+    }
+
+    // Shutdown zookeeper
+    super.tearDown();
+  }
+
+  /**
+   * Returns properties for a Kafka producer.
+   *
+   * @return Producer properties.
+   */
+  public Properties getProducerProps() {
+    StringBuilder brokers = new StringBuilder();
+    for (int i = 0; i < brokerConfigs.size(); ++i) {
+      KafkaConfig config = brokerConfigs.get(i);
+      brokers.append((i > 0) ? "," : "").append(config.hostName()).append(":").append(config.port());
+    }
+
+    Properties props = new Properties();
+    props.setProperty(KAFKA_BROKERS, brokers.toString());
+    props.setProperty(KAFKA_PRODUCER_ACK_TIMEOUT_MILLIS, "10000");
+
+    // These two properties below are increased from their defaults to help with the case that auto.create.topics.enable is
+    // disabled and a test tries to create a topic and immediately write to it
+    props.setProperty(KAFKA_PRODUCER_RETRY_INTERVAL_MILLIS, Integer.toString(500));
+    props.setProperty(KAFKA_PRODUCER_RETRY_MAX, Integer.toString(10));
+
+    return props;
+  }
+
+  /**
+   * Returns properties for a Kafka consumer.
+   *
+   * @return Consumer properties.
+   */
+  public Properties getConsumerProps() {
+    Properties props = new Properties();
+    props.setProperty(KAFKA_ZOOKEEPERS, zookeeperConnect);
+    return props;
+  }
+
+  /**
+   * Returns properties for either a Kafka producer or consumer.
+   *
+   * @return Combined producer and consumer properties.
+   */
+  public Properties getProps() {
+    // Combine producer and consumer properties.
+    Properties props = getProducerProps();
+    props.putAll(getConsumerProps());
+    return props;
+  }
+
+  /**
+   * Returns configuration properties for each Kafka broker in the cluster.
+   *
+   * @return Broker properties.
+   */
+  public List<Properties> getBrokerProps() {
+    List<Properties> props = new ArrayList<Properties>(brokers.size());
+    for (KafkaServer broker : brokers) {
+      Properties prop = new Properties();
+      prop.putAll(broker.config().props());
+      props.add(prop);
+    }
+    return props;
+  }
+
+  /**
+   * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+   * @param brokers the number of brokers to create configuration for.
+   * @param zookeeperPort the zookeeper port for the brokers to connect to.
+   * @return configuration for a collection of brokers.
+   * @throws IllegalArgumentException if {@code brokers} is less than 1
+   */
+  public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort) {
+    return getBrokerConfig(brokers, zookeeperPort, new Properties());
+  }
+
+  /**
+   * Creates a collection of Kafka Broker configurations based on the number of brokers and zookeeper.
+   * @param brokers the number of brokers to create configuration for.
+   * @param zookeeperPort the zookeeper port for the brokers to connect to.
+   * @param baseProperties basic properties that should be applied for each broker config.  These properties will be
+   *                       honored in favor of any default properties.
+   * @return configuration for a collection of brokers.
+   * @throws IllegalArgumentException if {@code brokers} is less than 1 or {@code baseProperties} is {@code null}.
+   */
+  public static List<KafkaConfig> getBrokerConfig(int brokers, int zookeeperPort, Properties baseProperties) {
+    if (brokers < 1) {
+      throw new IllegalArgumentException("Invalid broker count: " + brokers);
+    }
+    if (baseProperties == null) {
+      throw new IllegalArgumentException("The 'baseProperties' cannot be 'null'.");
+    }
+
+    int ports[] = KafkaTestUtils.getPorts(brokers);
+
+    List<KafkaConfig> configs = new ArrayList<KafkaConfig>(brokers);
+    for (int i = 0; i < brokers; ++i) {
+      Properties props = new Properties();
+      props.setProperty(KAFKA_ZOOKEEPERS, "localhost:" + zookeeperPort);
+      props.setProperty("broker.id", String.valueOf(i + 1));
+      props.setProperty("host.name", "localhost");
+      props.setProperty("port", String.valueOf(ports[i]));
+      props.setProperty("log.dir", KafkaTestUtils.getTempDir().getAbsolutePath());
+      props.setProperty("log.flush.interval.messages", String.valueOf(1));
+      props.setProperty("num.partitions", String.valueOf(PARTITIONS_PER_TOPIC));
+      props.setProperty("default.replication.factor", String.valueOf(brokers));
+      props.setProperty("auto.create.topics.enable", Boolean.FALSE.toString());
+
+      props.putAll(baseProperties);
+
+      configs.add(new KafkaConfig(props));
+    }
+    return configs;
+  }
+
+  /**
+   * Returns location of Kafka client configuration file containing broker and zookeeper connection properties.
+   * <p>
+   * This file can be loaded using the {@code -conf} command option to easily achieve Kafka connectivity.
+   * </p>
+   *
+   * @return Kafka client configuration file path
+   */
+  public String getClientConfigPath() {
+    return clientConfig.getAbsolutePath();
+  }
+
+  private static KafkaServer startBroker(KafkaConfig config) {
+    KafkaServer server = new KafkaServer(config, new SystemTime(), Option.<String>empty());
+    server.startup();
+    return server;
+  }
+
+  private static class SystemTime implements Time {
+    @Override
+    public long milliseconds() {
+      return System.currentTimeMillis();
+    }
+
+    @Override
+    public long nanoseconds() {
+      return System.nanoTime();
+    }
+
+    @Override
+    public void sleep(long ms) {
+      try {
+        Thread.sleep(ms);
+      } catch (InterruptedException e) {
+        // Ignore
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
new file mode 100644
index 0000000..f8eb2ff
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/KafkaTestUtils.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.ServerSocket;
+import java.util.HashSet;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * Assorted Kafka testing utility methods.
+ */
+public class KafkaTestUtils {
+
+  private static final Random RANDOM = new Random();
+  private static final String TEMP_DIR_PREFIX = "kafka-";
+
+  private static final Set<Integer> USED_PORTS = new HashSet<Integer>();
+
+  /**
+   * Creates and returns a new randomly named temporary directory. It will be deleted upon JVM exit.
+   *
+   * @return a new temporary directory.
+   *
+   * @throws RuntimeException if a new temporary directory could not be created.
+   */
+  public static File getTempDir() {
+    File file = new File(System.getProperty("java.io.tmpdir"), TEMP_DIR_PREFIX + RANDOM.nextInt(10000000));
+    if (!file.mkdirs()) {
+      throw new RuntimeException("could not create temp directory: " + file.getAbsolutePath());
+    }
+    file.deleteOnExit();
+    return file;
+  }
+
+  /**
+   * Returns an array containing the specified number of available local ports.
+   *
+   * @param count Number of local ports to identify and return.
+   *
+   * @return an array of available local port numbers.
+   *
+   * @throws RuntimeException if an I/O error occurs opening or closing a socket.
+   */
+  public static int[] getPorts(int count) {
+    int[] ports = new int[count];
+    Set<ServerSocket> openSockets = new HashSet<ServerSocket>(count + USED_PORTS.size());
+
+    for (int i = 0; i < count; ) {
+      try {
+        ServerSocket socket = new ServerSocket(0);
+        int port = socket.getLocalPort();
+        openSockets.add(socket);
+
+        // Disallow port reuse.
+        if (!USED_PORTS.contains(port)) {
+          ports[i++] = port;
+          USED_PORTS.add(port);
+        }
+      } catch (IOException e) {
+        throw new RuntimeException("could not open socket", e);
+      }
+    }
+
+    // Close the sockets so that their port numbers can be used by the caller.
+    for (ServerSocket socket : openSockets) {
+      try {
+        socket.close();
+      } catch (IOException e) {
+        throw new RuntimeException("could not close socket", e);
+      }
+    }
+
+    return ports;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
new file mode 100644
index 0000000..6ee102e
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZkStringSerializer.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.utils;
+
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import java.nio.charset.Charset;
+
+/**
+ * A {@link ZkSerializer Zookeeper serializer} for {@link String} objects.
+ * <p>
+ * Ported from the {@code kafka.utils.ZKStringSerializer} scala object.
+ * </p>
+ */
+public class ZkStringSerializer implements ZkSerializer {
+
+  private static final Charset UTF_8 = Charset.forName("UTF-8");
+
+  @Override
+  public byte[] serialize(Object data) throws ZkMarshallingError {
+    return ((String) data).getBytes(UTF_8);
+  }
+
+  @Override
+  public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+    return bytes != null ? new String(bytes, UTF_8) : null;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
new file mode 100644
index 0000000..c4a7e15
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/utils/ZookeeperTestHarness.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.crunch.kafka.utils;
+
+import kafka.utils.ZkUtils;
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.ZkConnection;
+
+import java.io.IOException;
+
+/**
+ * A test harness that brings up an embedded Zookeeper instance.
+ * <p>
+ * Adapted from the {@code kafka.zk.ZooKeeperTestHarness} class.
+ * </p>
+ */
+public class ZookeeperTestHarness {
+
+  /**
+   * Zookeeper connection info.
+   */
+  protected final String zookeeperConnect;
+
+  private EmbeddedZookeeper zookeeper;
+  private final int zkConnectionTimeout;
+  private final int zkSessionTimeout;
+
+  /**
+   * Zookeeper client connection.
+   */
+  protected ZkUtils zkUtils;
+
+  /**
+   * Creates a new Zookeeper broker test harness.
+   */
+  public ZookeeperTestHarness() {
+    this(KafkaTestUtils.getPorts(1)[0]);
+  }
+
+  /**
+   * Creates a new Zookeeper service test harness using the given port.
+   *
+   * @param zookeeperPort The port number to use for Zookeeper client connections.
+   */
+  public ZookeeperTestHarness(int zookeeperPort) {
+    this.zookeeper = null;
+    this.zkUtils = null;
+    this.zkConnectionTimeout = 6000;
+    this.zkSessionTimeout = 6000;
+    this.zookeeperConnect = "localhost:" + zookeeperPort;
+  }
+
+  /**
+   * Returns a client for communicating with the Zookeeper service.
+   *
+   * @return A Zookeeper client.
+   *
+   * @throws IllegalStateException
+   *             if Zookeeper has not yet been {@link #setUp()}, or has already been {@link #tearDown() torn down}.
+   */
+  public ZkClient getZkClient() {
+    if (zkUtils == null) {
+      throw new IllegalStateException("Zookeeper service is not active");
+    }
+    return zkUtils.zkClient();
+  }
+
+  public ZkUtils getZkUtils() {
+    return zkUtils;
+  }
+
+  /**
+   * Startup Zookeeper.
+   *
+   * @throws IOException if an error occurs during Zookeeper initialization.
+   */
+  public void setUp() throws IOException {
+    zookeeper = new EmbeddedZookeeper(zookeeperConnect);
+    ZkClient zkClient = new ZkClient(zookeeperConnect, zkSessionTimeout, zkConnectionTimeout, new ZkStringSerializer());
+    ZkConnection connection = new ZkConnection(zookeeperConnect, zkSessionTimeout);
+    zkUtils = new ZkUtils(zkClient, connection, false);
+  }
+
+  /**
+   * Shutdown Zookeeper.
+   */
+  public void tearDown() throws IOException {
+    if (zkUtils != null) {
+      zkUtils.close();
+      zkUtils = null;
+    }
+    if (zookeeper != null) {
+      zookeeper.shutdown();
+      zookeeper = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/resources/log4j.properties b/crunch-kafka/src/test/resources/log4j.properties
new file mode 100644
index 0000000..0b3eeee
--- /dev/null
+++ b/crunch-kafka/src/test/resources/log4j.properties
@@ -0,0 +1,29 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, A1
+log4j.appender.A1=org.apache.log4j.ConsoleAppender
+log4j.appender.A1.layout=org.apache.log4j.PatternLayout
+
+# Print the date in ISO 8601 format
+log4j.appender.A1.layout.ConversionPattern=%d [%t] %-5p %c - %m%n
+
+# Limit Apache logging to keep us from being overwhelmed as our tests stop and restart servers.
+log4j.logger.org.apache.hadoop=WARN
+log4j.logger.org.apache.zookeeper=WARN
+log4j.logger.org.mortbay=WARN
+log4j.logger.org.apache.zookeeper.client.ZooKeeperSaslClient=ERROR
+log4j.logger.org.apache.hadoop.conf.Configuration=ERROR
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 5b390ee..78ea085 100644
--- a/pom.xml
+++ b/pom.xml
@@ -54,6 +54,7 @@ under the License.
     <module>crunch-spark</module>
     <module>crunch-hive</module>
     <module>crunch-dist</module>
+    <module>crunch-kafka</module>
   </modules>
   <profiles>
     <profile>
@@ -103,6 +104,7 @@ under the License.
     <hbase.version>1.0.0</hbase.version>
     <avro.classifier>hadoop2</avro.classifier>
 
+    <kafka.version>0.9.0.1</kafka.version>
     <scala.base.version>2.10</scala.base.version>
     <scala.version>2.10.4</scala.version>
     <scalatest.version>2.2.4</scalatest.version>
@@ -455,6 +457,17 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka-clients</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
+      <dependency>
+        <groupId>org.apache.kafka</groupId>
+        <artifactId>kafka_${scala.base.version}</artifactId>
+        <version>${kafka.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.code.findbugs</groupId>
         <artifactId>jsr305</artifactId>
         <version>${jsr305.version}</version>


Mime
View raw message