apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject incubator-apex-malhar git commit: Add idempotent support for 0.9 kafka input operator
Date Tue, 05 Apr 2016 00:59:12 GMT
Repository: incubator-apex-malhar
Updated Branches:
  refs/heads/devel-3 a23cc5b8f -> 37f813092


Add idempotent support for 0.9 kafka input operator


Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/commit/37f81309
Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/tree/37f81309
Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/diff/37f81309

Branch: refs/heads/devel-3
Commit: 37f8130920f0d7214bec00b7ac36448e58977f49
Parents: a23cc5b
Author: Siyuan Hua <hsy541@apache.org>
Authored: Mon Apr 4 13:56:36 2016 -0700
Committer: Siyuan Hua <hsy541@apache.org>
Committed: Mon Apr 4 13:56:36 2016 -0700

----------------------------------------------------------------------
 .../kafka/AbstractKafkaInputOperator.java       |  80 ++++++++++-
 .../apex/malhar/kafka/KafkaConsumerWrapper.java |  62 ++++++++-
 .../malhar/kafka/KafkaInputOperatorTest.java    | 139 ++++++++++++++++---
 .../malhar/kafka/KafkaOperatorTestBase.java     |  79 ++++-------
 4 files changed, 281 insertions(+), 79 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
index 89104a3..06cd470 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/AbstractKafkaInputOperator.java
@@ -18,6 +18,7 @@
  */
 package org.apache.apex.malhar.kafka;
 
+import java.io.IOException;
 import java.util.Collection;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -33,6 +34,7 @@ import javax.validation.constraints.NotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.lang3.tuple.MutablePair;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
@@ -50,6 +52,8 @@ import com.datatorrent.api.InputOperator;
 import com.datatorrent.api.Operator;
 import com.datatorrent.api.Partitioner;
 import com.datatorrent.api.StatsListener;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * The abstract kafka input operator using kafka 0.9.0 new consumer API
@@ -94,6 +98,10 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
    */
   private final Map<AbstractKafkaPartitioner.PartitionMeta, Long> offsetTrack = new
HashMap<>();
 
+  private final transient Map<AbstractKafkaPartitioner.PartitionMeta, Long> windowStartOffset
= new HashMap<>();
+
+  private transient int operatorId;
+
   private int initialPartitionCount = 1;
 
   private long repartitionInterval = 30000L;
@@ -127,7 +135,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
    * Wrapper consumer object
    * It wraps KafkaConsumer, maintains consumer thread and store messages in a queue
    */
-  private transient final KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper();
+  private final transient KafkaConsumerWrapper consumerWrapper = new KafkaConsumerWrapper();
 
   /**
    * By default the strategy is one to one
@@ -144,7 +152,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
   /**
    * store offsets with window id, only keep offsets with windows that have not been committed
    */
