crunch-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mkw...@apache.org
Subject crunch git commit: CRUNCH-611: Added API for Offset reading/writing along with a simple implementation that supports doing it from hdfs.
Date Sat, 30 Jul 2016 20:48:47 GMT
Repository: crunch
Updated Branches:
  refs/heads/master b0491f20a -> 157ae25b4


CRUNCH-611: Added API for Offset reading/writing along with a simple implementation that supports doing it from hdfs.

Signed-off-by: Micah Whitacre <mkwhit@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/crunch/repo
Commit: http://git-wip-us.apache.org/repos/asf/crunch/commit/157ae25b
Tree: http://git-wip-us.apache.org/repos/asf/crunch/tree/157ae25b
Diff: http://git-wip-us.apache.org/repos/asf/crunch/diff/157ae25b

Branch: refs/heads/master
Commit: 157ae25b48ba5fb46001df7bb05c753a188ade8d
Parents: b0491f2
Author: Micah Whitacre <mkwhit@gmail.com>
Authored: Wed Jul 13 10:18:17 2016 -0500
Committer: Micah Whitacre <mkwhit@apache.org>
Committed: Sat Jul 30 15:42:40 2016 -0500

----------------------------------------------------------------------
 crunch-kafka/pom.xml                            |  19 +
 .../kafka/offset/AbstractOffsetReader.java      |  24 ++
 .../kafka/offset/AbstractOffsetWriter.java      |  17 +
 .../crunch/kafka/offset/OffsetReader.java       |  44 ++
 .../crunch/kafka/offset/OffsetWriter.java       |  34 ++
 .../kafka/offset/hdfs/HDFSOffsetReader.java     | 142 +++++++
 .../kafka/offset/hdfs/HDFSOffsetWriter.java     | 165 ++++++++
 .../crunch/kafka/offset/hdfs/Offsets.java       | 316 ++++++++++++++
 .../kafka/offset/hdfs/HDFSOffsetReaderTest.java | 220 ++++++++++
 .../kafka/offset/hdfs/HDFSOffsetWriterTest.java | 185 ++++++++
 .../crunch/kafka/offset/hdfs/OffsetsTest.java   | 418 +++++++++++++++++++
 pom.xml                                         |  19 +
 12 files changed, 1603 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/crunch-kafka/pom.xml b/crunch-kafka/pom.xml
index 7d6256b..dddf859 100644
--- a/crunch-kafka/pom.xml
+++ b/crunch-kafka/pom.xml
@@ -52,6 +52,25 @@ under the License.
       <artifactId>hadoop-client</artifactId>
     </dependency>
 
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-databind</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>com.fasterxml.jackson.core</groupId>
+      <artifactId>jackson-annotations</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+    <dependency>
+      <groupId>joda-time</groupId>
+      <artifactId>joda-time</artifactId>
+      <scope>compile</scope>
+      <optional>true</optional>
+    </dependency>
+
 
     <dependency>
       <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java
