apex-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From t..@apache.org
Subject [1/5] apex-malhar git commit: APEXMALHAR-2459 1)Refactor the existing Kafka Input Operator. 2)Added the support of KafkaInputOperator using 0.10 consumer API
Date Sun, 02 Jul 2017 19:04:54 GMT
Repository: apex-malhar
Updated Branches:
  refs/heads/master 4df68580e -> b42d8e741


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
deleted file mode 100644
index e9fcc36..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/EmbeddedKafka.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.ServerSocket;
-import java.nio.charset.StandardCharsets;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Properties;
-
-import org.I0Itec.zkclient.ZkClient;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-
-import com.google.common.base.Throwables;
-
-import kafka.admin.AdminUtils;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServer;
-import kafka.utils.MockTime;
-import kafka.utils.TestUtils;
-import kafka.utils.Time;
-import kafka.utils.ZKStringSerializer$;
-import kafka.utils.ZkUtils;
-import kafka.zk.EmbeddedZookeeper;
-
-public class EmbeddedKafka
-{
-  private static final String KAFKA_PATH = "/tmp/kafka-test";
-
-  private ZkClient zkClient;
-  private ZkUtils zkUtils;
-  private String BROKERHOST = "127.0.0.1";
-  private String BROKERPORT = "9092";
-  private EmbeddedZookeeper zkServer;
-  private KafkaServer kafkaServer;
-
-  public String getBroker()
-  {
-    return BROKERHOST + ":" + BROKERPORT;
-  }
-
-  public void start() throws IOException
-  {
-    // Find port
-    try {
-      ServerSocket serverSocket = new ServerSocket(0);
-      BROKERPORT = Integer.toString(serverSocket.getLocalPort());
-      serverSocket.close();
-    } catch (IOException e) {
-      throw Throwables.propagate(e);
-    }
-
-    // Setup Zookeeper
-    zkServer = new EmbeddedZookeeper();
-    String zkConnect = BROKERHOST + ":" + zkServer.port();
-    zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
-    zkUtils = ZkUtils.apply(zkClient, false);
-
-    // Setup brokers
-    cleanupDir();
-    Properties props = new Properties();
-    props.setProperty("zookeeper.connect", zkConnect);
-    props.setProperty("broker.id", "0");
-    props.setProperty("log.dirs", KAFKA_PATH);
-    props.setProperty("listeners", "PLAINTEXT://" + BROKERHOST + ":" + BROKERPORT);
-    KafkaConfig config = new KafkaConfig(props);
-    Time mock = new MockTime();
-    kafkaServer = TestUtils.createServer(config, mock);
-  }
-
-  public void stop() throws IOException
-  {
-    kafkaServer.shutdown();
-    zkClient.close();
-    zkServer.shutdown();
-    cleanupDir();
-  }
-
-  private void cleanupDir() throws IOException
-  {
-    FileUtils.deleteDirectory(new File(KAFKA_PATH));
-  }
-
-  public void createTopic(String topic)
-  {
-    AdminUtils.createTopic(zkUtils, topic, 1, 1, new Properties());
-    List<KafkaServer> servers = new ArrayList<KafkaServer>();
-    servers.add(kafkaServer);
-    TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
-  }
-
-  public void publish(String topic, List<String> messages)
-  {
-    Properties producerProps = new Properties();
-    producerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-    producerProps.setProperty("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
-    producerProps.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
-
-    try (KafkaProducer<Integer, byte[]> producer = new KafkaProducer<>(producerProps)) {
-      for (String message : messages) {
-        ProducerRecord<Integer, byte[]> data = new ProducerRecord<>(topic, message.getBytes(StandardCharsets.UTF_8));
-        producer.send(data);
-      }
-    }
-
-    List<KafkaServer> servers = new ArrayList<KafkaServer>();
-    servers.add(kafkaServer);
-    TestUtils.waitUntilMetadataIsPropagated(scala.collection.JavaConversions.asScalaBuffer(servers), topic, 0, 30000);
-  }
-
-  public List<String> consume(String topic, int timeout)
-  {
-    return consume(topic, timeout, true);
-  }
-
-  public List<String> consume(String topic, int timeout, boolean earliest)
-  {
-    Properties consumerProps = new Properties();
-    consumerProps.setProperty("bootstrap.servers", BROKERHOST + ":" + BROKERPORT);
-    consumerProps.setProperty("group.id", "group0");
-    consumerProps.setProperty("client.id", "consumer0");
-    consumerProps.setProperty("key.deserializer", "org.apache.kafka.common.serialization.IntegerDeserializer");
-    consumerProps.setProperty("value.deserializer", "org.apache.kafka.common.serialization.ByteArrayDeserializer");
-    // to make sure the consumer starts from the beginning of the topic
-    consumerProps.put("auto.offset.reset", earliest ? "earliest" : "latest");
-    KafkaConsumer<Integer, byte[]> consumer = new KafkaConsumer<>(consumerProps);
-    consumer.subscribe(Arrays.asList(topic));
-
-    List<String> messages = new ArrayList<>();
-
-    ConsumerRecords<Integer, byte[]> records = consumer.poll(timeout);
-    for (ConsumerRecord<Integer, byte[]> record : records) {
-      messages.add(new String(record.value()));
-    }
-
-    consumer.close();
-
-    return messages;
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
deleted file mode 100644
index 81d7ade..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaConsumerPropertiesTest.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.text.ParseException;
-import java.util.Properties;
-
-import org.junit.Assert;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.Description;
-
-import org.apache.kafka.common.KafkaException;
-
-import com.datatorrent.api.Context;
-
-public class KafkaConsumerPropertiesTest
-{
-
-  KafkaSinglePortInputOperator kafkaInput = new KafkaSinglePortInputOperator();
-  @Rule
-  public Watcher watcher = new Watcher();
-
-  public class Watcher extends TestWatcher
-  {
-    Context.OperatorContext context;
-
-    @Override
-    protected void starting(Description description)
-    {
-      super.starting(description);
-      kafkaInput.setClusters("localhost:8087");
-      kafkaInput.setInitialPartitionCount(1);
-      kafkaInput.setTopics("apexTest");
-      kafkaInput.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
-      Properties prop = new Properties();
-      prop.setProperty("security.protocol", "SASL_PLAINTEXT");
-      prop.setProperty("sasl.kerberos.service.name", "kafka");
-      kafkaInput.setConsumerProps(prop);
-    }
-
-    @Override
-    protected void finished(Description description)
-    {
-      super.finished(description);
-    }
-  }
-
-  @Test
-  public void TestConsumerProperties() throws ParseException
-  {
-    //Added test on this check to ensure consumer properties are set and not reset between.
-    if (null != kafkaInput.getConsumerProps().get("security.protocol")) {
-      try {
-        kafkaInput.definePartitions(null, null);
-      } catch (KafkaException e) {
-        //Ensures the  properties of the consumer are set/not reset.
-        Assert.assertEquals(
-            "java.lang.IllegalArgumentException: You must pass java.security.auth.login.config in "
-            + "secure mode.", e.getCause().getMessage());
-      }
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
deleted file mode 100644
index abf3a5b..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaHelper.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.apex.malhar.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.kafka.common.serialization.Serializer;
-
-public class KafkaHelper implements Serializer<KafkaOutputOperatorTest.Person>,
-    Deserializer<KafkaOutputOperatorTest.Person>
-{
-  @Override
-  public KafkaOutputOperatorTest.Person deserialize(String s, byte[] bytes)
-  {
-    ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
-    int nameLength = byteBuffer.getInt();
-    byte[] name = new byte[nameLength];
-
-    byteBuffer.get(name, 0, nameLength);
-
-    return new KafkaOutputOperatorTest.Person(new String(name), byteBuffer.getInt());
-  }
-
-  @Override
-  public byte[] serialize(String s, KafkaOutputOperatorTest.Person person)
-  {
-    byte[] name = person.name.getBytes();
-
-    ByteBuffer byteBuffer = ByteBuffer.allocate(name.length + 4 + 4);
-
-    byteBuffer.putInt(name.length);
-    byteBuffer.put(name);
-    byteBuffer.putInt(person.age);
-
-    return byteBuffer.array();
-  }
-
-  @Override
-  public void configure(Map<String, ?> map, boolean b)
-  {
-  }
-
-  @Override
-  public void close()
-  {
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
deleted file mode 100644
index f16c8f4..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaInputOperatorTest.java
+++ /dev/null
@@ -1,399 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TestWatcher;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.slf4j.LoggerFactory;
-
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-
-import com.google.common.collect.Lists;
-
-import com.datatorrent.api.Context;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DAG.Locality;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.stram.StramLocalCluster;
-
-/**
- * A bunch of test to verify the input operator will be automatically partitioned
- * per kafka partition This test is launching its
- * own Kafka cluster.
- */
-@RunWith(Parameterized.class)
-public class KafkaInputOperatorTest extends KafkaOperatorTestBase
-{
-
-  private int totalBrokers = 0;
-
-  private String partition = null;
-
-  private String testName = "";
-
-  public static String APPLICATION_PATH = baseDir + File.separator + StramLocalCluster.class.getName() + File.separator;
-
-  public class KafkaTestInfo extends TestWatcher
-  {
-    public org.junit.runner.Description desc;
-
-    public String getDir()
-    {
-      String methodName = desc.getMethodName();
-      String className = desc.getClassName();
-      return "target/" + className + "/" + methodName + "/" + testName;
-    }
-
-    @Override
-    protected void starting(org.junit.runner.Description description)
-    {
-      this.desc = description;
-    }
-  }
-
-  @Rule
-  public final KafkaTestInfo testInfo = new KafkaTestInfo();
-
-  @Parameterized.Parameters(name = "multi-cluster: {0}, multi-partition: {1}, partition: {2}")
-  public static Collection<Object[]> testScenario()
-  {
-    return Arrays.asList(new Object[][]{
-      {true, false, "one_to_one"},// multi cluster with single partition
-      {true, false, "one_to_many"},
-      {true, true, "one_to_one"},// multi cluster with multi partitions
-      {true, true, "one_to_many"},
-      {false, true, "one_to_one"}, // single cluster with multi partitions
-      {false, true, "one_to_many"},
-      {false, false, "one_to_one"}, // single cluster with single partitions
-      {false, false, "one_to_many"}
-    });
-  }
-
-  @Before
-  public void before()
-  {
-    testName = TEST_TOPIC + testCounter++;
-    logger.info("before() test case: {}", testName);
-    tupleCollection.clear();
-    //reset count for next new test case
-    k = 0;
-
-    createTopic(0, testName);
-    if (hasMultiCluster) {
-      createTopic(1, testName);
-    }
-
-  }
-
-  public KafkaInputOperatorTest(boolean hasMultiCluster, boolean hasMultiPartition, String partition)
-  {
-    // This class want to initialize several kafka brokers for multiple partitions
-    this.hasMultiCluster = hasMultiCluster;
-    this.hasMultiPartition = hasMultiPartition;
-    int cluster = 1 + (hasMultiCluster ? 1 : 0);
-    totalBrokers = (1 + (hasMultiPartition ? 1 : 0)) * cluster;
-    this.partition = partition;
-  }
-
-  private static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaInputOperatorTest.class);
-  private static List<String> tupleCollection = new LinkedList<>();
-
-  /**
-   * whether countDown latch count all tuples or just END_TUPLE
-   */
-  private static final boolean countDownAll = false;
-  private static final int scale = 2;
-  private static final int totalCount = 10 * scale;
-  private static final int failureTrigger = 3 * scale;
-  private static final int tuplesPerWindow = 5 * scale;
-  private static final int waitTime = 60000 + 300 * scale;
-
-  //This latch was used to count the END_TUPLE, but the order of tuple can't be guaranteed,
-  //so, count valid tuple instead.
-  private static CountDownLatch latch;
-  private static boolean hasFailure = false;
-  private static int k = 0;
-  private static Thread monitorThread;
-
-  /**
-   * Test Operator to collect tuples from KafkaSingleInputStringOperator.
-   *
-   * @param
-   */
-  public static class CollectorModule extends BaseOperator
-  {
-    public final transient DefaultInputPort<byte[]> inputPort = new DefaultInputPort<byte[]>()
-    {
-      @Override
-      public void process(byte[] bt)
-      {
-        processTuple(bt);
-      }
-    };
-
-    long currentWindowId;
-
-    long operatorId;
-
-    boolean isIdempotentTest = false;
-
-    transient List<String> windowTupleCollector = Lists.newArrayList();
-    private transient Map<String, List<String>> tupleCollectedInWindow = new HashMap<>();
-    private int endTuples = 0;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      operatorId = context.getId();
-    }
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-      super.beginWindow(windowId);
-      currentWindowId = windowId;
-      windowTupleCollector.clear();
-      endTuples = 0;
-    }
-
-    public void processTuple(byte[] bt)
-    {
-      String tuple = new String(bt);
-      if (hasFailure && k++ == failureTrigger) {
-        //you can only kill yourself once
-        hasFailure = false;
-        throw new RuntimeException();
-      }
-      if (tuple.startsWith(KafkaOperatorTestBase.END_TUPLE)) {
-        endTuples++;
-      }
-
-      windowTupleCollector.add(tuple);
-    }
-
-    @Override
-    public void endWindow()
-    {
-      super.endWindow();
-      if (isIdempotentTest) {
-        String key = operatorId + "," + currentWindowId;
-        List<String> msgsInWin = tupleCollectedInWindow.get(key);
-        if (msgsInWin != null) {
-          Assert.assertEquals(
-              "replay messages should be exactly same as previous window", msgsInWin, windowTupleCollector);
-        } else {
-          List<String> newList = Lists.newArrayList();
-          newList.addAll(windowTupleCollector);
-          tupleCollectedInWindow.put(key, newList);
-        }
-      }
-
-      //discard the tuples of this window if except happened
-      int tupleSize = windowTupleCollector.size();
-      tupleCollection.addAll(windowTupleCollector);
-
-      int countDownTupleSize = countDownAll ? tupleSize : endTuples;
-
-      if (latch != null) {
-        Assert.assertTrue(
-            "received END_TUPLES more than expected.", latch.getCount() >= countDownTupleSize);
-        while (countDownTupleSize > 0) {
-          latch.countDown();
-          --countDownTupleSize;
-        }
-        if (latch.getCount() == 0) {
-          /**
-           * The time before countDown() and the shutdown() of the application
-           * will cause fatal error:
-           * "Catastrophic Error: Invalid State - the operator blocked forever!"
-           * as the activeQueues could be cleared but alive haven't changed yet.
-           * throw the ShutdownException to let the engine shutdown;
-           */
-          try {
-            throw new ShutdownException();
-            //lc.shutdown();
-          } finally {
-            /**
-             * interrupt the engine thread, let it wake from sleep and handle
-             * the shutdown at this time, all payload should be handled. so it
-             * should be ok to interrupt
-             */
-            monitorThread.interrupt();
-          }
-        }
-      }
-    }
-
-  }
-
-  /**
-   * Test AbstractKafkaSinglePortInputOperator (i.e. an input adapter for Kafka, aka consumer). This module receives
-   * data from an outside test generator through Kafka message bus and feed that data into Malhar streaming platform.
-   *
-   * [Generate message and send that to Kafka message bus] ==> [Receive that message through Kafka input adapter(i.e.
-   * consumer) and send using emitTuples() interface on output port]
-   *
-   *
-   * @throws Exception
-   */
-  @Test
-  public void testInputOperator() throws Exception
-  {
-    hasFailure = false;
-    testInputOperator(false, false);
-  }
-
-  @Test
-  public void testInputOperatorWithFailure() throws Exception
-  {
-    hasFailure = true;
-    testInputOperator(true, false);
-  }
-
-  @Test
-  public void testIdempotentInputOperatorWithFailure() throws Exception
-  {
-    hasFailure = true;
-    testInputOperator(true, true);
-  }
-
-  public void testInputOperator(boolean hasFailure, boolean idempotent) throws Exception
-  {
-    // each broker should get a END_TUPLE message
-    latch = new CountDownLatch(countDownAll ? totalCount + totalBrokers : totalBrokers);
-
-    logger.info(
-        "Test Case: name: {}; totalBrokers: {}; hasFailure: {}; hasMultiCluster: {};" +
-        " hasMultiPartition: {}, partition: {}",
-        testName, totalBrokers, hasFailure, hasMultiCluster, hasMultiPartition, partition);
-
-    // Start producer
-    KafkaTestProducer p = new KafkaTestProducer(testName, hasMultiPartition, hasMultiCluster);
-    p.setSendCount(totalCount);
-    Thread t = new Thread(p);
-    t.start();
-
-    int expectedReceiveCount = totalCount + totalBrokers;
-
-    // Create DAG for testing.
-    LocalMode lma = LocalMode.newInstance();
-    DAG dag = lma.getDAG();
-
-    // Create KafkaSinglePortStringInputOperator
-    KafkaSinglePortInputOperator node = dag.addOperator(
-        "Kafka input" + testName, KafkaSinglePortInputOperator.class);
-    node.setInitialPartitionCount(1);
-    // set topic
-    node.setTopics(testName);
-    node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
-    node.setClusters(getClusterConfig());
-    node.setStrategy(partition);
-    if (idempotent) {
-      node.setWindowDataManager(new FSWindowDataManager());
-    }
-
-    // Create Test tuple collector
-    CollectorModule collector = dag.addOperator("TestMessageCollector", CollectorModule.class);
-    collector.isIdempotentTest = idempotent;
-
-    // Connect ports
-    dag.addStream("Kafka message" + testName, node.outputPort, collector.inputPort)
-        .setLocality(Locality.CONTAINER_LOCAL);
-
-    if (hasFailure) {
-      setupHasFailureTest(node, dag);
-    }
-
-    // Create local cluster
-    LocalMode.Controller lc = lma.getController();
-    lc.setHeartbeatMonitoringEnabled(false);
-
-    //let the Controller to run the inside another thread. It is almost same as call Controller.runAsync(),
-    //but Controller.runAsync() don't expose the thread which run it,
-    //so we don't know when the thread will be terminated.
-    //create this thread and then call join() to make sure the Controller shutdown completely.
-    monitorThread = new Thread((StramLocalCluster)lc, "master");
-    monitorThread.start();
-
-    boolean notTimeout = true;
-    try {
-      // Wait 60s for consumer finish consuming all the messages
-      notTimeout = latch.await(waitTime, TimeUnit.MILLISECONDS);
-      lc.shutdown();
-
-      //wait until control thread finished.
-      monitorThread.join();
-    } catch (Exception e) {
-      logger.warn(e.getMessage());
-    }
-
-    t.join();
-
-    if (!notTimeout || expectedReceiveCount != tupleCollection.size()) {
-      logger.info("Number of received/expected tuples: {}/{}, testName: {}, tuples: \n{}", tupleCollection.size(),
-          expectedReceiveCount, testName, tupleCollection);
-    }
-    Assert.assertTrue("TIMEOUT. testName: " + this.testName + "; Collected data: "
-        + tupleCollection, notTimeout);
-
-    // Check results
-    Assert.assertTrue("testName: " + testName + "; Collected tuple size: " + tupleCollection.size()
-        + "; Expected tuple size: " + expectedReceiveCount + "; data: \n" + tupleCollection,
-        expectedReceiveCount == tupleCollection.size());
-
-    logger.info("End of test case: {}", testName);
-  }
-
-  private void setupHasFailureTest(KafkaSinglePortInputOperator operator, DAG dag)
-  {
-    operator.setHoldingBufferSize(5000);
-    dag.setAttribute(Context.DAGContext.CHECKPOINT_WINDOW_COUNT, 1);
-    //dag.setAttribute(Context.OperatorContext.STORAGE_AGENT, new FSStorageAgent(
-    //  APPLICATION_PATH + "failureck", new Configuration()));
-    operator.setMaxTuplesPerWindow(tuplesPerWindow);
-  }
-
-  private String getClusterConfig()
-  {
-    String l = "localhost:";
-    return l + TEST_KAFKA_BROKER_PORT[0][0] +
-      (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
-      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
-      (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
deleted file mode 100644
index 3910546..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOperatorTestBase.java
+++ /dev/null
@@ -1,292 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.InetSocketAddress;
-import java.net.ServerSocket;
-import java.util.Properties;
-
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.slf4j.LoggerFactory;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.zookeeper.server.NIOServerCnxnFactory;
-import org.apache.zookeeper.server.ServerCnxnFactory;
-import org.apache.zookeeper.server.ZKDatabase;
-import org.apache.zookeeper.server.ZooKeeperServer;
-import org.apache.zookeeper.server.persistence.FileTxnSnapLog;
-
-import kafka.admin.TopicCommand;
-import kafka.server.KafkaConfig;
-import kafka.server.KafkaServerStartable;
-import kafka.utils.ZkUtils;
-
-/**
- * This is a base class setup/clean Kafka testing environment for all the input/output test If it's a multipartition
- * test, this class creates 2 kafka partitions
- */
-public class KafkaOperatorTestBase
-{
-
-  public static final String END_TUPLE = "END_TUPLE";
-  public static final int[] TEST_ZOOKEEPER_PORT;
-  public static final int[][] TEST_KAFKA_BROKER_PORT;
-  public static final String TEST_TOPIC = "testtopic";
-  public static int testCounter = 0;
-
-  // get available ports
-  static {
-    ServerSocket[] listeners = new ServerSocket[6];
-    int[] p = new int[6];
-
-    try {
-      for (int i = 0; i < 6; i++) {
-        listeners[i] = new ServerSocket(0);
-        p[i] = listeners[i].getLocalPort();
-      }
-
-      for (int i = 0; i < 6; i++) {
-        listeners[i].close();
-      }
-    } catch (IOException e) {
-      throw new RuntimeException(e);
-    }
-
-    TEST_ZOOKEEPER_PORT = new int[]{p[0], p[1]};
-    TEST_KAFKA_BROKER_PORT = new int[][]{
-      new int[]{p[2], p[3]},
-      new int[]{p[4], p[5]}
-    };
-  }
-
-  static final org.slf4j.Logger logger = LoggerFactory.getLogger(KafkaOperatorTestBase.class);
-  // since Kafka 0.8 use KafkaServerStatble instead of KafkaServer
-
-  // multiple brokers in multiple cluster
-  private static KafkaServerStartable[][] broker = new KafkaServerStartable[2][2];
-
-  // multiple cluster
-  private static ServerCnxnFactory[] zkFactory = new ServerCnxnFactory[2];
-
-  private static ZooKeeperServer[] zkServer = new ZooKeeperServer[2];
-
-  public static String baseDir = "target";
-
-  private static final String zkBaseDir = "zookeeper-server-data";
-  private static final String kafkaBaseDir = "kafka-server-data";
-  private static final String[] zkdir = new String[]{"zookeeper-server-data/1", "zookeeper-server-data/2"};
-  private static final String[][] kafkadir = new String[][]{
-      new String[]{"kafka-server-data/1/1", "kafka-server-data/1/2"},
-      new String[]{"kafka-server-data/2/1", "kafka-server-data/2/2"}};
-  protected boolean hasMultiPartition = false;
-  protected boolean hasMultiCluster = false;
-
-  public static void startZookeeper(final int clusterId)
-  {
-    try {
-
-      int numConnections = 100;
-      int tickTime = 2000;
-      File dir = new File(baseDir, zkdir[clusterId]);
-
-      zkServer[clusterId] = new TestZookeeperServer(dir, dir, tickTime);
-      zkFactory[clusterId] = new NIOServerCnxnFactory();
-      zkFactory[clusterId].configure(new InetSocketAddress(TEST_ZOOKEEPER_PORT[clusterId]), numConnections);
-
-      zkFactory[clusterId].startup(zkServer[clusterId]); // start the zookeeper server.
-      Thread.sleep(2000);
-      //kserver.startup();
-    } catch (Exception ex) {
-      logger.error(ex.getLocalizedMessage());
-    }
-  }
-
-  public static void stopZookeeper()
-  {
-    for (ZooKeeperServer zs : zkServer) {
-      if (zs != null) {
-        zs.shutdown();
-      }
-    }
-
-    for (ServerCnxnFactory zkf : zkFactory) {
-      if (zkf != null) {
-        zkf.closeAll();
-        zkf.shutdown();
-      }
-    }
-    zkServer = new ZooKeeperServer[2];
-    zkFactory = new ServerCnxnFactory[2];
-  }
-
-  public static void startKafkaServer(int clusterid, int brokerid)
-  {
-    Properties props = new Properties();
-    props.setProperty("broker.id", "" + clusterid * 10 + brokerid);
-    props.setProperty("log.dirs", new File(baseDir, kafkadir[clusterid][brokerid]).toString());
-    props.setProperty("zookeeper.connect", "localhost:" + TEST_ZOOKEEPER_PORT[clusterid]);
-    props.setProperty("port", "" + TEST_KAFKA_BROKER_PORT[clusterid][brokerid]);
-    props.setProperty("default.replication.factor", "1");
-    // set this to 50000 to boost the performance so most test data are in memory before flush to disk
-    props.setProperty("log.flush.interval.messages", "50000");
-
-    broker[clusterid][brokerid] = new KafkaServerStartable(new KafkaConfig(props));
-    broker[clusterid][brokerid].startup();
-
-  }
-
-  public static void startKafkaServer()
-  {
-
-    FileUtils.deleteQuietly(new File(baseDir, kafkaBaseDir));
-    //boolean[][] startable = new boolean[][] { new boolean[] { true, hasMultiPartition },
-    //  new boolean[] { hasMultiCluster, hasMultiCluster && hasMultiPartition } };
-    startKafkaServer(0, 0);
-    startKafkaServer(0, 1);
-    startKafkaServer(1, 0);
-    startKafkaServer(1, 1);
-
-    // startup is asynch operation. wait 2 sec for server to startup
-
-  }
-
-  public static void stopKafkaServer()
-  {
-    for (int i = 0; i < broker.length; i++) {
-      for (int j = 0; j < broker[i].length; j++) {
-        if (broker[i][j] != null) {
-          broker[i][j].shutdown();
-          broker[i][j].awaitShutdown();
-          broker[i][j] = null;
-        }
-      }
-    }
-  }
-
-  @BeforeClass
-  public static void beforeTest()
-  {
-    try {
-      startZookeeper();
-      startKafkaServer();
-    } catch (java.nio.channels.CancelledKeyException ex) {
-      logger.debug("LSHIL {}", ex.getLocalizedMessage());
-    }
-  }
-
-  public static void startZookeeper()
-  {
-    FileUtils.deleteQuietly(new File(baseDir, zkBaseDir));
-    startZookeeper(0);
-    startZookeeper(1);
-  }
-
-  public void createTopic(int clusterid, String topicName)
-  {
-    String[] args = new String[9];
-    args[0] = "--zookeeper";
-    args[1] = "localhost:" + TEST_ZOOKEEPER_PORT[clusterid];
-    args[2] = "--replication-factor";
-    args[3] = "1";
-    args[4] = "--partitions";
-    if (hasMultiPartition) {
-      args[5] = "2";
-    } else {
-      args[5] = "1";
-    }
-    args[6] = "--topic";
-    args[7] = topicName;
-    args[8] = "--create";
-
-    ZkUtils zu = ZkUtils.apply("localhost:" + TEST_ZOOKEEPER_PORT[clusterid], 30000, 30000, false);
-    TopicCommand.createTopic(zu, new TopicCommand.TopicCommandOptions(args));
-
-  }
-
-  @AfterClass
-  public static void afterTest()
-  {
-    try {
-      stopKafkaServer();
-      stopZookeeper();
-    } catch (Exception ex) {
-      logger.debug("LSHIL {}", ex.getLocalizedMessage());
-    }
-  }
-
-  public void setHasMultiPartition(boolean hasMultiPartition)
-  {
-    this.hasMultiPartition = hasMultiPartition;
-  }
-
-  public void setHasMultiCluster(boolean hasMultiCluster)
-  {
-    this.hasMultiCluster = hasMultiCluster;
-  }
-
-  public static class TestZookeeperServer extends ZooKeeperServer
-  {
-
-    public TestZookeeperServer()
-    {
-      super();
-      // TODO Auto-generated constructor stub
-    }
-
-    public TestZookeeperServer(File snapDir, File logDir, int tickTime) throws IOException
-    {
-      super(snapDir, logDir, tickTime);
-      // TODO Auto-generated constructor stub
-    }
-
-    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, DataTreeBuilder treeBuilder) throws IOException
-    {
-      super(txnLogFactory, treeBuilder);
-      // TODO Auto-generated constructor stub
-    }
-
-    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, DataTreeBuilder treeBuilder)
-        throws IOException
-    {
-      super(txnLogFactory, tickTime, treeBuilder);
-      // TODO Auto-generated constructor stub
-    }
-
-    public TestZookeeperServer(FileTxnSnapLog txnLogFactory, int tickTime, int minSessionTimeout,
-        int maxSessionTimeout, DataTreeBuilder treeBuilder, ZKDatabase zkDb)
-    {
-      super(txnLogFactory, tickTime, minSessionTimeout, maxSessionTimeout, treeBuilder, zkDb);
-      // TODO Auto-generated constructor stub
-    }
-
-    @Override
-    protected void registerJMX()
-    {
-    }
-
-    @Override
-    protected void unregisterJMX()
-    {
-    }
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
deleted file mode 100644
index 9a67024..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaOutputOperatorTest.java
+++ /dev/null
@@ -1,429 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Properties;
-
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import org.apache.apex.malhar.lib.wal.FSWindowDataManager;
-import org.apache.commons.io.FileUtils;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.producer.ProducerConfig;
-
-import com.datatorrent.api.Attribute;
-import com.datatorrent.api.Context;
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.api.DAG;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.LocalMode;
-import com.datatorrent.api.Operator;
-import com.datatorrent.common.util.BaseOperator;
-
-import static com.datatorrent.lib.helper.OperatorContextTestHelper.mockOperatorContext;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.GROUP_ID_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG;
-import static org.apache.kafka.clients.consumer.ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG;
-
-public class KafkaOutputOperatorTest extends KafkaOperatorTestBase
-{
-  String testName;
-  private static List<Person> tupleCollection = new LinkedList<>();
-  private final String VALUE_DESERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
-  private final String VALUE_SERIALIZER = "org.apache.apex.malhar.kafka.KafkaHelper";
-
-  public static String APPLICATION_PATH = baseDir + File.separator + "MyKafkaApp" + File.separator;
-
-  @Before
-  public void before()
-  {
-    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
-    testName = TEST_TOPIC + testCounter++;
-    createTopic(0, testName);
-    if (hasMultiCluster) {
-      createTopic(1, testName);
-    }
-  }
-
-  @After
-  public void after()
-  {
-    FileUtils.deleteQuietly(new File(APPLICATION_PATH));
-  }
-
-  @Test
-  public void testExactlyOnceWithFailure() throws Exception
-  {
-    List<Person> toKafka = GenerateList();
-
-    sendDataToKafka(true, toKafka, true, false);
-
-    List<Person> fromKafka = ReadFromKafka();
-
-    Assert.assertTrue("With Failure", compare(fromKafka, toKafka));
-  }
-
-  @Test
-  public void testExactlyOnceWithNoFailure() throws Exception
-  {
-    List<Person> toKafka = GenerateList();
-
-    sendDataToKafka(true, toKafka, false, false);
-
-    List<Person> fromKafka = ReadFromKafka();
-
-    Assert.assertTrue("With No Failure", compare(fromKafka, toKafka));
-  }
-
-  @Test
-  public void testExactlyOnceWithDifferentTuplesAfterRecovery() throws Exception
-  {
-    List<Person> toKafka = GenerateList();
-
-    try {
-      sendDataToKafka(true, toKafka, true, true);
-    } catch (RuntimeException ex) {
-
-      boolean expectedException = false;
-      if (ex.getMessage().contains("Violates")) {
-        expectedException = true;
-      }
-
-      Assert.assertTrue("Different tuples after recovery", expectedException);
-      return;
-    }
-
-    Assert.assertTrue("Wrong tuples during replay, should throw exception", false);
-  }
-
-  @Test
-  public void testKafkaOutput() throws Exception
-  {
-    List<Person> toKafka = GenerateList();
-
-    sendDataToKafka(false, toKafka, false, false);
-
-    List<Person> fromKafka = ReadFromKafka();
-
-    Assert.assertTrue("No failure", compare(fromKafka, toKafka));
-  }
-
-  @Test
-  public void testKafkaOutputWithFailure() throws Exception
-  {
-    List<Person> toKafka = GenerateList();
-
-    sendDataToKafka(false, toKafka, true, true);
-
-    List<Person> fromKafka = ReadFromKafka();
-
-    Assert.assertTrue("No failure", fromKafka.size() > toKafka.size());
-  }
-
-  private void sendDataToKafka(boolean exactlyOnce, List<Person> toKafka, boolean hasFailure,
-      boolean differentTuplesAfterRecovery) throws InterruptedException
-  {
-    Properties props = new Properties();
-    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, VALUE_SERIALIZER);
-    if (!exactlyOnce) {
-      props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_SERIALIZER);
-    }
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
-    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
-
-    Attribute.AttributeMap attributeMap = new Attribute.AttributeMap.DefaultAttributeMap();
-    attributeMap.put(Context.DAGContext.APPLICATION_NAME, "MyKafkaApp");
-    attributeMap.put(DAG.APPLICATION_PATH, APPLICATION_PATH);
-
-    OperatorContext operatorContext = mockOperatorContext(2, attributeMap);
-
-    cleanUp(operatorContext);
-
-    Operator kafkaOutput;
-    DefaultInputPort<Person> inputPort;
-
-    if (exactlyOnce) {
-      KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
-          ResetKafkaOutput(testName, props, operatorContext);
-      inputPort = kafkaOutputTemp.inputPort;
-      kafkaOutput = kafkaOutputTemp;
-    } else {
-      KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
-          ResetKafkaSimpleOutput(testName, props, operatorContext);
-      inputPort = kafkaOutputTemp.inputPort;
-      kafkaOutput = kafkaOutputTemp;
-    }
-
-    kafkaOutput.beginWindow(1);
-    inputPort.getSink().put(toKafka.get(0));
-    inputPort.getSink().put(toKafka.get(1));
-    inputPort.getSink().put(toKafka.get(2));
-    kafkaOutput.endWindow();
-    kafkaOutput.beginWindow(2);
-    inputPort.getSink().put(toKafka.get(3));
-    inputPort.getSink().put(toKafka.get(4));
-    inputPort.getSink().put(toKafka.get(5));
-    kafkaOutput.endWindow();
-    kafkaOutput.beginWindow(3);
-    inputPort.getSink().put(toKafka.get(6));
-    inputPort.getSink().put(toKafka.get(7));
-
-    if (hasFailure) {
-      if (exactlyOnce) {
-        KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutputTemp =
-            ResetKafkaOutput(testName, props, operatorContext);
-        inputPort = kafkaOutputTemp.inputPort;
-        kafkaOutput = kafkaOutputTemp;
-      } else {
-        KafkaSinglePortOutputOperator<String, Person> kafkaOutputTemp =
-            ResetKafkaSimpleOutput(testName, props, operatorContext);
-        inputPort = kafkaOutputTemp.inputPort;
-        kafkaOutput = kafkaOutputTemp;
-      }
-
-      kafkaOutput.beginWindow(2);
-      inputPort.getSink().put(toKafka.get(3));
-      inputPort.getSink().put(toKafka.get(4));
-      inputPort.getSink().put(toKafka.get(5));
-      kafkaOutput.endWindow();
-      kafkaOutput.beginWindow(3);
-      inputPort.getSink().put(toKafka.get(6));
-
-      if (!differentTuplesAfterRecovery) {
-        inputPort.getSink().put(toKafka.get(7));
-      }
-    }
-
-    inputPort.getSink().put(toKafka.get(8));
-    inputPort.getSink().put(toKafka.get(9));
-    kafkaOutput.endWindow();
-    kafkaOutput.beginWindow(4);
-    inputPort.getSink().put(toKafka.get(10));
-    inputPort.getSink().put(toKafka.get(11));
-    kafkaOutput.endWindow();
-
-    cleanUp(operatorContext);
-  }
-
-  private KafkaSinglePortExactlyOnceOutputOperator<Person> ResetKafkaOutput(
-      String testName, Properties props, Context.OperatorContext operatorContext)
-  {
-    KafkaSinglePortExactlyOnceOutputOperator<Person> kafkaOutput = new KafkaSinglePortExactlyOnceOutputOperator<>();
-    kafkaOutput.setTopic(testName);
-    kafkaOutput.setProperties(props);
-    kafkaOutput.setup(operatorContext);
-
-    return kafkaOutput;
-  }
-
-  private KafkaSinglePortOutputOperator<String, Person> ResetKafkaSimpleOutput(
-      String testName, Properties props, Context.OperatorContext operatorContext)
-  {
-    KafkaSinglePortOutputOperator<String, Person> kafkaOutput = new KafkaSinglePortOutputOperator<>();
-    kafkaOutput.setTopic(testName);
-    kafkaOutput.setProperties(props);
-    kafkaOutput.setup(operatorContext);
-
-    return kafkaOutput;
-  }
-
-  private void cleanUp(Context.OperatorContext operatorContext)
-  {
-    FSWindowDataManager windowDataManager = new FSWindowDataManager();
-    windowDataManager.setup(operatorContext);
-    try {
-      windowDataManager.committed(windowDataManager.getLargestCompletedWindow());
-    } catch (IOException e) {
-      e.printStackTrace();
-    }
-  }
-
-  private boolean compare(List<Person> fromKafka, List<Person> toKafka)
-  {
-    if (fromKafka.size() != toKafka.size()) {
-      return false;
-    }
-
-    for (int i = 0; i < fromKafka.size(); ++i) {
-      if (!fromKafka.get(i).equals(toKafka.get(i))) {
-        return false;
-      }
-    }
-
-    return true;
-  }
-
-  private String getClusterConfig()
-  {
-    String l = "localhost:";
-    return l + TEST_KAFKA_BROKER_PORT[0][0] +
-      (hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[0][1] : "") +
-      (hasMultiCluster ? ";" + l + TEST_KAFKA_BROKER_PORT[1][0] : "") +
-      (hasMultiCluster && hasMultiPartition ? "," + l + TEST_KAFKA_BROKER_PORT[1][1] : "");
-  }
-
-  private List<Person> GenerateList()
-  {
-    List<Person> people = new ArrayList<>();
-
-    for (Integer i = 0; i < 12; ++i) {
-      people.add(new Person(i.toString(), i));
-    }
-
-    return people;
-  }
-
-  private List<Person> ReadFromKafka()
-  {
-    tupleCollection.clear();
-
-    // Create KafkaSinglePortStringInputOperator
-    Properties props = new Properties();
-    props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
-    props.put(BOOTSTRAP_SERVERS_CONFIG, getClusterConfig());
-    props.put(KEY_DESERIALIZER_CLASS_CONFIG, KafkaSinglePortExactlyOnceOutputOperator.KEY_DESERIALIZER);
-    props.put(VALUE_DESERIALIZER_CLASS_CONFIG, VALUE_DESERIALIZER);
-    props.put(GROUP_ID_CONFIG, "KafkaTest");
-
-    LocalMode lma = LocalMode.newInstance();
-    DAG dag = lma.getDAG();
-
-    // Create KafkaSinglePortStringInputOperator
-    KafkaSinglePortInputOperator node = dag.addOperator("Kafka input", KafkaSinglePortInputOperator.class);
-    node.setConsumerProps(props);
-    node.setInitialPartitionCount(1);
-    // set topic
-    node.setTopics(testName);
-    node.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name());
-    node.setClusters(getClusterConfig());
-    node.setStrategy("one_to_one");
-
-    // Create Test tuple collector
-    CollectorModule collector1 = dag.addOperator("collector", new CollectorModule());
-
-    // Connect ports
-    dag.addStream("Kafka message", node.outputPort, collector1.inputPort);
-
-    // Create local cluster
-    final LocalMode.Controller lc = lma.getController();
-    lc.setHeartbeatMonitoringEnabled(false);
-
-    lc.run(30000);
-
-    return tupleCollection;
-  }
-
-  public static class CollectorModule extends BaseOperator
-  {
-    public final transient CollectorInputPort inputPort = new CollectorInputPort(this);
-
-    long currentWindowId;
-    long operatorId;
-
-    @Override
-    public void setup(Context.OperatorContext context)
-    {
-      super.setup(context);
-      operatorId = context.getId();
-    }
-
-    @Override
-    public void beginWindow(long windowId)
-    {
-      super.beginWindow(windowId);
-      currentWindowId = windowId;
-    }
-
-    @Override
-    public void endWindow()
-    {
-      super.endWindow();
-    }
-  }
-
-  public static class CollectorInputPort extends DefaultInputPort<byte[]>
-  {
-    CollectorModule ownerNode;
-
-    CollectorInputPort(CollectorModule node)
-    {
-      this.ownerNode = node;
-    }
-
-    @Override
-    public void process(byte[] bt)
-    {
-      tupleCollection.add(new KafkaHelper().deserialize("r", bt));
-    }
-  }
-
-  public static class Person
-  {
-    public String name;
-    public Integer age;
-
-    public Person(String name, Integer age)
-    {
-      this.name = name;
-      this.age = age;
-    }
-
-    @Override
-    public boolean equals(Object o)
-    {
-      if (this == o) {
-        return true;
-      }
-      if (o == null || getClass() != o.getClass()) {
-        return false;
-      }
-
-      Person person = (Person)o;
-
-      if (name != null ? !name.equals(person.name) : person.name != null) {
-        return false;
-      }
-
-      return age != null ? age.equals(person.age) : person.age == null;
-    }
-
-    @Override
-    public int hashCode()
-    {
-      int result = name != null ? name.hashCode() : 0;
-      result = 31 * result + (age != null ? age.hashCode() : 0);
-      return result;
-    }
-
-    @Override
-    public String toString()
-    {
-      return name + age.toString();
-    }
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
deleted file mode 100644
index 6098bde..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestPartitioner.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.util.Map;
-
-import org.apache.kafka.clients.producer.Partitioner;
-import org.apache.kafka.common.Cluster;
-
-import kafka.utils.VerifiableProperties;
-
-/**
- * A simple partitioner class for test purpose
- * Key is a int string
- * Messages are distributed to all partitions
- * One for even number, the other for odd
- */
-public class KafkaTestPartitioner implements Partitioner
-{
-  public KafkaTestPartitioner(VerifiableProperties props)
-  {
-
-  }
-
-  public KafkaTestPartitioner()
-  {
-
-  }
-
-  @Override
-  public int partition(String topic, Object key, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster)
-  {
-    int num_partitions = cluster.partitionsForTopic(topic).size();
-    return Integer.parseInt((String)key) % num_partitions;
-  }
-
-  @Override
-  public void close()
-  {
-
-  }
-
-  @Override
-  public void configure(Map<String, ?> map)
-  {
-
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
----------------------------------------------------------------------
diff --git a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java b/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
deleted file mode 100644
index 0f18666..0000000
--- a/kafka/src/test/java/org/apache/apex/malhar/kafka/KafkaTestProducer.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.kafka;
-
-import java.util.List;
-import java.util.Properties;
-import java.util.Random;
-import java.util.concurrent.Future;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.Producer;
-import org.apache.kafka.clients.producer.ProducerConfig;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.serialization.StringSerializer;
-
-import com.google.common.collect.Lists;
-
-/**
- * A kafka producer for testing
- */
-public class KafkaTestProducer implements Runnable
-{
-  //  private static final Logger logger = LoggerFactory.getLogger(KafkaTestProducer.class);
-  private final Producer<String, String> producer;
-  private final Producer<String, String> producer1;
-  private final String topic;
-  private int sendCount = 20;
-  // to generate a random int as a key for partition
-  private final Random rand = new Random();
-  private boolean hasPartition = false;
-  private boolean hasMultiCluster = false;
-  private List<String> messages;
-
-  // http://kafka.apache.org/documentation.html#producerconfigs
-  private String ackType = "1";
-
-  public int getSendCount()
-  {
-    return sendCount;
-  }
-
-  public void setSendCount(int sendCount)
-  {
-    this.sendCount = sendCount;
-  }
-
-  public void setMessages(List<String> messages)
-  {
-    this.messages = messages;
-  }
-
-  private Properties createProducerConfig(int cid)
-  {
-    Properties props = new Properties();
-    props.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-    props.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
-    props.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG, KafkaTestPartitioner.class.getName());
-    String brokerList = "localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][0];
-    brokerList += hasPartition ? (",localhost:" + KafkaOperatorTestBase.TEST_KAFKA_BROKER_PORT[cid][1]) : "";
-    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
-    props.setProperty(ProducerConfig.METADATA_MAX_AGE_CONFIG, "20000");
-    props.setProperty(ProducerConfig.ACKS_CONFIG, getAckType());
-
-    return props;
-  }
-
-  public KafkaTestProducer(String topic)
-  {
-    this(topic, false);
-  }
-
-  public KafkaTestProducer(String topic, boolean hasPartition, boolean hasMultiCluster)
-  {
-    // Use random partitioner. Don't need the key type. Just set it to Integer.
-    // The message is of type String.
-    this.topic = topic;
-    this.hasPartition = hasPartition;
-    this.hasMultiCluster = hasMultiCluster;
-    producer = new KafkaProducer<>(createProducerConfig(0));
-    if (hasMultiCluster) {
-      producer1 = new KafkaProducer<>(createProducerConfig(1));
-    } else {
-      producer1 = null;
-    }
-  }
-
-  public KafkaTestProducer(String topic, boolean hasPartition)
-  {
-    this(topic, hasPartition, false);
-  }
-
-  private transient List<Future<RecordMetadata>> sendTasks = Lists.newArrayList();
-
-  private void generateMessages()
-  {
-    // Create dummy message
-    int messageNo = 1;
-    while (messageNo <= sendCount) {
-      String messageStr = "_" + messageNo++;
-      int k = rand.nextInt(100);
-      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + k, "c1" + messageStr)));
-      if (hasMultiCluster && messageNo <= sendCount) {
-        messageStr = "_" + messageNo++;
-        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + k, "c2" + messageStr)));
-      }
-      // logger.debug(String.format("Producing %s", messageStr));
-    }
-    // produce the end tuple to let the test input operator know it's done produce messages
-    for (int i = 0; i < (hasPartition ? 2 : 1); ++i) {
-      sendTasks.add(producer.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
-      if (hasMultiCluster) {
-        sendTasks.add(producer1.send(new ProducerRecord<>(topic, "" + i, KafkaOperatorTestBase.END_TUPLE)));
-      }
-    }
-  }
-
-  @Override
-  public void run()
-  {
-    if (messages == null) {
-      generateMessages();
-    } else {
-      for (String msg : messages) {
-        sendTasks.add(producer.send(new ProducerRecord<>(topic, "", msg)));
-      }
-    }
-
-    producer.flush();
-    if (producer1 != null) {
-      producer1.flush();
-    }
-
-    try {
-      for (Future<RecordMetadata> task : sendTasks) {
-        task.get(30, TimeUnit.SECONDS);
-      }
-    } catch (Exception e) {
-      throw new RuntimeException(e);
-    }
-
-    close();
-  }
-
-  public void close()
-  {
-    producer.close();
-    if (producer1 != null) {
-      producer1.close();
-    }
-  }
-
-  public String getAckType()
-  {
-    return ackType;
-  }
-
-  public void setAckType(String ackType)
-  {
-    this.ackType = ackType;
-  }
-} // End of KafkaTestProducer

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/kafka/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/kafka/src/test/resources/log4j.properties b/kafka/src/test/resources/log4j.properties
deleted file mode 100644
index 910e44a..0000000
--- a/kafka/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,50 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-
-log4j.rootLogger=DEBUG,CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.CONSOLE.threshold=INFO
-#log4j.appender.CONSOLE.threshold=${test.log.console.threshold}
-test.log.console.threshold=WARN
-
-log4j.appender.RFA=org.apache.log4j.RollingFileAppender
-log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
-log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} [%t] %-5p %c{2} %M - %m%n
-log4j.appender.RFA.File=/tmp/app.log
-
-# to enable, add SYSLOG to rootLogger
-log4j.appender.SYSLOG=org.apache.log4j.net.SyslogAppender
-log4j.appender.SYSLOG.syslogHost=127.0.0.1
-log4j.appender.SYSLOG.layout=org.apache.log4j.PatternLayout
-log4j.appender.SYSLOG.layout.conversionPattern=${dt.cid} %-5p [%t] %c{2} %x - %m%n
-log4j.appender.SYSLOG.Facility=LOCAL1
-
-#log4j.logger.org=INFO
-
-#log4j.logger.org.apache.commons.beanutils=warn
-log4j.logger.com.datatorrent=INFO
-log4j.logger.org.apache.apex=INFO
-
-log4j.logger.org.apache.kafka=WARN
-log4j.logger.kafka.consumer=WARN
-log4j.logger.kafka=WARN
-log4j.logger.org.apache.zookeeper=WARN

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/b42d8e74/sql/pom.xml
----------------------------------------------------------------------
diff --git a/sql/pom.xml b/sql/pom.xml
index 113e563..96ac178 100644
--- a/sql/pom.xml
+++ b/sql/pom.xml
@@ -84,12 +84,6 @@
       <groupId>org.apache.apex</groupId>
       <artifactId>malhar-kafka</artifactId>
       <version>${project.parent.version}</version>
-      <exclusions>
-        <exclusion>
-          <groupId>*</groupId>
-          <artifactId>*</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.calcite</groupId>


Mime
View raw message