-  private transient final List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta,
Long>>> offsetHistory = new LinkedList<>();
+  private final transient List<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta,
Long>>> offsetHistory = new LinkedList<>();
 
   /**
    * Application name is used as group.id for kafka consumer
@@ -162,10 +170,12 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
   @AutoMetric
   private transient KafkaMetrics metrics;
 
+  private WindowDataManager windowDataManager = new WindowDataManager.NoopWindowDataManager();
+
   @Override
   public void activate(Context.OperatorContext context)
   {
-    consumerWrapper.start();
+    consumerWrapper.start(isIdempotent());
   }
 
   @Override
@@ -183,8 +193,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
   @Override
   public void committed(long windowId)
   {
-    if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
+    if (initialOffset == InitialOffset.LATEST || initialOffset == InitialOffset.EARLIEST)
{
       return;
+    }
     //ask kafka consumer wrapper to store the committed offsets
     for (Iterator<Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>>>
iter = offsetHistory.iterator(); iter.hasNext(); ) {
       Pair<Long, Map<AbstractKafkaPartitioner.PartitionMeta, Long>> item = iter.next();
@@ -195,6 +206,13 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
         iter.remove();
       }
     }
+    if (isIdempotent()) {
+      try {
+        windowDataManager.deleteUpTo(operatorId, windowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
   }
 
   @Override
@@ -211,6 +229,9 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
       AbstractKafkaPartitioner.PartitionMeta pm = new AbstractKafkaPartitioner.PartitionMeta(tuple.getLeft(),
           msg.topic(), msg.partition());
       offsetTrack.put(pm, msg.offset() + 1);
+      if (isIdempotent() && !windowStartOffset.containsKey(pm)) {
+        windowStartOffset.put(pm, msg.offset());
+      }
     }
     emitCount += count;
   }
@@ -222,6 +243,23 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
   {
     emitCount = 0;
     currentWindowId = wid;
+    windowStartOffset.clear();
+    if (isIdempotent() && wid <= windowDataManager.getLargestRecoveryWindow())
{
+      replay(wid);
+    } else {
+      consumerWrapper.afterReplay();
+    }
+  }
+
+  private void replay(long windowId)
+  {
+    try {
+      Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData
=
+          (Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>)windowDataManager.load(operatorId,
windowId);
+      consumerWrapper.emitImmediately(windowData);
+    } catch (IOException e) {
+      DTThrowable.rethrow(e);
+    }
   }
 
   @Override
@@ -233,6 +271,19 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
 
     //update metrics
     metrics.updateMetrics(clusters, consumerWrapper.getAllConsumerMetrics());
+
+    //update the windowDataManager
+    if (isIdempotent()) {
+      try {
+        Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>> windowData
= new HashMap<>();
+        for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Long> e : windowStartOffset.entrySet())
{
+          windowData.put(e.getKey(), new MutablePair<>(e.getValue(), offsetTrack.get(e.getKey())
- e.getValue()));
+        }
+        windowDataManager.save(windowData, operatorId, currentWindowId);
+      } catch (IOException e) {
+        DTThrowable.rethrow(e);
+      }
+    }
   }
 
 
@@ -243,13 +294,15 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
     applicationName = context.getValue(Context.DAGContext.APPLICATION_NAME);
     consumerWrapper.create(this);
     metrics = new KafkaMetrics(metricsRefreshInterval);
+    windowDataManager.setup(context);
+    operatorId = context.getId();
   }
 
 
   @Override
   public void teardown()
   {
-
+    windowDataManager.teardown();
   }
 
   private void initPartitioner()
@@ -325,7 +378,7 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
     }
     if (e != null) {
       logger.warn("Exceptions in committing offsets {} : {} ",
-        Joiner.on(';').withKeyValueSeparator("=").join(map), e);
+          Joiner.on(';').withKeyValueSeparator("=").join(map), e);
     }
   }
 
@@ -339,6 +392,11 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
     return assignment;
   }
 
+  private boolean isIdempotent()
+  {
+    return windowDataManager != null && !(windowDataManager instanceof WindowDataManager.NoopWindowDataManager);
+  }
+
   //---------------------------------------------setters and getters----------------------------------------
   public void setInitialPartitionCount(int partitionCount)
   {
@@ -525,6 +583,16 @@ public abstract class AbstractKafkaInputOperator implements InputOperator,
Opera
     return repartitionInterval;
   }
 
+  public void setWindowDataManager(WindowDataManager windowDataManager)
+  {
+    this.windowDataManager = windowDataManager;
+  }
+
+  public WindowDataManager getWindowDataManager()
+  {
+    return windowDataManager;
+  }
+
   /**
    * @omitFromUI
    * @return current checkpointed offsets

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
----------------------------------------------------------------------
diff --git a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
index 7a1211a..adc9540 100644
--- a/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
+++ b/kafka/src/main/java/org/apache/apex/malhar/kafka/KafkaConsumerWrapper.java
@@ -22,6 +22,7 @@ package org.apache.apex.malhar.kafka;
 import java.io.Closeable;
 import java.io.IOException;
 import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
@@ -51,8 +52,11 @@ import org.apache.kafka.common.errors.WakeupException;
 import org.apache.kafka.common.serialization.ByteArrayDeserializer;
 
 import com.google.common.base.Joiner;
+import com.google.common.collect.Iterables;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
+import com.datatorrent.netlet.util.DTThrowable;
+
 /**
  * This is the wrapper class for new Kafka consumer API
  *
@@ -83,6 +87,8 @@ public class KafkaConsumerWrapper implements Closeable
 
   private final Map<String, Map<TopicPartition, OffsetAndMetadata>> offsetsToCommit
= new HashMap<>();
 
+  private boolean waitForReplay = false;
+
   /**
    *
    * Only put the offset needs to be committed in the ConsumerThread.offsetToCommit map
@@ -109,6 +115,52 @@ public class KafkaConsumerWrapper implements Closeable
 
   }
 
+  public void emitImmediately(Map<AbstractKafkaPartitioner.PartitionMeta, Pair<Long,
Long>> windowData)
+  {
+    for (Map.Entry<AbstractKafkaPartitioner.PartitionMeta, Pair<Long, Long>>
windowEntry : windowData.entrySet()) {
+      AbstractKafkaPartitioner.PartitionMeta meta = windowEntry.getKey();
+      Pair<Long, Long> replayOffsetSize = windowEntry.getValue();
+      KafkaConsumer<byte[], byte[]> kc = consumers.get(meta.getCluster());
+      if (kc == null && kc.assignment().contains(windowEntry.getKey().getTopicPartition()))
{
+        throw new RuntimeException("Coundn't find consumer to replay the message PartitionMeta
: " + meta);
+      }
+      //pause other partition
+      for (TopicPartition tp : kc.assignment()) {
+        if (meta.getTopicPartition().equals(tp)) {
+          kc.resume(tp);
+        } else {
+          kc.pause(tp);
+        }
+      }
+      // set the offset to window start offset
+      kc.seek(meta.getTopicPartition(), replayOffsetSize.getLeft());
+      long windowCount = replayOffsetSize.getRight();
+      while (windowCount > 0) {
+        try {
+          ConsumerRecords<byte[], byte[]> records = kc.poll(ownerOperator.getConsumerTimeout());
+          for (Iterator<ConsumerRecord<byte[], byte[]>> cri = records.iterator();
cri.hasNext() && windowCount > 0;) {
+            ownerOperator.emitTuple(meta.getCluster(), cri.next());
+            windowCount--;
+          }
+        } catch (NoOffsetForPartitionException e) {
+          throw new RuntimeException("Couldn't replay the offset", e);
+        }
+      }
+      // set the offset after window
+      kc.seek(meta.getTopicPartition(), replayOffsetSize.getLeft() + replayOffsetSize.getRight());
+    }
+
+    // resume all topics
+    for (KafkaConsumer<byte[], byte[]> kc : consumers.values()) {
+      kc.resume(Iterables.toArray(kc.assignment(), TopicPartition.class));
+    }
+
+  }
+
+  public void afterReplay()
+  {
+    waitForReplay = false;
+  }
 
   static final class ConsumerThread implements Runnable
   {
@@ -137,6 +189,10 @@ public class KafkaConsumerWrapper implements Closeable
 
 
         while (wrapper.isAlive) {
+          if (wrapper.waitForReplay) {
+            Thread.sleep(100);
+            continue;
+          }
           if (!this.offsetToCommit.isEmpty()) {
             // in each fetch cycle commit the offset if needed
             if (logger.isDebugEnabled()) {
@@ -170,6 +226,8 @@ public class KafkaConsumerWrapper implements Closeable
         }
       } catch (WakeupException we) {
         logger.info("The consumer is being stopped");
+      } catch (InterruptedException e) {
+        DTThrowable.rethrow(e);
       } finally {
         consumer.close();
       }
@@ -194,11 +252,11 @@ public class KafkaConsumerWrapper implements Closeable
   /**
    * This method is called in the activate method of the operator
    */
