crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject [2/3] crunch git commit: CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable
Date Mon, 23 May 2016 20:21:49 GMT
CRUNCH-606: Kafka Source for Crunch which supports reading data as BytesWritable

* Some of the code contributed by Bryan Baugher and Andrew Olson


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/321cfef6
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/321cfef6
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/321cfef6

Branch: refs/heads/master
Commit: 321cfef6e85325ab7a4d9548686a96972f6f31fd
Parents: c09c4ee
Author: Micah Whitacre <mkwhit@gmail.com>
Authored: Mon Apr 11 09:47:33 2016 -0500
Committer: Micah Whitacre <mkwhit@gmail.com>
Committed: Mon May 23 15:13:36 2016 -0500

----------------------------------------------------------------------
 crunch-kafka/pom.xml                            |  82 ++++
 .../java/org/apache/crunch/kafka/KafkaData.java |  63 +++
 .../crunch/kafka/KafkaRecordsIterable.java      | 294 +++++++++++++
 .../org/apache/crunch/kafka/KafkaSource.java    | 225 ++++++++++
 .../org/apache/crunch/kafka/KafkaUtils.java     | 301 ++++++++++++++
 .../kafka/inputformat/KafkaInputFormat.java     | 235 +++++++++++
 .../kafka/inputformat/KafkaInputSplit.java      | 117 ++++++
 .../kafka/inputformat/KafkaRecordReader.java    | 152 +++++++
 .../org/apache/crunch/kafka/ClusterTest.java    | 217 ++++++++++
 .../org/apache/crunch/kafka/KafkaDataIT.java    | 118 ++++++
 .../crunch/kafka/KafkaRecordsIterableIT.java    | 415 +++++++++++++++++++
 .../org/apache/crunch/kafka/KafkaSourceIT.java  | 169 ++++++++
 .../org/apache/crunch/kafka/KafkaUtilsIT.java   | 188 +++++++++
 .../kafka/inputformat/KafkaInputFormatIT.java   | 407 ++++++++++++++++++
 .../kafka/inputformat/KafkaInputSplitTest.java  |  65 +++
 .../kafka/inputformat/KafkaRecordReaderIT.java  | 122 ++++++
 .../crunch/kafka/utils/EmbeddedZookeeper.java   | 102 +++++
 .../kafka/utils/KafkaBrokerTestHarness.java     | 369 +++++++++++++++++
 .../crunch/kafka/utils/KafkaTestUtils.java      |  94 +++++
 .../crunch/kafka/utils/ZkStringSerializer.java  |  43 ++
 .../kafka/utils/ZookeeperTestHarness.java       | 112 +++++
 .../src/test/resources/log4j.properties         |  29 ++
 pom.xml                                         |  13 +
 23 files changed, 3932 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
