flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From de...@apache.org
Subject [3/7] flume git commit: FLUME-3195. Split the KafkaChannelTest to avoid timeouts
Date Mon, 05 Feb 2018 14:48:38 GMT
FLUME-3195. Split the KafkaChannelTest to avoid timeouts

KafkaChannelTest had quite a few test methods so during the build sometimes it
caused a timeout. Refactoring it to smaller test classes decrease the chance of
timeouts.

This closes #183

Reviewers: Miklos Csanady, Ferenc Szabo

(Viktor Somogyi via Ferenc Szabo)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5a70cd7b
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5a70cd7b
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5a70cd7b

Branch: refs/heads/trunk
Commit: 5a70cd7b3773d322d15a6adb04811c121d1f3976
Parents: 69c66ef
Author: Viktor Somogyi <viktor.somogyi@cloudera.com>
Authored: Thu Nov 16 13:01:12 2017 +0100
Committer: Denes Arvay <denes@apache.org>
Committed: Fri Jan 26 16:23:03 2018 +0100

----------------------------------------------------------------------
 .../channel/kafka/TestBasicFunctionality.java   | 213 +++++
 .../flume/channel/kafka/TestKafkaChannel.java   | 897 -------------------
 .../channel/kafka/TestKafkaChannelBase.java     | 272 ++++++
 .../channel/kafka/TestOffsetsAndMigration.java  | 194 ++++
 .../channel/kafka/TestParseAsFlumeEvent.java    | 132 +++
 .../flume/channel/kafka/TestPartitions.java     | 179 ++++
 .../flume/channel/kafka/TestRollback.java       |  84 ++
 7 files changed, 1074 insertions(+), 897 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