-  public void start()
+  public void start(boolean waitForReplay)
   {
+    this.waitForReplay = waitForReplay;
     isAlive = true;
 
-
     // thread to consume the kafka data
     // create thread pool for consumer threads
     kafkaConsumerExecutor = Executors.newCachedThreadPool(

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
index d055555..9c5d5dc 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
@@ -18,30 +18,44 @@
  */
 package org.apache.apex.malhar.kafka;
 
+import java.io.File;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 import org.slf4j.LoggerFactory;
 
+import org.apache.commons.io.FileUtils;
+
 import com.datatorrent.api.Context;
 import com.datatorrent.api.DAG;
 import com.datatorrent.api.DAG.Locality;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.LocalMode;
 import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.WindowDataManager;
+import com.datatorrent.stram.StramLocalCluster;
 
 /**
  * A bunch of test to verify the input operator will be automatically partitioned per kafka
partition This test is launching its
  * own Kafka cluster.
  */
+@Ignore
 @RunWith(Parameterized.class)
 public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 {
@@ -50,6 +64,10 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   private String partition = null;
 
+  private String testName = "";
+
+  public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName()
+ File.separator;
+
   @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition:
{2}")
   public static Collection<Object[]> testScenario()
   {
@@ -64,6 +82,19 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     });
   }
 
+  @Before
+  public void before()
+  {
+    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
+    tupleCollection.clear();
+    testName = TEST_TOPIC + testCounter++;
+    createTopic(0, testName);
+    if (hasMultiCluster) {
+      createTopic(1, testName);
+    }
+
+  }
+
   public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String
partition)
   {
     // This class want to initialize several kafka brokers for multiple partitions
@@ -76,6 +107,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
 
   private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
   private static List<String> tupleCollection = new LinkedList<>();
+  private static Map<String, Set<String>> tupleCollectedInWindow = new HashMap<>();
   private static CountDownLatch latch;
   private static boolean hasFailure = false;
   private static int failureTrigger = 3000;
@@ -88,11 +120,58 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
    */
   public static class CollectorModule extends BaseOperator
   {
-    public final transient CollectorInputPort inputPort = new CollectorInputPort();
+
+    public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
+
+    long currentWindowId;
+
+    long operatorId;
+
+    boolean isIdempotentTest = false;
+
+    transient Set<String> windowTupleCollector = new HashSet<>();
+
+    @Override
+    public void setup(Context.OperatorContext context)
+    {
+      super.setup(context);
+      operatorId = context.getId();
+    }
+
+    @Override
+    public void beginWindow(long windowId)
+    {
+      super.beginWindow(windowId);
+      currentWindowId = windowId;
+    }
+
+    @Override
+    public void endWindow()
+    {
+      super.endWindow();
+      if (isIdempotentTest) {
+        String key = operatorId + "," + currentWindowId;
+        Set<String> msgsInWin = tupleCollectedInWindow.get(key);
+        if (msgsInWin!=null) {
+          Assert.assertEquals("replay messages should be exactly same as previous window",
msgsInWin, windowTupleCollector);
+        } else {
+          Set<String> newSet = new HashSet<>();
+          newSet.addAll(windowTupleCollector);
+          tupleCollectedInWindow.put(key, newSet);
+        }
+      }
+      windowTupleCollector.clear();
+    }
+
   }
 
   public static class CollectorInputPort extends DefaultInputPort<byte[]>
   {
+    CollectorModule ownerNode;
+
+    CollectorInputPort(CollectorModule node) {
+      this.ownerNode = node;
+    }
 
     @Override
     public void process(byte[] bt)
@@ -110,15 +189,11 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
         return;
       }
       tupleCollection.add(tuple);
-    }
-
-    @Override
-    public void setConnected(boolean flag)
-    {
-      if (flag) {
-        tupleCollection.clear();
+      if (ownerNode.isIdempotentTest) {
+        ownerNode.windowTupleCollector.add(tuple);
       }
     }
+
   }
 
   /**
@@ -132,21 +207,28 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
    * @throws Exception
    */
   @Test
