apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From hsy...@apache.org
Subject [1/5] apex-malhar git commit: APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator
Date Thu, 07 Jul 2016 17:01:39 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master c4a11299b -> cc9d50366


APEXMALHAR-2076 #resolve #comment add AbstractTupleUniqueExactlyOnceKafkaOutputOperator


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/33a5c2ec
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/33a5c2ec
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/33a5c2ec

Branch: refs/heads/master
Commit: 33a5c2ec95c9ee6f33023ea4ae82d156a140cb25
Parents: 72de840
Author: brightchen <bright@datatorrent.com>
Authored: Wed May 25 13:01:26 2016 -0700
Committer: brightchen <bright@datatorrent.com>
Committed: Thu Jun 2 11:18:21 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaOutputOperator.java      |   2 +-
 ...pleUniqueExactlyOnceKafkaOutputOperator.java | 610 +++++++++++++++++++
 .../contrib/kafka/KafkaMetadataUtil.java        | 121 +++-
 .../datatorrent/contrib/kafka/KafkaUtil.java    | 358 +++++++++++
 ...upleUniqueExactlyOnceOutputOperatorTest.java | 512 ++++++++++++++++
 .../contrib/kafka/KafkaUtilTester.java          | 128 ++++
 contrib/src/test/resources/log4j.properties     |   1 +
 7 files changed, 1713 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
index f0835c4..8003669 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractKafkaOutputOperator.java
@@ -100,7 +100,7 @@ public abstract class AbstractKafkaOutputOperator<K, V> implements Operator
 
     return new ProducerConfig(configProperties);
   }
