atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject incubator-atlas git commit: ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover. (yhemanth)
Date Fri, 13 May 2016 04:18:39 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 07b8b4d3c -> 98769871e


ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover.
(yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/98769871
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/98769871
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/98769871

Branch: refs/heads/master
Commit: 98769871e56d9a97792e2dba52345e876908ac63
Parents: 07b8b4d
Author: Hemanth Yamijala <hyamijala@hortonworks.com>
Authored: Fri May 13 09:48:20 2016 +0530
Committer: Hemanth Yamijala <hyamijala@hortonworks.com>
Committed: Fri May 13 09:48:20 2016 +0530

----------------------------------------------------------------------
 distro/src/conf/atlas-application.properties    |   1 +
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  27 +++-
 .../apache/atlas/kafka/KafkaNotification.java   |  55 +++++---
 .../notification/AbstractNotification.java      |   9 +-
 .../AbstractNotificationConsumer.java           |   2 +
 .../notification/NotificationConsumer.java      |   9 ++
 .../apache/atlas/kafka/KafkaConsumerTest.java   |  55 +++++++-
 .../atlas/kafka/KafkaNotificationTest.java      | 119 ++++++----------
 .../AbstractNotificationConsumerTest.java       |   5 +
 release-log.txt                                 |   1 +
 .../main/resources/atlas-application.properties |   1 +
 .../notification/NotificationHookConsumer.java  |  82 ++++++-----
 .../NotificationHookConsumerKafkaTest.java      | 141 +++++++++++++++++++
 .../NotificationHookConsumerTest.java           |  36 +++++
 14 files changed, 397 insertions(+), 146 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/distro/src/conf/atlas-application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/atlas-application.properties b/distro/src/conf/atlas-application.properties
index 119865d..b2b62aa 100755
--- a/distro/src/conf/atlas-application.properties
+++ b/distro/src/conf/atlas-application.properties
@@ -59,6 +59,7 @@ atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
 atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
+atlas.kafka.auto.commit.enable=false
 
 
 #########  Hive Lineage Configs  #########

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index f1c9742..270215d 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -19,6 +19,7 @@ package org.apache.atlas.kafka;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotificationConsumer;
 import org.apache.atlas.notification.MessageDeserializer;
@@ -35,24 +36,29 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T>
{
 
     private final int consumerId;
     private final ConsumerIterator iterator;
+    private final ConsumerConnector consumerConnector;
+    private final boolean autoCommitEnabled;
+    private long lastSeenOffset;
 
 
     // ----- Constructors ----------------------------------------------------
 
     /**
      * Create a Kafka consumer.
-     *
-     * @param type          the notification type returned by this consumer
      * @param deserializer  the message deserializer used for this consumer
      * @param stream        the underlying Kafka stream
      * @param consumerId    an id value for this consumer
+     * @param consumerConnector the {@link ConsumerConnector} which created the underlying
Kafka stream
+     * @param autoCommitEnabled true if consumer does not need to commit offsets explicitly,
false otherwise.
      */
-    public KafkaConsumer(Class<T> type,
-                         MessageDeserializer<T> deserializer, KafkaStream<String,
String> stream, int consumerId) {
+    public KafkaConsumer(MessageDeserializer<T> deserializer, KafkaStream<String,
String> stream, int consumerId,
+                         ConsumerConnector consumerConnector, boolean autoCommitEnabled)
{
         super(deserializer);
-
+        this.consumerConnector = consumerConnector;
+        this.lastSeenOffset = 0;
         this.iterator   = stream.iterator();
         this.consumerId = consumerId;
+        this.autoCommitEnabled = autoCommitEnabled;
     }
 
 
@@ -71,6 +77,7 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T>
{
         MessageAndMetadata message = iterator.next();
         LOG.debug("Read message: conumerId: {}, topic - {}, partition - {}, offset - {},
message - {}",
                 consumerId, message.topic(), message.partition(), message.offset(), message.message());
+        lastSeenOffset = message.offset();
         return (String) message.message();
     }
 
@@ -79,4 +86,14 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T>
{
         MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
         return (String) message.message();
     }
+
+    @Override
+    public void commit() {
+        if (autoCommitEnabled) {
+            LOG.debug("Auto commit is disabled, not committing.");
+        } else {
+            consumerConnector.commitOffsets();
+            LOG.debug("Committed offset: {}", lastSeenOffset);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index cfffec4..1ee62d2 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.kafka;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Singleton;
 import kafka.consumer.Consumer;
 import kafka.consumer.KafkaStream;
@@ -112,9 +113,6 @@ public class KafkaNotification extends AbstractNotification implements
Service {
                 "org.apache.kafka.common.serialization.StringSerializer");
         properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringSerializer");
-
-        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
-
         properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
                 "org.apache.kafka.common.serialization.StringDeserializer");
         properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
@@ -123,6 +121,10 @@ public class KafkaNotification extends AbstractNotification implements
Service {
         properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "smallest");
     }
 
+    @VisibleForTesting
+    protected KafkaNotification(Properties properties) {
+        this.properties = properties;
+    }
 
     // ----- Service ---------------------------------------------------------
 
@@ -159,26 +161,34 @@ public class KafkaNotification extends AbstractNotification implements
Service {
     @Override
     public <T> List<NotificationConsumer<T>> createConsumers(NotificationType
notificationType,
                                                              int numConsumers) {
+        return createConsumers(notificationType, numConsumers,
+                Boolean.valueOf(properties.getProperty("auto.commit.enable", "true")));
+    }
+
+    @VisibleForTesting
+    public <T> List<NotificationConsumer<T>> createConsumers(NotificationType
notificationType,
+                                                      int numConsumers, boolean autoCommitEnabled)
{
         String topic = TOPIC_MAP.get(notificationType);
 
         Properties consumerProperties = getConsumerProperties(notificationType);
 
-        ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
-        Map<String, Integer> topicCountMap = new HashMap<>();
-        topicCountMap.put(topic, numConsumers);
-        StringDecoder decoder = new StringDecoder(null);
-        Map<String, List<KafkaStream<String, String>>> streamsMap =
-                consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
-        List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
         List<NotificationConsumer<T>> consumers = new ArrayList<>(numConsumers);
-        int consumerId = 0;
-        for (KafkaStream stream : kafkaConsumers) {
-            KafkaConsumer<T> kafkaConsumer =
-                createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
-                    stream, consumerId++);
-            consumers.add(kafkaConsumer);
+        for (int i = 0; i < numConsumers; i++) {
+            ConsumerConnector consumerConnector = createConsumerConnector(consumerProperties);
+            Map<String, Integer> topicCountMap = new HashMap<>();
+            topicCountMap.put(topic, 1);
+            StringDecoder decoder = new StringDecoder(null);
+            Map<String, List<KafkaStream<String, String>>> streamsMap =
+                    consumerConnector.createMessageStreams(topicCountMap, decoder, decoder);
+            List<KafkaStream<String, String>> kafkaConsumers = streamsMap.get(topic);
+            for (KafkaStream stream : kafkaConsumers) {
+                KafkaConsumer<T> kafkaConsumer =
+                        createKafkaConsumer(notificationType.getClassType(), notificationType.getDeserializer(),
+                                stream, i, consumerConnector, autoCommitEnabled);
+                consumers.add(kafkaConsumer);
+            }
+            consumerConnectors.add(consumerConnector);
         }
-        consumerConnectors.add(consumerConnector);
 
         return consumers;
     }
@@ -245,12 +255,14 @@ public class KafkaNotification extends AbstractNotification implements
Service {
      * @param stream        the Kafka stream
      * @param consumerId    the id for the new consumer
      *
+     * @param consumerConnector
      * @return a new Kafka consumer
      */
-    protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T>
type,
-            MessageDeserializer<T> deserializer, KafkaStream stream,
-            int consumerId) {
-        return new org.apache.atlas.kafka.KafkaConsumer<T>(type, deserializer, stream,
consumerId);
+    protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+    createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer, KafkaStream
stream,
+                        int consumerId, ConsumerConnector consumerConnector, boolean autoCommitEnabled)
{
+        return new org.apache.atlas.kafka.KafkaConsumer<T>(deserializer, stream,
+                consumerId, consumerConnector, autoCommitEnabled);
     }
 
     // Get properties for consumer request
@@ -266,6 +278,7 @@ public class KafkaNotification extends AbstractNotification implements
Service {
         consumerProperties.putAll(properties);
         consumerProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
 
+        LOG.info("Consumer property: auto.commit.enable: " + consumerProperties.getProperty("auto.commit.enable"));
         return consumerProperties;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
index 7d22126..cb44fc6 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotification.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
 import com.google.gson.JsonElement;
@@ -46,7 +47,7 @@ public abstract class AbstractNotification implements NotificationInterface
{
      */
     public static final MessageVersion CURRENT_MESSAGE_VERSION = new MessageVersion("1.0.0");
 
-    private static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
+    public static final String PROPERTY_EMBEDDED = PROPERTY_PREFIX + ".embedded";
     private final boolean embedded;
     private final boolean isHAEnabled;
 
@@ -59,7 +60,6 @@ public abstract class AbstractNotification implements NotificationInterface
{
         registerTypeAdapter(JSONArray.class, new JSONArraySerializer()).
         create();
 
-
     // ----- Constructors ----------------------------------------------------
 
     public AbstractNotification(Configuration applicationProperties) throws AtlasException
{
@@ -67,6 +67,11 @@ public abstract class AbstractNotification implements NotificationInterface
{
         this.isHAEnabled = HAConfiguration.isHAEnabled(applicationProperties);
     }
 
+    @VisibleForTesting
+    protected AbstractNotification() {
+        embedded = false;
+        isHAEnabled = false;
+    }
 
     // ----- NotificationInterface -------------------------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index f00bbca..d4d78de 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -68,4 +68,6 @@ public abstract class AbstractNotificationConsumer<T> implements NotificationCon
     public T peek() {
         return deserializer.deserialize(peekMessage());
     }
+
+    public abstract void commit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 78e8ce7..2e861cb 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -43,4 +43,13 @@ public interface NotificationConsumer<T> {
      * @return the next notification
      */
     T peek();
+
+    /**
+     * Commit the offset of messages that have been successfully processed.
+     *
+     * This API should be called when messages read with {@link #next()} have been successfully
processed and
+     * the consumer is ready to handle the next message, which could happen even after a
normal or an abnormal
+     * restart.
+     */
+    void commit();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
index 7f607c6..ad7d93e 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaConsumerTest.java
@@ -20,6 +20,7 @@ package org.apache.atlas.kafka;
 
 import kafka.consumer.ConsumerIterator;
 import kafka.consumer.KafkaStream;
+import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
 import org.apache.atlas.notification.AbstractNotification;
 import org.apache.atlas.notification.MessageVersion;
@@ -33,6 +34,9 @@ import org.apache.atlas.typesystem.IStruct;
 import org.apache.atlas.typesystem.Referenceable;
 import org.apache.atlas.typesystem.Struct;
 import org.codehaus.jettison.json.JSONException;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.Collections;
@@ -41,6 +45,8 @@ import java.util.List;
 import java.util.NoSuchElementException;
 
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.*;
 
@@ -51,6 +57,14 @@ public class KafkaConsumerTest {
 
     private static final String TRAIT_NAME = "MyTrait";
 
+    @Mock
+    private ConsumerConnector consumerConnector;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
     @Test
     public void testNext() throws Exception {
         KafkaStream<String, String> stream = mock(KafkaStream.class);
@@ -70,8 +84,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+            new KafkaConsumer<>(
+                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream,
99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -101,8 +116,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+            new KafkaConsumer<>(
+                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream,
99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -135,8 +151,9 @@ public class KafkaConsumerTest {
         when(messageAndMetadata.message()).thenReturn(json);
 
         NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
-            new KafkaConsumer<>(NotificationInterface.NotificationType.HOOK.getClassType(),
-                NotificationInterface.NotificationType.HOOK.getDeserializer(), stream, 99);
+            new KafkaConsumer<>(
+                    NotificationInterface.NotificationType.HOOK.getDeserializer(), stream,
99,
+                    consumerConnector, false);
 
         assertTrue(consumer.hasNext());
 
@@ -147,6 +164,32 @@ public class KafkaConsumerTest {
         assertTrue(consumer.hasNext());
     }
 
+    @Test
+    public void testCommitIsCalledIfAutoCommitDisabled() {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                new KafkaConsumer<>(
+                        NotificationInterface.NotificationType.HOOK.getDeserializer(), stream,
99,
+                        consumerConnector, false);
+
+        consumer.commit();
+
+        verify(consumerConnector).commitOffsets();
+    }
+
+    @Test
+    public void testCommitIsNotCalledIfAutoCommitEnabled() {
+        KafkaStream<String, String> stream = mock(KafkaStream.class);
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                new KafkaConsumer<>(
+                        NotificationInterface.NotificationType.HOOK.getDeserializer(), stream,
99,
+                        consumerConnector, true);
+
+        consumer.commit();
+
+        verify(consumerConnector, never()).commitOffsets();
+    }
+
     private Referenceable getEntity(String traitName) {
         Referenceable entity = EntityNotificationImplTest.getEntity("id");
         List<IStruct> traitInfo = new LinkedList<>();

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 17fda25..219bd70 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -17,26 +17,16 @@
  */
 package org.apache.atlas.kafka;
 
-import com.google.inject.Inject;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
-import org.apache.atlas.AtlasException;
 import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.NotificationInterface;
-import org.apache.atlas.notification.NotificationModule;
-import org.apache.commons.configuration.Configuration;
-import org.apache.kafka.clients.consumer.ConsumerConfig;
-import org.testng.annotations.AfterClass;
-import org.testng.annotations.BeforeClass;
-import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
-import java.util.Collections;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
@@ -44,99 +34,80 @@ import java.util.Properties;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
 
-@Guice(modules = NotificationModule.class)
 public class KafkaNotificationTest {
 
-    @Inject
-    private KafkaNotification kafka;
-
-    @BeforeClass
-    public void setUp() throws Exception {
-        kafka.start();
-    }
-
     @Test
     @SuppressWarnings("unchecked")
     public void testCreateConsumers() throws Exception {
-        Configuration configuration = mock(Configuration.class);
-        Iterator iterator = mock(Iterator.class);
-        ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
-        KafkaStream kafkaStream1 = mock(KafkaStream.class);
-        KafkaStream kafkaStream2 = mock(KafkaStream.class);
-        String groupId = "groupId9999";
-
-        when(configuration.subset(KafkaNotification.PROPERTY_PREFIX)).thenReturn(configuration);
-        when(configuration.getKeys()).thenReturn(iterator);
-        when(iterator.hasNext()).thenReturn(true).thenReturn(false);
-        when(iterator.next()).thenReturn("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY);
-        when(configuration.getList("entities." + KafkaNotification.CONSUMER_GROUP_ID_PROPERTY))
-                .thenReturn(Collections.<Object>singletonList(groupId));
-
-        Map<String, List<KafkaStream<String, String>>> streamsMap = new
HashMap<>();
-        List<KafkaStream<String, String>> kafkaStreamList = new LinkedList<>();
-        kafkaStreamList.add(kafkaStream1);
-        kafkaStreamList.add(kafkaStream2);
-        streamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreamList);
+        Properties properties = mock(Properties.class);
+        when(properties.getProperty("entities.group.id")).thenReturn("atlas");
+        final ConsumerConnector consumerConnector = mock(ConsumerConnector.class);
         Map<String, Integer> topicCountMap = new HashMap<>();
-        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 2);
+        topicCountMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, 1);
 
-        when(consumerConnector.createMessageStreams(
-            eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(streamsMap);
-
-        TestKafkaNotification kafkaNotification = new TestKafkaNotification(configuration,
consumerConnector);
+        Map<String, List<KafkaStream<String, String>>> kafkaStreamsMap
=
+                new HashMap<>();
+        List<KafkaStream<String, String>> kafkaStreams = new ArrayList<>();
+        KafkaStream kafkaStream = mock(KafkaStream.class);
+        kafkaStreams.add(kafkaStream);
+        kafkaStreamsMap.put(KafkaNotification.ATLAS_ENTITIES_TOPIC, kafkaStreams);
 
-        List<NotificationConsumer<String>> consumers =
-            kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES,
2);
+        when(consumerConnector.createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class))).thenReturn(kafkaStreamsMap);
 
-        assertEquals(2, consumers.size());
+        final KafkaConsumer consumer1 = mock(KafkaConsumer.class);
+        final KafkaConsumer consumer2 = mock(KafkaConsumer.class);
 
-        // assert that all of the given kafka streams were used to create kafka consumers
-        List<KafkaStream> streams = kafkaNotification.kafkaStreams;
-        assertTrue(streams.contains(kafkaStream1));
-        assertTrue(streams.contains(kafkaStream2));
+        KafkaNotification kafkaNotification =
+                new TestKafkaNotification(properties, consumerConnector, consumer1, consumer2);
 
-        // assert that the given consumer group id was added to the properties used to create
the consumer connector
-        Properties properties = kafkaNotification.myProperties;
-        assertEquals(groupId, properties.getProperty(ConsumerConfig.GROUP_ID_CONFIG));
-    }
+        List<NotificationConsumer<String>> consumers =
+                kafkaNotification.createConsumers(NotificationInterface.NotificationType.ENTITIES,
2);
 
-    @AfterClass
-    public void teardown() throws Exception {
-        kafka.stop();
+        verify(consumerConnector, times(2)).createMessageStreams(
+                eq(topicCountMap), any(StringDecoder.class), any(StringDecoder.class));
+        assertEquals(consumers.size(), 2);
+        assertTrue(consumers.contains(consumer1));
+        assertTrue(consumers.contains(consumer2));
     }
 
-    // Extended kafka notification class for testing.
-    private static class TestKafkaNotification extends KafkaNotification {
+    class TestKafkaNotification extends KafkaNotification {
 
         private final ConsumerConnector consumerConnector;
+        private final KafkaConsumer consumer1;
+        private final KafkaConsumer consumer2;
 
-        private Properties myProperties;
-        private List<KafkaStream> kafkaStreams = new LinkedList<>();
-
-        public TestKafkaNotification(Configuration applicationProperties,
-                                     ConsumerConnector consumerConnector) throws AtlasException
{
-            super(applicationProperties);
+        TestKafkaNotification(Properties properties, ConsumerConnector consumerConnector,
+                              KafkaConsumer consumer1, KafkaConsumer consumer2) {
+            super(properties);
             this.consumerConnector = consumerConnector;
+            this.consumer1 = consumer1;
+            this.consumer2 = consumer2;
         }
 
         @Override
         protected ConsumerConnector createConsumerConnector(Properties consumerProperties)
{
-            this.myProperties = consumerProperties;
-            kafkaStreams.clear();
             return consumerConnector;
         }
 
         @Override
-        protected <T> org.apache.atlas.kafka.KafkaConsumer<T> createKafkaConsumer(Class<T>
type,
-                                                                                  MessageDeserializer<T>
deserializer,
-                                                                                  KafkaStream
stream,
-                                                                                  int consumerId)
{
-            kafkaStreams.add(stream);
-            return super.createKafkaConsumer(type, deserializer, stream, consumerId);
+        protected <T> org.apache.atlas.kafka.KafkaConsumer<T>
+        createKafkaConsumer(Class<T> type, MessageDeserializer<T> deserializer,
KafkaStream stream,
+                            int consumerId, ConsumerConnector connector, boolean autoCommitEnabled)
{
+            if (consumerId == 0) {
+                return consumer1;
+            } else if (consumerId == 1) {
+                return consumer2;
+            }
+            return null;
         }
+
+
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
index e63175d..e8b55ef 100644
--- a/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
+++ b/notification/src/test/java/org/apache/atlas/notification/AbstractNotificationConsumerTest.java
@@ -253,6 +253,11 @@ public class AbstractNotificationConsumerTest {
         public boolean hasNext() {
             return index < messageList.size();
         }
+
+        @Override
+        public void commit() {
+            // do nothing.
+        }
     }
 
     private static final class TestDeserializer<T> extends VersionedMessageDeserializer<T>
{

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index f68e86c..5d37d07 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -20,6 +20,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-629 Kafka messages in ATLAS_HOOK might be lost in HA mode at the instant of failover.
(yhemanth)
 ATLAS-758 hdfs location of hive table is pointing to old location even after rename ( sumasai
)
 ATLAS-667 Entity delete should check for required reverse references ( dkantor via sumasai
)
 ATLAS-738 Add query ability on system properties like guid, state, createdtime etc (shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/typesystem/src/main/resources/atlas-application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/atlas-application.properties b/typesystem/src/main/resources/atlas-application.properties
index a343a20..aafad0f 100644
--- a/typesystem/src/main/resources/atlas-application.properties
+++ b/typesystem/src/main/resources/atlas-application.properties
@@ -70,6 +70,7 @@ atlas.kafka.consumer.timeout.ms=100
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities
+atlas.kafka.auto.commit.enable=false
 
 #########  Entity Audit Configs  #########
 atlas.audit.hbase.tablename=ATLAS_ENTITY_AUDIT_EVENTS

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 8ef2f64..901b1ed 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -17,6 +17,7 @@
  */
 package org.apache.atlas.notification;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
@@ -183,50 +184,55 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             while (shouldRun.get()) {
                 try {
                     if (hasNext()) {
-                        HookNotification.HookNotificationMessage message = consumer.next();
-                        atlasClient.setUser(message.getUser());
-                        try {
-                            switch (message.getType()) {
-                            case ENTITY_CREATE:
-                                HookNotification.EntityCreateRequest createRequest =
-                                        (HookNotification.EntityCreateRequest) message;
-                                atlasClient.createEntity(createRequest.getEntities());
-                                break;
+                        handleMessage(consumer.next());
+                    }
+                } catch (Throwable t) {
+                    LOG.warn("Failure in NotificationHookConsumer", t);
+                }
+            }
+        }
 
-                            case ENTITY_PARTIAL_UPDATE:
-                                HookNotification.EntityPartialUpdateRequest partialUpdateRequest
=
-                                        (HookNotification.EntityPartialUpdateRequest) message;
-                                atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                        partialUpdateRequest.getAttribute(),
-                                        partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
-                                break;
+        @VisibleForTesting
+        void handleMessage(HookNotification.HookNotificationMessage message) {
+            atlasClient.setUser(message.getUser());
+            try {
+                switch (message.getType()) {
+                case ENTITY_CREATE:
+                    HookNotification.EntityCreateRequest createRequest =
+                            (HookNotification.EntityCreateRequest) message;
+                    atlasClient.createEntity(createRequest.getEntities());
+                    break;
 
-                            case ENTITY_DELETE:
-                                HookNotification.EntityDeleteRequest deleteRequest =
-                                    (HookNotification.EntityDeleteRequest) message;
-                                atlasClient.deleteEntity(deleteRequest.getTypeName(),
-                                    deleteRequest.getAttribute(),
-                                    deleteRequest.getAttributeValue());
-                                break;
+                case ENTITY_PARTIAL_UPDATE:
+                    HookNotification.EntityPartialUpdateRequest partialUpdateRequest =
+                            (HookNotification.EntityPartialUpdateRequest) message;
+                    atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                            partialUpdateRequest.getAttribute(),
+                            partialUpdateRequest.getAttributeValue(), partialUpdateRequest.getEntity());
+                    break;
 
-                            case ENTITY_FULL_UPDATE:
-                                HookNotification.EntityUpdateRequest updateRequest =
-                                        (HookNotification.EntityUpdateRequest) message;
-                                atlasClient.updateEntities(updateRequest.getEntities());
-                                break;
+                case ENTITY_DELETE:
+                    HookNotification.EntityDeleteRequest deleteRequest =
+                        (HookNotification.EntityDeleteRequest) message;
+                    atlasClient.deleteEntity(deleteRequest.getTypeName(),
+                        deleteRequest.getAttribute(),
+                        deleteRequest.getAttributeValue());
+                    break;
 
-                            default:
-                                throw new IllegalStateException("Unhandled exception!");
-                            }
-                        } catch (Exception e) {
-                            //todo handle failures
-                            LOG.warn("Error handling message {}", message, e);
-                        }
-                    }
-                } catch (Throwable t) {
-                    LOG.warn("Failure in NotificationHookConsumer", t);
+                case ENTITY_FULL_UPDATE:
+                    HookNotification.EntityUpdateRequest updateRequest =
+                            (HookNotification.EntityUpdateRequest) message;
+                    atlasClient.updateEntities(updateRequest.getEntities());
+                    break;
+
+                default:
+                    throw new IllegalStateException("Unhandled exception!");
                 }
+            } catch (Exception e) {
+                //todo handle failures
+                LOG.warn("Error handling message {}", message, e);
             }
+            consumer.commit();
         }
 
         boolean serverAvailable(Timer timer) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
new file mode 100644
index 0000000..5b2ffeb
--- /dev/null
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerKafkaTest.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.notification;
+
+import com.google.inject.Inject;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasException;
+import org.apache.atlas.LocalAtlasClient;
+import org.apache.atlas.kafka.KafkaNotification;
+import org.apache.atlas.notification.hook.HookNotification;
+import org.apache.atlas.typesystem.Referenceable;
+import org.apache.commons.lang.RandomStringUtils;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Guice;
+import org.testng.annotations.Test;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+@Guice(modules = NotificationModule.class)
+public class NotificationHookConsumerKafkaTest {
+
+    @Inject
+    private NotificationInterface notificationInterface;
+
+    private KafkaNotification kafkaNotification;
+
+    @BeforeTest
+    public void setup() throws AtlasException {
+        kafkaNotification = startKafkaServer();
+    }
+
+    @AfterTest
+    public void shutdown() {
+        kafkaNotification.stop();
+    }
+
+    @Test
+    public void testConsumerConsumesNewMessageWithAutoCommitDisabled() throws AtlasException,
InterruptedException {
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user1", createEntity()));
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                createNewConsumer(kafkaNotification, false);
+        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(kafkaNotification, localAtlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user1");
+
+
+        // produce another message, and make sure it moves ahead. If commit succeeded, this
would work.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user2", createEntity()));
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user2");
+
+        kafkaNotification.close();
+    }
+
+    @Test
+    public void testConsumerRemainsAtSameMessageWithAutoCommitEnabled()
+            throws NotificationException, InterruptedException {
+
+        produceMessage(new HookNotification.EntityCreateRequest("test_user3", createEntity()));
+
+        NotificationConsumer<HookNotification.HookNotificationMessage> consumer =
+                createNewConsumer(kafkaNotification, true);
+        LocalAtlasClient localAtlasClient = mock(LocalAtlasClient.class);
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(kafkaNotification, localAtlasClient);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user3");
+
+        // produce another message, but this will not be consumed, as commit code is not
executed in hook consumer.
+        produceMessage(new HookNotification.EntityCreateRequest("test_user4", createEntity()));
+
+        consumeOneMessage(consumer, hookConsumer);
+        verify(localAtlasClient).setUser("test_user3");
+
+        kafkaNotification.close();
+    }
+
+    NotificationConsumer<HookNotification.HookNotificationMessage> createNewConsumer(
+            KafkaNotification kafkaNotification, boolean autoCommitEnabled) {
+        return kafkaNotification.<HookNotification.HookNotificationMessage>createConsumers(
+                NotificationInterface.NotificationType.HOOK, 1, autoCommitEnabled).get(0);
+    }
+
+    void consumeOneMessage(NotificationConsumer<HookNotification.HookNotificationMessage>
consumer,
+                           NotificationHookConsumer.HookConsumer hookConsumer) throws InterruptedException
{
+        while (!consumer.hasNext()) {
+            Thread.sleep(1000);
+        }
+        hookConsumer.handleMessage(consumer.next());
+    }
+
+    Referenceable createEntity() {
+        final Referenceable entity = new Referenceable(AtlasClient.DATA_SET_SUPER_TYPE);
+        entity.set("name", "db" + randomString());
+        entity.set("description", randomString());
+        return entity;
+    }
+
+    KafkaNotification startKafkaServer() throws AtlasException {
+        KafkaNotification kafkaNotification = (KafkaNotification) notificationInterface;
+        kafkaNotification.start();
+        return kafkaNotification;
+    }
+
+    protected String randomString() {
+        return RandomStringUtils.randomAlphanumeric(10);
+    }
+
+    private void produceMessage(HookNotification.HookNotificationMessage message) throws
NotificationException {
+        notificationInterface.send(NotificationInterface.NotificationType.HOOK, message);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/98769871/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
index 8765826..7860eb6 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerTest.java
@@ -21,6 +21,7 @@ import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.LocalAtlasClient;
 import org.apache.atlas.ha.HAConfiguration;
+import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.commons.configuration.Configuration;
 import org.mockito.Mock;
 import org.mockito.MockitoAnnotations;
@@ -34,6 +35,7 @@ import java.util.concurrent.ExecutorService;
 import static org.mockito.Mockito.any;
 import static org.mockito.Mockito.doThrow;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.verifyZeroInteractions;
@@ -87,6 +89,40 @@ public class NotificationHookConsumerTest {
     }
 
     @Test
+    public void testCommitIsCalledWhenMessageIsProcessed() throws AtlasServiceException {
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationConsumer consumer = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+        HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
+        when(message.getUser()).thenReturn("user");
+        when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+
+        hookConsumer.handleMessage(message);
+
+        verify(consumer).commit();
+    }
+
+    @Test
+    public void testCommitIsCalledEvenWhenMessageProcessingFails() throws AtlasServiceException
{
+        NotificationHookConsumer notificationHookConsumer =
+                new NotificationHookConsumer(notificationInterface, atlasClient);
+        NotificationConsumer consumer = mock(NotificationConsumer.class);
+        NotificationHookConsumer.HookConsumer hookConsumer =
+                notificationHookConsumer.new HookConsumer(consumer);
+        HookNotification.EntityCreateRequest message = mock(HookNotification.EntityCreateRequest.class);
+        when(message.getUser()).thenReturn("user");
+        when(message.getType()).thenReturn(HookNotification.HookNotificationType.ENTITY_CREATE);
+        when(atlasClient.createEntity(any(List.class))).
+                thenThrow(new RuntimeException("Simulating exception in processing message"));
+
+        hookConsumer.handleMessage(message);
+
+        verify(consumer).commit();
+    }
+
+    @Test
     public void testConsumerProceedsWithFalseIfInterrupted() throws AtlasServiceException,
InterruptedException {
         NotificationHookConsumer notificationHookConsumer = new NotificationHookConsumer(notificationInterface,
atlasClient);
         NotificationHookConsumer.HookConsumer hookConsumer =



Mime
View raw message