new file mode 100644
index 0000000..a96a9b0
--- /dev/null
+++ b/crunch-kafka/pom.xml
@@ -0,0 +1,82 @@
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.crunch</groupId>
+    <artifactId>crunch-parent</artifactId>
+    <version>0.14.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>crunch-kafka</artifactId>
+  <name>Apache Crunch for Kafka</name>
+
+  <dependencies>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-core</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-clients</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka_2.10</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.scala-lang</groupId>
+      <artifactId>scala-library</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.hadoop</groupId>
+      <artifactId>hadoop-client</artifactId>
+    </dependency>
+
+
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-api</artifactId>
+    </dependency>
+    <dependency>
+      <groupId>org.slf4j</groupId>
+      <artifactId>slf4j-log4j12</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.apache.crunch</groupId>
+      <artifactId>crunch-test</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>junit</groupId>
+      <artifactId>junit</artifactId>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>org.hamcrest</groupId>
+      <artifactId>hamcrest-all</artifactId>
+      <scope>test</scope>
+    </dependency>
+  </dependencies>
+
+</project>

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
new file mode 100644
index 0000000..6543aad
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaData.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.SourceTarget;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskInputOutputContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+class KafkaData<K, V> implements ReadableData<Pair<K, V>> {
+
+  private static final long serialVersionUID = -6582212311361579556L;
+
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+  private final Properties props;
+
+  public KafkaData(Properties connectionProperties,
+                   Map<TopicPartition, Pair<Long, Long>> offsets) {
+    this.props = connectionProperties;
+    this.offsets = offsets;
+  }
+
+
+  @Override
+  public Set<SourceTarget<?>> getSourceTargets() {
+    return null;
+  }
+
+  @Override
+  public void configure(Configuration conf) {
+    //no-op
+  }
+
+  @Override
+  public Iterable<Pair<K, V>> read(TaskInputOutputContext<?, ?, ?, ?> context) throws IOException {
+    Consumer<K, V> consumer = new KafkaConsumer<K, V>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
new file mode 100644
index 0000000..8fec7f8
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaRecordsIterable.java
@@ -0,0 +1,294 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import org.apache.crunch.Pair;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReader;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Properties;
+import java.util.Set;
+
+
+class KafkaRecordsIterable<K, V> implements Iterable<Pair<K, V>> {
+
+  /**
+   * Logger
+   */
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordsIterable.class);
+
+  /**
+   * The Kafka consumer responsible for retrieving messages.
+   */
+  private final Consumer<K, V> consumer;
+
+  /**
+   * The starting positions of the iterable for the topic.
+   */
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * Tracks if the iterable is empty.
+   */
+  private final boolean isEmpty;
+
+  /**
+   * The poll time between each request to Kafka
+   */
+  private final long scanPollTime;
+
+  private final int maxRetryAttempts;
+
+  /**
+   * Creates the iterable that will pull values for a collection of topics using the provided {@code consumer} between
+   * the {@code startOffsets} and {@code stopOffsets}.
+   * @param consumer The consumer for pulling the data from Kafka.  The consumer will be closed automatically once all
+   *                 of the records have been consumed.
+   * @param offsets offsets for pulling data
+   * @param properties properties for tweaking the behavior of the iterable.
+   * @throws IllegalArgumentException if any of the arguments are {@code null} or empty.
+   */
+  public KafkaRecordsIterable(Consumer<K, V> consumer, Map<TopicPartition, Pair<Long, Long>> offsets,
+                              Properties properties) {
+    if (consumer == null) {
+      throw new IllegalArgumentException("The 'consumer' cannot be 'null'.");
+    }
+    this.consumer = consumer;
+
+    if (properties == null) {
+      throw new IllegalArgumentException("The 'properties' cannot be 'null'.");
+    }
+
+    String retryString = properties.getProperty(KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY,
+        KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING);
+    maxRetryAttempts = Integer.parseInt(retryString);
+
+    if (offsets == null || offsets.isEmpty()) {
+      throw new IllegalArgumentException("The 'offsets' cannot 'null' or empty.");
+    }
+
+    //filter out any topics and partitions that do not have offset ranges that will produce data.
+    Map<TopicPartition, Pair<Long, Long>> filteredOffsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      Pair<Long, Long> value = entry.getValue();
+      //if start is less than one less than stop then there is data to be had
+      if(value.first() < value.second()){
+        filteredOffsets.put(entry.getKey(), value);
+      }else{
+        LOG.debug("Removing offsets for {} because start is not less than the end offset.", entry.getKey());
+      }
+    }
+
+    //check to make sure that based on the offsets there is data to retrieve, otherwise false.
+    //there will be data if the start offsets are less than stop offsets
+    isEmpty = filteredOffsets.isEmpty();
+    if (isEmpty) {
+      LOG.warn("Iterable for Kafka for is empty because offsets are empty.");
+    }
+
+    //assign this
+    this.offsets = filteredOffsets;
+
+    scanPollTime = Long.parseLong(properties.getProperty(KafkaSource.CONSUMER_POLL_TIMEOUT_KEY,
+        Long.toString(KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT)));
+  }
+
+  @Override
+  public Iterator<Pair<K, V>> iterator() {
+    if (isEmpty) {
+      LOG.debug("Returning empty iterator since offsets align.");
+      return Collections.emptyIterator();
+    }
+    //Assign consumer to all of the partitions
+    LOG.debug("Assigning topics and partitions and seeking to start offsets.");
+
+    consumer.assign(new LinkedList<>(offsets.keySet()));
+    //hack so maybe look at removing this
+    consumer.poll(0);
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      consumer.seek(entry.getKey(), entry.getValue().first());
+    }
+
+    return new RecordsIterator<K, V>(consumer, offsets, scanPollTime, maxRetryAttempts);
+  }
+
+  private static class RecordsIterator<K, V> implements Iterator<Pair<K, V>> {
+
+    private final Consumer<K, V> consumer;
+    private final Map<TopicPartition, Pair<Long, Long>> offsets;
+    private final long pollTime;
+    private final int maxNumAttempts;
+    private ConsumerRecords<K, V> records;
+    private Iterator<ConsumerRecord<K, V>> currentIterator;
+    private final Set<TopicPartition> remainingPartitions;
+
+    private Pair<K, V> next;
+
+    public RecordsIterator(Consumer<K, V> consumer,
+                           Map<TopicPartition, Pair<Long, Long>> offsets, long pollTime, int maxNumRetries) {
+      this.consumer = consumer;
+      remainingPartitions = new HashSet<>(offsets.keySet());
+      this.offsets = offsets;
+      this.pollTime = pollTime;
+      this.maxNumAttempts = maxNumRetries;
+    }
+
+    @Override
+    public boolean hasNext() {
+      if (next != null)
+        return true;
+
+      //if partitions to consume then pull next value
+      if (remainingPartitions.size() > 0) {
+        next = getNext();
+      }
+
+      return next != null;
+    }
+
+    @Override
+    public Pair<K, V> next() {
+      if (next == null) {
+        next = getNext();
+      }
+
+      if (next != null) {
+        Pair<K, V> returnedNext = next;
+        //prime for next call
+        next = getNext();
+        //return the current next
+        return returnedNext;
+      } else {
+        throw new NoSuchElementException("No more elements.");
+      }
+    }
+
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("remove is not supported.");
+    }
+
+    /**
+     * Gets the current iterator.
+     *
+     * @return the current iterator or {@code null} if there are no more values to consume.
+     */
+    private Iterator<ConsumerRecord<K, V>> getIterator() {
+      if (!remainingPartitions.isEmpty()) {
+        if (currentIterator != null && currentIterator.hasNext()) {
+          return currentIterator;
+        }
+        LOG.debug("Retrieving next set of records.");
+        int numTries = 0;
+        boolean notSuccess = false;
+        while(!notSuccess && numTries < maxNumAttempts) {
+          try {
+            records = consumer.poll(pollTime);
+            notSuccess = true;
+          }catch(RetriableException re){
+            numTries++;
+            if(numTries < maxNumAttempts) {
+              LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+            }else{
+              LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumAttempts, re);
+              throw re;
+            }
+          }
+        }
+        if (records == null || records.isEmpty()) {
+          LOG.debug("Retrieved empty records.");
+          currentIterator = null;
+          return null;
+        }
+        currentIterator = records.iterator();
+        return currentIterator;
+      }
+
+      LOG.debug("No more partitions to consume therefore not retrieving any more records.");
+      return null;
+    }
+
+    /**
+     * Internal method for retrieving the next value to retrieve.
+     *
+     * @return {@code null} if there are no more values to retrieve otherwise the next event.
+     */
+    private Pair<K, V> getNext() {
+      while (!remainingPartitions.isEmpty()) {
+        Iterator<ConsumerRecord<K, V>> iterator = getIterator();
+
+        while (iterator != null && iterator.hasNext()) {
+          ConsumerRecord<K, V> record = iterator.next();
+          TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
+          long offset = record.offset();
+
+          if (withinRange(topicPartition, offset)) {
+            LOG.debug("Retrieving value for {} with offset {}.", topicPartition, offset);
+            return Pair.of(record.key(), record.value());
+          }
+          LOG.debug("Value for {} with offset {} is outside of range skipping.", topicPartition, offset);
+        }
+      }
+
+      LOG.debug("Closing the consumer because there are no more remaining partitions.");
+      consumer.close();
+
+      LOG.debug("Consumed data from all partitions.");
+      return null;
+
+    }
+
+    /**
+     * Checks whether the value for {@code topicPartition} with an {@code offset} is within scan range.  If
+     * the value is not then {@code false} is returned otherwise {@code true}.
+     *
+     * @param topicPartion The partition for the offset
+     * @param offset the offset in the partition
+     * @return {@code true} if the value is within the expected consumption range, otherwise {@code false}.
+     */
+    private boolean withinRange(TopicPartition topicPartion, long offset) {
+      long endOffset = offsets.get(topicPartion).second();
+      //end offsets are one higher than the last written value.
+      boolean emit = offset < endOffset;
+      if (offset >= endOffset - 1) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Completed consuming partition {} with offset {} and ending offset {}.",
+              new Object[]{topicPartion, offset, endOffset});
+        }
+        remainingPartitions.remove(topicPartion);
+        consumer.pause(topicPartion);
+      }
+      LOG.debug("Value for partition {} and offset {} is within range.", topicPartion, offset);
+      return emit;
+    }
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
new file mode 100644
index 0000000..485604d
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaSource.java
@@ -0,0 +1,225 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import org.apache.crunch.DoFn;
+import org.apache.crunch.Pair;
+import org.apache.crunch.ReadableData;
+import org.apache.crunch.Source;
+import org.apache.crunch.TableSource;
+import org.apache.crunch.impl.mr.run.CrunchMapper;
+import org.apache.crunch.io.CrunchInputs;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.crunch.io.ReadableSource;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormat;
+import org.apache.crunch.types.Converter;
+import org.apache.crunch.types.PTableType;
+import org.apache.crunch.types.PType;
+import org.apache.crunch.types.writable.Writables;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * A Crunch Source that will retrieve events from Kafka given start and end offsets.  The source is not designed to
+ * process unbounded data but instead to retrieve data between a specified range.
+ * <p>
+ *
+ * The values retrieved from Kafka are returned as raw bytes inside of a {@link BytesWritable}.  If callers
+ * need specific parsing logic based on the topic then consumers are encouraged to use multiple Kafka Sources
+ * for each topic and use special {@link DoFn} to parse the payload.
+ */
+public class KafkaSource
+    implements TableSource<BytesWritable, BytesWritable>, ReadableSource<Pair<BytesWritable, BytesWritable>> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaSource.class);
+
+  private final FormatBundle inputBundle;
+  private final Properties props;
+  private final Map<TopicPartition, Pair<Long, Long>> offsets;
+
+  /**
+   * The consistent PType describing all of the data being retrieved from Kafka as a BytesWritable.
+   */
+  private static PTableType<BytesWritable, BytesWritable> KAFKA_SOURCE_TYPE =
+      Writables.tableOf(Writables.writables(BytesWritable.class), Writables.writables(BytesWritable.class));
+
+  /**
+   * Constant to indicate how long the reader waits before timing out when retrieving data from Kafka.
+   */
+  public static final String CONSUMER_POLL_TIMEOUT_KEY = "org.apache.crunch.kafka.consumer.poll.timeout";
+
+  /**
+   * Default timeout value for {@link #CONSUMER_POLL_TIMEOUT_KEY} of 1 second.
+   */
+  public static final long CONSUMER_POLL_TIMEOUT_DEFAULT = 1000L;
+
+
+  /**
+   * Constructs a Kafka source that will read data from the Kafka cluster identified by the {@code kafkaConnectionProperties}
+   * and from the specific topics and partitions identified in the {@code offsets}
+   * @param kafkaConnectionProperties The connection properties for reading from Kafka.  These properties will be honored
+   *                                  with the exception of the {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+   *                                  {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+   * @param offsets A map of {@link TopicPartition} to a pair of start and end offsets respectively.  The start and end offsets
+   *                are evaluated at [start, end) where the ending offset is excluded.  Each TopicPartition must have a
+   *                non-null pair describing its offsets.  The start offset should be less than the end offset.  If the values
+   *                are equal or start is greater than the end then that partition will be skipped.
+   */
+  public KafkaSource(Properties kafkaConnectionProperties, Map<TopicPartition, Pair<Long, Long>> offsets) {
+    this.props = copyAndSetProperties(kafkaConnectionProperties);
+
+    inputBundle = createFormatBundle(props, offsets);
+
+    this.offsets = Collections.unmodifiableMap(new HashMap<>(offsets));
+  }
+
+  @Override
+  public Source<Pair<BytesWritable, BytesWritable>> inputConf(String key, String value) {
+    inputBundle.set(key, value);
+    return this;
+  }
+
+  @Override
+  public PType<Pair<BytesWritable, BytesWritable>> getType() {
+    return KAFKA_SOURCE_TYPE;
+  }
+
+  @Override
+  public Converter<?, ?, ?, ?> getConverter() {
+    return KAFKA_SOURCE_TYPE.getConverter();
+  }
+
+  @Override
+  public PTableType<BytesWritable, BytesWritable> getTableType() {
+    return KAFKA_SOURCE_TYPE;
+  }
+
+  @Override
+  public long getSize(Configuration configuration) {
+    // TODO something smarter here.
+    return 1000L * 1000L * 1000L;
+  }
+
+  @Override
+  public String toString() {
+    return "KafkaSource("+props.getProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG)+")";
+  }
+
+  @Override
+  public long getLastModifiedAt(Configuration configuration) {
+    LOG.warn("Cannot determine last modified time for source: {}", toString());
+    return -1;
+  }
+
+  private static <K, V> FormatBundle createFormatBundle(Properties kafkaConnectionProperties,
+                                                        Map<TopicPartition, Pair<Long, Long>> offsets) {
+
+    FormatBundle<KafkaInputFormat> bundle = FormatBundle.forInput(KafkaInputFormat.class);
+
+    KafkaInputFormat.writeOffsetsToBundle(offsets, bundle);
+
+    for (String name : kafkaConnectionProperties.stringPropertyNames()) {
+      bundle.set(name, kafkaConnectionProperties.getProperty(name));
+    }
+
+    return bundle;
+  }
+
+  private static <K, V> Properties copyAndSetProperties(Properties kakfaConnectionProperties) {
+    Properties props = new Properties();
+    props.putAll(kakfaConnectionProperties);
+
+    //Setting the key/value deserializer to ensure proper translation from Kafka to PType format.
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, BytesDeserializer.class.getName());
+
+    return props;
+  }
+
+
+  @Override
+  public Iterable<Pair<BytesWritable, BytesWritable>> read(Configuration conf) throws IOException {
+    // consumer will get closed when the iterable is fully consumed.
+    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+    // of parallelism when reading.
+    Consumer<BytesWritable, BytesWritable> consumer = new KafkaConsumer<>(props);
+    return new KafkaRecordsIterable<>(consumer, offsets, props);
+  }
+
+
+  @Override
+  public void configureSource(Job job, int inputId) throws IOException {
+    Configuration conf = job.getConfiguration();
+    //an id of -1 indicates that this is the only input so just use it directly
+    if (inputId == -1) {
+      job.setMapperClass(CrunchMapper.class);
+      job.setInputFormatClass(inputBundle.getFormatClass());
+      inputBundle.configure(conf);
+    } else {
+      //there are multiple inputs for this mapper so add it as a CrunchInputs and need a fake path just to
+      //make it play well with other file based inputs.
+      Path dummy = new Path("/kafka/" + inputId);
+      CrunchInputs.addInputPath(job, dummy, inputBundle, inputId);
+    }
+  }
+
+  @Override
+  public ReadableData<Pair<BytesWritable, BytesWritable>> asReadable() {
+    // skip using the inputformat/splits since this will be read in a single JVM and don't need the complexity
+    // of parallelism when reading.
+    return new KafkaData<>(props, offsets);
+  }
+
+
+  /**
+   * Basic {@link Deserializer} which simply wraps the payload as a {@link BytesWritable}.
+   */
+  public static class BytesDeserializer implements Deserializer<BytesWritable> {
+
+    @Override
+    public void configure(Map<String, ?> configProperties, boolean isKey) {
+      //no-op
+    }
+
+    @Override
+    public BytesWritable deserialize(String topic, byte[] valueBytes) {
+      return new BytesWritable(valueBytes);
+    }
+
+    @Override
+    public void close() {
+      //no-op
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
new file mode 100644
index 0000000..aeea6fb
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/KafkaUtils.java
@@ -0,0 +1,301 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.cluster.Broker;
+import kafka.cluster.BrokerEndPoint;
+import kafka.cluster.EndPoint;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.OffsetResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.TopicMetadataRequest;
+import kafka.javaapi.TopicMetadataResponse;
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.protocol.SecurityProtocol;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Random;
+
+/**
+ * Simple utilities for retrieving offset and Kafka information to assist in setting up and configuring a
+ * {@link KafkaSource} instance.
+ */
+public class KafkaUtils {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+
+  private static final String CLIENT_ID = "crunch-kafka-client";
+
+  private static final Random RANDOM = new Random();
+
+  /**
+   * Configuration property for the number of retry attempts that will be made to Kafka.
+   */
+  public static final String KAFKA_RETRY_ATTEMPTS_KEY = "org.apache.crunch.kafka.retry.attempts";
+
+  /**
+   * Default number of retry attempts.
+   */
+  public static final int KAFKA_RETRY_ATTEMPTS_DEFAULT = 5;
+  public static final String KAFKA_RETRY_ATTEMPTS_DEFAULT_STRING = Integer.toString(KAFKA_RETRY_ATTEMPTS_DEFAULT);
+
+  /**
+   * Converts the provided {@code config} into a {@link Properties} object to connect with Kafka.
+   * @param config the config to read properties
+   * @return a properties instance populated with all of the values inside the provided {@code config}.
+   */
+  public static Properties getKafkaConnectionProperties(Configuration config) {
+    Properties props = new Properties();
+    for (Map.Entry<String, String> value : config) {
+      props.setProperty(value.getKey(), value.getValue());
+    }
+
+    return props;
+  }
+
+  /**
+   * Adds the {@code properties} to the provided {@code config} instance.
+   * @param properties the properties to add to the config.
+   * @param config the configuration instance to be modified.
+   * @return the config instance with the populated properties
+   */
+  public static Configuration addKafkaConnectionProperties(Properties properties, Configuration config) {
+    for (String name : properties.stringPropertyNames()) {
+      config.set(name, properties.getProperty(name));
+    }
+    return config;
+  }
+
+  /**
+   * Returns a {@link TopicMetadataRequest} from the given topics
+   *
+   * @param topics an array of topics you want metadata for
+   * @return a {@link TopicMetadataRequest} from the given topics
+   * @throws IllegalArgumentException if topics is {@code null} or empty, or if any of the topics is null, empty or blank
+   */
+  private static TopicMetadataRequest getTopicMetadataRequest(String... topics) {
+    if (topics == null)
+      throw new IllegalArgumentException("topics cannot be null");
+    if (topics.length == 0)
+      throw new IllegalArgumentException("topics cannot be empty");
+
+    for (String topic : topics)
+      if (StringUtils.isBlank(topic))
+        throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+    return new TopicMetadataRequest(Arrays.asList(topics));
+  }
+
+  /**
+   * <p>
+   * Retrieves the offset values for an array of topics at the specified time.
+   * </p>
+   * <p>
+   * If the Kafka cluster does not have the logs for the partition at the specified time or if the topic did not exist
+   * at that time this will instead return the earliest offset for that partition.
+   * </p>
+   *
+   * @param properties the properties containing the configuration for kafka
+   * @param time       the time at which we want to know what the offset values were
+   * @param topics     the topics we want to know the offset values of
+   * @return the offset values for an array of topics at the specified time
+   * @throws IllegalArgumentException if properties is {@code null} or if topics is {@code null} or empty or if any of
+   *                                  the topics are {@code null}, empty or blank, or if there is an error parsing the
+   *                                  properties.
+   * @throws IllegalStateException if there is an error communicating with the Kafka cluster to retrieve information.
+   */
+  public static Map<TopicPartition, Long> getBrokerOffsets(Properties properties, long time, String... topics) {
+    if (properties == null)
+      throw new IllegalArgumentException("properties cannot be null");
+
+    final List<Broker> brokers = getBrokers(properties);
+    Collections.shuffle(brokers, RANDOM);
+
+    return getBrokerOffsets(brokers, time, topics);
+  }
+
+  // Visible for testing
+  static Map<TopicPartition, Long> getBrokerOffsets(List<Broker> brokers, long time, String... topics) {
+    if (topics == null)
+      throw new IllegalArgumentException("topics cannot be null");
+    if (topics.length == 0)
+      throw new IllegalArgumentException("topics cannot be empty");
+
+    for (String topic : topics)
+      if (StringUtils.isBlank(topic))
+        throw new IllegalArgumentException("No topic can be null, empty or blank");
+
+    TopicMetadataResponse topicMetadataResponse = null;
+
+    final TopicMetadataRequest topicMetadataRequest = getTopicMetadataRequest(topics);
+
+    for (final Broker broker : brokers) {
+      final SimpleConsumer consumer = getSimpleConsumer(broker);
+      try {
+        topicMetadataResponse = consumer.send(topicMetadataRequest);
+        break;
+      } catch (Exception err) {
+        EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+        LOG.warn(String.format("Fetching topic metadata for topic(s) '%s' from broker '%s' failed",
+            Arrays.toString(topics), endpoint.host()), err);
+      } finally {
+        consumer.close();
+      }
+    }
+
+    if (topicMetadataResponse == null) {
+      throw new IllegalStateException(String.format("Fetching topic metadata for topic(s) '%s' from broker(s) '%s' failed",
+          Arrays.toString(topics), Arrays.toString(brokers.toArray())));
+    }
+
+    // From the topic metadata, build a PartitionOffsetRequestInfo for each partition of each topic. It should be noted that
+    // only the leader Broker has the partition offset information[1] so save the leader Broker so we
+    // can send the request to it.
+    // [1] - https://cwiki.apache.org/KAFKA/a-guide-to-the-kafka-protocol.html#AGuideToTheKafkaProtocol-OffsetAPI
+    Map<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequests =
+        new HashMap<>();
+
+    for (TopicMetadata metadata : topicMetadataResponse.topicsMetadata()) {
+      for (PartitionMetadata partition : metadata.partitionsMetadata()) {
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo =
+            new HashMap<>();
+
+        BrokerEndPoint brokerEndPoint = partition.leader();
+        Broker leader = new Broker(0, brokerEndPoint.host(), brokerEndPoint.port(), SecurityProtocol.PLAINTEXT);
+
+        if (brokerRequests.containsKey(leader))
+          requestInfo = brokerRequests.get(leader);
+
+        requestInfo.put(new TopicAndPartition(metadata.topic(), partition.partitionId()), new PartitionOffsetRequestInfo(
+            time, 1));
+
+        brokerRequests.put(leader, requestInfo);
+      }
+    }
+
+    Map<TopicPartition, Long> topicPartitionToOffset = new HashMap<>();
+
+    // Send the offset request to the leader broker
+    for (Map.Entry<Broker, Map<TopicAndPartition, PartitionOffsetRequestInfo>> brokerRequest : brokerRequests.entrySet()) {
+      SimpleConsumer simpleConsumer = getSimpleConsumer(brokerRequest.getKey());
+
+      OffsetResponse offsetResponse = null;
+      try {
+        OffsetRequest offsetRequest = new OffsetRequest(brokerRequest.getValue(), kafka.api.OffsetRequest.CurrentVersion(),
+            CLIENT_ID);
+        offsetResponse = simpleConsumer.getOffsetsBefore(offsetRequest);
+      } finally {
+        simpleConsumer.close();
+      }
+
+      Map<TopicPartition, Long> earliestOffsets = null;
+
+      // Retrieve/parse the results
+      for (Map.Entry<TopicAndPartition, PartitionOffsetRequestInfo> entry : brokerRequest.getValue().entrySet()) {
+        TopicAndPartition topicAndPartition = entry.getKey();
+        TopicPartition topicPartition = new TopicPartition(topicAndPartition.topic(), topicAndPartition.partition());
+        long[] offsets = offsetResponse.offsets(topicAndPartition.topic(), topicAndPartition.partition());
+        long offset;
+
+        // The Kafka API will return no value if a time is given which there is no log that contains messages from that time
+        // (i.e. before a topic existed or in a log that was rolled/cleaned)
+        if (offsets.length > 0) {
+          offset = offsets[0];
+        } else {
+          LOG.info("Kafka did not have an offset for topic/partition [{}]. Returning earliest known offset instead",
+              topicAndPartition);
+
+          // This shouldn't happen but if kafka's API did not provide us with a value and we are asking for the earliest
+          // time we can't be sure what to do so quit
+          if (time == kafka.api.OffsetRequest.EarliestTime())
+            throw new IllegalStateException("We requested the earliest offsets for topic [" + topicAndPartition.topic()
+                + "] but Kafka returned no values");
+
+          // Load the earliest offsets for the topic if it hasn't been loaded already
+          if (earliestOffsets == null)
+            earliestOffsets = getBrokerOffsets(Arrays.asList(brokerRequest.getKey()),
+                kafka.api.OffsetRequest.EarliestTime(), topicAndPartition.topic());
+
+          offset = earliestOffsets.get(topicPartition);
+        }
+
+        topicPartitionToOffset.put(topicPartition, offset);
+      }
+    }
+
+    return topicPartitionToOffset;
+  }
+
+  /**
+   * Returns a {@link SimpleConsumer} connected to the given {@link Broker}
+   */
+  private static SimpleConsumer getSimpleConsumer(final Broker broker) {
+    // BrokerHost, BrokerPort, timeout, buffer size, client id
+    EndPoint endpoint = broker.endPoints().get(SecurityProtocol.PLAINTEXT).get();
+    return new SimpleConsumer(endpoint.host(), endpoint.port(), 100000, 64 * 1024, CLIENT_ID);
+  }
+
+  /**
+   * Returns a {@link Broker} list from the given {@link Properties}
+   *
+   * @param properties the {@link Properties} with configuration to connect to a Kafka broker
+   */
+  private static List<Broker> getBrokers(final Properties properties) {
+    if (properties == null)
+      throw new IllegalArgumentException("props cannot be null");
+
+    String commaDelimitedBrokerList = properties.getProperty("metadata.broker.list");
+    if (commaDelimitedBrokerList == null)
+      throw new IllegalArgumentException("Unable to find 'metadata.broker.list' in given properties");
+
+    // Split broker list into host/port pairs
+    String[] brokerPortList = commaDelimitedBrokerList.split(",");
+    if (brokerPortList.length < 1)
+      throw new IllegalArgumentException("Unable to parse broker list : [" + Arrays.toString(brokerPortList) + "]");
+
+    final List<Broker> brokers = new ArrayList<Broker>(brokerPortList.length);
+    for (final String brokerHostPortString : brokerPortList) {
+      // Split host/port
+      String[] brokerHostPort = brokerHostPortString.split(":");
+      if (brokerHostPort.length != 2)
+        throw new IllegalArgumentException("Unable to parse host/port from broker string : ["
+            + Arrays.toString(brokerHostPort) + "] from broker list : [" + Arrays.toString(brokerPortList) + "]");
+      try {
+        brokers.add(new Broker(0, brokerHostPort[0], Integer.parseInt(brokerHostPort[1]), SecurityProtocol.PLAINTEXT));
+      } catch (NumberFormatException e) {
+        throw new IllegalArgumentException("Error parsing broker port : " + brokerHostPort[1], e);
+      }
+    }
+    return brokers;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
new file mode 100644
index 0000000..eba4a97
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputFormat.java
@@ -0,0 +1,235 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.Pair;
+import org.apache.crunch.io.FormatBundle;
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Basic input format for reading data from Kafka.  Data is read and maintained in its pure byte form and wrapped
+ * inside of a {@link BytesWritable} instance.
+ *
+ * Populating the configuration of the input format is handled with the convenience method of
+ * {@link #writeOffsetsToConfiguration(Map, Configuration)}.  This should be done to ensure
+ * the Kafka offset information is available when the input format {@link #getSplits(JobContext) creates its splits}
+ * and {@link #createRecordReader(InputSplit, TaskAttemptContext) readers}.
+ */
+public class KafkaInputFormat extends InputFormat<BytesWritable, BytesWritable> implements Configurable {
+
+  /**
+   * Constant for constructing configuration keys for the input format.
+   */
+  private static final String KAFKA_INPUT_OFFSETS_BASE = "org.apache.crunch.kafka.offsets.topic";
+
+  /**
+   * Constant used for building configuration keys and specifying partitions.
+   */
+  private static final String PARTITIONS = "partitions";
+
+  /**
+   * Constant used for building configuration keys and specifying the start of a partition.
+   */
+  private static final String START = "start";
+
+  /**
+   * Constant used for building configuration keys and specifying the end of a partition.
+   */
+  private static final String END = "end";
+
+  /**
+   * Regex to discover all of the defined partitions which should be consumed by the input format.
+   */
+  private static final String TOPIC_KEY_REGEX = KAFKA_INPUT_OFFSETS_BASE + "\\..*\\." + PARTITIONS + "$";
+
+  private Configuration configuration;
+
+  @Override
+  public List<InputSplit> getSplits(JobContext jobContext) throws IOException, InterruptedException {
+    Map<TopicPartition, Pair<Long, Long>> offsets = getOffsets(getConf());
+    List<InputSplit> splits = new LinkedList<>();
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+
+      long start = entry.getValue().first();
+      long end = entry.getValue().second();
+      if(start != end) {
+        splits.add(new KafkaInputSplit(topicPartition.topic(), topicPartition.partition(), entry.getValue().first(),
+            entry.getValue().second()));
+      }
+    }
+
+    return splits;
+  }
+
+  @Override
+  public RecordReader<BytesWritable, BytesWritable> createRecordReader(InputSplit inputSplit, TaskAttemptContext taskAttemptContext)
+      throws IOException, InterruptedException {
+    return new KafkaRecordReader<>();
+  }
+
+  @Override
+  public void setConf(Configuration configuration) {
+    this.configuration = configuration;
+  }
+
+  @Override
+  public Configuration getConf() {
+    return configuration;
+  }
+
+
+  //The following methods are used for reading and writing Kafka Partition offset information into Hadoop's Configuration
+  //objects and into Crunch's FormatBundle.  For a specific Kafka Topic it might have one or many partitions and for
+  //each partition it will need a start and end offset.  Assuming you have a topic of "abc" and it has 2 partitions the
+  //configuration would be populated with the following:
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions = [0,1]
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.start = <partition start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.0.end = <partition end>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.start = <partition start>
+  // org.apache.crunch.kafka.offsets.topic.abc.partitions.1.end = <partition end>
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the {@code bundle}.
+   * @param offsets The starting and ending offsets for the topics and partitions.
+   * @param bundle the bundle into which the information should be persisted.
+   */
+  public static void writeOffsetsToBundle(Map<TopicPartition, Pair<Long, Long>> offsets, FormatBundle bundle) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+      bundle.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Writes the start and end offsets for the provided topic partitions to the {@code config}.
+   * @param offsets The starting and ending offsets for the topics and partitions.
+   * @param config the config into which the information should be persisted.
+   */
+  public static void writeOffsetsToConfiguration(Map<TopicPartition, Pair<Long, Long>> offsets, Configuration config) {
+    for (Map.Entry<String, String> entry : generateValues(offsets).entrySet()) {
+      config.set(entry.getKey(), entry.getValue());
+    }
+  }
+
+  /**
+   * Reads the {@code configuration} to determine which topics, partitions, and offsets should be used for reading data.
+   *
+   * @param configuration the configuration to derive the data to read.
+   * @return a map of {@link TopicPartition} to a pair of start and end offsets.
+   * @throws IllegalStateException if the {@code configuration} does not have the start and end offsets set properly
+   * for a partition.
+   */
+  public static Map<TopicPartition, Pair<Long, Long>> getOffsets(Configuration configuration) {
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    //find configuration for all of the topics with defined partitions
+    Map<String, String> topicPartitionKeys = configuration.getValByRegex(TOPIC_KEY_REGEX);
+
+    //for each topic start to process it's partitions
+    for (String key : topicPartitionKeys.keySet()) {
+      String topic = getTopicFromKey(key);
+      int[] partitions = configuration.getInts(key);
+      //for each partition find and add the start/end offset
+      for (int partitionId : partitions) {
+        TopicPartition topicPartition = new TopicPartition(topic, partitionId);
+        long start = configuration.getLong(generatePartitionStartKey(topic, partitionId),Long.MIN_VALUE);
+        long end = configuration.getLong(generatePartitionEndKey(topic, partitionId),
+            Long.MIN_VALUE);
+
+        if(start == Long.MIN_VALUE || end == Long.MIN_VALUE){
+          throw new IllegalStateException("The "+topicPartition+ "has an invalid start:"+start+ " or end:"+end
+              +" offset configured.");
+        }
+
+        offsets.put(topicPartition, Pair.of(start, end));
+      }
+    }
+
+    return offsets;
+  }
+
+  private static Map<String, String> generateValues(Map<TopicPartition, Pair<Long, Long>> offsets) {
+    Map<String, String> offsetConfigValues = new HashMap<>();
+    Map<String, Set<Integer>> topicsPartitions = new HashMap<>();
+
+    for (Map.Entry<TopicPartition, Pair<Long, Long>> entry : offsets.entrySet()) {
+      TopicPartition topicPartition = entry.getKey();
+      String topic = topicPartition.topic();
+      int partition = topicPartition.partition();
+      String startKey = generatePartitionStartKey(topic, partition);
+      String endKey = generatePartitionEndKey(topic, partition);
+      //Add the start and end offsets for a specific partition
+      offsetConfigValues.put(startKey, Long.toString(entry.getValue().first()));
+      offsetConfigValues.put(endKey, Long.toString(entry.getValue().second()));
+
+      Set<Integer> partitions = topicsPartitions.get(topic);
+      if (partitions == null) {
+        partitions = new HashSet<>();
+        topicsPartitions.put(topic, partitions);
+      }
+      partitions.add(partition);
+    }
+
+    //generate the partitions values for each topic
+    for (Map.Entry<String, Set<Integer>> entry : topicsPartitions.entrySet()) {
+      String key = KAFKA_INPUT_OFFSETS_BASE + "." + entry.getKey() + "." + PARTITIONS;
+      Set<Integer> partitions = entry.getValue();
+      String partitionsString = StringUtils.join(partitions, ",");
+      offsetConfigValues.put(key, partitionsString);
+    }
+
+    return offsetConfigValues;
+  }
+
+  static String generatePartitionStartKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + START;
+  }
+
+  static String generatePartitionEndKey(String topic, int partition) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS + "." + partition + "." + END;
+  }
+
+  static String generateTopicPartitionsKey(String topic) {
+    return KAFKA_INPUT_OFFSETS_BASE + "." + topic + "." + PARTITIONS;
+  }
+
+  static String getTopicFromKey(String key) {
+    //strip off the base key + a trailing "."
+    String value = key.substring(KAFKA_INPUT_OFFSETS_BASE.length() + 1);
+    //strip off the end part + a preceding "."
+    value = value.substring(0, (value.length() - (PARTITIONS.length() + 1)));
+
+    return value;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
new file mode 100644
index 0000000..c8ebc6a
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaInputSplit.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * InputSplit that represent retrieving data from a single {@link TopicPartition} between the specified start
+ * and end offsets.
+ */
+public class KafkaInputSplit extends InputSplit implements Writable {
+
+  private long startingOffset;
+  private long endingOffset;
+  private TopicPartition topicPartition;
+
+  /**
+   * Nullary Constructor for creating the instance inside the Mapper instance.
+   */
+  public KafkaInputSplit() {
+
+  }
+
+  /**
+   * Constructs an input split for the provided {@code topic} and {@code partition} restricting data to be between
+   * the {@code startingOffset} and {@code endingOffset}
+   * @param topic the topic for the split
+   * @param partition the partition for the topic
+   * @param startingOffset the start of the split
+   * @param endingOffset the end of the split
+   */
+  public KafkaInputSplit(String topic, int partition, long startingOffset, long endingOffset) {
+    this.startingOffset = startingOffset;
+    this.endingOffset = endingOffset;
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public long getLength() throws IOException, InterruptedException {
+    // This is just used as a hint for size of bytes so it is already inaccurate.
+    return startingOffset > 0 ? endingOffset - startingOffset : endingOffset;
+  }
+
+  @Override
+  public String[] getLocations() throws IOException, InterruptedException {
+    //Leave empty since data locality not really an issue.
+    return new String[0];
+  }
+
+  /**
+   * Returns the topic and partition for the split
+   * @return the topic and partition for the split
+   */
+  public TopicPartition getTopicPartition() {
+    return topicPartition;
+  }
+
+  /**
+   * Returns the starting offset for the split
+   * @return the starting offset for the split
+   */
+  public long getStartingOffset() {
+    return startingOffset;
+  }
+
+  /**
+   * Returns the ending offset for the split
+   * @return the ending offset for the split
+   */
+  public long getEndingOffset() {
+    return endingOffset;
+  }
+
+  @Override
+  public void write(DataOutput dataOutput) throws IOException {
+    dataOutput.writeUTF(topicPartition.topic());
+    dataOutput.writeInt(topicPartition.partition());
+    dataOutput.writeLong(startingOffset);
+    dataOutput.writeLong(endingOffset);
+  }
+
+  @Override
+  public void readFields(DataInput dataInput) throws IOException {
+    String topic = dataInput.readUTF();
+    int partition = dataInput.readInt();
+    startingOffset = dataInput.readLong();
+    endingOffset = dataInput.readLong();
+
+    topicPartition = new TopicPartition(topic, partition);
+  }
+
+  @Override
+  public String toString() {
+    return getTopicPartition() + " Start: " + startingOffset + " End: " + endingOffset;
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
new file mode 100644
index 0000000..1420519
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/inputformat/KafkaRecordReader.java
@@ -0,0 +1,152 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka.inputformat;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.RetriableException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.Iterator;
+
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_DEFAULT;
+import static org.apache.crunch.kafka.KafkaUtils.KAFKA_RETRY_ATTEMPTS_KEY;
+import static org.apache.crunch.kafka.KafkaUtils.getKafkaConnectionProperties;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_DEFAULT;
+import static org.apache.crunch.kafka.KafkaSource.CONSUMER_POLL_TIMEOUT_KEY;
+
+/**
+ * A {@link RecordReader} for pulling data from Kafka.
+ * @param <K> the key of the records from Kafka
+ * @param <V> the value of the records from Kafka
+ */
+public class KafkaRecordReader<K, V> extends RecordReader<K, V> {
+
+  private static final Logger LOG = LoggerFactory.getLogger(KafkaRecordReader.class);
+
+  private Consumer<K, V> consumer;
+  private ConsumerRecord<K, V> record;
+  private long endingOffset;
+  private Iterator<ConsumerRecord<K, V>> recordIterator;
+  private long consumerPollTimeout;
+  private long maxNumberOfRecords;
+  private long startingOffset;
+  private int maxNumberAttempts;
+
+  @Override
+  public void initialize(InputSplit inputSplit, TaskAttemptContext taskAttemptContext) throws IOException, InterruptedException {
+    consumer = new KafkaConsumer<>(getKafkaConnectionProperties(taskAttemptContext.getConfiguration()));
+    KafkaInputSplit split = (KafkaInputSplit) inputSplit;
+    TopicPartition topicPartition = split.getTopicPartition();
+    consumer.assign(Collections.singletonList(topicPartition));
+    //suggested hack to gather info without gathering data
+    consumer.poll(0);
+    //now seek to the desired start location
+    startingOffset = split.getStartingOffset();
+    consumer.seek(topicPartition,startingOffset);
+
+    endingOffset = split.getEndingOffset();
+
+    maxNumberOfRecords = endingOffset - split.getStartingOffset();
+    if(LOG.isInfoEnabled()) {
+      LOG.info("Reading data from {} between {} and {}", new Object[]{topicPartition, startingOffset, endingOffset});
+    }
+
+    Configuration config = taskAttemptContext.getConfiguration();
+    consumerPollTimeout = config.getLong(CONSUMER_POLL_TIMEOUT_KEY, CONSUMER_POLL_TIMEOUT_DEFAULT);
+    maxNumberAttempts = config.getInt(KAFKA_RETRY_ATTEMPTS_KEY, KAFKA_RETRY_ATTEMPTS_DEFAULT);
+  }
+
+  @Override
+  public boolean nextKeyValue() throws IOException, InterruptedException {
+    recordIterator = getRecords();
+    record = recordIterator.hasNext() ? recordIterator.next() : null;
+    if(LOG.isDebugEnabled()){
+      if(record != null) {
+        LOG.debug("nextKeyValue: Retrieved record with offset {}", record.offset());
+      }else{
+        LOG.debug("nextKeyValue: Retrieved null record");
+      }
+    }
+    return record != null && record.offset() < endingOffset;
+  }
+
+  @Override
+  public K getCurrentKey() throws IOException, InterruptedException {
+    return record == null ? null : record.key();
+  }
+
+  @Override
+  public V getCurrentValue() throws IOException, InterruptedException {
+    return record == null ? null : record.value();
+  }
+
+  @Override
+  public float getProgress() throws IOException, InterruptedException {
+    //not most accurate but gives reasonable estimate
+    return record == null ? 0.0f : ((float) (record.offset()- startingOffset)) / maxNumberOfRecords;
+  }
+
+  private Iterator<ConsumerRecord<K, V>> getRecords() {
+    if (recordIterator == null || !recordIterator.hasNext()) {
+      ConsumerRecords<K, V> records = null;
+      int numTries = 0;
+      boolean notSuccess = false;
+      while(!notSuccess && numTries < maxNumberAttempts) {
+        try {
+          records = consumer.poll(consumerPollTimeout);
+          notSuccess = true;
+        } catch (RetriableException re) {
+          numTries++;
+          if (numTries < maxNumberAttempts) {
+            LOG.warn("Error pulling messages from Kafka. Retrying with attempt {}", numTries, re);
+          } else {
+            LOG.error("Error pulling messages from Kafka. Exceeded maximum number of attempts {}", maxNumberAttempts, re);
+            throw re;
+          }
+        }
+      }
+
+      if(LOG.isDebugEnabled() && records != null){
+        LOG.debug("No records retrieved from Kafka therefore nothing to iterate over.");
+      }else{
+        LOG.debug("Retrieved records from Kafka to iterate over.");
+      }
+      return records != null ? records.iterator() : ConsumerRecords.<K, V>empty().iterator();
+    }
+    return recordIterator;
+  }
+
+  @Override
+  public void close() throws IOException {
+    LOG.debug("Closing the record reader.");
+    if(consumer != null) {
+      consumer.close();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
new file mode 100644
index 0000000..836039c
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/ClusterTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.Decoder;
+import kafka.serializer.Encoder;
+import kafka.utils.VerifiableProperties;
+import org.apache.crunch.impl.mr.run.RuntimeParameters;
+import org.apache.crunch.kafka.inputformat.KafkaInputFormatIT;
+import org.apache.crunch.kafka.inputformat.KafkaRecordReaderIT;
+import org.apache.crunch.kafka.utils.KafkaBrokerTestHarness;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.Serializer;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Suite;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+@RunWith(Suite.class)
+@Suite.SuiteClasses({
+    //org.apache.crunch.kafka
+    KafkaSourceIT.class, KafkaRecordsIterableIT.class, KafkaDataIT.class,
+    //org.apache.crunch.kafka.inputformat
+    KafkaRecordReaderIT.class, KafkaInputFormatIT.class, KafkaUtilsIT.class,
+})
+public class ClusterTest {
+
+
+  private static TemporaryFolder folder = new TemporaryFolder();
+  private static KafkaBrokerTestHarness kafka;
+  private static boolean runAsSuite = false;
+  private static Configuration conf;
+  private static FileSystem fs;
+
+  @BeforeClass
+  public static void startSuite() throws Exception {
+    runAsSuite = true;
+    startKafka();
+    setupFileSystem();
+  }
+
+  @AfterClass
+  public static void endSuite() throws Exception {
+    stopKafka();
+  }
+
+  public static void startTest() throws Exception {
+    if (!runAsSuite) {
+      startKafka();
+      setupFileSystem();
+    }
+  }
+
+  public static void endTest() throws Exception {
+    if (!runAsSuite) {
+      stopKafka();
+    }
+  }
+
+  private static void stopKafka() throws IOException {
+    kafka.tearDown();
+  }
+
+  private static void startKafka() throws IOException {
+    Properties props = new Properties();
+    props.setProperty("auto.create.topics.enable", Boolean.TRUE.toString());
+
+    kafka = new KafkaBrokerTestHarness(props);
+    kafka.setUp();
+  }
+
+  private static void setupFileSystem() throws IOException {
+    folder.create();
+
+    conf = new Configuration();
+    conf.set(RuntimeParameters.TMP_DIR, folder.getRoot().getAbsolutePath());
+    // Run Map/Reduce tests in process.
+    conf.set("mapreduce.jobtracker.address", "local");
+  }
+
+  public static Configuration getConf() {
+    // Clone the configuration so it doesn't get modified for other tests.
+    return new Configuration(conf);
+  }
+
+  public static Properties getConsumerProperties() {
+    Properties props = new Properties();
+    props.putAll(kafka.getProps());
+    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringSerDe.class.getName());
+    //set this because still needed by some APIs.
+    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+    props.setProperty("enable.auto.commit", Boolean.toString(false));
+
+    //when set this causes some problems with initializing the consumer.
+    props.remove(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
+    return props;
+  }
+
+  public static Properties getProducerProperties() {
+    Properties props = new Properties();
+    props.putAll(kafka.getProps());
+    //set this because still needed by some APIs.
+    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, props.getProperty("metadata.broker.list"));
+    return props;
+  }
+
+  public static Configuration getConsumerConfig() {
+    Configuration kafkaConfig = new Configuration(conf);
+    KafkaUtils.addKafkaConnectionProperties(getConsumerProperties(), kafkaConfig);
+    return kafkaConfig;
+  }
+
+  public static List<String> writeData(Properties props, String topic, String batch, int loops, int numValuesPerLoop) {
+    Properties producerProps = new Properties();
+    producerProps.putAll(props);
+    producerProps.setProperty("serializer.class", StringEncoderDecoder.class.getName());
+    producerProps.setProperty("key.serializer.class", StringEncoderDecoder.class.getName());
+
+    // Set the default compression used to be snappy
+    producerProps.setProperty("compression.codec", "snappy");
+    producerProps.setProperty("request.required.acks", "1");
+
+    ProducerConfig producerConfig = new ProducerConfig(producerProps);
+
+    Producer<String, String> producer = new Producer<>(producerConfig);
+    List<String> keys = new LinkedList<>();
+    try {
+      for (int i = 0; i < loops; i++) {
+        List<KeyedMessage<String, String>> events = new LinkedList<>();
+        for (int j = 0; j < numValuesPerLoop; j++) {
+          String key = "key" + batch + i + j;
+          String value = "value" + batch + i + j;
+          keys.add(key);
+          events.add(new KeyedMessage<>(topic, key, value));
+        }
+        producer.send(events);
+      }
+    } finally {
+      producer.close();
+    }
+    return keys;
+  }
+
+
+  public static class StringSerDe implements Serializer<String>, Deserializer<String> {
+
+    @Override
+    public void configure(Map map, boolean b) {
+
+    }
+
+    @Override
+    public byte[] serialize(String topic, String value) {
+      return value.getBytes();
+    }
+
+    @Override
+    public String deserialize(String topic, byte[] bytes) {
+      return new String(bytes);
+    }
+
+    @Override
+    public void close() {
+
+    }
+  }
+
+  public static class StringEncoderDecoder implements Encoder<String>, Decoder<String> {
+
+    public StringEncoderDecoder() {
+
+    }
+
+    public StringEncoderDecoder(VerifiableProperties props) {
+
+    }
+
+    @Override
+    public String fromBytes(byte[] bytes) {
+      return new String(bytes);
+    }
+
+    @Override
+    public byte[] toBytes(String value) {
+      return value.getBytes();
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/321cfef6/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
new file mode 100644
index 0000000..595a94b
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/KafkaDataIT.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.crunch.kafka;
+
+
+import kafka.api.OffsetRequest;
+import org.apache.crunch.Pair;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+import static org.apache.crunch.kafka.ClusterTest.writeData;
+import static org.hamcrest.Matchers.hasItem;
+import static org.hamcrest.core.Is.is;
+import static org.hamcrest.core.IsNot.not;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+public class KafkaDataIT {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private String topic;
+  private Map<TopicPartition, Long> startOffsets;
+  private Map<TopicPartition, Long> stopOffsets;
+  private Map<TopicPartition, Pair<Long, Long>> offsets;
+  private Properties props;
+
+  @BeforeClass
+  public static void init() throws Exception {
+    ClusterTest.startTest();
+  }
+
+  @AfterClass
+  public static void cleanup() throws Exception {
+    ClusterTest.endTest();
+  }
+
+  @Before
+  public void setup() {
+    topic = testName.getMethodName();
+
+    props = ClusterTest.getConsumerProperties();
+
+    startOffsets = new HashMap<>();
+    stopOffsets = new HashMap<>();
+    offsets = new HashMap<>();
+    for (int i = 0; i < 4; i++) {
+      TopicPartition tp = new TopicPartition(topic, i);
+      startOffsets.put(tp, 0L);
+      stopOffsets.put(tp, 100L);
+
+      offsets.put(tp, Pair.of(0L, 100L));
+    }
+  }
+
+  @Test
+  public void getDataIterable() throws IOException {
+    int loops = 10;
+    int numPerLoop = 100;
+    int total = loops * numPerLoop;
+    List<String> keys = writeData(props, topic, "batch", loops, numPerLoop);
+
+    startOffsets = getStartOffsets(props, topic);
+    stopOffsets = getStopOffsets(props, topic);
+
+    Map<TopicPartition, Pair<Long, Long>> offsets = new HashMap<>();
+    for (Map.Entry<TopicPartition, Long> entry : startOffsets.entrySet()) {
+      offsets.put(entry.getKey(), Pair.of(entry.getValue(), stopOffsets.get(entry.getKey())));
+    }
+
+    Iterable<Pair<String, String>> data = new KafkaData<String, String>(props, offsets).read(null);
+
+    int count = 0;
+    for (Pair<String, String> event : data) {
+      assertThat(keys, hasItem(event.first()));
+      assertTrue(keys.remove(event.first()));
+      count++;
+    }
+
+    assertThat(count, is(total));
+    assertThat(keys.size(), is(0));
+  }
+
+  private static Map<TopicPartition, Long> getStopOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.LatestTime(), topic);
+  }
+
+  private static Map<TopicPartition, Long> getStartOffsets(Properties props, String topic) {
+    return KafkaUtils.getBrokerOffsets(props, OffsetRequest.EarliestTime(), topic);
+  }
+}


Mime
View raw message