new file mode 100644
index 0000000..0856c64
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetReader.java
@@ -0,0 +1,24 @@
+package org.apache.crunch.kafka.offset;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Base implementation of {@link OffsetReader}
+ */
+public abstract class AbstractOffsetReader implements OffsetReader {
+
+  @Override
+  public Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException {
+    throw new UnsupportedOperationException("Operation to read old offsets is not supported");
+  }
+
+  @Override
+  public List<Long> getStoredOffsetPersistenceTimes() throws IOException {
+    throw new UnsupportedOperationException("Operation to retrieve old offset persistence times is not supported");
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java
new file mode 100644
index 0000000..493a499
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/AbstractOffsetWriter.java
@@ -0,0 +1,17 @@
+package org.apache.crunch.kafka.offset;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Base implementation of {@link OffsetWriter}
+ */
+public abstract class AbstractOffsetWriter implements OffsetWriter {
+
+  @Override
+  public void write(Map<TopicPartition, Long> offsets) throws IOException {
+    write(System.currentTimeMillis(), offsets);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java
new file mode 100644
index 0000000..4d4056b
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetReader.java
@@ -0,0 +1,44 @@
+package org.apache.crunch.kafka.offset;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Reader API that supports reading offset information from an underlying storage mechanism.
+ */
+public interface OffsetReader extends Closeable {
+
+  /**
+   * Reads the last stored offsets.
+   *
+   * @return the last stored offsets.  If there are no stored offsets an empty collection will be returned.
+   * @throws IOException if there is an error reading from the underlying storage.
+   */
+  Map<TopicPartition, Long> readLatestOffsets() throws IOException;
+
+  /**
+   * Reads the offsets for a given {@code persistedOffsetTime}.  Note that not all storage mechanisms support
+   * complete historical offset information.  Use the {@link #getStoredOffsetPersistenceTimes()} to find valid values
+   * to specify for {@code persistedOffsetTime}.
+   *
+   * @param persistedOffsetTime the persistence time when offsets were written to the underlying storage system.
+   * @return returns the offsets persisted at the specified {@code persistedOffsetTime}.  If no offsets were persisted
+   * at that time or available to be retrieved then {@code null} will be returned.
+   * @throws IOException if there is an error reading from the underlying storage.
+   */
+  Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException;
+
+  /**
+   * Returns the list of available persistence times offsets have been written to the underlying storage mechanism.
+   * The list of available persistence times will be returned in the order of earliest to latest.
+   *
+   * @return the collection of persistence times in the form of milliseconds since epoch.  If there are no historical
+   * persistence times then an {@code empty list} is returned.
+   * @throws IOException if there is an error reading from the underlying storage.
+   */
+  public List<Long> getStoredOffsetPersistenceTimes() throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java
new file mode 100644
index 0000000..0f0056e
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/OffsetWriter.java
@@ -0,0 +1,34 @@
+package org.apache.crunch.kafka.offset;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.Map;
+
+/**
+ * Writer for persisting offset information.
+ */
+public interface OffsetWriter extends Closeable {
+
+  /**
+   * Persists the {@code offsets} to a configured location with the current time specified as the as of time.
+   *
+   * @param offsets the offsets to persist
+   * @throws IllegalArgumentException if the {@code offsets} are {@code null}.
+   * @throws IOException              if there is an error persisting the offsets.
+   */
+  void write(Map<TopicPartition, Long> offsets) throws IOException;
+
+  /**
+   * Persists the {@code offsets} to a configured location with metadata of {@code asOfTime} indicating
+   * the time in milliseconds when the offsets were meaningful.
+   *
+   * @param asOfTime the metadata describing when the offsets are accurate as of a time given in milliseconds
+   *                 since epoch.
+   * @param offsets  the offsets to persist
+   * @throws IllegalArgumentException if the {@code offsets} are {@code null} or the {@code asOfTime} is less than 0.
+   * @throws IOException              if there is an error persisting the offsets.
+   */
+  void write(long asOfTime, Map<TopicPartition, Long> offsets) throws IOException;
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java
new file mode 100644
index 0000000..a497570
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReader.java
@@ -0,0 +1,142 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.crunch.kafka.offset.AbstractOffsetReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Reader implementation that reads offset information from HDFS.
+ */
+public class HDFSOffsetReader extends AbstractOffsetReader {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetReader.class);
+
+  private final Configuration config;
+  private final Path baseOffsetStoragePath;
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Creates a reader instance for interacting with the storage specified by the {@code config} and with
+   * the base storage path of {@code baseStoragePath}.
+   *
+   * @param config                the config for interacting with the underlying data store.
+   * @param baseOffsetStoragePath the base storage path for offset information.  If the path does not exist it will
+   *                              be created.
+   * @throws IllegalArgumentException if either argument is {@code null}.
+   */
+  public HDFSOffsetReader(Configuration config, Path baseOffsetStoragePath) {
+    if (config == null) {
+      throw new IllegalArgumentException("The 'config' cannot be 'null'.");
+    }
+    if (baseOffsetStoragePath == null) {
+      throw new IllegalArgumentException("The 'baseOffsetStoragePath' cannot be 'null'.");
+    }
+    this.config = config;
+    this.baseOffsetStoragePath = baseOffsetStoragePath;
+  }
+
+  @Override
+  public Map<TopicPartition, Long> readLatestOffsets() throws IOException {
+    List<Long> storedOffsetPersistenceTimes = getStoredOffsetPersistenceTimes(true);
+    if (storedOffsetPersistenceTimes.isEmpty()) {
+      return Collections.emptyMap();
+    }
+
+    long persistedTime = storedOffsetPersistenceTimes.get(0);
+
+    Map<TopicPartition, Long> offsets = readOffsets(persistedTime);
+
+    return offsets == null ? Collections.<TopicPartition, Long>emptyMap() : offsets;
+  }
+
+  @Override
+  public Map<TopicPartition, Long> readOffsets(long persistedOffsetTime) throws IOException {
+    Path offsetFilePath = HDFSOffsetWriter.getPersistedTimeStoragePath(baseOffsetStoragePath, persistedOffsetTime);
+
+    FileSystem fs = getFileSystem();
+    if (fs.isFile(offsetFilePath)) {
+      InputStream inputStream = fs.open(offsetFilePath);
+      try  {
+        Offsets offsets = MAPPER.readValue(inputStream, Offsets.class);
+        Map<TopicPartition, Long> partitionsMap = new HashMap<>();
+        for(Offsets.PartitionOffset partitionOffset: offsets.getOffsets()){
+          partitionsMap.put(new TopicPartition(partitionOffset.getTopic(), partitionOffset.getPartition()),
+              partitionOffset.getOffset());
+        }
+        return partitionsMap;
+      }finally{
+        inputStream.close();
+      }
+    }
+
+    LOG.error("Offset file at {} is not a file or does not exist.", offsetFilePath);
+    return null;
+  }
+
+  @Override
+  public List<Long> getStoredOffsetPersistenceTimes() throws IOException {
+    return getStoredOffsetPersistenceTimes(false);
+  }
+
+  private List<Long> getStoredOffsetPersistenceTimes(boolean newestFirst) throws IOException {
+    List<Long> persistedTimes = new LinkedList<>();
+    FileSystem fs = getFileSystem();
+    try {
+      FileStatus[] fileStatuses = fs.listStatus(baseOffsetStoragePath);
+      for (FileStatus status : fileStatuses) {
+        if (status.isFile()) {
+          String fileName = status.getPath().getName();
+          try {
+            persistedTimes.add(HDFSOffsetWriter.fileNameToPersistenceTime(fileName));
+          } catch (IllegalArgumentException iae) {
+            LOG.info("Skipping file {} due to filename not being of the correct format.", status.getPath(),
+                iae);
+          }
+        } else {
+          LOG.info("Skippping {} because it is not a file.", status.getPath());
+        }
+      }
+    } catch (FileNotFoundException fnfe) {
+      LOG.error("Unable to retrieve prior offsets.", fnfe);
+    }
+
+    //natural order should put oldest (smallest long) first. This will put newest first.
+    if (newestFirst) {
+      Collections.sort(persistedTimes, Collections.reverseOrder());
+    } else {
+      Collections.sort(persistedTimes);
+    }
+    return Collections.unmodifiableList(persistedTimes);
+  }
+
+  @Override
+  public void close() throws IOException {
+
+  }
+
+  /**
+   * Returns the {@link FileSystem} instance for writing data.  Callers are not responsible for closing the instance.
+   *
+   * @return the {@link FileSystem} instance for writing data.
+   * @throws IOException error retrieving underlying file system.
+   */
+  protected FileSystem getFileSystem() throws IOException {
+    return FileSystem.get(config);
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java
new file mode 100644
index 0000000..9762d1d
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriter.java
@@ -0,0 +1,165 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.lang.StringUtils;
+import org.apache.crunch.kafka.offset.AbstractOffsetWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.TopicPartition;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.joda.time.format.DateTimeFormat;
+import org.joda.time.format.DateTimeFormatter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Offset writer implementation that stores the offsets in HDFS.
+ */
+public class HDFSOffsetWriter extends AbstractOffsetWriter {
+
+  private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetWriter.class);
+
+  /**
+   * Custom formatter for translating the times into valid file names.
+   */
+  public static final String PERSIST_TIME_FORMAT = "yyyy-MM-dd'T'HH-mm-ssZ";
+
+  /**
+   * Formatter to use when creating the file names in a URI compliant format.
+   */
+  public static final DateTimeFormatter FILE_FORMATTER = DateTimeFormat.forPattern(PERSIST_TIME_FORMAT).withZoneUTC();
+
+  /**
+   * File extension for storing the offsets.
+   */
+  public static final String FILE_FORMAT_EXTENSION = ".json";
+
+  /**
+   * Configuration for the underlying storage.
+   */
+  private final Configuration config;
+
+  /**
+   * Mapper for converting data into JSON
+   */
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  /**
+   * Base storage path for offset data
+   */
+  private final Path baseStoragePath;
+
+  /**
+   * Creates a writer instance for interacting with the storage specified by the {@code config} and with
+   * the base storage path of {@code baseStoragePath}.
+   *
+   * @param config          the config for interacting with the underlying data store.
+   * @param baseStoragePath the base storage path for offset information.
+   * @throws IllegalArgumentException if either argument is {@code null}.
+   */
+  public HDFSOffsetWriter(Configuration config, Path baseStoragePath) {
+    if (config == null) {
+      throw new IllegalArgumentException("The 'config' cannot be 'null'.");
+    }
+    if (baseStoragePath == null) {
+      throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
+    }
+    this.config = config;
+    this.baseStoragePath = baseStoragePath;
+  }
+
+  @Override
+  public void write(long asOfTime, Map<TopicPartition, Long> offsets) throws IOException {
+    if (offsets == null) {
+      throw new IllegalArgumentException("The 'offsets' cannot be 'null'.");
+    }
+    if (asOfTime < 0) {
+      throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0.");
+    }
+    List<Offsets.PartitionOffset> partitionOffsets = new LinkedList<>();
+    for(Map.Entry<TopicPartition, Long> entry: offsets.entrySet()){
+      partitionOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setOffset(entry.getValue())
+          .setTopic(entry.getKey().topic())
+          .setPartition(entry.getKey().partition()).build());
+    }
+
+    Offsets storageOffsets = Offsets.Builder.newBuilder().setOffsets(partitionOffsets)
+        .setAsOfTime(asOfTime).build();
+
+    FileSystem fs = getFileSystem();
+    Path offsetPath = getPersistedTimeStoragePath(baseStoragePath, asOfTime);
+    LOG.debug("Writing offsets to {} with as of time {}", offsetPath, asOfTime);
+    try (FSDataOutputStream fsDataOutputStream = fs.create(getPersistedTimeStoragePath(baseStoragePath, asOfTime), true)) {
+      MAPPER.writeValue(fsDataOutputStream, storageOffsets);
+      fsDataOutputStream.flush();
+    }
+    LOG.debug("Completed writing offsets to {}", offsetPath);
+  }
+
+  @Override
+  public void close() throws IOException {
+    //no-op
+  }
+
+  /**
+   * Returns the {@link FileSystem} instance for writing data.  Callers are not responsible for closing the instance.
+   *
+   * @return the {@link FileSystem} instance for writing data.
+   * @throws IOException error retrieving underlying file system.
+   */
+  protected FileSystem getFileSystem() throws IOException {
+    return FileSystem.get(config);
+  }
+
+  /**
+   * Creates a {@link Path} for storing the offsets for a specified {@code persistedTime}.
+   *
+   * @param baseStoragePath The base path the offsets will be stored at.
+   * @param persistedTime   the time of the data being persisted.
+   * @return The path to where the offset information should be stored.
+   * @throws IllegalArgumentException if the {@code baseStoragePath} is {@code null}.
+   */
+  public static Path getPersistedTimeStoragePath(Path baseStoragePath, long persistedTime) {
+    if (baseStoragePath == null) {
+      throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
+    }
+    return new Path(baseStoragePath, persistenceTimeToFileName(persistedTime));
+  }
+
+  /**
+   * Converts a {@code fileName} into the time the offsets were persisted.
+   *
+   * @param fileName the file name to parse.
+   * @return the time in milliseconds since epoch that the offsets were stored.
+   * @throws IllegalArgumentException if the {@code fileName} is not of the correct format or is {@code null} or
+   *                                  empty.
+   */
+  public static long fileNameToPersistenceTime(String fileName) {
+    if (StringUtils.isBlank(fileName)) {
+      throw new IllegalArgumentException("the 'fileName' cannot be 'null' or empty");
+    }
+    String formattedTimeString = StringUtils.strip(fileName, FILE_FORMAT_EXTENSION);
+    DateTime persistedTime = FILE_FORMATTER.parseDateTime(formattedTimeString);
+    return persistedTime.getMillis();
+  }
+
+  /**
+   * Converts a {@code persistedTime} into a file name for persisting the offsets.
+   *
+   * @param persistedTime the persisted time to use to generate the file name.
+   * @return the file name to use when persisting the data.
+   */
+  public static String persistenceTimeToFileName(long persistedTime) {
+    DateTime dateTime = new DateTime(persistedTime, DateTimeZone.UTC);
+    String formattedTime = FILE_FORMATTER.print(dateTime);
+    return formattedTime + FILE_FORMAT_EXTENSION;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
new file mode 100644
index 0000000..e5c80e0
--- /dev/null
+++ b/crunch-kafka/src/main/java/org/apache/crunch/kafka/offset/hdfs/Offsets.java
@@ -0,0 +1,316 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonInclude;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonDeserialize;
+import kafka.api.OffsetRequest;
+import org.apache.commons.lang.StringUtils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Set;
+
+/**
+ * Simple object to represent a collection of Kafka Topic and Partition offset information to make storing
+ * this information easier.
+ */
+@JsonDeserialize(builder = Offsets.Builder.class)
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class Offsets {
+
+  private final long offsetsAsOfTime;
+
+  private final List<PartitionOffset> offsets;
+
+  private Offsets(long asOfTime, List<PartitionOffset> offsets) {
+    offsetsAsOfTime = asOfTime;
+    this.offsets = offsets;
+  }
+
+  /**
+   * Returns the time in milliseconds since epoch that the offset information was retrieved or valid as of.
+   *
+   * @return the time in milliseconds since epoch that the offset information was retrieved or valid as of.
+   */
+  @JsonProperty("asOfTime")
+  public long getAsOfTime() {
+    return offsetsAsOfTime;
+  }
+
+  /**
+   * The collection of offset information for specific topics and partitions.
+   *
+   * @return collection of offset information for specific topics and partitions.
+   */
+  @JsonProperty("offsets")
+  public List<PartitionOffset> getOffsets() {
+    return offsets;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(offsetsAsOfTime, offsets);
+  }
+
+  @Override
+  public boolean equals(Object obj) {
+    if (obj == null) {
+      return false;
+    }
+
+    if (obj instanceof Offsets) {
+      Offsets that = (Offsets) obj;
+
+      return this.offsetsAsOfTime == that.offsetsAsOfTime
+          && this.offsets.equals(that.offsets);
+    }
+
+    return false;
+  }
+
+
+  /**
+   * Builder for the {@link Offsets}.
+   */
+  @JsonIgnoreProperties(ignoreUnknown = true)
+  public static class Builder {
+
+    private long asOf = -1;
+    private List<PartitionOffset> offsets = Collections.emptyList();
+
+    /**
+     * Creates a new Builder instance.
+     *
+     * @return a new Builder instance.
+     */
+    public static Builder newBuilder() {
+      return new Builder();
+    }
+
+    /**
+     * Sets the as of time for the collection of offsets.
+     *
+     * @param asOfTime the as of time for the offsets.
+     * @return builder instance
+     * @throws IllegalArgumentException if the {@code asOfTime} is less than 0.
+     */
+    @JsonProperty("asOfTime")
+    public Builder setAsOfTime(long asOfTime) {
+      if (asOfTime < 0) {
+        throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0.");
+      }
+      this.asOf = asOfTime;
+      return this;
+    }
+
+    /**
+     * Sets the collection of offsets.
+     *
+     * @param offsets the collection of offsets
+     * @return builder instance
+     * @throws IllegalArgumentException if the {@code offsets} is {@code null}.
+     */
+    @JsonProperty("offsets")
+    public Builder setOffsets(List<PartitionOffset> offsets) {
+      if (offsets == null) {
+        throw new IllegalArgumentException("The 'offsets' cannot be 'null'.");
+      }
+      List<PartitionOffset> sortedOffsets = new LinkedList<>(offsets);
+
+      Collections.sort(sortedOffsets);
+
+      this.offsets = Collections.unmodifiableList(sortedOffsets);
+
+      return this;
+    }
+
+    /**
+     * Builds an instance.
+     *
+     * @return a built instance
+     * @throws IllegalStateException if the {@link #setAsOfTime(long) asOfTime} is not set or the specified
+     *                               {@link #setOffsets(List) offsets} contains duplicate entries for a topic partition.
+     */
+    public Offsets build() {
+      if (asOf < 0) {
+        throw new IllegalStateException("The 'asOfTime' cannot be less than 0.");
+      }
+
+      Set<String> uniqueTopicPartitions = new HashSet<>();
+      for(PartitionOffset partitionOffset : offsets){
+        uniqueTopicPartitions.add(partitionOffset.getTopic()+partitionOffset.getPartition());
+      }
+
+      if (uniqueTopicPartitions.size() != offsets.size()) {
+        throw new IllegalStateException("The 'offsets' contains duplicate entries for a topic and partition.");
+      }
+
+      return new Offsets(asOf, offsets);
+    }
+  }
+
+
+  /**
+   * Simple object that represents a specific topic, partition, and its offset value.
+   */
+  @JsonDeserialize(builder = PartitionOffset.Builder.class)
+  @JsonInclude(JsonInclude.Include.NON_NULL)
+  public static class PartitionOffset implements Comparable<PartitionOffset> {
+
+    private final String topic;
+    private final int partition;
+    private final long offset;
+
+    private PartitionOffset(String topic, int partition, long offset) {
+      this.topic = topic;
+      this.partition = partition;
+      this.offset = offset;
+    }
+
+    /**
+     * Returns the topic
+     *
+     * @return the topic
+     */
+    public String getTopic() {
+      return topic;
+    }
+
+    /**
+     * Returns the partition
+     *
+     * @return the partition
+     */
+    public int getPartition() {
+      return partition;
+    }
+
+    /**
+     * Returns the offset
+     *
+     * @return the offset
+     */
+    public long getOffset() {
+      return offset;
+    }
+
+    @Override
+    public int compareTo(PartitionOffset other) {
+      int compare = topic.compareTo(other.topic);
+      if (compare == 0) {
+        compare = Integer.compare(partition, other.partition);
+        if (compare == 0) {
+          return Long.compare(offset, other.offset);
+        }
+      }
+      return compare;
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null) {
+        return false;
+      }
+
+      if (obj instanceof PartitionOffset) {
+        PartitionOffset that = (PartitionOffset) obj;
+
+        return compareTo(that) == 0;
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(topic, partition, offset);
+    }
+
+    /**
+     * Builder for {@link PartitionOffset}
+     */
+    @JsonIgnoreProperties(ignoreUnknown = true)
+    public static class Builder {
+
+      private String topic;
+      private int partition = -1;
+      private long offset = OffsetRequest.EarliestTime();
+
+      /**
+       * Creates a new builder instance.
+       *
+       * @return a new builder instance.
+       */
+      public static Builder newBuilder() {
+        return new Builder();
+      }
+
+      /**
+       * Set the {@code topic} for the partition offset being built
+       *
+       * @param topic the topic for the partition offset being built.
+       * @return builder instance
+       * @throws IllegalArgumentException if the {@code topic} is {@code null} or empty.
+       */
+      @JsonProperty("topic")
+      public Builder setTopic(String topic) {
+        if (StringUtils.isBlank(topic)) {
+          throw new IllegalArgumentException("The 'topic' cannot be null or empty.");
+        }
+        this.topic = topic;
+        return this;
+      }
+
+      /**
+       * Set the {@code partition} for the partition offset being built
+       *
+       * @param partition the partition for the partition offset being built.
+       * @return builder instance
+       * @throws IllegalArgumentException if the {@code partition} is less than 0.
+       */
+      @JsonProperty("partition")
+      public Builder setPartition(int partition) {
+        if (partition < 0) {
+          throw new IllegalArgumentException("The 'partition' cannot be less than 0.");
+        }
+        this.partition = partition;
+        return this;
+      }
+
+      /**
+       * Set the {@code offset} for the partition offset being built.  If the {@code offset} is not
+       * set then it defaults to {@link OffsetRequest#EarliestTime()}.
+       *
+       * @param offset the topic for the partition offset being built.
+       * @return builder instance
+       */
+      @JsonProperty("offset")
+      public Builder setOffset(long offset) {
+        this.offset = offset;
+        return this;
+      }
+
+      /**
+       * Builds a PartitionOffset instance.
+       *
+       * @return the built PartitionOffset instance.
+       * @throws IllegalStateException if the {@code topic} or {@code partition} are never set or configured
+       *                               to invalid values.
+       */
+      public PartitionOffset build() {
+        if (StringUtils.isBlank(topic)) {
+          throw new IllegalStateException("The 'topic' cannot be null or empty.");
+        }
+
+        if (partition < 0) {
+          throw new IllegalStateException("The 'partition' cannot be less than 0.");
+        }
+
+        return new PartitionOffset(topic, partition, offset);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java
new file mode 100644
index 0000000..faead74
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetReaderTest.java
@@ -0,0 +1,220 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.crunch.kafka.offset.OffsetReader;
+import org.apache.crunch.kafka.offset.OffsetWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+
+import static org.hamcrest.CoreMatchers.nullValue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class HDFSOffsetReaderTest {
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Path basePath;
+  private FileSystem fileSystem;
+  private OffsetWriter writer;
+  private OffsetReader reader;
+
+
+  @Before
+  public void setup() throws IOException {
+    Configuration config = new Configuration();
+    config.set(FileSystem.DEFAULT_FS, tempFolder.newFolder().getAbsolutePath());
+
+    fileSystem = FileSystem.newInstance(config);
+    basePath = new Path(tempFolder.newFolder().toString(), testName.getMethodName());
+
+    writer = new HDFSOffsetWriter(config, basePath);
+
+    reader = new HDFSOffsetReader(config, basePath);
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    writer.close();
+    reader.close();
+    fileSystem.close();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void constructNullConfig() {
+    new HDFSOffsetReader(null, new Path("/"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void constructNullPath() {
+    new HDFSOffsetReader(new Configuration(), null);
+  }
+
+  @Test
+  public void getStoredOffsetPersistenceTimesNoValues() throws IOException {
+    List<Long> storedOffsetPersistenceTimes = reader.getStoredOffsetPersistenceTimes();
+    assertThat(storedOffsetPersistenceTimes, is(Collections.<Long>emptyList()));
+  }
+
+  @Test
+  public void getStoredOffsetPersistenceTimesMultipleValues() throws IOException {
+    long current = 1464992662000L;
+    List<Long> persistedTimes = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      persistedTimes.add(current + (i * 18000));
+    }
+
+    for (Long t : persistedTimes) {
+      try {
+        writer.write(t, Collections.<TopicPartition, Long>emptyMap());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes();
+
+    assertThat(storedTimes, is(persistedTimes));
+  }
+
+  @Test
+  public void readOffsetNoMatchForTime() throws IOException {
+    Map<TopicPartition, Long> offsets = reader.readOffsets(12345L);
+    assertThat(offsets, is(nullValue()));
+  }
+
+  @Test
+  public void readOffsetLatestNone() throws IOException {
+    assertThat(reader.readLatestOffsets(), is(Collections.<TopicPartition, Long>emptyMap()));
+  }
+
+  @Test
+  public void readOffsetLatest() throws IOException {
+    long current = 1464992662000L;
+    List<Long> persistedTimes = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      persistedTimes.add(current + (i * 18000));
+    }
+
+    for (Long t : persistedTimes) {
+      try {
+        writer.write(t, Collections.<TopicPartition, Long>emptyMap());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    long expectedTime = persistedTimes.get(persistedTimes.size() - 1);
+
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    for (int i = 0; i < 9; i++) {
+      for (int j = 0; j < 5; j++) {
+        offsets.put(new TopicPartition("topic" + i, j), (long) j);
+      }
+    }
+
+    writer.write(expectedTime, offsets);
+
+    Map<TopicPartition, Long> retrievedOffsets = reader.readLatestOffsets();
+
+    assertThat(retrievedOffsets, is(offsets));
+  }
+
+
+  @Test
+  public void readOffsetForTime() throws IOException {
+    long current = 1464992662000L;
+    List<Long> persistedTimes = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      persistedTimes.add(current + (i * 18000));
+    }
+    for (Long t : persistedTimes) {
+      try {
+        writer.write(t, Collections.<TopicPartition, Long>emptyMap());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    long expectedTime = persistedTimes.get(2);
+
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+    for (int i = 0; i < 9; i++) {
+      for (int j = 0; j < 5; j++) {
+        offsets.put(new TopicPartition("topic" + i, j), (long) j);
+      }
+    }
+
+    writer.write(expectedTime, offsets);
+
+    Map<TopicPartition, Long> retrievedOffsets = reader.readOffsets(expectedTime);
+
+    assertThat(retrievedOffsets, is(offsets));
+  }
+
+
+  @Test
+  public void skipReadingDirectory() throws IOException {
+    long current = 1464992662000L;
+    List<Long> persistedTimes = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      persistedTimes.add(current + (i * 18000));
+    }
+
+    for (Long t : persistedTimes) {
+      try {
+        writer.write(t, Collections.<TopicPartition, Long>emptyMap());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+    fileSystem.mkdirs(new Path(basePath, "imadirectory"));
+
+    List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes();
+
+    assertThat(storedTimes, is(persistedTimes));
+  }
+
+  @Test
+  public void skipInvalidFile() throws IOException {
+    long current = 1464992662000L;
+    List<Long> persistedTimes = new LinkedList<>();
+    for (int i = 0; i < 10; i++) {
+      persistedTimes.add(current + (i * 18000));
+    }
+
+    for (Long t : persistedTimes) {
+      try {
+        writer.write(t, Collections.<TopicPartition, Long>emptyMap());
+      } catch (IOException e) {
+        e.printStackTrace();
+      }
+    }
+
+    fileSystem.createNewFile(new Path(basePath, "imabadfile.json"));
+    fileSystem.createNewFile(new Path(basePath, "imabadfile.txt"));
+
+    List<Long> storedTimes = reader.getStoredOffsetPersistenceTimes();
+
+    assertThat(storedTimes, is(persistedTimes));
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java
new file mode 100644
index 0000000..70922d8
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/HDFSOffsetWriterTest.java
@@ -0,0 +1,185 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class HDFSOffsetWriterTest {
+
+  private static final ObjectMapper MAPPER = new ObjectMapper();
+
+  @Rule
+  public TemporaryFolder tempFolder = new TemporaryFolder();
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private Configuration config;
+
+  private Path basePath;
+  private FileSystem fileSystem;
+  private HDFSOffsetWriter writer;
+
+
+  @Before
+  public void setup() throws IOException {
+    config = new Configuration();
+    config.set(FileSystem.DEFAULT_FS, tempFolder.newFolder().getAbsolutePath());
+
+    fileSystem = FileSystem.newInstance(config);
+    basePath = new Path(tempFolder.newFolder().toString(), testName.getMethodName());
+
+    writer = new HDFSOffsetWriter(config, basePath);
+  }
+
+  @After
+  public void cleanup() throws IOException {
+    writer.close();
+    fileSystem.close();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void constructNullConfig() {
+    new HDFSOffsetWriter(null, new Path("/"));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void constructNullPath() {
+    new HDFSOffsetWriter(new Configuration(), null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void writeNullOffsets() throws IOException {
+    writer.write(10L, null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void writeNullInvalidAsOfTime() throws IOException {
+    writer.write(-1L, Collections.<TopicPartition, Long>emptyMap());
+  }
+
+  @Test
+  public void writeEmptyOffsets() throws IOException {
+    long persistTime = System.currentTimeMillis();
+    Map<TopicPartition, Long> offsets = Collections.emptyMap();
+
+    writer.write(persistTime, offsets);
+
+    Path expectedPath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, persistTime);
+
+    try (InputStream in = fileSystem.open(expectedPath)) {
+      Offsets persistedOffsets = MAPPER.readValue(in, Offsets.class);
+      assertThat(persistedOffsets.getAsOfTime(), is(persistTime));
+      assertThat(persistedOffsets.getOffsets(), is(Collections.<Offsets.PartitionOffset>emptyList()));
+    }
+  }
+
+  @Test
+  public void writeOffsets() throws IOException {
+    long persistTime = System.currentTimeMillis();
+    Map<TopicPartition, Long> offsets = new HashMap<>();
+
+    for (int i = 0; i < 9; i++) {
+      for (int j = 0; j < 5; j++) {
+        offsets.put(new TopicPartition("topic" + i, j), (long) j);
+      }
+
+    }
+
+    writer.write(persistTime, offsets);
+
+    Path expectedPath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, persistTime);
+
+    try (InputStream in = fileSystem.open(expectedPath)) {
+      Offsets persistedOffsets = MAPPER.readValue(in, Offsets.class);
+      assertThat(persistedOffsets.getAsOfTime(), is(persistTime));
+      assertThat(persistedOffsets.getOffsets().size(), is(offsets.size()));
+
+      Iterator<Offsets.PartitionOffset> partitionOffsets = persistedOffsets.getOffsets().iterator();
+      for (int i = 0; i < 9; i++) {
+        for (int j = 0; j < 5; j++) {
+          assertTrue(partitionOffsets.hasNext());
+          Offsets.PartitionOffset partitionOffset = partitionOffsets.next();
+          assertThat(partitionOffset.getPartition(), is(j));
+          assertThat(partitionOffset.getOffset(), is((long) j));
+          assertThat(partitionOffset.getTopic(), is("topic" + i));
+        }
+      }
+    }
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void getPersistedStoragePathNullBase() {
+    HDFSOffsetWriter.getPersistedTimeStoragePath(null, 10L);
+  }
+
+  @Test
+  public void getPersistedStoragePath() {
+    //Timestamp of 02 Jun 2016 20:12:17 GMT
+    //2016-06-02T20:12:17Z
+    long timestamp = 1464898337000L;
+
+    String expectedFileName = HDFSOffsetWriter.FILE_FORMATTER.print(timestamp)
+        + HDFSOffsetWriter.FILE_FORMAT_EXTENSION;
+    Path filePath = HDFSOffsetWriter.getPersistedTimeStoragePath(basePath, timestamp);
+
+    assertThat(filePath, is(new Path(basePath, expectedFileName)));
+  }
+
+  @Test
+  public void timeToFileName() {
+    //Timestamp of 02 Jun 2016 20:12:17 GMT
+    //2016-06-02T20:12:17Z
+    long timestamp = 1464898337000L;
+
+    String expectedFileName = "2016-06-02T20-12-17+0000" + HDFSOffsetWriter.FILE_FORMAT_EXTENSION;
+
+    assertThat(HDFSOffsetWriter.persistenceTimeToFileName(timestamp), is(expectedFileName));
+  }
+
+  @Test
+  public void fileNameToTime() {
+    //Timestamp of 02 Jun 2016 20:12:17 GMT
+    //2016-06-02T20:12:17Z
+    long timestamp = 1464898337000L;
+
+    String expectedFileName = "2016-06-02T20-12-17+0000" + HDFSOffsetWriter.FILE_FORMAT_EXTENSION;
+
+    assertThat(HDFSOffsetWriter.fileNameToPersistenceTime(expectedFileName), is(timestamp));
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void fileNameToTimeNullFileName() {
+    HDFSOffsetWriter.fileNameToPersistenceTime(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void fileNameToTimeEmptyFileName() {
+    HDFSOffsetWriter.fileNameToPersistenceTime("");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void fileNameToTimeInvalidFileName() {
+    HDFSOffsetWriter.fileNameToPersistenceTime("2016-06-02T20:12:17.000Z.json");
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
----------------------------------------------------------------------
diff --git a/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
new file mode 100644
index 0000000..e976da8
--- /dev/null
+++ b/crunch-kafka/src/test/java/org/apache/crunch/kafka/offset/hdfs/OffsetsTest.java
@@ -0,0 +1,418 @@
+package org.apache.crunch.kafka.offset.hdfs;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import kafka.api.OffsetRequest;
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.junit.BeforeClass;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.LinkedList;
+import java.util.List;
+
+import static org.hamcrest.Matchers.greaterThan;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.core.Is.is;
+import static org.junit.Assert.assertThat;
+
+public class OffsetsTest {
+
+  @Rule
+  public TestName testName = new TestName();
+
+  private static ObjectMapper mapper;
+
+  @BeforeClass
+  public static void setup() {
+    mapper = new ObjectMapper();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void buildOffsetNullOffsets() {
+    Offsets.Builder.newBuilder().setOffsets(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void buildInvalidAsOfTime() {
+    Offsets.Builder.newBuilder().setAsOfTime(-1);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void buildNoAsOfTime() {
+    Offsets.Builder.newBuilder().build();
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void buildPartitionNullTopic() {
+    Offsets.PartitionOffset.Builder.newBuilder().setTopic(null);
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void buildPartitionEmptyTopic() {
+    Offsets.PartitionOffset.Builder.newBuilder().setTopic(" ");
+  }
+
+  @Test(expected = IllegalArgumentException.class)
+  public void buildPartitionInvalidPartition() {
+    Offsets.PartitionOffset.Builder.newBuilder().setPartition(-1);
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void buildPartitionNoTopicSet() {
+    Offsets.PartitionOffset.Builder.newBuilder().setPartition(10).setOffset(10L).build();
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void buildPartitionNoPartitionSet() {
+    Offsets.PartitionOffset.Builder.newBuilder().setTopic(testName.getMethodName()).setOffset(10L).build();
+  }
+
+  @Test
+  public void buildPartitionOffset() {
+    Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset.getOffset(), is(10L));
+    assertThat(partitionOffset.getPartition(), is(1));
+    assertThat(partitionOffset.getTopic(), is(testName.getMethodName()));
+  }
+
+  @Test
+  public void buildPartitionOffsetNoOffsetSet() {
+    Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setPartition(1).build();
+
+    assertThat(partitionOffset.getOffset(), is(OffsetRequest.EarliestTime()));
+    assertThat(partitionOffset.getPartition(), is(1));
+    assertThat(partitionOffset.getTopic(), is(testName.getMethodName()));
+  }
+
+  @Test
+  public void partitionOffsetSame() {
+    Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset.equals(partitionOffset), is(true));
+    assertThat(partitionOffset.compareTo(partitionOffset), is(0));
+  }
+
+  @Test
+  public void partitionOffsetEqual() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(true));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(0));
+  }
+
+  @Test
+  public void partitionOffsetNotEqualDiffTopic() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic("abc").setOffset(10L).setPartition(1).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0)));
+  }
+
+  @Test
+  public void partitionOffsetNotEqualDiffPartition() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(0).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0)));
+  }
+
+  @Test
+  public void partitionOffsetNotEqualDiffOffset() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(9L).setPartition(1).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(lessThan(0)));
+  }
+
+
+  @Test
+  public void partitionOffsetNotEqualDiffGreaterTopic() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic("abc").setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0)));
+  }
+
+  @Test
+  public void partitionOffsetNotEqualDiffGreaterPartition() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(2).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0)));
+  }
+
+  @Test
+  public void partitionOffsetNotEqualDiffGreaterOffset() {
+    Offsets.PartitionOffset partitionOffset1 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(12L).setPartition(1).build();
+
+    Offsets.PartitionOffset partitionOffset2 = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(10L).setPartition(1).build();
+
+    assertThat(partitionOffset1.equals(partitionOffset2), is(false));
+    assertThat(partitionOffset1.compareTo(partitionOffset2), is(greaterThan(0)));
+  }
+
+  @Test
+  public void jsonSerializationPartitionOffset() throws IOException {
+    Offsets.PartitionOffset partitionOffset = Offsets.PartitionOffset.Builder.newBuilder()
+        .setTopic(testName.getMethodName()).setOffset(12L).setPartition(1).build();
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    mapper.writeValue(baos, partitionOffset);
+
+    Offsets.PartitionOffset readOffset = mapper.readValue(baos.toByteArray(), Offsets.PartitionOffset.class);
+
+    assertThat(readOffset, is(partitionOffset));
+  }
+
+
+  @Test
+  public void buildOffsetsNoOffsets() {
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+    assertThat(offsets.getOffsets(), is(Collections.<Offsets.PartitionOffset>emptyList()));
+  }
+
+  @Test
+  public void buildOffsetsSortOffsets() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    List<Offsets.PartitionOffset> returnedOffsets = offsets.getOffsets();
+    int count = 1;
+
+    //iterate in the expected order
+    for (Offsets.PartitionOffset o : returnedOffsets) {
+      assertThat(o.getTopic(), is("topic" + count));
+      assertThat(o.getPartition(), is(partition));
+      assertThat(o.getOffset(), is(offset));
+      count++;
+    }
+    assertThat(count, is(10));
+  }
+
+  @Test
+  public void offsetsSame() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets), is(true));
+  }
+
+  @Test
+  public void offsetsEqual() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets2), is(true));
+  }
+
+  @Test
+  public void offsetsDiffAsOfTime() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(11).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets2), is(false));
+  }
+
+  @Test
+  public void offsetsDiffOffsets() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>();
+    for(int i = 0; i < 5; i++){
+      secondOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets2), is(false));
+  }
+
+  @Test(expected = IllegalStateException.class)
+  public void offsetsDuplicates() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic9").setPartition(0).build());
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+  }
+
+  @Test
+  public void offsetsDiffListInstances() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      reversedOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+    List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>();
+    for(int i = 0; i < 9; i++){
+      secondOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + (9 - i))
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets2), is(true));
+  }
+
+  @Test
+  public void offsetsEqualEmptyOffsets() {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> reversedOffsets = new LinkedList<>();
+
+    List<Offsets.PartitionOffset> secondOffsets = new LinkedList<>();
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(reversedOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    Offsets offsets2 = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(secondOffsets).build();
+    assertThat(offsets.getAsOfTime(), is(10L));
+
+    assertThat(offsets.equals(offsets2), is(true));
+  }
+
+  @Test
+  public void jsonSerializationOffsets() throws IOException {
+    int partition = 0;
+    long offset = 10L;
+
+    List<Offsets.PartitionOffset> partitionOffsets = new LinkedList<>();
+    for(int i = 0; i < 100; i++){
+      partitionOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setTopic("topic" + i)
+          .setPartition(partition).setOffset(offset).build());
+    }
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10).setOffsets(partitionOffsets).build();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    mapper.writeValue(baos, offsets);
+
+    Offsets readOffsets = mapper.readValue(baos.toByteArray(), Offsets.class);
+
+    assertThat(readOffsets, is(offsets));
+  }
+
+  @Test
+  public void jsonSerializationOffsetsEmpty() throws IOException {
+    int partition = 0;
+    long offset = 10L;
+
+
+    Offsets offsets = Offsets.Builder.newBuilder().setAsOfTime(10)
+        .setOffsets(Collections.<Offsets.PartitionOffset>emptyList()).build();
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    mapper.writeValue(baos, offsets);
+
+    Offsets readOffsets = mapper.readValue(baos.toByteArray(), Offsets.class);
+
+    assertThat(readOffsets, is(offsets));
+  }
+}