-  public void testPartitionableInputOperator() throws Exception
+  public void testInputOperator() throws Exception
   {
     hasFailure = false;
-    testInputOperator(false);
+    testInputOperator(false, false);
   }
 
 
   @Test
-  public void testPartitionableInputOperatorWithFailure() throws Exception
+  public void testInputOperatorWithFailure() throws Exception
+  {
+    hasFailure = true;
+    testInputOperator(true, false);
+  }
+
+  @Test
+  public void testIdempotentInputOperatorWithFailure() throws Exception
   {
     hasFailure = true;
-    testInputOperator(true);
+    testInputOperator(true, true);
   }
 
-  public void testInputOperator(boolean hasFailure) throws Exception
+  public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
   {
 
     // each broker should get a END_TUPLE message
@@ -155,7 +237,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     int totalCount = 10000;
 
     // Start producer
-    KafkaTestProducer p = new KafkaTestProducer(TEST_TOPIC, hasMultiPartition, hasMultiCluster);
+    KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
     p.setSendCount(totalCount);
     Thread t = new Thread(p);
     t.start();
@@ -168,32 +250,47 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
     KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
     node.setInitialPartitionCount(1);
     // set topic
-    node.setTopics(TEST_TOPIC);
+    node.setTopics(testName);
     node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
     node.setClusters(getClusterConfig());
     node.setStrategy(partition);
+    if(idempotent) {
+      node.setWindowDataManager(new WindowDataManager.FSWindowDataManager());
+    }
+
 
     // Create Test tuple collector
-    CollectorModule collector = dag.addOperator("TestMessageCollector", new CollectorModule());
+    CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
+    collector.isIdempotentTest = idempotent;
 
     // Connect ports
     dag.addStream("Kafka message", node.outputPort, collector.inputPort).setLocality(Locality.CONTAINER_LOCAL);
 
-    // Create local cluster
-    final LocalMode.Controller lc = lma.getController();
-    lc.setHeartbeatMonitoringEnabled(false);
 
     if (hasFailure) {
       setupHasFailureTest(node, dag);
     }
+
+    // Create local cluster
+    final LocalMode.Controller lc = lma.getController();
+    lc.setHeartbeatMonitoringEnabled(false);
+
     lc.runAsync();
 
     // Wait 30s for consumer finish consuming all the messages
     boolean notTimeout = latch.await(40000, TimeUnit.MILLISECONDS);
+    Collections.sort(tupleCollection, new Comparator<String>()
+    {
+      @Override
+      public int compare(String o1, String o2)
+      {
+        return Integer.parseInt(o1.split("_")[1]) - Integer.parseInt(o2.split("_")[1]);
+      }
+    });
     Assert.assertTrue("TIMEOUT: 40s Collected " + tupleCollection, notTimeout);
 
     // Check results
-    Assert.assertEquals("Tuple count", totalCount, tupleCollection.size());
+    Assert.assertTrue("Collected tuples " + tupleCollection + " Tuple count is not expected",
totalCount <=+ tupleCollection.size());
     logger.debug(String.format("Number of emitted tuples: %d", tupleCollection.size()));
 
     t.join();
@@ -205,7 +302,7 @@ public class KafkaInputOperatorTest extends KafkaOperatorTestBase
   {
     operator.setHoldingBufferSize(5000);
     dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
-    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent("target/ck",
new Configuration()));
+    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(APPLICATION_PATH
+ "failureck", new Configuration()));
     operator.setMaxTuplesPerWindow(500);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/37f81309/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