new file mode 100644
index 0000000..4ff0ee6
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestBasicFunctionality.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.conf.Configurables;
+import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+
+public class TestBasicFunctionality extends TestKafkaChannelBase {
+
+  @Test
+  public void testProps() throws Exception {
+    Context context = new Context();
+    context.put("kafka.producer.some-parameter", "1");
+    context.put("kafka.consumer.another-parameter", "1");
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(TOPIC_CONFIG, topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty("some-parameter"), "1");
+    Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
+  }
+
+  @Test
+  public void testOldConfig() throws Exception {
+    Context context = new Context();
+    context.put(BROKER_LIST_FLUME_KEY, testUtil.getKafkaServerUrl());
+    context.put(GROUP_ID_FLUME, "flume-something");
+    context.put(READ_SMALLEST_OFFSET, "true");
+    context.put("topic", topic);
+
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+
+    Properties consumerProps = channel.getConsumerProps();
+    Properties producerProps = channel.getProducerProps();
+
+    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
+        testUtil.getKafkaServerUrl());
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
+        "flume-something");
+    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
+        "earliest");
+  }
+
+
+  @Test
+  public void testStopAndStart() throws Exception {
+    doTestStopAndStart(false, false);
+  }
+
+  @Test
+  public void testStopAndStartWithRollback() throws Exception {
+    doTestStopAndStart(true, true);
+  }
+
+  @Test
+  public void testStopAndStartWithRollbackAndNoRetry() throws Exception {
+    doTestStopAndStart(true, false);
+  }
+
+  @Test
+  public void testNullKeyNoHeader() throws Exception {
+    doTestNullKeyNoHeader();
+  }
+
+  /**
+   * Tests that sub-properties get set correctly if you run the configure() method twice
+   * (fix for FLUME-2857)
+   *
+   * @throws Exception
+   */
+  @Test
+  public void testDefaultSettingsOnReConfigure() throws Exception {
+    String sampleProducerProp = "compression.type";
+    String sampleProducerVal = "snappy";
+
+    String sampleConsumerProp = "fetch.min.bytes";
+    String sampleConsumerVal = "99";
+
+    Context context = prepareDefaultContext(false);
+    context.put(KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX + sampleProducerProp,
+        sampleProducerVal);
+    context.put(KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX + sampleConsumerProp,
+        sampleConsumerVal);
+
+    final KafkaChannel channel = createChannel(context);
+
+    Assert.assertEquals(sampleProducerVal,
+        channel.getProducerProps().getProperty(sampleProducerProp));
+    Assert.assertEquals(sampleConsumerVal,
+        channel.getConsumerProps().getProperty(sampleConsumerProp));
+
+    context = prepareDefaultContext(false);
+    channel.configure(context);
+
+    Assert.assertNull(channel.getProducerProps().getProperty(sampleProducerProp));
+    Assert.assertNull(channel.getConsumerProps().getProperty(sampleConsumerProp));
+
+  }
+
+
+  private void doTestNullKeyNoHeader() throws Exception {
+    final KafkaChannel channel = startChannel(false);
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+
+    for (int i = 0; i < 50; i++) {
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
+      producer.send(data).get();
+    }
+    ExecutorCompletionService<Void> submitterSvc = new
+        ExecutorCompletionService<>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+        50, false, false);
+    wait(submitterSvc, 5);
+    List<String> finals = new ArrayList<>(50);
+    for (int i = 0; i < 50; i++) {
+      finals.add(i, events.get(i).getHeaders().get(KEY_HEADER));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue(finals.get(i) == null);
+    }
+    channel.stop();
+  }
+
+  /**
+   * This method starts a channel, puts events into it. The channel is then
+   * stopped and restarted. Then we check to make sure if all events we put
+   * come out. Optionally, 10 events are rolled back,
+   * and optionally we restart the agent immediately after and we try to pull it
+   * out.
+   *
+   * @param rollback
+   * @param retryAfterRollback
+   * @throws Exception
+   */
+  private void doTestStopAndStart(boolean rollback,
+                                  boolean retryAfterRollback) throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    ExecutorService underlying = Executors
+        .newCachedThreadPool();
+    ExecutorCompletionService<Void> submitterSvc =
+        new ExecutorCompletionService<>(underlying);
+    final List<List<Event>> events = createBaseList();
+    putEvents(channel, events, submitterSvc);
+    wait(submitterSvc, 5);
+    channel.stop();
+    final KafkaChannel channel2 = startChannel(true);
+    int total = 50;
+    if (rollback && !retryAfterRollback) {
+      total = 40;
+    }
+    final List<Event> eventsPulled =
+        pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
+    wait(submitterSvc, 5);
+    channel2.stop();
+    if (!retryAfterRollback && rollback) {
+      final KafkaChannel channel3 = startChannel(true);
+      int expectedRemaining = 50 - eventsPulled.size();
+      final List<Event> eventsPulled2 =
+          pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
+      wait(submitterSvc, 5);
+      Assert.assertEquals(expectedRemaining, eventsPulled2.size());
+      eventsPulled.addAll(eventsPulled2);
+      channel3.stop();
+    }
+    underlying.shutdownNow();
+    verify(eventsPulled);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
deleted file mode 100644
index 5e5f2d0..0000000
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ /dev/null
@@ -1,897 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.flume.channel.kafka;
-
-import com.google.common.collect.Lists;
-import kafka.admin.AdminUtils;
-import kafka.utils.ZKGroupTopicDirs;
-import kafka.utils.ZkUtils;
-import org.apache.commons.lang.RandomStringUtils;
-import org.apache.flume.Context;
-import org.apache.flume.Event;
-import org.apache.flume.Transaction;
-import org.apache.flume.conf.Configurables;
-import org.apache.flume.event.EventBuilder;
-import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
-import org.apache.flume.shared.kafka.test.PartitionOption;
-import org.apache.flume.shared.kafka.test.PartitionTestScenario;
-import org.apache.flume.sink.kafka.util.TestUtil;
-import org.apache.kafka.clients.CommonClientConfigs;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.kafka.common.security.JaasUtils;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
-import java.util.Set;
-import java.util.concurrent.Callable;
-import java.util.concurrent.CyclicBarrier;
-import java.util.concurrent.ExecutorCompletionService;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
-
-public class TestKafkaChannel {
-
-  private static TestUtil testUtil = TestUtil.getInstance();
-  private String topic = null;
-  private final Set<String> usedTopics = new HashSet<String>();
-
-  private static final int DEFAULT_TOPIC_PARTITIONS = 5;
-
-  @BeforeClass
-  public static void setupClass() throws Exception {
-    testUtil.prepare();
-    Thread.sleep(2500);
-  }
-
-  @Before
-  public void setup() throws Exception {
-    topic = findUnusedTopic();
-    try {
-      createTopic(topic, DEFAULT_TOPIC_PARTITIONS);
-    } catch (Exception e) {
-    }
-    Thread.sleep(2500);
-  }
-
-  @AfterClass
-  public static void tearDown() {
-    testUtil.tearDown();
-  }
-
-  //Make sure the props are picked up correctly.
-  @Test
-  public void testProps() throws Exception {
-    Context context = new Context();
-    context.put("kafka.producer.some-parameter", "1");
-    context.put("kafka.consumer.another-parameter", "1");
-    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
-    context.put(TOPIC_CONFIG, topic);
-
-    final KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-
-    Properties consumerProps = channel.getConsumerProps();
-    Properties producerProps = channel.getProducerProps();
-
-    Assert.assertEquals(producerProps.getProperty("some-parameter"), "1");
-    Assert.assertEquals(consumerProps.getProperty("another-parameter"), "1");
-  }
-
-  @Test
-  public void testOldConfig() throws Exception {
-    Context context = new Context();
-    context.put(BROKER_LIST_FLUME_KEY,testUtil.getKafkaServerUrl());
-    context.put(GROUP_ID_FLUME,"flume-something");
-    context.put(READ_SMALLEST_OFFSET,"true");
-    context.put("topic",topic);
-
-    final KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-
-    Properties consumerProps = channel.getConsumerProps();
-    Properties producerProps = channel.getProducerProps();
-
-    Assert.assertEquals(producerProps.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG),
-                        testUtil.getKafkaServerUrl());
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.GROUP_ID_CONFIG),
-                        "flume-something");
-    Assert.assertEquals(consumerProps.getProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
-                        "earliest");
-  }
-
-  @Test
-  public void testSuccess() throws Exception {
-    doTestSuccessRollback(false, false);
-  }
-
-  @Test
-  public void testSuccessInterleave() throws Exception {
-    doTestSuccessRollback(false, true);
-  }
-
-  @Test
-  public void testRollbacks() throws Exception {
-    doTestSuccessRollback(true, false);
-  }
-
-  @Test
-  public void testRollbacksInterleave() throws Exception {
-    doTestSuccessRollback(true, true);
-  }
-
-  private void doTestSuccessRollback(final boolean rollback,
-                                     final boolean interleave) throws Exception {
-    final KafkaChannel channel = startChannel(true);
-    writeAndVerify(rollback, channel, interleave);
-    channel.stop();
-  }
-
-
-  @Test
-  public void testStopAndStart() throws Exception {
-    doTestStopAndStart(false, false);
-  }
-
-  @Test
-  public void testStopAndStartWithRollback() throws Exception {
-    doTestStopAndStart(true, true);
-  }
-
-  @Test
-  public void testStopAndStartWithRollbackAndNoRetry() throws Exception {
-    doTestStopAndStart(true, false);
-  }
-
-  @Test
-  public void testParseAsFlumeEventFalse() throws Exception {
-    doParseAsFlumeEventFalse(false);
-  }
-
-  @Test
-  public void testParseAsFlumeEventFalseCheckHeader() throws Exception {
-    doParseAsFlumeEventFalse(true);
-  }
-
-  @Test
-  public void testParseAsFlumeEventFalseAsSource() throws Exception {
-    doParseAsFlumeEventFalseAsSource(false);
-  }
-
-  @Test
-  public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception {
-    doParseAsFlumeEventFalseAsSource(true);
-  }
-
-  @Test
-  public void testNullKeyNoHeader() throws Exception {
-    doTestNullKeyNoHeader();
-  }
-
-  @Test
-  public void testOffsetsNotCommittedOnStop() throws Exception {
-    String message = "testOffsetsNotCommittedOnStop-" + System.nanoTime();
-
-    KafkaChannel channel = startChannel(false);
-
-    KafkaProducer<String, byte[]> producer =
-        new KafkaProducer<String, byte[]>(channel.getProducerProps());
-    ProducerRecord<String, byte[]> data =
-        new ProducerRecord<String, byte[]>(topic, "header-" + message, message.getBytes());
-    producer.send(data).get();
-    producer.flush();
-    producer.close();
-
-    Event event = takeEventWithoutCommittingTxn(channel);
-    Assert.assertNotNull(event);
-    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
-
-    // Stop the channel without committing the transaction
-    channel.stop();
-
-    channel = startChannel(false);
-
-    // Message should still be available
-    event = takeEventWithoutCommittingTxn(channel);
-    Assert.assertNotNull(event);
-    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
-  }
-
-  @Test
-  public void testMigrateOffsetsNone() throws Exception {
-    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
-  }
-
-  @Test
-  public void testMigrateOffsetsZookeeper() throws Exception {
-    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
-  }
-
-  @Test
-  public void testMigrateOffsetsKafka() throws Exception {
-    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
-  }
-
-  @Test
-  public void testMigrateOffsetsBoth() throws Exception {
-    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
-  }
-
-  @Test
-  public void testPartitionHeaderSet() throws Exception {
-    doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
-  }
-
-  @Test
-  public void testPartitionHeaderNotSet() throws Exception {
-    doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
-  }
-
-  @Test
-  public void testStaticPartitionAndHeaderSet() throws Exception {
-    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
-  }
-
-  @Test
-  public void testStaticPartitionHeaderNotSet() throws Exception {
-    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
-  }
-
-  @Test
-  public void testPartitionHeaderMissing() throws Exception {
-    doPartitionErrors(PartitionOption.NOTSET);
-  }
-
-  @Test(expected = org.apache.flume.ChannelException.class)
-  public void testPartitionHeaderOutOfRange() throws Exception {
-    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
-  }
-
-  @Test(expected = org.apache.flume.ChannelException.class)
-  public void testPartitionHeaderInvalid() throws Exception {
-    doPartitionErrors(PartitionOption.NOTANUMBER);
-  }
-
-  /**
-   * Tests that sub-properties get set correctly if you run the configure() method twice
-   * (fix for FLUME-2857)
-   * @throws Exception
-   */
-  @Test
-  public void testDefaultSettingsOnReConfigure() throws Exception {
-    String sampleProducerProp = "compression.type";
-    String sampleProducerVal = "snappy";
-
-    String sampleConsumerProp = "fetch.min.bytes";
-    String sampleConsumerVal = "99";
-
-    Context context = prepareDefaultContext(false);
-    context.put(KafkaChannelConfiguration.KAFKA_PRODUCER_PREFIX + sampleProducerProp,
-        sampleProducerVal);
-    context.put(KafkaChannelConfiguration.KAFKA_CONSUMER_PREFIX + sampleConsumerProp,
-        sampleConsumerVal);
-
-    final KafkaChannel channel = createChannel(context);
-
-    Assert.assertEquals(sampleProducerVal,
-        channel.getProducerProps().getProperty(sampleProducerProp));
-    Assert.assertEquals(sampleConsumerVal,
-        channel.getConsumerProps().getProperty(sampleConsumerProp));
-
-    context = prepareDefaultContext(false);
-    channel.configure(context);
-
-    Assert.assertNull(channel.getProducerProps().getProperty(sampleProducerProp));
-    Assert.assertNull(channel.getConsumerProps().getProperty(sampleConsumerProp));
-
-  }
-
-  public void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
-                                            String group) throws Exception {
-    // create a topic with 1 partition for simplicity
-    topic = findUnusedTopic();
-    createTopic(topic, 1);
-
-    Context context = prepareDefaultContext(false);
-    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
-    context.put(GROUP_ID_FLUME, group);
-    final KafkaChannel channel = createChannel(context);
-
-    // Produce some data and save an offset
-    Long fifthOffset = 0L;
-    Long tenthOffset = 0L;
-    Properties props = channel.getProducerProps();
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
-    for (int i = 1; i <= 50; i++) {
-      ProducerRecord<String, byte[]> data =
-          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
-      RecordMetadata recordMetadata = producer.send(data).get();
-      if (i == 5) {
-        fifthOffset = recordMetadata.offset();
-      }
-      if (i == 10) {
-        tenthOffset = recordMetadata.offset();
-      }
-    }
-
-    // Commit 10th offset to zookeeper
-    if (hasZookeeperOffsets) {
-      ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000,
-          JaasUtils.isZkSecurityEnabled());
-      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
-      // we commit the tenth offset to ensure some data is missed.
-      Long offset = tenthOffset + 1;
-      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(),
-          zkUtils.updatePersistentPath$default$3());
-      zkUtils.close();
-    }
-
-    // Commit 5th offset to kafka
-    if (hasKafkaOffsets) {
-      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
-      offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
-      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(channel.getConsumerProps());
-      consumer.commitSync(offsets);
-      consumer.close();
-    }
-
-    // Start the channel and read some data
-    channel.start();
-    ExecutorCompletionService<Void> submitterSvc = new
-        ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc,
-        20, false, false);
-    wait(submitterSvc, 5);
-    List<Integer> finals = new ArrayList<Integer>(40);
-    for (Event event: events) {
-      finals.add(Integer.parseInt(new String(event.getBody())));
-    }
-    channel.stop();
-
-    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
-      // The default behavior is to read the entire log
-      Assert.assertTrue("Channel should read the the first message", finals.contains(1));
-    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
-      // Respect Kafka offsets if they exist
-      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
-      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
-    } else if (hasKafkaOffsets) {
-      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
-      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
-      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
-    } else {
-      // Otherwise migrate the ZooKeeper offsets if they exist
-      Assert.assertFalse("Channel should not read the 10th message", finals.contains(10));
-      Assert.assertTrue("Channel should read the 11th message", finals.contains(11));
-    }
-  }
-
-  /**
-   * This function tests three scenarios:
-   * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided,
-   *    however it exceeds the number of partitions available on the topic.
-   *    Expected behaviour: ChannelException thrown.
-   *
-   * 2. PartitionOption.NOTSET: The partition header is not actually set.
-   *    Expected behaviour: Exception is not thrown because the code avoids an NPE.
-   *
-   * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer.
-   *    Expected behaviour: ChannelExeption thrown.
-   *
-   * @param option
-   * @throws Exception
-   */
-  private void doPartitionErrors(PartitionOption option) throws Exception {
-    Context context = prepareDefaultContext(false);
-    context.put(PARTITION_HEADER_NAME, KafkaPartitionTestUtil.PARTITION_HEADER);
-    String tempTopic = findUnusedTopic();
-    createTopic(tempTopic, 5);
-    final KafkaChannel channel = createChannel(context);
-    channel.start();
-
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    Map<String, String> headers = new HashMap<String, String>();
-    switch (option) {
-      case VALIDBUTOUTOFRANGE:
-        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER,
-            String.valueOf(DEFAULT_TOPIC_PARTITIONS + 2));
-        break;
-      case NOTSET:
-        headers.put("wrong-header", "2");
-        break;
-      case NOTANUMBER:
-        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER, "not-a-number");
-        break;
-      default:
-        break;
-    }
-
-    Event event = EventBuilder.withBody(String.valueOf(9).getBytes(), headers);
-
-    channel.put(event);
-
-    tx.commit();
-
-    deleteTopic(tempTopic);
-  }
-
-  /**
-   * This method tests both the default behavior (usePartitionHeader=false)
-   * and the behaviour when the partitionId setting is used.
-   * Under the default behaviour, one would expect an even distribution of
-   * messages to partitions, however when partitionId is used we manually create
-   * a large skew to some partitions and then verify that this actually happened
-   * by reading messages directly using a Kafka Consumer.
-   *
-   * @param usePartitionHeader
-   * @param staticPtn
-   * @throws Exception
-   */
-  private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
-    final int numPtns = DEFAULT_TOPIC_PARTITIONS;
-    final int numMsgs = numPtns * 10;
-    final Integer staticPtn = DEFAULT_TOPIC_PARTITIONS - 2 ;
-    Context context = prepareDefaultContext(false);
-    if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
-        scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
-      context.put(PARTITION_HEADER_NAME, "partition-header");
-    }
-    if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID ||
-        scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
-      context.put(STATIC_PARTITION_CONF, staticPtn.toString());
-    }
-    final KafkaChannel channel = createChannel(context);
-    channel.start();
-
-    // Create a map of PartitionId:List<Messages> according to the desired distribution
-    // Initialise with empty ArrayLists
-    Map<Integer, List<Event>> partitionMap = new HashMap<Integer, List<Event>>(numPtns);
-    for (int i = 0; i < numPtns; i++) {
-      partitionMap.put(i, new ArrayList<Event>());
-    }
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-
-    List<Event> orderedEvents = KafkaPartitionTestUtil.generateSkewedMessageList(scenario, numMsgs,
-                                                                 partitionMap, numPtns, staticPtn);
-
-    for (Event event : orderedEvents) {
-      channel.put(event);
-    }
-
-    tx.commit();
-
-    Map<Integer, List<byte[]>> resultsMap = KafkaPartitionTestUtil.retrieveRecordsFromPartitions(
-                                                       topic, numPtns, channel.getConsumerProps());
-
-    KafkaPartitionTestUtil.checkResultsAgainstSkew(scenario, partitionMap, resultsMap, staticPtn,
-                                                   numMsgs);
-
-    channel.stop();
-  }
-
-  private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
-    for (int i = 0; i < 5; i++) {
-      Transaction txn = channel.getTransaction();
-      txn.begin();
-
-      Event event = channel.take();
-      if (event != null) {
-        return event;
-      } else {
-        txn.commit();
-        txn.close();
-      }
-    }
-    return null;
-  }
-
-  private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
-    final KafkaChannel channel = startChannel(false);
-    Properties props = channel.getProducerProps();
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
-
-    for (int i = 0; i < 50; i++) {
-      ProducerRecord<String, byte[]> data =
-          new ProducerRecord<String, byte[]>(topic, String.valueOf(i) + "-header",
-                                             String.valueOf(i).getBytes());
-      producer.send(data).get();
-    }
-    ExecutorCompletionService<Void> submitterSvc = new
-        ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
-    wait(submitterSvc, 5);
-    Map<Integer, String> finals = new HashMap<Integer, String>();
-    for (int i = 0; i < 50; i++) {
-      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
-                 events.get(i).getHeaders().get(KEY_HEADER));
-    }
-    for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.keySet().contains(i));
-      if (checkHeaders) {
-        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
-      }
-      finals.remove(i);
-    }
-    Assert.assertTrue(finals.isEmpty());
-    channel.stop();
-  }
-
-  private void doTestNullKeyNoHeader() throws Exception {
-    final KafkaChannel channel = startChannel(false);
-    Properties props = channel.getProducerProps();
-    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(props);
-
-    for (int i = 0; i < 50; i++) {
-      ProducerRecord<String, byte[]> data =
-          new ProducerRecord<String, byte[]>(topic, null, String.valueOf(i).getBytes());
-      producer.send(data).get();
-    }
-    ExecutorCompletionService<Void> submitterSvc = new
-            ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc,
-            50, false, false);
-    wait(submitterSvc, 5);
-    List<String> finals = new ArrayList<String>(50);
-    for (int i = 0; i < 50; i++) {
-      finals.add(i, events.get(i).getHeaders().get(KEY_HEADER));
-    }
-    for (int i = 0; i < 50; i++) {
-      Assert.assertTrue( finals.get(i) == null);
-    }
-    channel.stop();
-  }
-
-  /**
-   * Like the previous test but here we write to the channel like a Flume source would do
-   * to verify that the events are written as text and not as an Avro object
-   *
-   * @throws Exception
-   */
-  public void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception {
-    final KafkaChannel channel = startChannel(false);
-
-    List<String> msgs = new ArrayList<String>();
-    Map<String, String> headers = new HashMap<String, String>();
-    for (int i = 0; i < 50; i++) {
-      msgs.add(String.valueOf(i));
-    }
-    Transaction tx = channel.getTransaction();
-    tx.begin();
-    for (int i = 0; i < msgs.size(); i++) {
-      headers.put(KEY_HEADER, String.valueOf(i) + "-header");
-      channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
-    }
-    tx.commit();
-    ExecutorCompletionService<Void> submitterSvc =
-        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
-    wait(submitterSvc, 5);
-    Map<Integer, String> finals = new HashMap<Integer, String>();
-    for (int i = 0; i < 50; i++) {
-      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
-                 events.get(i).getHeaders().get(KEY_HEADER));
-    }
-    for (int i = 0; i < 50; i++) {
-      Assert.assertTrue(finals.keySet().contains(i));
-      if (checkHeaders) {
-        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
-      }
-      finals.remove(i);
-    }
-    Assert.assertTrue(finals.isEmpty());
-    channel.stop();
-  }
-
-  /**
-   * This method starts a channel, puts events into it. The channel is then
-   * stopped and restarted. Then we check to make sure if all events we put
-   * come out. Optionally, 10 events are rolled back,
-   * and optionally we restart the agent immediately after and we try to pull it
-   * out.
-   *
-   * @param rollback
-   * @param retryAfterRollback
-   * @throws Exception
-   */
-  private void doTestStopAndStart(boolean rollback,
-                                  boolean retryAfterRollback) throws Exception {
-    final KafkaChannel channel = startChannel(true);
-    ExecutorService underlying = Executors
-            .newCachedThreadPool();
-    ExecutorCompletionService<Void> submitterSvc =
-            new ExecutorCompletionService<Void>(underlying);
-    final List<List<Event>> events = createBaseList();
-    putEvents(channel, events, submitterSvc);
-    int completed = 0;
-    wait(submitterSvc, 5);
-    channel.stop();
-    final KafkaChannel channel2 = startChannel(true);
-    int total = 50;
-    if (rollback && !retryAfterRollback) {
-      total = 40;
-    }
-    final List<Event> eventsPulled =
-            pullEvents(channel2, submitterSvc, total, rollback, retryAfterRollback);
-    wait(submitterSvc, 5);
-    channel2.stop();
-    if (!retryAfterRollback && rollback) {
-      final KafkaChannel channel3 = startChannel(true);
-      int expectedRemaining = 50 - eventsPulled.size();
-      final List<Event> eventsPulled2 =
-              pullEvents(channel3, submitterSvc, expectedRemaining, false, false);
-      wait(submitterSvc, 5);
-      Assert.assertEquals(expectedRemaining, eventsPulled2.size());
-      eventsPulled.addAll(eventsPulled2);
-      channel3.stop();
-    }
-    underlying.shutdownNow();
-    verify(eventsPulled);
-  }
-
-  private KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
-    Context context = prepareDefaultContext(parseAsFlume);
-    KafkaChannel channel = createChannel(context);
-    channel.start();
-    return channel;
-  }
-
-  private KafkaChannel createChannel(Context context) throws Exception {
-    final KafkaChannel channel = new KafkaChannel();
-    Configurables.configure(channel, context);
-    return channel;
-  }
-
-  private void writeAndVerify(final boolean testRollbacks,
-                              final KafkaChannel channel) throws Exception {
-    writeAndVerify(testRollbacks, channel, false);
-  }
-
-  private void writeAndVerify(final boolean testRollbacks,
-                              final KafkaChannel channel, final boolean interleave)
-      throws Exception {
-
-    final List<List<Event>> events = createBaseList();
-
-    ExecutorCompletionService<Void> submitterSvc =
-        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-
-    putEvents(channel, events, submitterSvc);
-
-    if (interleave) {
-      wait(submitterSvc, 5);
-    }
-
-    ExecutorCompletionService<Void> submitterSvc2 =
-        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
-
-    final List<Event> eventsPulled = pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
-
-    if (!interleave) {
-      wait(submitterSvc, 5);
-    }
-    wait(submitterSvc2, 5);
-
-    verify(eventsPulled);
-  }
-
-  private List<List<Event>> createBaseList() {
-    final List<List<Event>> events = new ArrayList<List<Event>>();
-    for (int i = 0; i < 5; i++) {
-      List<Event> eventList = new ArrayList<Event>(10);
-      events.add(eventList);
-      for (int j = 0; j < 10; j++) {
-        Map<String, String> hdrs = new HashMap<String, String>();
-        String v = (String.valueOf(i) + " - " + String
-                .valueOf(j));
-        hdrs.put("header", v);
-        eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
-      }
-    }
-    return events;
-  }
-
-  private void putEvents(final KafkaChannel channel, final List<List<Event>>
-          events, ExecutorCompletionService<Void> submitterSvc) {
-    for (int i = 0; i < 5; i++) {
-      final int index = i;
-      submitterSvc.submit(new Callable<Void>() {
-        @Override
-        public Void call() {
-          Transaction tx = channel.getTransaction();
-          tx.begin();
-          List<Event> eventsToPut = events.get(index);
-          for (int j = 0; j < 10; j++) {
-            channel.put(eventsToPut.get(j));
-          }
-          try {
-            tx.commit();
-          } finally {
-            tx.close();
-          }
-          return null;
-        }
-      });
-    }
-  }
-
-  private List<Event> pullEvents(final KafkaChannel channel,
-                                 ExecutorCompletionService<Void> submitterSvc, final int total,
-                                 final boolean testRollbacks, final boolean retryAfterRollback) {
-    final List<Event> eventsPulled = Collections.synchronizedList(new
-            ArrayList<Event>(50));
-    final CyclicBarrier barrier = new CyclicBarrier(5);
-    final AtomicInteger counter = new AtomicInteger(0);
-    final AtomicInteger rolledBackCount = new AtomicInteger(0);
-    final AtomicBoolean startedGettingEvents = new AtomicBoolean(false);
-    final AtomicBoolean rolledBack = new AtomicBoolean(false);
-    for (int k = 0; k < 5; k++) {
-      final int index = k;
-      submitterSvc.submit(new Callable<Void>() {
-        @Override
-        public Void call() throws Exception {
-          Transaction tx = null;
-          final List<Event> eventsLocal = Lists.newLinkedList();
-          int takenByThisThread = 0;
-          channel.registerThread();
-          Thread.sleep(1000);
-          barrier.await();
-          while (counter.get() < (total - rolledBackCount.get())) {
-            if (tx == null) {
-              tx = channel.getTransaction();
-              tx.begin();
-            }
-            try {
-              Event e = channel.take();
-              if (e != null) {
-                startedGettingEvents.set(true);
-                eventsLocal.add(e);
-              } else {
-                if (testRollbacks &&
-                        index == 4 &&
-                        (!rolledBack.get()) &&
-                        startedGettingEvents.get()) {
-                  tx.rollback();
-                  tx.close();
-                  tx = null;
-                  rolledBack.set(true);
-                  final int eventsLocalSize = eventsLocal.size();
-                  eventsLocal.clear();
-                  if (!retryAfterRollback) {
-                    rolledBackCount.set(eventsLocalSize);
-                    return null;
-                  }
-                } else {
-                  tx.commit();
-                  tx.close();
-                  tx = null;
-                  eventsPulled.addAll(eventsLocal);
-                  counter.getAndAdd(eventsLocal.size());
-                  eventsLocal.clear();
-                }
-              }
-            } catch (Exception ex) {
-              eventsLocal.clear();
-              if (tx != null) {
-                tx.rollback();
-                tx.close();
-              }
-              tx = null;
-              ex.printStackTrace();
-            }
-          }
-          // Close txn.
-          return null;
-        }
-      });
-    }
-    return eventsPulled;
-  }
-
-  private void wait(ExecutorCompletionService<Void> submitterSvc, int max)
-          throws Exception {
-    int completed = 0;
-    while (completed < max) {
-      submitterSvc.take();
-      completed++;
-    }
-  }
-
-  private void verify(List<Event> eventsPulled) {
-    Assert.assertFalse(eventsPulled.isEmpty());
-    Assert.assertEquals(50, eventsPulled.size());
-    Set<String> eventStrings = new HashSet<String>();
-    for (Event e : eventsPulled) {
-      Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
-      eventStrings.add(e.getHeaders().get("header"));
-    }
-    for (int i = 0; i < 5; i++) {
-      for (int j = 0; j < 10; j++) {
-        String v = String.valueOf(i) + " - " + String.valueOf(j);
-        Assert.assertTrue(eventStrings.contains(v));
-        eventStrings.remove(v);
-      }
-    }
-    Assert.assertTrue(eventStrings.isEmpty());
-  }
-
-  private Context prepareDefaultContext(boolean parseAsFlume) {
-    // Prepares a default context with Kafka Server Properties
-    Context context = new Context();
-    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
-    context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
-    context.put(TOPIC_CONFIG, topic);
-
-    return context;
-  }
-
-  public String findUnusedTopic() {
-    String newTopic = null;
-    boolean topicFound = false;
-    while (!topicFound) {
-      newTopic = RandomStringUtils.randomAlphabetic(8);
-      if (!usedTopics.contains(newTopic)) {
-        usedTopics.add(newTopic);
-        topicFound = true;
-      }
-    }
-    return newTopic;
-  }
-
-  public static void createTopic(String topicName, int numPartitions) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    int replicationFactor = 1;
-    Properties topicConfig = new Properties();
-    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
-  }
-
-  public static void deleteTopic(String topicName) {
-    int sessionTimeoutMs = 10000;
-    int connectionTimeoutMs = 10000;
-    ZkUtils zkUtils =
-        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
-    AdminUtils.deleteTopic(zkUtils, topicName);
-  }
-
-}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
new file mode 100644
index 0000000..9f139d6
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannelBase.java
@@ -0,0 +1,272 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import com.google.common.collect.Lists;
+import kafka.admin.AdminUtils;
+import kafka.utils.ZkUtils;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.conf.Configurables;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.sink.kafka.util.TestUtil;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
+
+public class TestKafkaChannelBase {
+
+  static TestUtil testUtil = TestUtil.getInstance();
+  String topic = null;
+  private final Set<String> usedTopics = new HashSet<>();
+
+  static final int DEFAULT_TOPIC_PARTITIONS = 5;
+
+  @BeforeClass
+  public static void setupClass() throws Exception {
+    testUtil.prepare();
+    Thread.sleep(2500);
+  }
+
+  @Before
+  public void setup() throws Exception {
+    topic = findUnusedTopic();
+    createTopic(topic, DEFAULT_TOPIC_PARTITIONS);
+    Thread.sleep(2500);
+  }
+
+  @AfterClass
+  public static void tearDown() {
+    testUtil.tearDown();
+  }
+
+  String findUnusedTopic() {
+    String newTopic = null;
+    boolean topicFound = false;
+    while (!topicFound) {
+      newTopic = RandomStringUtils.randomAlphabetic(8);
+      if (!usedTopics.contains(newTopic)) {
+        usedTopics.add(newTopic);
+        topicFound = true;
+      }
+    }
+    return newTopic;
+  }
+
+  static void createTopic(String topicName, int numPartitions) {
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    int replicationFactor = 1;
+    Properties topicConfig = new Properties();
+    AdminUtils.createTopic(zkUtils, topicName, numPartitions, replicationFactor, topicConfig);
+  }
+
+  static void deleteTopic(String topicName) {
+    int sessionTimeoutMs = 10000;
+    int connectionTimeoutMs = 10000;
+    ZkUtils zkUtils =
+        ZkUtils.apply(testUtil.getZkUrl(), sessionTimeoutMs, connectionTimeoutMs, false);
+    AdminUtils.deleteTopic(zkUtils, topicName);
+  }
+
+  KafkaChannel startChannel(boolean parseAsFlume) throws Exception {
+    Context context = prepareDefaultContext(parseAsFlume);
+    KafkaChannel channel = createChannel(context);
+    channel.start();
+    return channel;
+  }
+
+  Context prepareDefaultContext(boolean parseAsFlume) {
+    // Prepares a default context with Kafka Server Properties
+    Context context = new Context();
+    context.put(BOOTSTRAP_SERVERS_CONFIG, testUtil.getKafkaServerUrl());
+    context.put(PARSE_AS_FLUME_EVENT, String.valueOf(parseAsFlume));
+    context.put(TOPIC_CONFIG, topic);
+
+    return context;
+  }
+
+  KafkaChannel createChannel(Context context) throws Exception {
+    final KafkaChannel channel = new KafkaChannel();
+    Configurables.configure(channel, context);
+    return channel;
+  }
+
+  List<Event> pullEvents(final KafkaChannel channel,
+                         ExecutorCompletionService<Void> submitterSvc, final int total,
+                         final boolean testRollbacks, final boolean retryAfterRollback) {
+    final List<Event> eventsPulled = Collections.synchronizedList(new
+        ArrayList<Event>(50));
+    final CyclicBarrier barrier = new CyclicBarrier(5);
+    final AtomicInteger counter = new AtomicInteger(0);
+    final AtomicInteger rolledBackCount = new AtomicInteger(0);
+    final AtomicBoolean startedGettingEvents = new AtomicBoolean(false);
+    final AtomicBoolean rolledBack = new AtomicBoolean(false);
+    for (int k = 0; k < 5; k++) {
+      final int index = k;
+      submitterSvc.submit(new Callable<Void>() {
+        @Override
+        public Void call() throws Exception {
+          Transaction tx = null;
+          final List<Event> eventsLocal = Lists.newLinkedList();
+          channel.registerThread();
+          Thread.sleep(1000);
+          barrier.await();
+          while (counter.get() < (total - rolledBackCount.get())) {
+            if (tx == null) {
+              tx = channel.getTransaction();
+              tx.begin();
+            }
+            try {
+              Event e = channel.take();
+              if (e != null) {
+                startedGettingEvents.set(true);
+                eventsLocal.add(e);
+              } else {
+                if (testRollbacks &&
+                    index == 4 &&
+                    (!rolledBack.get()) &&
+                    startedGettingEvents.get()) {
+                  tx.rollback();
+                  tx.close();
+                  tx = null;
+                  rolledBack.set(true);
+                  final int eventsLocalSize = eventsLocal.size();
+                  eventsLocal.clear();
+                  if (!retryAfterRollback) {
+                    rolledBackCount.set(eventsLocalSize);
+                    return null;
+                  }
+                } else {
+                  tx.commit();
+                  tx.close();
+                  tx = null;
+                  eventsPulled.addAll(eventsLocal);
+                  counter.getAndAdd(eventsLocal.size());
+                  eventsLocal.clear();
+                }
+              }
+            } catch (Exception ex) {
+              eventsLocal.clear();
+              if (tx != null) {
+                tx.rollback();
+                tx.close();
+              }
+              tx = null;
+              ex.printStackTrace();
+            }
+          }
+          // Close txn.
+          return null;
+        }
+      });
+    }
+    return eventsPulled;
+  }
+
+  void wait(ExecutorCompletionService<Void> submitterSvc, int max)
+      throws Exception {
+    int completed = 0;
+    while (completed < max) {
+      submitterSvc.take();
+      completed++;
+    }
+  }
+
+  List<List<Event>> createBaseList() {
+    final List<List<Event>> events = new ArrayList<>();
+    for (int i = 0; i < 5; i++) {
+      List<Event> eventList = new ArrayList<>(10);
+      events.add(eventList);
+      for (int j = 0; j < 10; j++) {
+        Map<String, String> hdrs = new HashMap<>();
+        String v = (String.valueOf(i) + " - " + String
+            .valueOf(j));
+        hdrs.put("header", v);
+        eventList.add(EventBuilder.withBody(v.getBytes(), hdrs));
+      }
+    }
+    return events;
+  }
+
+  void putEvents(final KafkaChannel channel, final List<List<Event>>
+      events, ExecutorCompletionService<Void> submitterSvc) {
+    for (int i = 0; i < 5; i++) {
+      final int index = i;
+      submitterSvc.submit(new Callable<Void>() {
+        @Override
+        public Void call() {
+          Transaction tx = channel.getTransaction();
+          tx.begin();
+          List<Event> eventsToPut = events.get(index);
+          for (int j = 0; j < 10; j++) {
+            channel.put(eventsToPut.get(j));
+          }
+          try {
+            tx.commit();
+          } finally {
+            tx.close();
+          }
+          return null;
+        }
+      });
+    }
+  }
+
+  void verify(List<Event> eventsPulled) {
+    Assert.assertFalse(eventsPulled.isEmpty());
+    Assert.assertEquals(50, eventsPulled.size());
+    Set<String> eventStrings = new HashSet<>();
+    for (Event e : eventsPulled) {
+      Assert.assertEquals(e.getHeaders().get("header"), new String(e.getBody()));
+      eventStrings.add(e.getHeaders().get("header"));
+    }
+    for (int i = 0; i < 5; i++) {
+      for (int j = 0; j < 10; j++) {
+        String v = String.valueOf(i) + " - " + String.valueOf(j);
+        Assert.assertTrue(eventStrings.contains(v));
+        eventStrings.remove(v);
+      }
+    }
+    Assert.assertTrue(eventStrings.isEmpty());
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
new file mode 100644
index 0000000..47c583a
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestOffsetsAndMigration.java
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import kafka.utils.ZKGroupTopicDirs;
+import kafka.utils.ZkUtils;
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.security.JaasUtils;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.ZOOKEEPER_CONNECT_FLUME_KEY;
+
+public class TestOffsetsAndMigration extends TestKafkaChannelBase {
+
+  @Test
+  public void testOffsetsNotCommittedOnStop() throws Exception {
+    String message = "testOffsetsNotCommittedOnStop-" + System.nanoTime();
+
+    KafkaChannel channel = startChannel(false);
+
+    KafkaProducer<String, byte[]> producer =
+        new KafkaProducer<>(channel.getProducerProps());
+    ProducerRecord<String, byte[]> data =
+        new ProducerRecord<>(topic, "header-" + message, message.getBytes());
+    producer.send(data).get();
+    producer.flush();
+    producer.close();
+
+    Event event = takeEventWithoutCommittingTxn(channel);
+    Assert.assertNotNull(event);
+    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
+
+    // Stop the channel without committing the transaction
+    channel.stop();
+
+    channel = startChannel(false);
+
+    // Message should still be available
+    event = takeEventWithoutCommittingTxn(channel);
+    Assert.assertNotNull(event);
+    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
+  }
+
+  @Test
+  public void testMigrateOffsetsNone() throws Exception {
+    doTestMigrateZookeeperOffsets(false, false, "testMigrateOffsets-none");
+  }
+
+  @Test
+  public void testMigrateOffsetsZookeeper() throws Exception {
+    doTestMigrateZookeeperOffsets(true, false, "testMigrateOffsets-zookeeper");
+  }
+
+  @Test
+  public void testMigrateOffsetsKafka() throws Exception {
+    doTestMigrateZookeeperOffsets(false, true, "testMigrateOffsets-kafka");
+  }
+
+  @Test
+  public void testMigrateOffsetsBoth() throws Exception {
+    doTestMigrateZookeeperOffsets(true, true, "testMigrateOffsets-both");
+  }
+
+  private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
+    for (int i = 0; i < 5; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+
+      Event event = channel.take();
+      if (event != null) {
+        return event;
+      } else {
+        txn.commit();
+        txn.close();
+      }
+    }
+    return null;
+  }
+
+  private void doTestMigrateZookeeperOffsets(boolean hasZookeeperOffsets, boolean hasKafkaOffsets,
+                                             String group) throws Exception {
+    // create a topic with 1 partition for simplicity
+    topic = findUnusedTopic();
+    createTopic(topic, 1);
+
+    Context context = prepareDefaultContext(false);
+    context.put(ZOOKEEPER_CONNECT_FLUME_KEY, testUtil.getZkUrl());
+    context.put(GROUP_ID_FLUME, group);
+    final KafkaChannel channel = createChannel(context);
+
+    // Produce some data and save an offset
+    Long fifthOffset = 0L;
+    Long tenthOffset = 0L;
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+    for (int i = 1; i <= 50; i++) {
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<>(topic, null, String.valueOf(i).getBytes());
+      RecordMetadata recordMetadata = producer.send(data).get();
+      if (i == 5) {
+        fifthOffset = recordMetadata.offset();
+      }
+      if (i == 10) {
+        tenthOffset = recordMetadata.offset();
+      }
+    }
+
+    // Commit 10th offset to zookeeper
+    if (hasZookeeperOffsets) {
+      ZkUtils zkUtils = ZkUtils.apply(testUtil.getZkUrl(), 30000, 30000,
+          JaasUtils.isZkSecurityEnabled());
+      ZKGroupTopicDirs topicDirs = new ZKGroupTopicDirs(group, topic);
+      // we commit the tenth offset to ensure some data is missed.
+      Long offset = tenthOffset + 1;
+      zkUtils.updatePersistentPath(topicDirs.consumerOffsetDir() + "/0", offset.toString(),
+          zkUtils.updatePersistentPath$default$3());
+      zkUtils.close();
+    }
+
+    // Commit 5th offset to kafka
+    if (hasKafkaOffsets) {
+      Map<TopicPartition, OffsetAndMetadata> offsets = new HashMap<>();
+      offsets.put(new TopicPartition(topic, 0), new OffsetAndMetadata(fifthOffset + 1));
+      KafkaConsumer<String, byte[]> consumer = new KafkaConsumer<>(channel.getConsumerProps());
+      consumer.commitSync(offsets);
+      consumer.close();
+    }
+
+    // Start the channel and read some data
+    channel.start();
+    ExecutorCompletionService<Void> submitterSvc = new
+        ExecutorCompletionService<>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc,
+        20, false, false);
+    wait(submitterSvc, 5);
+    List<Integer> finals = new ArrayList<>(40);
+    for (Event event : events) {
+      finals.add(Integer.parseInt(new String(event.getBody())));
+    }
+    channel.stop();
+
+    if (!hasKafkaOffsets && !hasZookeeperOffsets) {
+      // The default behavior is to read the entire log
+      Assert.assertTrue("Channel should read the the first message", finals.contains(1));
+    } else if (hasKafkaOffsets && hasZookeeperOffsets) {
+      // Respect Kafka offsets if they exist
+      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
+      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
+    } else if (hasKafkaOffsets) {
+      // Respect Kafka offsets if they exist (don't fail if zookeeper offsets are missing)
+      Assert.assertFalse("Channel should not read the 5th message", finals.contains(5));
+      Assert.assertTrue("Channel should read the 6th message", finals.contains(6));
+    } else {
+      // Otherwise migrate the ZooKeeper offsets if they exist
+      Assert.assertFalse("Channel should not read the 10th message", finals.contains(10));
+      Assert.assertTrue("Channel should read the 11th message", finals.contains(11));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java
new file mode 100644
index 0000000..6baad77
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestParseAsFlumeEvent.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
+
+public class TestParseAsFlumeEvent extends TestKafkaChannelBase {
+
+  @Test
+  public void testParseAsFlumeEventFalse() throws Exception {
+    doParseAsFlumeEventFalse(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseCheckHeader() throws Exception {
+    doParseAsFlumeEventFalse(true);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSource() throws Exception {
+    doParseAsFlumeEventFalseAsSource(false);
+  }
+
+  @Test
+  public void testParseAsFlumeEventFalseAsSourceCheckHeader() throws Exception {
+    doParseAsFlumeEventFalseAsSource(true);
+  }
+
+  private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
+    final KafkaChannel channel = startChannel(false);
+    Properties props = channel.getProducerProps();
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<>(props);
+
+    for (int i = 0; i < 50; i++) {
+      ProducerRecord<String, byte[]> data =
+          new ProducerRecord<>(topic, String.valueOf(i) + "-header",
+              String.valueOf(i).getBytes());
+      producer.send(data).get();
+    }
+    ExecutorCompletionService<Void> submitterSvc = new
+        ExecutorCompletionService<>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
+    wait(submitterSvc, 5);
+    Map<Integer, String> finals = new HashMap<>();
+    for (int i = 0; i < 50; i++) {
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+          events.get(i).getHeaders().get(KEY_HEADER));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
+      finals.remove(i);
+    }
+    Assert.assertTrue(finals.isEmpty());
+    channel.stop();
+  }
+
+  /**
+   * Like the previous test but here we write to the channel like a Flume source would do
+   * to verify that the events are written as text and not as an Avro object
+   *
+   * @throws Exception
+   */
+  private void doParseAsFlumeEventFalseAsSource(Boolean checkHeaders) throws Exception {
+    final KafkaChannel channel = startChannel(false);
+
+    List<String> msgs = new ArrayList<>();
+    Map<String, String> headers = new HashMap<>();
+    for (int i = 0; i < 50; i++) {
+      msgs.add(String.valueOf(i));
+    }
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+    for (int i = 0; i < msgs.size(); i++) {
+      headers.put(KEY_HEADER, String.valueOf(i) + "-header");
+      channel.put(EventBuilder.withBody(msgs.get(i).getBytes(), headers));
+    }
+    tx.commit();
+    ExecutorCompletionService<Void> submitterSvc =
+        new ExecutorCompletionService<>(Executors.newCachedThreadPool());
+    List<Event> events = pullEvents(channel, submitterSvc, 50, false, false);
+    wait(submitterSvc, 5);
+    Map<Integer, String> finals = new HashMap<>();
+    for (int i = 0; i < 50; i++) {
+      finals.put(Integer.parseInt(new String(events.get(i).getBody())),
+          events.get(i).getHeaders().get(KEY_HEADER));
+    }
+    for (int i = 0; i < 50; i++) {
+      Assert.assertTrue(finals.keySet().contains(i));
+      if (checkHeaders) {
+        Assert.assertTrue(finals.containsValue(String.valueOf(i) + "-header"));
+      }
+      finals.remove(i);
+    }
+    Assert.assertTrue(finals.isEmpty());
+    channel.stop();
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestPartitions.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestPartitions.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestPartitions.java
new file mode 100644
index 0000000..9652fb1
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestPartitions.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import org.apache.flume.Context;
+import org.apache.flume.Event;
+import org.apache.flume.Transaction;
+import org.apache.flume.event.EventBuilder;
+import org.apache.flume.shared.kafka.test.KafkaPartitionTestUtil;
+import org.apache.flume.shared.kafka.test.PartitionOption;
+import org.apache.flume.shared.kafka.test.PartitionTestScenario;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARTITION_HEADER_NAME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.STATIC_PARTITION_CONF;
+
+public class TestPartitions extends TestKafkaChannelBase {
+
+  @Test
+  public void testPartitionHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.PARTITION_ID_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.NO_PARTITION_HEADERS);
+  }
+
+  @Test
+  public void testStaticPartitionAndHeaderSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID);
+  }
+
+  @Test
+  public void testStaticPartitionHeaderNotSet() throws Exception {
+    doPartitionHeader(PartitionTestScenario.STATIC_HEADER_ONLY);
+  }
+
+  @Test
+  public void testPartitionHeaderMissing() throws Exception {
+    doPartitionErrors(PartitionOption.NOTSET);
+  }
+
+  @Test(expected = org.apache.flume.ChannelException.class)
+  public void testPartitionHeaderOutOfRange() throws Exception {
+    doPartitionErrors(PartitionOption.VALIDBUTOUTOFRANGE);
+  }
+
+  @Test(expected = org.apache.flume.ChannelException.class)
+  public void testPartitionHeaderInvalid() throws Exception {
+    doPartitionErrors(PartitionOption.NOTANUMBER);
+  }
+
+  /**
+   * This method tests both the default behavior (usePartitionHeader=false)
+   * and the behaviour when the partitionId setting is used.
+   * Under the default behaviour, one would expect an even distribution of
+   * messages to partitions, however when partitionId is used we manually create
+   * a large skew to some partitions and then verify that this actually happened
+   * by reading messages directly using a Kafka Consumer.
+   *
+   * @param scenario
+   * @throws Exception
+   */
+  private void doPartitionHeader(PartitionTestScenario scenario) throws Exception {
+    final int numPtns = DEFAULT_TOPIC_PARTITIONS;
+    final int numMsgs = numPtns * 10;
+    final Integer staticPtn = DEFAULT_TOPIC_PARTITIONS - 2;
+    Context context = prepareDefaultContext(false);
+    if (scenario == PartitionTestScenario.PARTITION_ID_HEADER_ONLY ||
+        scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID) {
+      context.put(PARTITION_HEADER_NAME, "partition-header");
+    }
+    if (scenario == PartitionTestScenario.STATIC_HEADER_AND_PARTITION_ID ||
+        scenario == PartitionTestScenario.STATIC_HEADER_ONLY) {
+      context.put(STATIC_PARTITION_CONF, staticPtn.toString());
+    }
+    final KafkaChannel channel = createChannel(context);
+    channel.start();
+
+    // Create a map of PartitionId:List<Messages> according to the desired distribution
+    // Initialise with empty ArrayLists
+    Map<Integer, List<Event>> partitionMap = new HashMap<>(numPtns);
+    for (int i = 0; i < numPtns; i++) {
+      partitionMap.put(i, new ArrayList<Event>());
+    }
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    List<Event> orderedEvents = KafkaPartitionTestUtil.generateSkewedMessageList(scenario, numMsgs,
+        partitionMap, numPtns, staticPtn);
+
+    for (Event event : orderedEvents) {
+      channel.put(event);
+    }
+
+    tx.commit();
+
+    Map<Integer, List<byte[]>> resultsMap = KafkaPartitionTestUtil.retrieveRecordsFromPartitions(
+        topic, numPtns, channel.getConsumerProps());
+
+    KafkaPartitionTestUtil.checkResultsAgainstSkew(scenario, partitionMap, resultsMap, staticPtn,
+        numMsgs);
+
+    channel.stop();
+  }
+
+  /**
+   * This function tests three scenarios:
+   * 1. PartitionOption.VALIDBUTOUTOFRANGE: An integer partition is provided,
+   * however it exceeds the number of partitions available on the topic.
+   * Expected behaviour: ChannelException thrown.
+   * <p>
+   * 2. PartitionOption.NOTSET: The partition header is not actually set.
+   * Expected behaviour: Exception is not thrown because the code avoids an NPE.
+   * <p>
+   * 3. PartitionOption.NOTANUMBER: The partition header is set, but is not an Integer.
+   * Expected behaviour: ChannelExeption thrown.
+   *
+   * @param option
+   * @throws Exception
+   */
+  private void doPartitionErrors(PartitionOption option) throws Exception {
+    Context context = prepareDefaultContext(false);
+    context.put(PARTITION_HEADER_NAME, KafkaPartitionTestUtil.PARTITION_HEADER);
+    String tempTopic = findUnusedTopic();
+    createTopic(tempTopic, 5);
+    final KafkaChannel channel = createChannel(context);
+    channel.start();
+
+    Transaction tx = channel.getTransaction();
+    tx.begin();
+
+    Map<String, String> headers = new HashMap<>();
+    switch (option) {
+      case VALIDBUTOUTOFRANGE:
+        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER,
+            String.valueOf(DEFAULT_TOPIC_PARTITIONS + 2));
+        break;
+      case NOTSET:
+        headers.put("wrong-header", "2");
+        break;
+      case NOTANUMBER:
+        headers.put(KafkaPartitionTestUtil.PARTITION_HEADER, "not-a-number");
+        break;
+      default:
+        break;
+    }
+
+    Event event = EventBuilder.withBody(String.valueOf(9).getBytes(), headers);
+
+    channel.put(event);
+
+    tx.commit();
+
+    deleteTopic(tempTopic);
+  }
+}

http://git-wip-us.apache.org/repos/asf/flume/blob/5a70cd7b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
new file mode 100644
index 0000000..6e9e1e2
--- /dev/null
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestRollback.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.flume.channel.kafka;
+
+import org.apache.flume.Event;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.Executors;
+
+public class TestRollback extends TestKafkaChannelBase {
+
+  @Test
+  public void testSuccess() throws Exception {
+    doTestSuccessRollback(false, false);
+  }
+
+  @Test
+  public void testSuccessInterleave() throws Exception {
+    doTestSuccessRollback(false, true);
+  }
+
+  @Test
+  public void testRollbacks() throws Exception {
+    doTestSuccessRollback(true, false);
+  }
+
+  @Test
+  public void testRollbacksInterleave() throws Exception {
+    doTestSuccessRollback(true, true);
+  }
+
+  private void doTestSuccessRollback(final boolean rollback,
+                                     final boolean interleave) throws Exception {
+    final KafkaChannel channel = startChannel(true);
+    writeAndVerify(rollback, channel, interleave);
+    channel.stop();
+  }
+
+  private void writeAndVerify(final boolean testRollbacks,
+                              final KafkaChannel channel, final boolean interleave)
+      throws Exception {
+
+    final List<List<Event>> events = createBaseList();
+
+    ExecutorCompletionService<Void> submitterSvc =
+        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+
+    putEvents(channel, events, submitterSvc);
+
+    if (interleave) {
+      wait(submitterSvc, 5);
+    }
+
+    ExecutorCompletionService<Void> submitterSvc2 =
+        new ExecutorCompletionService<Void>(Executors.newCachedThreadPool());
+
+    final List<Event> eventsPulled = pullEvents(channel, submitterSvc2, 50, testRollbacks, true);
+
+    if (!interleave) {
+      wait(submitterSvc, 5);
+    }
+    wait(submitterSvc2, 5);
+
+    verify(eventsPulled);
+  }
+}


Mime
View raw message