http://git-wip-us.apache.org/repos/asf/crunch/blob/157ae25b/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 78ea085..47e58ba 100644
--- a/pom.xml
+++ b/pom.xml
@@ -90,6 +90,7 @@ under the License.
     <parquet.version>1.8.1</parquet.version>
     <javassist.version>3.16.1-GA</javassist.version>
     <jackson.version>1.8.8</jackson.version>
+    <jackson.databind.version>2.6.1</jackson.databind.version>
     <protobuf-java.version>2.5.0</protobuf-java.version>
     <libthrift.version>0.8.0</libthrift.version>
     <slf4j.version>1.6.1</slf4j.version>
@@ -298,6 +299,24 @@ under the License.
       </dependency>
 
       <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-databind</artifactId>
+        <version>${jackson.databind.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>com.fasterxml.jackson.core</groupId>
+        <artifactId>jackson-annotations</artifactId>
+        <version>${jackson.databind.version}</version>
+      </dependency>
+
+      <dependency>
+        <groupId>joda-time</groupId>
+        <artifactId>joda-time</artifactId>
+        <version>2.8.1</version>
+      </dependency>
+
+      <dependency>
         <groupId>com.google.protobuf</groupId>
         <artifactId>protobuf-java</artifactId>
         <version>${protobuf-java.version}</version>


Mime
View raw message