flume-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jar...@apache.org
Subject flume git commit: FLUME-2920: Kafka Channel Should Not Commit Offsets When Stopping
Date Fri, 10 Jun 2016 13:53:38 GMT
Repository: flume
Updated Branches:
  refs/heads/flume-1.7 f2bdd57fc -> 68911af88


FLUME-2920: Kafka Channel Should Not Commit Offsets When Stopping

(Kevin Conaway via Jarek Jarcec Cecho)


Project: http://git-wip-us.apache.org/repos/asf/flume/repo
Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/68911af8
Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/68911af8
Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/68911af8

Branch: refs/heads/flume-1.7
Commit: 68911af88792aeb065193a57e8ec79edde90d7fb
Parents: f2bdd57
Author: Jarek Jarcec Cecho <jarcec@apache.org>
Authored: Fri Jun 10 15:52:20 2016 +0200
Committer: Jarek Jarcec Cecho <jarcec@apache.org>
Committed: Fri Jun 10 15:53:30 2016 +0200

----------------------------------------------------------------------
 .../flume/channel/kafka/KafkaChannel.java       |  4 -
 .../flume/channel/kafka/TestKafkaChannel.java   | 82 +++++++++++++++++---
 2 files changed, 70 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flume/blob/68911af8/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
index 09d3f9d..dfc95bc 100644
--- a/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/main/java/org/apache/flume/channel/kafka/KafkaChannel.java
@@ -267,10 +267,6 @@ public class KafkaChannel extends BasicChannelSemantics {
   }
 
   private void decommissionConsumerAndRecords(ConsumerAndRecords c) {
-    if (c.failedEvents.isEmpty()) {
-      c.commitOffsets();
-    }
-    c.failedEvents.clear();
     c.consumer.close();
   }
 

http://git-wip-us.apache.org/repos/asf/flume/blob/68911af8/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
----------------------------------------------------------------------
diff --git a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
index 13e073b..d01346a 100644
--- a/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
+++ b/flume-ng-channels/flume-kafka-channel/src/test/java/org/apache/flume/channel/kafka/TestKafkaChannel.java
@@ -32,26 +32,42 @@ import org.apache.kafka.clients.CommonClientConfigs;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
 import org.apache.kafka.clients.producer.ProducerRecord;
-import org.junit.*;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.*;
-import java.util.concurrent.*;
+import org.junit.AfterClass;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.ExecutorCompletionService;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
-import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.*;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.BROKER_LIST_FLUME_KEY;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.GROUP_ID_FLUME;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.KEY_HEADER;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.PARSE_AS_FLUME_EVENT;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.READ_SMALLEST_OFFSET;
+import static org.apache.flume.channel.kafka.KafkaChannelConfiguration.TOPIC_CONFIG;
 
 public class TestKafkaChannel {
 
-  private final static Logger LOGGER =
-          LoggerFactory.getLogger(TestKafkaChannel.class);
-
   private static TestUtil testUtil = TestUtil.getInstance();
   private String topic = null;
   private final Set<String> usedTopics = new HashSet<String>();
-  private CountDownLatch latch = null;
 
   @BeforeClass
   public static void setupClass() throws Exception {
@@ -74,7 +90,6 @@ public class TestKafkaChannel {
     } catch (Exception e) {
     }
     Thread.sleep(2500);
-    latch = new CountDownLatch(5);
   }
 
   @AfterClass
@@ -191,6 +206,49 @@ public class TestKafkaChannel {
     doTestNullKeyNoHeader();
   }
 
+  @Test
+  public void testOffsetsNotCommittedOnStop() throws Exception {
+    String message = "testOffsetsNotCommittedOnStop-" + System.nanoTime();
+
+    KafkaChannel channel = startChannel(false);
+
+    KafkaProducer<String, byte[]> producer = new KafkaProducer<String, byte[]>(channel.getProducerProps());
+    ProducerRecord<String, byte[]> data = new ProducerRecord<String, byte[]>(topic,
"header-" + message, message.getBytes());
+    producer.send(data).get();
+    producer.flush();
+    producer.close();
+
+    Event event = takeEventWithoutCommittingTxn(channel);
+    Assert.assertNotNull(event);
+    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
+
+    // Stop the channel without committing the transaction
+    channel.stop();
+
+    channel = startChannel(false);
+
+    // Message should still be available
+    event = takeEventWithoutCommittingTxn(channel);
+    Assert.assertNotNull(event);
+    Assert.assertTrue(Arrays.equals(message.getBytes(), event.getBody()));
+  }
+
+  private Event takeEventWithoutCommittingTxn(KafkaChannel channel) {
+    for (int i=0; i < 5; i++) {
+      Transaction txn = channel.getTransaction();
+      txn.begin();
+
+      Event event = channel.take();
+      if (event != null) {
+        return event;
+      } else {
+        txn.commit();
+        txn.close();
+      }
+    }
+    return null;
+  }
+
   private void doParseAsFlumeEventFalse(Boolean checkHeaders) throws Exception {
     final KafkaChannel channel = startChannel(false);
     Properties props = channel.getProducerProps();


Mime
View raw message