index 7085348..a05fd9b 100644
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
@@ -24,8 +24,8 @@ import java.net.InetSocketAddress;
 import java.net.ServerSocket;
 import java.util.Properties;
 
-import org.junit.After;
-import org.junit.Before;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
 import org.slf4j.LoggerFactory;
 
 import org.apache.commons.io.FileUtils;
@@ -51,6 +51,7 @@ public class KafkaOperatorTestBase
   public static final int[] TEST_ZOOKEEPER_PORT;
   public static final int[][] TEST_KAFKA_BROKER_PORT;
   public static final String TEST_TOPIC = "testtopic";
+  public static int testCounter = 0;
 
   // get available ports
   static {
@@ -81,27 +82,27 @@ public class KafkaOperatorTestBase
   // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
 
   // multiple brokers in multiple cluster
-  private KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
+  private static KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
 
   // multiple cluster
-  private ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
+  private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
 
-  private ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
+  private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
 
-  public String baseDir = "target";
+  public static String baseDir = "target";
 
-  private final String zkBaseDir = "zookeeper-server-data";
-  private final String kafkaBaseDir = "kafka-server-data";
-  private final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2"
};
-  private final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1",
"kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2"
} };
+  private static final String zkBaseDir = "zookeeper-server-data";
+  private static final String kafkaBaseDir = "kafka-server-data";
+  private static final String[] zkdir = new String[] { "zookeeper-server-data/1", "zookeeper-server-data/2"
};
+  private static final String[][] kafkadir = new String[][] { new String[] { "kafka-server-data/1/1",
"kafka-server-data/1/2" }, new String[] { "kafka-server-data/2/1", "kafka-server-data/2/2"
} };
   protected boolean hasMultiPartition = false;
   protected boolean hasMultiCluster = false;
 