-
+  
   public Producer<K, V> getProducer()
   {
     return producer;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
new file mode 100644
index 0000000..15fea37
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/AbstractTupleUniqueExactlyOnceKafkaOutputOperator.java
@@ -0,0 +1,610 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.classification.InterfaceStability;
+
+import com.google.common.base.Splitter;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.common.util.Pair;
+
+import kafka.api.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringDecoder;
+
+/**
+ * Assumptions: - assume the value of incoming tuples are not duplicate(at least
+ * in one window) among all operator partitions. - assume one Kafka partition
+ * can be written by multiple operator partitions at the same time - assume the
+ * the Kafka partition was decided by tuple value itself( not depended on
+ * operator partition)
+ * 
+ * Notes: - the order of data could be changed when replay. - the data could go
+ * to the other partition when replay. For example if the upstream operator
+ * failed.
+ * 
+ * Implementation: for each Kafka partition, load minimum last window and the
+ * minimum offset of the last window of all operator partitions. And then load
+ * the tuples from Kafka based on this minimum offset. When processing tuple, if
+ * the window id is less than the minimum last window, just ignore the tuple. If
+ * window id equals loaded minimum window id, and tuple equals any of loaded
+ * tuple, ignore it. Else, send to Kafka
+ * 
+ * @displayName Abstract Tuple Unique Exactly Once Kafka Output
+ * @category Messaging
+ * @tags output operator
+ */
+@InterfaceStability.Evolving
+public abstract class AbstractTupleUniqueExactlyOnceKafkaOutputOperator<T, K, V>
+    extends AbstractKafkaOutputOperator<K, V>
+{
+  public static final String DEFAULT_CONTROL_TOPIC = "ControlTopic";
+  protected transient int partitionNum = 1;
+
+  /**
+   * allow client set the partitioner as partitioner may need some attributes
+   */
+  protected kafka.producer.Partitioner partitioner;
+
+  protected transient int operatorPartitionId;
+
+  protected String controlTopic = DEFAULT_CONTROL_TOPIC;
+
+  //The control info includes the time, use this time to track the head of control info we care.
+  protected int controlInfoTrackBackTime = 120000;
+
+  /**
+   * max number of offset need to check
+   */
+  protected int maxNumOffsetsOfControl = 1000;
+
+  protected String controlProducerProperties;
+  protected Set<String> brokerSet;
+  
+  protected transient long currentWindowId;
+
+  /**
+   * the map from Kafka partition id to the control offset. this one is
+   * checkpointed and as the start offset to load the recovery control
+   * information Note: this only keep the information of this operator
+   * partition.
+   */
+  protected transient Map<Integer, Long> partitionToLastControlOffset = Maps.newHashMap();
+
+  /**
+   * keep the minimal last window id for recovery. If only one partition
+   * crashed, it is ok just use the last window id of this operator partition as
+   * the recovery window id If all operator partitions crashed, should use the
+   * minimal last window id as the recovery window id, as the data may go to the
+   * other partitions. But as the operator can't distinguish which is the case.
+   * use the most general one.
+   */
+  protected transient long minRecoveryWindowId = -2;
+  protected transient long maxRecoveryWindowId = -2;
+
+  /**
+   * A map from Kafka partition id to lastMessages writtten to this kafka
+   * partition. This information was loaded depends on the
+   * RecoveryControlInfo.kafkaPartitionIdToOffset
+   */
+  protected transient Map<Integer, List<Pair<byte[], byte[]>>> partitionToLastMsgs = Maps.newHashMap();
+
+  /**
+   * The messages are assume to written to the kafka partition decided by
+   * tupleToKeyValue(T tuple) and partitioner. But it also depended on the
+   * system. for example, it could be only one partition when create topic.
+   * Don't distinguish kafka partitions if partition is not reliable.
+   */
+  protected transient Set<Pair<byte[], byte[]>> totalLastMsgs = Sets.newHashSet();
+
+  protected transient RecoveryControlInfo controlInfo = new RecoveryControlInfo();
+  protected transient Producer<String, String> controlDataProducer;
+  protected transient StringDecoder controlInfoDecoder;
+
+  @Override
+  public void setup(OperatorContext context)
+  {
+    getBrokerSet();
+    
+    super.setup(context);
+    controlInfoDecoder = new StringDecoder(null);
+
+    operatorPartitionId = context.getId();
+
+    controlDataProducer = new Producer<String, String>(createKafkaControlProducerConfig());
+
+    if (partitioner == null) {
+      createDefaultPartitioner();
+    }
+
+    loadControlData();
+  }
+
+  @Override
+  public void beginWindow(long windowId)
+  {
+    currentWindowId = windowId;
+  }
+
+  /**
+   * Implement Operator Interface.
+   */
+  @Override
+  public void endWindow()
+  {
+    //we'd better flush the cached tuples, but Kafka 0.8.1 doesn't support flush.
+    //keep the control information of this operator partition to control topic
+    saveControlData();
+  }
+
+  protected void createDefaultPartitioner()
+  {
+    try {
+      String className = (String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_PARTITIONER);
+      if (className != null) {
+        partitioner = (kafka.producer.Partitioner)Class.forName(className).newInstance();
+      }
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to initialize partitioner", e);
+    }
+  }
+
+  /**
+   * load control data OUTPUT: lastMsgs and partitionToMinLastWindowId
+   */
+  protected void loadControlData()
+  {
+    long loadDataTime = System.currentTimeMillis();
+
+    final String clientNamePrefix = getClientNamePrefix();
+    Map<Integer, SimpleConsumer> consumers = KafkaUtil.createSimpleConsumers(clientNamePrefix, brokerSet, controlTopic);
+    if (consumers == null || consumers.size() != 1) {
+      logger.error("The consumer for recovery information was not expected. {}", consumers);
+      return;
+    }
+    final SimpleConsumer consumer = consumers.get(0);
+    if (consumer == null) {
+      logger.error("No consumer for recovery information.");
+      return;
+    }
+
+    long latestOffset = KafkaMetadataUtil.getLastOffset(consumer, controlTopic, 0, OffsetRequest.LatestTime(),
+        KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0));
+    logger.debug("latestOffsets: {}", latestOffset);
+    if (latestOffset <= 0) {
+      return;
+    }
+
+    int batchMessageSize = 100;
+    List<Pair<byte[], byte[]>> messages = Lists.newArrayList();
+
+    boolean isControlMessageEnough = false;
+    Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo = Maps.newHashMap();
+
+    while (latestOffset > 0 && !isControlMessageEnough) {
+      long startOffset = latestOffset - batchMessageSize + 1;
+      if (startOffset < 0) {
+        startOffset = 0;
+      }
+
+      //read offsets as batch and handle them.
+      messages.clear();
+      KafkaUtil.readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, controlTopic, 0),
+          controlTopic, 0, startOffset, latestOffset - 1, messages, 3);
+      for (Pair<byte[], byte[]> message : messages) {
+        //handle the message; we have to handle all the message.
+        RecoveryControlInfo rci = RecoveryControlInfo.fromString((String)controlInfoDecoder.fromBytes(message.second));
+        isControlMessageEnough = (loadControlInfoIntermedia(rci, loadDataTime,
+            operatorPartitionIdToLastControlInfo) == 0);
+
+        if (isControlMessageEnough) {
+          break;
+        }
+      }
+
+      latestOffset = startOffset - 1;
+    }
+
+    loadRecoveryWindowId(operatorPartitionIdToLastControlInfo);
+    loadLastMessages(operatorPartitionIdToLastControlInfo);
+  }
+
+  /**
+   * load the recovery window id. right now use the minimal window id as the
+   * recovery window id Different Operator partitions maybe crashed at different
+   * window. use the minimal window of all operator partitions as the window for
+   * recovery.
+   * 
+   * @param operatorPartitionIdToLastWindowId
+   */
+  protected void loadRecoveryWindowId(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+  {
+    for (RecoveryControlInfo rci : operatorPartitionIdToLastControlInfo.values()) {
+      if (minRecoveryWindowId < 0 || rci.windowId < minRecoveryWindowId) {
+        minRecoveryWindowId = rci.windowId;
+      }
+      if (maxRecoveryWindowId < 0 || rci.windowId > maxRecoveryWindowId) {
+        maxRecoveryWindowId = rci.windowId;
+      }
+    }
+  }
+
+  /**
+   * load control information from intermedia to
+   * 
+   * @param operatorPartitionIdToLastWindowId
+   * @param operatorToKafkaToOffset
+   */
+  protected void loadLastMessages(Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+  {
+    partitionToLastControlOffset.clear();
+
+    for (Map.Entry<Integer, RecoveryControlInfo> entry : operatorPartitionIdToLastControlInfo.entrySet()) {
+      RecoveryControlInfo rci = entry.getValue();
+      if (rci.windowId == this.minRecoveryWindowId) {
+        //get the minimal offset
+        for (Map.Entry<Integer, Long> kafkaPartitionEntry : rci.kafkaPartitionIdToOffset.entrySet()) {
+          Long offset = partitionToLastControlOffset.get(kafkaPartitionEntry.getKey());
+          if (offset == null || offset > kafkaPartitionEntry.getValue()) {
+            partitionToLastControlOffset.put(kafkaPartitionEntry.getKey(), kafkaPartitionEntry.getValue());
+          }
+        }
+      }
+    }
+
+    partitionToLastMsgs.clear();
+
+    KafkaUtil.readMessagesAfterOffsetTo(getClientNamePrefix(), brokerSet, getTopic(), partitionToLastControlOffset,
+        partitionToLastMsgs);
+
+    loadTotalLastMsgs();
+  }
+
+  /**
+   * load Total Last Messages from partitionToLastMsgs;
+   */
+  protected void loadTotalLastMsgs()
+  {
+    totalLastMsgs.clear();
+    if (partitionToLastMsgs == null || partitionToLastMsgs.isEmpty()) {
+      return;
+    }
+    for (List<Pair<byte[], byte[]>> msgs : partitionToLastMsgs.values()) {
+      totalLastMsgs.addAll(msgs);
+    }
+  }
+
+  protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime,
+      Map<Integer, RecoveryControlInfo> operatorPartitionIdToLastControlInfo)
+  {
+    if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) {
+      return 0;
+    }
+
+    //The record should be in ascent order, so the later should override the previous
+    operatorPartitionIdToLastControlInfo.put(controlInfo.partitionIdOfOperator, controlInfo);
+
+    return 1;
+  }
+
+  /**
+   * Current implementation we can get the number of operator partitions. So we
+   * we use the controlInfoTrackBackTime to control the trace back of control
+   * information.
+   * 
+   * @param controlInfo
+   * @param loadDataTime
+   * @param operatorPartitionIdToLastWindowId
+   * @param operatorToKafkaToOffset
+   * @return 0 if control information is enough and don't need to load any more
+   */
+  protected int loadControlInfoIntermedia(RecoveryControlInfo controlInfo, long loadDataTime,
+      Map<Integer, Long> operatorPartitionIdToLastWindowId, Map<Integer, Map<Integer, Long>> operatorToKafkaToOffset)
+  {
+    if (controlInfo.generateTime + controlInfoTrackBackTime < loadDataTime) {
+      return 0;
+    }
+
+    //The record should be in ascent order, so the later should override the previous
+    operatorPartitionIdToLastWindowId.put(controlInfo.partitionIdOfOperator, controlInfo.windowId);
+    operatorToKafkaToOffset.put(controlInfo.partitionIdOfOperator, controlInfo.kafkaPartitionIdToOffset);
+
+    return 1;
+  }
+
+  /**
+   * save the control data. each operator partition only save its control data
+   */
+  protected void saveControlData()
+  {
+    controlInfo.generateTime = System.currentTimeMillis();
+    controlInfo.partitionIdOfOperator = operatorPartitionId;
+    controlInfo.windowId = this.currentWindowId;
+    if (controlInfo.kafkaPartitionIdToOffset == null) {
+      controlInfo.kafkaPartitionIdToOffset = Maps.newHashMap();
+    } else {
+      controlInfo.kafkaPartitionIdToOffset.clear();
+    }
+    KafkaMetadataUtil.getLastOffsetsTo(getClientNamePrefix(), brokerSet, getTopic(),
+        controlInfo.kafkaPartitionIdToOffset);
+
+    //send to control topic
+    controlDataProducer.send(new KeyedMessage<String, String>(getControlTopic(), null, 0, controlInfo.toString()));
+  }
+
+  protected String getClientNamePrefix()
+  {
+    return getClass().getName().replace('$', '.');
+  }
+
+
+  protected Set<String> getBrokerSet()
+  {
+    if (brokerSet == null) {
+      brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+    }
+    return brokerSet;
+  }
+
+  /**
+   * This input port receives tuples that will be written out to Kafka.
+   */
+  public final transient DefaultInputPort<T> inputPort = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      processTuple(tuple);
+    }
+  };
+
+  /**
+   * separate to a method to give sub-class chance to override
+   * 
+   * @param tuple
+   */
+  protected void processTuple(T tuple)
+  {
+    Pair<K, V> keyValue = tupleToKeyValue(tuple);
+    final int pid = getPartitionKey(keyValue.first);
+
+    if (!skipTuple(pid, keyValue)) {
+      getProducer().send(new KeyedMessage<K, V>(getTopic(), keyValue.first, pid, keyValue.second));
+      sendCount++;
+    }
+  }
+
+  protected boolean skipTuple(int partitionId, Pair<K, V> msg)
+  {
+    if (currentWindowId <= minRecoveryWindowId) {
+      return true;
+    }
+    if (currentWindowId > maxRecoveryWindowId + 1) {
+      return false;
+    }
+
+    return isDuplicateTuple(partitionId, msg);
+  }
+
+  protected boolean isDuplicateTuple(int partitionId, Pair<K, V> msg)
+  {
+    Collection<Pair<byte[], byte[]>> lastMsgs = partitionToLastMsgs.get(partitionId);
+
+    //check depended on the partition only
+    if (lastMsgs == null || lastMsgs.isEmpty()) {
+      lastMsgs = totalLastMsgs;
+    }
+
+    for (Pair<byte[], byte[]> cachedMsg : lastMsgs) {
+      if (equals(cachedMsg, msg)) {
+        return true;
+      }
+    }
+    return false;
+
+  }
+
+  protected boolean equals(Pair<byte[], byte[]> cachedMsg, Pair<K, V> msg)
+  {
+    if (cachedMsg.first == null ^ msg.first == null) {
+      return false;
+    }
+    if (cachedMsg.second == null ^ msg.second == null) {
+      return false;
+    }
+
+    if (cachedMsg.first == null && msg.first == null && cachedMsg.second == null && msg.second == null) {
+      return true;
+    }
+
+    if (!equals(cachedMsg.first, msg.first)) {
+      return false;
+    }
+
+    return equals(cachedMsg.second, msg.second);
+  }
+
+  /**
+   * 
+   * @param bytes
+   * @param value
+   * @return
+   */
+  protected abstract <M> boolean equals(byte[] bytes, M value);
+
+  /**
+   * get the partition key. for 0.8.1, If a partition key is provided it will
+   * override the key for the purpose of partitioning but will not be stored.
+   * 
+   * @return
+   */
+  protected int getPartitionKey(K key)
+  {
+    if (partitioner != null) {
+      return partitioner.partition(key, partitionNum);
+    }
+
+    if (key != null) {
+      return key.hashCode();
+    }
+
+    //stick to the Kafka partition, so can't use round robbin
+    return 0;
+  }
+
+  /**
+   * setup the configuration for control producer
+   * 
+   * @return
+   */
+  protected ProducerConfig createKafkaControlProducerConfig()
+  {
+    if (controlProducerProperties == null || controlProducerProperties.isEmpty()) {
+      controlProducerProperties = getProducerProperties();
+    }
+
+    Properties prop = new Properties();
+    for (String propString : controlProducerProperties.split(",")) {
+      if (!propString.contains("=")) {
+        continue;
+      }
+      String[] keyVal = StringUtils.trim(propString).split("=");
+      prop.put(StringUtils.trim(keyVal[0]), StringUtils.trim(keyVal[1]));
+    }
+
+    //only support String encoder now, overwrite
+    prop.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+    prop.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+
+    Properties configProperties = this.getConfigProperties();
+    configProperties.putAll(prop);
+
+    return new ProducerConfig(configProperties);
+  }
+
+  /**
+   * Tell the operator how to convert a input tuple to a kafka key value pair
+   * 
+   * @param tuple
+   * @return A kafka key value pair.
+   */
+  protected abstract Pair<K, V> tupleToKeyValue(T tuple);
+
+  public kafka.producer.Partitioner getPartitioner()
+  {
+    return partitioner;
+  }
+
+  public void setPartitioner(kafka.producer.Partitioner partitioner)
+  {
+    this.partitioner = partitioner;
+  }
+
+  public String getControlTopic()
+  {
+    return controlTopic;
+  }
+
+  public void setControlTopic(String controlTopic)
+  {
+    this.controlTopic = controlTopic;
+  }
+
+  public String getControlProducerProperties()
+  {
+    return controlProducerProperties;
+  }
+
+  public void setControlProducerProperties(String controlProducerProperties)
+  {
+    this.controlProducerProperties = controlProducerProperties;
+  }
+
+  private static final Logger logger = LoggerFactory.getLogger(AbstractExactlyOnceKafkaOutputOperator.class);
+
+  /**
+   * This class used to keep the recovery information
+   *
+   */
+  protected static class RecoveryControlInfo
+  {
+    protected static final String SEPERATOR = "#";
+    protected int partitionIdOfOperator;
+    protected long generateTime;
+    protected long windowId;
+    protected Map<Integer, Long> kafkaPartitionIdToOffset;
+    //( operatorPartitionId => ( lastWindowId, (KafkaPartitionId => offset) ) )
+
+    @Override
+    public String toString()
+    {
+      StringBuilder sb = new StringBuilder();
+      sb.append(partitionIdOfOperator).append(SEPERATOR).append(generateTime).append(SEPERATOR).append(windowId);
+      sb.append(SEPERATOR).append(kafkaPartitionIdToOffset);
+      return sb.toString();
+    }
+
+    public static RecoveryControlInfo fromString(String str)
+    {
+      if (str == null || str.isEmpty()) {
+        throw new IllegalArgumentException("Input parameter is null or empty.");
+      }
+      String[] fields = str.split(SEPERATOR);
+      if (fields == null || fields.length != 4) {
+        throw new IllegalArgumentException(
+            "Invalid input String: \"" + str + "\", " + "expected fields seperated by '" + SEPERATOR + "'");
+      }
+
+      RecoveryControlInfo rci = new RecoveryControlInfo();
+      rci.partitionIdOfOperator = Integer.valueOf(fields[0]);
+      rci.generateTime = Long.valueOf(fields[1]);
+      rci.windowId = Long.valueOf(fields[2]);
+
+      String mapString = fields[3].trim();
+      if (mapString.startsWith("{") && mapString.endsWith("}")) {
+        mapString = mapString.substring(1, mapString.length() - 1);
+      }
+      Map<String, String> idToOffsetAsString = Splitter.on(",").withKeyValueSeparator("=").split(mapString);
+      rci.kafkaPartitionIdToOffset = Maps.newHashMap();
+      for (Map.Entry<String, String> entry : idToOffsetAsString.entrySet()) {
+        rci.kafkaPartitionIdToOffset.put(Integer.valueOf(entry.getKey()), Long.valueOf(entry.getValue()));
+      }
+      return rci;
+    }
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
index f6057cd..5f4d4c4 100644
--- a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaMetadataUtil.java
@@ -28,11 +28,10 @@ import java.util.Map;
 import java.util.Set;
 
 import org.I0Itec.zkclient.ZkClient;
-import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import scala.collection.JavaConversions;
+import org.apache.commons.lang3.StringUtils;
 
 import com.google.common.collect.Maps;
 import com.google.common.collect.Maps.EntryTransformer;
@@ -50,6 +49,8 @@ import kafka.javaapi.consumer.SimpleConsumer;
 import kafka.utils.ZKStringSerializer$;
 import kafka.utils.ZkUtils;
 
+import scala.collection.JavaConversions;
+
 /**
  * A util class used to retrieve all the metadatas for partitions/topics
  * Every method in the class creates a temporary simple kafka consumer and
@@ -69,7 +70,7 @@ public class KafkaMetadataUtil
   // A temporary client used to retrieve the metadata of topic/partition etc
   private static final String mdClientId = "Kafka_Metadata_Lookup_Client";
 
-  private static final int timeout=10000;
+  private static final int timeout = 10000;
 
   //buffer size for MD lookup client is 128k should be enough for most cases
   private static final int bufferSize = 128 * 1024;
@@ -95,20 +96,23 @@ public class KafkaMetadataUtil
    * @return Get the partition metadata list for the specific topic via the brokers
    * null if topic is not found
    */
-  public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers, final String topic)
+  public static Map<String, List<PartitionMetadata>> getPartitionsForTopic(SetMultimap<String, String> brokers,
+      final String topic)
   {
-    return Maps.transformEntries(brokers.asMap(), new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>(){
-      @Override
-      public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
-      {
-        return getPartitionsForTopic(new HashSet<String>(bs), topic);
-      }});
+    return Maps.transformEntries(brokers.asMap(),
+        new EntryTransformer<String, Collection<String>, List<PartitionMetadata>>()
+        {
+          @Override
+          public List<PartitionMetadata> transformEntry(String key, Collection<String> bs)
+          {
+            return getPartitionsForTopic(new HashSet<String>(bs), topic);
+          }
+        });
   }
   
-  
-  public static Set<String> getBrokers(Set<String> zkHost){
-    
-    ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ',') ,30000, 30000, ZKStringSerializer$.MODULE$);
+  public static Set<String> getBrokers(Set<String> zkHost)
+  {
+    ZkClient zkclient = new ZkClient(StringUtils.join(zkHost, ','), 30000, 30000, ZKStringSerializer$.MODULE$);
     Set<String> brokerHosts = new HashSet<String>();
     for (Broker b : JavaConversions.asJavaIterable(ZkUtils.getAllBrokersInCluster(zkclient))) {
       brokerHosts.add(b.getConnectionString());
@@ -149,7 +153,7 @@ public class KafkaMetadataUtil
   public static TopicMetadata getTopicMetadata(Set<String> brokerSet, String topic)
   {
     SimpleConsumer mdConsumer = null;
-    if (brokerSet == null || brokerSet == null || brokerSet.size() == 0) {
+    if (brokerSet == null || brokerSet.size() == 0) {
       return null;
     }
     try {
@@ -191,12 +195,12 @@ public class KafkaMetadataUtil
    * @param partition
    * @param whichTime
    * @param clientName
-   * @return 0 if consumer is null at this time
+   * @return the last offset, value value should be >=0. Return <0 if consumer is null or error.
    */
   public static long getLastOffset(SimpleConsumer consumer, String topic, int partition, long whichTime, String clientName)
   {
     if (consumer == null) {
-      return 0;
+      return -1;
     }
     TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
     Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
@@ -206,11 +210,92 @@ public class KafkaMetadataUtil
 
     if (response.hasError()) {
       logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partition));
-      return 0;
+      return -1;
     }
     long[] offsets = response.offsets(topic, partition);
     return offsets[0];
   }
 
+  
+  /**
+   * this method wrapper kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(OffsetRequest)
+   * @param consumer
+   * @param clientName
+   * @param topic
+   * @param partitionId
+   * @param time
+   * @param maxNumOffsets
+   * @return
+   */
+  public static long[] getOffsetsBefore(SimpleConsumer consumer, String clientName, String topic, int partitionId, long time, int maxNumOffsets)
+  {
+    if (consumer == null) {
+      throw new IllegalArgumentException("consumer is not suppose to be null.");
+    }
+    
+    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+    requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(time, maxNumOffsets));
+    OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+    OffsetResponse response = consumer.getOffsetsBefore(request);
+
+    if (response.hasError()) {
+      logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+      return null;
+    }
+    return response.offsets(topic, partitionId);
+  }
+
+  
+  /**
+   * get the last offset of each partition to the partitionToOffset map
+   * @param clientNamePrefix
+   * @param brokerSet
+   * @param topic
+   * @param time
+   * @param partitionToOffset
+   */
+  public static void getLastOffsetsTo(String clientNamePrefix, Set<String> brokerSet, String topic,
+      Map<Integer, Long> partitionToOffset)
+  {
+    // read last received kafka message
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+      SimpleConsumer consumer = null;
+      try {
+        int partitionId = pm.partitionId();
+
+        String leadBroker = pm.leader().host();
+        int port = pm.leader().port();
+        final String clientName = getClientName(clientNamePrefix, topic, partitionId);
+        consumer = new SimpleConsumer(leadBroker, port, 100000, 64 * 1024, clientName);
+
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partitionId);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 1));
+        OffsetRequest request = new OffsetRequest(requestInfo, kafka.api.OffsetRequest.CurrentVersion(), clientName);
+        OffsetResponse response = consumer.getOffsetsBefore(request);
+
+        if (response.hasError()) {
+          logger.error("Error fetching data Offset Data the Broker. Reason: " + response.errorCode(topic, partitionId));
+        }
+        partitionToOffset.put(partitionId, response.offsets(topic, partitionId)[0]);
+      } finally {
+        if (consumer != null) {
+          consumer.close();
+        }
+      }
+    }
+  }
+  
+  public static String getClientName(String clientNamePrefix, String topic, int partitionId)
+  {
+    return clientNamePrefix + "_" + topic + "_" + partitionId;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
----------------------------------------------------------------------
diff --git a/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
new file mode 100644
index 0000000..d49e462
--- /dev/null
+++ b/contrib/src/main/java/com/datatorrent/contrib/kafka/KafkaUtil.java
@@ -0,0 +1,358 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import com.datatorrent.common.util.Pair;
+
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.PartitionMetadata;
+import kafka.javaapi.TopicMetadata;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+
+public class KafkaUtil
+{
+  private static Logger logger = LoggerFactory.getLogger(KafkaUtil.class);
+  public static final int DEFAULT_TIMEOUT = 200;
+  public static final int DEFAULT_BUFFER_SIZE = 64 * 10240;
+  public static final int DEFAULT_FETCH_SIZE = 200;
+
+  /**
+   * read last message ( the start offset send from partitionToOffset ) of all
+   * partition to partitionToMessages
+   * 
+   * @param clientNamePrefix
+   * @param brokerSet
+   * @param topic
+   * @param partitionToStartOffset
+   * @param partitionToMessages
+   */
+  public static void readMessagesAfterOffsetTo(String clientNamePrefix, Set<String> brokerSet, String topic,
+      Map<Integer, Long> partitionToStartOffset, Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages)
+  {
+    // read last received kafka message
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+      SimpleConsumer consumer = null;
+      try {
+        List<Pair<byte[], byte[]>> messagesOfPartition = partitionToMessages.get(pm.partitionId());
+        if (messagesOfPartition == null) {
+          messagesOfPartition = Lists.newArrayList();
+          partitionToMessages.put(pm.partitionId(), messagesOfPartition);
+        }
+
+        long startOffset = partitionToStartOffset.get(pm.partitionId()) == null ? 0
+            : partitionToStartOffset.get(pm.partitionId());
+        final String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+        consumer = createSimpleConsumer(clientName, tm.topic(), pm);
+
+        //the returned lastOffset is the offset which haven't written data to.
+        long lastOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(),
+            kafka.api.OffsetRequest.LatestTime(), clientName);
+        logger.debug("lastOffset = {}", lastOffset);
+        if (lastOffset <= 0) {
+          continue;
+        }
+
+        readMessagesBetween(consumer, clientName, topic, pm.partitionId(), startOffset, lastOffset - 1,
+            messagesOfPartition);
+      } finally {
+        if (consumer != null) {
+          consumer.close();
+        }
+      }
+    }
+  }
+
+  public static void readMessagesBetween(String clientNamePrefix, Set<String> brokerSet, String topic, int partitionId,
+      long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages)
+  {
+    Map<Integer, SimpleConsumer> consumers = createSimpleConsumers(clientNamePrefix, brokerSet, topic);
+    if (consumers == null) {
+      throw new RuntimeException("Can't find any consumer.");
+    }
+
+    SimpleConsumer consumer = consumers.get(partitionId);
+    if (consumer == null) {
+      throw new IllegalArgumentException("No consumer for partition: " + partitionId);
+    }
+
+    readMessagesBetween(consumer, KafkaMetadataUtil.getClientName(clientNamePrefix, topic, partitionId), topic,
+        partitionId, startOffset, endOffset, messages);
+  }
+
+  /**
+   * get A map of partition id to SimpleConsumer
+   * 
+   * @param clientNamePrefix
+   * @param brokerSet
+   * @param topic
+   * @return A map of partition id to SimpleConsumer
+   */
+  public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet,
+      String topic)
+  {
+    return createSimpleConsumers(clientNamePrefix, brokerSet, topic, DEFAULT_TIMEOUT);
+  }
+
+  /**
+   * get A map of partition id to SimpleConsumer
+   * 
+   * @param clientNamePrefix
+   * @param brokerSet
+   * @param topic
+   * @param timeOut
+   * @return A map of partition id to SimpleConsumer
+   */
+  public static Map<Integer, SimpleConsumer> createSimpleConsumers(String clientNamePrefix, Set<String> brokerSet,
+      String topic, int timeOut)
+  {
+    if (clientNamePrefix == null || clientNamePrefix.isEmpty() || brokerSet == null || brokerSet.isEmpty()
+        || topic == null || topic.isEmpty()) {
+      throw new IllegalArgumentException(
+          "clientNamePrefix = " + clientNamePrefix + ", brokerSet = " + brokerSet + ", topic = " + topic);
+    }
+
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    Map<Integer, SimpleConsumer> consumers = Maps.newHashMap();
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+      String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+      consumers.put(pm.partitionId(), createSimpleConsumer(clientName, tm.topic(), pm));
+    }
+    return consumers;
+  }
+
+  public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId,
+      long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages)
+  {
+    readMessagesBetween(consumer, clientName, topic, partitionId, startOffset, endOffset, messages, 1);
+  }
+
+  /**
+   * read messages of a certain partition into messages
+   * 
+   * @param consumer
+   * @param clientNamePrefix
+   * @param topic
+   * @param partitionId
+   * @param startOffset
+   *          inclusive
+   * @param endOffset
+   *          inclusive
+   * @param messages
+   * @param tryTimesOnEmptyMessage
+   *          how many times should to try when response message is empty. <=0
+   *          means try forever.
+   */
+  public static void readMessagesBetween(SimpleConsumer consumer, String clientName, String topic, int partitionId,
+      long startOffset, long endOffset, List<Pair<byte[], byte[]>> messages, int tryTimesOnEmptyMessage)
+  {
+    if (startOffset < 0 || endOffset < 0 || endOffset < startOffset) {
+      throw new IllegalArgumentException(
+          "Both offset should not less than zero and endOffset should not less than startOffset. startOffset = "
+              + startOffset + ", endoffset = " + endOffset);
+    }
+
+    int readSize = 0;
+    int wantedSize = (int)(endOffset - startOffset + 1);
+
+    int triedTimesOnEmptyMessage = 0;
+    while (readSize < wantedSize
+        && (tryTimesOnEmptyMessage <= 0 || triedTimesOnEmptyMessage < tryTimesOnEmptyMessage)) {
+      logger.debug("startOffset = {}", startOffset);
+      FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+          .addFetch(topic, partitionId, startOffset, DEFAULT_FETCH_SIZE).build();
+
+      FetchResponse fetchResponse = consumer.fetch(req);
+      if (fetchResponse.hasError()) {
+        logger.error(
+            "Error fetching data Offset Data the Broker. Reason: " + fetchResponse.errorCode(topic, partitionId));
+        return;
+      }
+
+      triedTimesOnEmptyMessage++;
+      ByteBufferMessageSet messageSet = fetchResponse.messageSet(topic, partitionId);
+      for (MessageAndOffset messageAndOffset : messageSet) {
+        long offset = messageAndOffset.offset();
+        logger.debug("offset = " + offset);
+
+        if (offset > endOffset || offset < startOffset) {
+          continue;
+        }
+        triedTimesOnEmptyMessage = 0;
+        startOffset = offset + 1;
+        ++readSize;
+        messages.add(kafkaMessageToPair(messageAndOffset.message()));
+      }
+    }
+  }
+
+  /**
+   * read last message of each partition into lastMessages
+   * 
+   * @param clientNamePrefix
+   * @param brokerSet
+   * @param topic
+   * @param lastMessages
+   */
+  public static void readLastMessages(String clientNamePrefix, Set<String> brokerSet, String topic,
+      Map<Integer, Pair<byte[], byte[]>> lastMessages)
+  {
+    // read last received kafka message
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+      SimpleConsumer consumer = null;
+      try {
+        String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+        consumer = createSimpleConsumer(clientName, tm.topic(), pm);
+
+        long readOffset = KafkaMetadataUtil.getLastOffset(consumer, tm.topic(), pm.partitionId(),
+            kafka.api.OffsetRequest.LatestTime(), clientName);
+
+        FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+            .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build();
+
+        FetchResponse fetchResponse = consumer.fetch(req);
+        if (fetchResponse.hasError()) {
+          logger.error("Error fetching data Offset Data the Broker. Reason: "
+              + fetchResponse.errorCode(topic, pm.partitionId()));
+          return;
+        }
+
+        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(tm.topic(), pm.partitionId())) {
+          lastMessages.put(pm.partitionId(), kafkaMessageToPair(messageAndOffset.message()));
+        }
+      } finally {
+        if (consumer != null) {
+          consumer.close();
+        }
+      }
+
+    }
+  }
+
+  public static Pair<byte[], byte[]> readLastMessage(String clientNamePrefix, Set<String> brokerSet, String topic,
+      int partitionId)
+  {
+    // read last received kafka message
+    TopicMetadata tm = KafkaMetadataUtil.getTopicMetadata(brokerSet, topic);
+
+    if (tm == null) {
+      throw new RuntimeException("Failed to retrieve topic metadata");
+    }
+
+    for (PartitionMetadata pm : tm.partitionsMetadata()) {
+      SimpleConsumer consumer = null;
+      try {
+        if (pm.partitionId() != partitionId) {
+          continue;
+        }
+
+        String clientName = KafkaMetadataUtil.getClientName(clientNamePrefix, tm.topic(), pm.partitionId());
+        consumer = createSimpleConsumer(clientName, topic, pm);
+
+        long readOffset = KafkaMetadataUtil.getLastOffset(consumer, topic, partitionId,
+            kafka.api.OffsetRequest.LatestTime(), clientName);
+
+        FetchRequest req = new FetchRequestBuilder().clientId(clientName)
+            .addFetch(tm.topic(), pm.partitionId(), readOffset - 1, DEFAULT_FETCH_SIZE).build();
+
+        FetchResponse fetchResponse = consumer.fetch(req);
+        if (fetchResponse.hasError()) {
+          logger.error("Error fetching data Offset Data the Broker. Reason: "
+              + fetchResponse.errorCode(topic, pm.partitionId()));
+          return null;
+        }
+
+        for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) {
+          return kafkaMessageToPair(messageAndOffset.message());
+        }
+      } finally {
+        if (consumer != null) {
+          consumer.close();
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * convert Kafka message to pair
+   * 
+   * @param m
+   * @return
+   */
+  public static Pair<byte[], byte[]> kafkaMessageToPair(Message m)
+  {
+    ByteBuffer payload = m.payload();
+    ByteBuffer key = m.key();
+    byte[] keyBytes = null;
+    if (key != null) {
+      keyBytes = new byte[key.limit()];
+      key.get(keyBytes);
+    }
+
+    byte[] valueBytes = new byte[payload.limit()];
+    payload.get(valueBytes);
+    return new Pair<byte[], byte[]>(keyBytes, valueBytes);
+  }
+
+  public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm)
+  {
+    return createSimpleConsumer(clientName, topic, pm, DEFAULT_TIMEOUT, DEFAULT_BUFFER_SIZE);
+  }
+
+  public static SimpleConsumer createSimpleConsumer(String clientName, String topic, PartitionMetadata pm, int timeout,
+      int bufferSize)
+  {
+    String leadBroker = pm.leader().host();
+    int port = pm.leader().port();
+    return new SimpleConsumer(leadBroker, port, timeout, bufferSize, clientName);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
new file mode 100644
index 0000000..abdcb01
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaTupleUniqueExactlyOnceOutputOperatorTest.java
@@ -0,0 +1,512 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DAG;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.api.LocalMode;
+import com.datatorrent.api.StreamingApplication;
+import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.common.util.Pair;
+import com.datatorrent.stram.api.OperatorDeployInfo;
+
+import kafka.producer.ProducerConfig;
+import kafka.serializer.StringDecoder;
+
+public class KafkaTupleUniqueExactlyOnceOutputOperatorTest extends KafkaOperatorTestBase
+{
+  public static final int TUPLE_NUM_IN_ONE_WINDOW = 10;
+  public static final String topic1 = "OperatorTest1";
+  public static final String controlTopic1 = "ControlTopic1";
+
+  public static final String topic2 = "OperatorTest2";
+  public static final String controlTopic2 = "ControlTopic2";
+
+  public static final String topic3 = "OperatorTest3";
+  public static final String controlTopic3 = "ControlTopic3";
+
+  public static class TupleUniqueExactlyOnceKafkaOutputTestOperator
+      extends AbstractTupleUniqueExactlyOnceKafkaOutputOperator<Integer, String, String>
+  {
+    protected transient StringDecoder decoder = null;
+
+    @Override
+    public void setup(OperatorContext context)
+    {
+      decoder = new StringDecoder(null);
+      super.setup(context);
+    }
+    
+    @Override
+    protected Pair<String, String> tupleToKeyValue(Integer tuple)
+    {
+      return new Pair<>(String.valueOf(tuple % 2), String.valueOf(tuple));
+    }
+
+    @Override
+    protected <T> boolean equals(byte[] bytes, T value)
+    {
+      if (bytes == null && value == null) {
+        return true;
+      }
+      if (value == null) {
+        return false;
+      }
+      return value.equals(decoder.fromBytes(bytes));
+    }
+
+  }
+
+  protected void createTopic(String topicName)
+  {
+    createTopic(0, topicName);
+    if (hasMultiCluster) {
+      createTopic(1, topicName);
+    }
+  }
+
+  protected ProducerConfig createKafkaControlProducerConfig()
+  {
+    return new ProducerConfig(this.getKafkaProperties());
+  }
+
+  /**
+   * This test case there are only one operator partition, and the order of data
+   * changed when recovery.
+   */
+  @Test
+  public void testOutOfOrder()
+  {
+    OperatorDeployInfo context = new OperatorDeployInfo();
+    context.id = 1;
+    int[] expectedTuple = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 3)];
+    int tupleIndex = 0;
+    long windowId = 0;
+    {
+      //create required topics
+      createTopic(topic1);
+      createTopic(controlTopic1);
+
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator = createOperator(topic1, controlTopic1, 1);
+
+      int i = 0;
+      for (int windowCount = 0; windowCount < 2; ++windowCount) {
+        operator.beginWindow(windowId++);
+
+        for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1); ++i) {
+          operator.processTuple(i);
+          expectedTuple[tupleIndex++] = i;
+        }
+        waitMills(500);
+        operator.endWindow();
+      }
+
+      //last window, the crash window
+      operator.beginWindow(windowId++);
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5; ++i) {
+        operator.processTuple(i);
+        expectedTuple[tupleIndex++] = i;
+      }
+
+      //crashed now.
+    }
+
+    //let kafka message send to server
+    waitMills(1000);
+
+    {
+      //recovery
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+      operator.setTopic(topic1);
+      operator.setControlTopic(controlTopic1);
+      operator.setConfigProperties(getKafkaProperties());
+
+      operator.setup(context);
+
+      //assume replay start with 2nd window, but different order
+      int i = TUPLE_NUM_IN_ONE_WINDOW;
+
+      windowId = 1;
+      operator.beginWindow(windowId++);
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) {
+        operator.processTuple(i);
+      }
+      i = TUPLE_NUM_IN_ONE_WINDOW + 1;
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2; i += 2) {
+        operator.processTuple(i);
+      }
+      waitMills(500);
+      operator.endWindow();
+
+      //3rd window, in different order
+      operator.beginWindow(windowId++);
+      i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) {
+        operator.processTuple(i);
+        if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) {
+          expectedTuple[tupleIndex++] = i;
+        }
+      }
+
+      i = TUPLE_NUM_IN_ONE_WINDOW * 2 + 1;
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3; i += 2) {
+        operator.processTuple(i);
+        if (i >= TUPLE_NUM_IN_ONE_WINDOW * 2.5) {
+          expectedTuple[tupleIndex++] = i;
+        }
+      }
+    }
+
+    int[] actualTuples = readTuplesFromKafka(topic1);
+    Assert.assertArrayEquals(expectedTuple, actualTuples);
+  }
+
+  protected TupleUniqueExactlyOnceKafkaOutputTestOperator createOperator(String topic, String controlTopic, int id)
+  {
+    TupleUniqueExactlyOnceKafkaOutputTestOperator operator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+    operator.setTopic(topic);
+    operator.setControlTopic(controlTopic);
+
+    operator.setConfigProperties(getKafkaProperties());
+    OperatorDeployInfo context = new OperatorDeployInfo();
+    context.id = id;
+    operator.setup(context);
+
+    return operator;
+  }
+
+  /**
+   * This test case test the case the tuple go to other operator partition when
+   * recovery.
+   */
+  @Test
+  public void testDifferentPartition()
+  {
+    //hasMultiPartition = true;
+
+    int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)];
+    int tupleIndex = 0;
+    long windowId1 = 0;
+    long windowId2 = 0;
+
+    //create required topics
+    createTopic(topic2);
+    createTopic(controlTopic2);
+
+    {
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1);
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2);
+      TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+          operator1, operator2 };
+
+      //send as round robin
+      int i = 0;
+      for (int windowCount = 0; windowCount < 2; ++windowCount) {
+        operator1.beginWindow(windowId1++);
+        operator2.beginWindow(windowId2++);
+
+        for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) {
+          operators[i % 2].processTuple(i);
+          expectedTuples[tupleIndex++] = i;
+        }
+        waitMills(500);
+        operator1.endWindow();
+        operator2.endWindow();
+      }
+
+      //last window, the crash window
+      operator1.beginWindow(windowId1++);
+      operator2.beginWindow(windowId2++);
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; ++i) {
+        operators[i % 2].processTuple(i);
+        expectedTuples[tupleIndex++] = i;
+      }
+
+      //crashed now.
+    }
+
+    //let kafka message send to server
+    waitMills(1000);
+    int lastTuple = tupleIndex - 1;
+    {
+      //recovery
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic2, controlTopic2, 1);
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic2, controlTopic2, 2);
+      //tuple go to different partition
+      TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+          operator2, operator1 };
+
+      //assume replay start with 2nd window, but different order
+      int i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+
+      windowId1 = 1;
+      windowId2 = 1;
+
+      //window id: 1, 2
+      for (int windowCount = 0; windowCount < 2; ++windowCount) {
+        operator1.beginWindow(windowId1++);
+        operator2.beginWindow(windowId2++);
+
+        for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; ++i) {
+          operators[i % 2].processTuple(i);
+          if (i > lastTuple) {
+            expectedTuples[tupleIndex++] = i;
+          }
+        }
+        waitMills(500);
+        operator1.endWindow();
+        operator2.endWindow();
+      }
+    }
+
+    int[] actualTuples = readTuplesFromKafka(topic2);
+    Arrays.sort(actualTuples);
+    Arrays.sort(expectedTuples);
+
+    assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples);
+  }
+
+  /**
+   * This test case test only one operator partition crash, while the other
+   * operator partition keep on write data to the same Kafka partition.
+   */
+  @Test
+  public void testOnePartitionCrash()
+  {
+
+    int[] expectedTuples = new int[(int)(TUPLE_NUM_IN_ONE_WINDOW * 6)];
+    int tupleIndex = 0;
+    long windowId1 = 0;
+    long windowId2 = 0;
+
+    //create required topics
+    createTopic(topic3);
+    createTopic(controlTopic3);
+
+    {
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1);
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator2 = createOperator(topic3, controlTopic3, 2);
+      TupleUniqueExactlyOnceKafkaOutputTestOperator[] operators = new TupleUniqueExactlyOnceKafkaOutputTestOperator[] {
+          operator1, operator2 };
+
+      //send as round robin
+      int i = 0;
+      for (int windowCount = 0; windowCount < 2; ++windowCount) {
+        operator1.beginWindow(windowId1++);
+        operator2.beginWindow(windowId2++);
+
+        for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 1) * 2; ++i) {
+          operators[i % 2].processTuple(i);
+          expectedTuples[tupleIndex++] = i;
+        }
+        waitMills(500);
+        operator1.endWindow();
+        operator2.endWindow();
+      }
+
+      //operator1 crash, while operator2 alive
+      operator1.beginWindow(windowId1++);
+      //operator1 handle even number;
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2; i += 2) {
+        operators[i % 2].processTuple(i);
+        expectedTuples[tupleIndex++] = i;
+      }
+
+      //operator1 crashed now.
+
+      //operator2 still alive, operator2 handle odd number
+      operator2.beginWindow(windowId2++);
+      i = TUPLE_NUM_IN_ONE_WINDOW * 4 + 1;
+      for (; i < TUPLE_NUM_IN_ONE_WINDOW * 3 * 2; i += 2) {
+        operator2.processTuple(i);
+        expectedTuples[tupleIndex++] = i;
+      }
+      operator2.endWindow();
+
+    }
+
+    //let kafka message send to server
+    waitMills(1000);
+
+    //operator1 recover from second window
+    int lastTuple = (int)(TUPLE_NUM_IN_ONE_WINDOW * 2.5 * 2) - 1;
+    {
+      //recovery
+      TupleUniqueExactlyOnceKafkaOutputTestOperator operator1 = createOperator(topic3, controlTopic3, 1);
+
+      //assume replay start with 2nd window, same order
+      int i = TUPLE_NUM_IN_ONE_WINDOW * 2;
+
+      windowId1 = 1;
+
+      //window id: 1, 2
+      for (int windowCount = 0; windowCount < 2; ++windowCount) {
+        operator1.beginWindow(windowId1++);
+
+        for (; i < TUPLE_NUM_IN_ONE_WINDOW * (windowCount + 2) * 2; i += 2) {
+          operator1.processTuple(i);
+          if (i > lastTuple) {
+            expectedTuples[tupleIndex++] = i;
+          }
+        }
+        waitMills(500);
+        operator1.endWindow();
+      }
+    }
+
+    int[] actualTuples = readTuplesFromKafka(topic3);
+    Arrays.sort(actualTuples);
+    Arrays.sort(expectedTuples);
+
+    assertArrayEqualsWithDetailInfo(expectedTuples, actualTuples);
+  }
+
+  /**
+   * Test the application which using TupleUniqueExactlyOnceKafkaOutputTestOperator is launchalbe in local mode
+   */
+  @Test
+  public void testLaunchApp() throws Exception
+  {
+    Configuration conf = new Configuration(false);
+    LocalMode lma = LocalMode.newInstance();
+    DAG dag = lma.getDAG();
+
+    TupleGenerateOperator generateOperator = new TupleGenerateOperator();
+    dag.addOperator("GenerateOperator", generateOperator);
+    
+    TupleUniqueExactlyOnceKafkaOutputTestOperator testOperator = new TupleUniqueExactlyOnceKafkaOutputTestOperator();
+    dag.addOperator("TestOperator", testOperator);
+
+    dag.addStream("stream", generateOperator.outputPort, testOperator.inputPort);
+    
+    StreamingApplication app = new StreamingApplication()
+    {
+      @Override
+      public void populateDAG(DAG dag, Configuration conf)
+      {
+      }
+    };
+
+    lma.prepareDAG(app, conf);
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.run(5000);
+
+    lc.shutdown();
+  }
+
+  public static void assertArrayEqualsWithDetailInfo(int[] expectedTuples, int[] actualTuples)
+  {
+    Assert.assertTrue("Length incorrect. expected " + expectedTuples.length + "; actual " + actualTuples.length,
+        actualTuples.length == expectedTuples.length);
+    for (int i = 0; i < actualTuples.length; ++i) {
+      Assert.assertEquals("Not equal. index=" + i + ", expected=" + expectedTuples[i] + ", actual=" + actualTuples[i],
+          actualTuples[i], expectedTuples[i]);
+    }
+  }
+
+  public void waitMills(long millis)
+  {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception e) {
+      //ignore
+    }
+  }
+
+  public int[] readTuplesFromKafka(String topic)
+  {
+    StringDecoder decoder = new StringDecoder(null);
+    Map<Integer, Long> partitionToStartOffset = Maps.newHashMap();
+    partitionToStartOffset.put(0, 0L);
+
+    this.waitMills(1000);
+
+    Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap();
+    KafkaUtil.readMessagesAfterOffsetTo("TestOperator", getBrokerSet(), topic, partitionToStartOffset,
+        partitionToMessages);
+
+    List<Pair<byte[], byte[]>> msgList = partitionToMessages.get(0);
+    int[] values = new int[msgList.size()];
+    int index = 0;
+    for (Pair<byte[], byte[]> msg : msgList) {
+      values[index++] = Integer.valueOf(decoder.fromBytes(msg.second));
+    }
+    return values;
+  }
+
+
+  protected Set<String> getBrokerSet()
+  {
+    return Sets.newHashSet((String)getKafkaProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+  }
+
+  public Properties getKafkaProperties()
+  {
+    Properties props = new Properties();
+    props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+    //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+    //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+    props.put("metadata.broker.list", "localhost:9092");
+    //props.setProperty("producer.type", "sync");
+    props.setProperty("producer.type", "async");
+    props.setProperty("queue.buffering.max.ms", "100");
+    props.setProperty("queue.buffering.max.messages", "5");
+    props.setProperty("batch.num.messages", "5");
+    return props;
+  }
+  
+  
+  public static class TupleGenerateOperator extends BaseOperator implements InputOperator
+  {
+    @OutputPortFieldAnnotation(optional = true)
+    public final transient DefaultOutputPort<Integer> outputPort = new DefaultOutputPort<>();
+    protected int value = 0;
+
+    @Override
+    public void emitTuples()
+    {
+      if (!outputPort.isConnected()) {
+        return;
+      }
+
+      for (int i = 0; i < 100; ++i) {
+        outputPort.emit(++value);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
----------------------------------------------------------------------
diff --git a/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
new file mode 100644
index 0000000..c27803d
--- /dev/null
+++ b/contrib/src/test/java/com/datatorrent/contrib/kafka/KafkaUtilTester.java
@@ -0,0 +1,128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.datatorrent.contrib.kafka;
+
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.google.common.collect.Maps;
+import com.google.common.collect.Sets;
+
+import com.datatorrent.common.util.Pair;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+public class KafkaUtilTester extends KafkaOperatorTestBase
+{
+  public static final String topic = "UtilTestTopic";
+  public static final String clientNamePrefix = "UtilTestClient";
+  public static final int DATA_SIZE = 50;
+
+  protected Producer<String, String> producer;
+  private transient Set<String> brokerSet;
+
+  public void beforeTest()
+  {
+    //Got exception when using multiple partition.
+    //java.io.FileNotFoundException: target/kafka-server-data/1/1/replication-offset-checkpoint (No such file or directory)
+    //hasMultiPartition = true;
+
+    super.beforeTest();
+    createTopic(topic);
+
+    producer = new Producer<String, String>(createKafkaProducerConfig());
+    getBrokerSet();
+
+    sendData();
+  }
+
+  public void sendData()
+  {
+    for (int i = 0; i < DATA_SIZE; ++i) {
+      producer.send(new KeyedMessage<String, String>(topic, null, "message " + i));
+    }
+
+    waitMills(1000);
+  }
+
+  @Test
+  public void testReadMessagesAfterOffsetTo()
+  {
+    Map<Integer, Long> partitionToStartOffset = Maps.newHashMap();
+    partitionToStartOffset.put(1, 0L);
+    Map<Integer, List<Pair<byte[], byte[]>>> partitionToMessages = Maps.newHashMap();
+    KafkaUtil.readMessagesAfterOffsetTo(clientNamePrefix, brokerSet, topic, partitionToStartOffset,
+        partitionToMessages);
+    final int dataSize = partitionToMessages.entrySet().iterator().next().getValue().size();
+    Assert.assertTrue("data size is: " + dataSize, dataSize == DATA_SIZE);
+  }
+
+  public void waitMills(long millis)
+  {
+    try {
+      Thread.sleep(millis);
+    } catch (Exception e) {
+      //ignore
+    }
+  }
+
+  protected void createTopic(String topicName)
+  {
+    createTopic(0, topicName);
+    if (hasMultiCluster) {
+      createTopic(1, topicName);
+    }
+  }
+
+  protected Properties getConfigProperties()
+  {
+    Properties props = new Properties();
+    props.setProperty("serializer.class", "kafka.serializer.StringEncoder");
+    //props.setProperty("serializer.class", "kafka.serializer.DefaultEncoder");
+    //props.setProperty("key.serializer.class", "kafka.serializer.StringEncoder");
+    props.put("metadata.broker.list", "localhost:9092");
+    //props.setProperty("producer.type", "sync");
+    props.setProperty("producer.type", "async");
+    props.setProperty("queue.buffering.max.ms", "10");
+    props.setProperty("queue.buffering.max.messages", "10");
+    props.setProperty("batch.num.messages", "5");
+
+    return props;
+  }
+
+  protected ProducerConfig createKafkaProducerConfig()
+  {
+    return new ProducerConfig(getConfigProperties());
+  }
+
+  protected Set<String> getBrokerSet()
+  {
+    if (brokerSet == null) {
+      brokerSet = Sets.newHashSet((String)getConfigProperties().get(KafkaMetadataUtil.PRODUCER_PROP_BROKERLIST));
+    }
+    return brokerSet;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/33a5c2ec/contrib/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/contrib/src/test/resources/log4j.properties b/contrib/src/test/resources/log4j.properties
index 2fcbe38..cfc50cf 100644
--- a/contrib/src/test/resources/log4j.properties
+++ b/contrib/src/test/resources/log4j.properties
@@ -39,3 +39,4 @@ log4j.logger.org=info
 #log4j.logger.org.apache.commons.beanutils=warn
 log4j.logger.com.datatorrent=debug
 log4j.logger.org.apache.apex=debug
+log4j.logger.kafka=info


Mime
View raw message