-  public void startZookeeper(final int clusterId)
+  public static void startZookeeper(final int clusterId)
   {
     try {
 
-      int numConnections = 10;
+      int numConnections = 100;
       int tickTime = 2000;
       File dir = new File(baseDir, zkdir[clusterId]);
 
@@ -117,7 +118,7 @@ public class KafkaOperatorTestBase
     }
   }
 
-  public void stopZookeeper()
+  public static void stopZookeeper()
   {
     for (ZooKeeperServer zs : zkServer) {
       if (zs != null) {
@@ -135,44 +136,37 @@ public class KafkaOperatorTestBase
     zkFactory =  new ServerCnxnFactory[2];
   }
 
-  public void startKafkaServer(int clusterid, int brokerid)
+  public static void startKafkaServer(int clusterid, int brokerid)
   {
     Properties props = new Properties();
-    props.setProperty("broker.id", "" + brokerid);
+    props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
     props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
     props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
     props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
     props.setProperty("default.replication.factor", "1");
     // set this to 50000 to boost the performance so most test data are in memory before
flush to disk
     props.setProperty("log.flush.interval.messages", "50000");
-    if (hasMultiPartition) {
-      props.setProperty("num.partitions", "2");
-    } else {
-      props.setProperty("num.partitions", "1");
-    }
 
     broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
     broker[clusterid][brokerid].startup();
 
   }
 
-  public void startKafkaServer()
+  public static void startKafkaServer()
   {
 
     FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
-    boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
-    for (int i = 0; i < startable.length; i++) {
-      for (int j = 0; j < startable[i].length; j++) {
-        if (startable[i][j])
-          startKafkaServer(i, j);
-      }
-    }
+    //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
+    startKafkaServer(0, 0);
+    startKafkaServer(0, 1);
+    startKafkaServer(1, 0);
+    startKafkaServer(1, 1);
 
     // startup is asynch operation. wait 2 sec for server to startup
 
   }
 
-  public void stopKafkaServer()
+  public static void stopKafkaServer()
   {
     for (int i = 0; i < broker.length; i++) {
       for (int j = 0; j < broker[i].length; j++) {
@@ -185,28 +179,22 @@ public class KafkaOperatorTestBase
     }
   }
 
-  @Before
-  public void beforeTest()
+  @BeforeClass
+  public static void beforeTest()
   {
     try {
       startZookeeper();
       startKafkaServer();
-      createTopic(0, TEST_TOPIC);
-      if (hasMultiCluster) {
-        createTopic(1, TEST_TOPIC);
-      }
     } catch (java.nio.channels.CancelledKeyException ex) {
       logger.debug("LSHIL {}", ex.getLocalizedMessage());
     }
   }
 
-  public void startZookeeper()
+  public static void startZookeeper()
   {
     FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
     startZookeeper(0);
-    if (hasMultiCluster) {
-      startZookeeper(1);
-    }
+    startZookeeper(1);
   }
 
   public void createTopic(int clusterid, String topicName)
@@ -229,19 +217,10 @@ public class KafkaOperatorTestBase
     ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000,
false);
     TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
 
-    // Right now, there is no programmatic synchronized way to create the topic. have to
wait 2 sec to make sure the
-    // topic is created
-    // So the tests will not hit any bizarre failure
-    try {
-      Thread.sleep(5000);
-      zu.close();
-    } catch (InterruptedException e) {
-      e.printStackTrace();
-    }
   }
 
-  @After
-  public void afterTest()
+  @AfterClass
+  public static void afterTest()
   {
     try {
       stopKafkaServer();


Mime
View raw message