atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yhema...@apache.org
Subject incubator-atlas git commit: ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)
Date Fri, 17 Jun 2016 09:28:48 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master 86dd72aff -> a2e7738aa


ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/a2e7738a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/a2e7738a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/a2e7738a

Branch: refs/heads/master
Commit: a2e7738aa25bba20bafa5f42ee1d628807a26b52
Parents: 86dd72a
Author: Hemanth Yamijala <hyamijala@hortonworks.com>
Authored: Fri Jun 17 14:58:13 2016 +0530
Committer: Hemanth Yamijala <hyamijala@hortonworks.com>
Committed: Fri Jun 17 14:58:33 2016 +0530

----------------------------------------------------------------------
 docs/src/site/twiki/Configuration.twiki         | 10 ++-
 .../java/org/apache/atlas/hook/AtlasHook.java   | 36 +++++++-
 .../apache/atlas/hook/FailedMessagesLogger.java | 95 ++++++++++++++++++++
 .../apache/atlas/kafka/KafkaNotification.java   | 46 ++++++++--
 .../notification/NotificationException.java     | 13 +++
 .../org/apache/atlas/hook/AtlasHookTest.java    | 91 +++++++++++++++++--
 .../atlas/kafka/KafkaNotificationTest.java      | 85 ++++++++++++++++++
 release-log.txt                                 |  1 +
 8 files changed, 362 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/docs/src/site/twiki/Configuration.twiki
----------------------------------------------------------------------
diff --git a/docs/src/site/twiki/Configuration.twiki b/docs/src/site/twiki/Configuration.twiki
index 0e122fe..3ad0fbe 100644
--- a/docs/src/site/twiki/Configuration.twiki
+++ b/docs/src/site/twiki/Configuration.twiki
@@ -168,9 +168,17 @@ atlas.notification.replicas=1
 atlas.notification.kafka.service.principal=kafka/_HOST@EXAMPLE.COM
 # Set this to the location of the keytab file for Kafka
 #atlas.notification.kafka.keytab.location=/etc/security/keytabs/kafka.service.keytab
-
 </verbatim>
 
+These configuration parameters are useful for saving messages in case there are issues in
reaching Kafka for
+sending messages.
+
+<verbatim>
+# Whether to save messages that failed to be sent to Kafka, default is true
+atlas.notification.log.failed.messages=true
+# If saving messages is enabled, the file name to save them to. This file will be created
under the log directory of the hook's host component - like HiveServer2
+atlas.notification.failed.messages.filename=atlas_hook_failed_messages.log
+</verbatim>
 
 ---++ Client Configs
 <verbatim>

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
index 71029b0..2ca8d85 100644
--- a/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
+++ b/notification/src/main/java/org/apache/atlas/hook/AtlasHook.java
@@ -18,9 +18,11 @@
 
 package org.apache.atlas.hook;
 
+import com.google.common.annotations.VisibleForTesting;
 import com.google.inject.Guice;
 import com.google.inject.Injector;
 import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.NotificationModule;
 import org.apache.atlas.notification.hook.HookNotification;
@@ -50,6 +52,15 @@ public abstract class AtlasHook {
 
     protected static NotificationInterface notifInterface;
 
+    private static boolean logFailedMessages;
+    private static FailedMessagesLogger failedMessagesLogger;
+
+    public static final String ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY =
+            "atlas.notification.failed.messages.filename";
+    public static final String ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME = "atlas_hook_failed_messages.log";
+    public static final String ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY =
+            "atlas.notification.log.failed.messages";
+
     static {
         try {
             atlasProperties = ApplicationProperties.get();
@@ -57,6 +68,14 @@ public abstract class AtlasHook {
             LOG.info("Failed to load application properties", e);
         }
 
+        String failedMessageFile = atlasProperties.getString(ATLAS_NOTIFICATION_FAILED_MESSAGES_FILENAME_KEY,
+                ATLAS_HOOK_FAILED_MESSAGES_LOG_DEFAULT_NAME);
+        logFailedMessages = atlasProperties.getBoolean(ATLAS_NOTIFICATION_LOG_FAILED_MESSAGES_ENABLED_KEY,
true);
+        if (logFailedMessages) {
+            failedMessagesLogger = new FailedMessagesLogger(failedMessageFile);
+            failedMessagesLogger.init();
+        }
+
         Injector injector = Guice.createInjector(new NotificationModule());
         notifInterface = injector.getInstance(NotificationInterface.class);
 
@@ -89,18 +108,31 @@ public abstract class AtlasHook {
      * @param maxRetries maximum number of retries while sending message to messaging system
      */
     public static void notifyEntities(List<HookNotification.HookNotificationMessage>
messages, int maxRetries) {
+        notifyEntitiesInternal(messages, maxRetries, notifInterface, logFailedMessages, failedMessagesLogger);
+    }
+
+    @VisibleForTesting
+    static void notifyEntitiesInternal(List<HookNotification.HookNotificationMessage>
messages, int maxRetries,
+                                       NotificationInterface notificationInterface,
+                                       boolean shouldLogFailedMessages, FailedMessagesLogger
logger) {
         final String message = messages.toString();
 
         int numRetries = 0;
         while (true) {
             try {
-                notifInterface.send(NotificationInterface.NotificationType.HOOK, messages);
+                notificationInterface.send(NotificationInterface.NotificationType.HOOK, messages);
                 return;
-            } catch(Exception e) {
+            } catch (Exception e) {
                 numRetries++;
                 if (numRetries < maxRetries) {
                     LOG.debug("Failed to notify atlas for entity {}. Retrying", message,
e);
                 } else {
+                    if (shouldLogFailedMessages && e instanceof NotificationException)
{
+                        List<String> failedMessages = ((NotificationException) e).getFailedMessages();
+                        for (String msg : failedMessages) {
+                            logger.log(msg);
+                        }
+                    }
                     LOG.error("Failed to notify atlas for entity {} after {} retries. Quitting",
                             message, maxRetries, e);
                     return;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
new file mode 100644
index 0000000..0b552d3
--- /dev/null
+++ b/notification/src/main/java/org/apache/atlas/hook/FailedMessagesLogger.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.atlas.hook;
+
+
+import org.apache.log4j.Appender;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.FileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Enumeration;
+
+/**
+ * A logger wrapper that can be used to write messages that failed to be sent to a log file.
+ */
+public class FailedMessagesLogger {
+
+    public static final String PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE = "%d{ISO8601} %m%n";
+    public static final String DATE_PATTERN = ".yyyy-MM-dd";
+
+    private final Logger logger = Logger.getLogger("org.apache.atlas.hook.FailedMessagesLogger");
+    private String failedMessageFile;
+
+    public FailedMessagesLogger(String failedMessageFile) {
+        this.failedMessageFile = failedMessageFile;
+    }
+
+    void init() {
+        String rootLoggerDirectory = getRootLoggerDirectory();
+        if (rootLoggerDirectory == null) {
+            return;
+        }
+        String absolutePath = new File(rootLoggerDirectory, failedMessageFile).getAbsolutePath();
+        try {
+            DailyRollingFileAppender failedLogFilesAppender = new DailyRollingFileAppender(
+                    new PatternLayout(PATTERN_SPEC_TIMESTAMP_MESSAGE_NEWLINE), absolutePath,
DATE_PATTERN);
+            logger.addAppender(failedLogFilesAppender);
+            logger.setLevel(Level.ERROR);
+            logger.setAdditivity(false);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    /**
+     * Get the root logger file location under which the failed log messages will be written.
+     *
+     * Since this class is used in Hooks which run within JVMs of other components like Hive,
+     * we want to write the failed messages file under the same location as where logs from
+     * the host component are saved. This method attempts to get such a location from the
+     * root logger's appenders. It will work only if at least one of the appenders is a {@link
FileAppender}
+     *
+     * @return directory under which host component's logs are stored.
+     */
+    private String getRootLoggerDirectory() {
+        String rootLoggerDirectory = null;
+        org.apache.log4j.Logger rootLogger = org.apache.log4j.Logger.getRootLogger();
+
+        Enumeration allAppenders = rootLogger.getAllAppenders();
+        while (allAppenders.hasMoreElements()) {
+            Appender appender = (Appender) allAppenders.nextElement();
+            if (appender instanceof FileAppender) {
+                FileAppender fileAppender = (FileAppender) appender;
+                String rootLoggerFile = fileAppender.getFile();
+                rootLoggerDirectory = new File(rootLoggerFile).getParent();
+                break;
+            }
+        }
+        return rootLoggerDirectory;
+    }
+
+    void log(String message) {
+        logger.error(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
index 1ee62d2..806f2b4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaNotification.java
@@ -37,6 +37,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationConverter;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.Producer;
 import org.apache.kafka.clients.producer.ProducerConfig;
 import org.apache.kafka.clients.producer.ProducerRecord;
 import org.apache.kafka.clients.producer.RecordMetadata;
@@ -90,6 +91,10 @@ public class KafkaNotification extends AbstractNotification implements
Service {
         }
     };
 
+    @VisibleForTesting
+    String getTopicName(NotificationType notificationType) {
+        return TOPIC_MAP.get(notificationType);
+    }
 
     // ----- Constructors ----------------------------------------------------
 
@@ -214,24 +219,36 @@ public class KafkaNotification extends AbstractNotification implements
Service {
         if (producer == null) {
             createProducer();
         }
+        sendInternalToProducer(producer, type, messages);
+    }
 
+    @VisibleForTesting
+    void sendInternalToProducer(Producer p, NotificationType type, String[] messages) throws
NotificationException {
         String topic = TOPIC_MAP.get(type);
-        List<Future<RecordMetadata>> futures = new ArrayList<>();
+        List<MessageContext> messageContexts = new ArrayList<>();
         for (String message : messages) {
             ProducerRecord record = new ProducerRecord(topic, message);
             LOG.debug("Sending message for topic {}: {}", topic, message);
-            futures.add(producer.send(record));
+            Future future = p.send(record);
+            messageContexts.add(new MessageContext(future, message));
         }
 
-        for (Future<RecordMetadata> future : futures) {
+        List<String> failedMessages = new ArrayList<>();
+        Exception lastFailureException = null;
+        for (MessageContext context : messageContexts) {
             try {
-                RecordMetadata response = future.get();
+                RecordMetadata response = context.getFuture().get();
                 LOG.debug("Sent message for topic - {}, partition - {}, offset - {}", response.topic(),
                     response.partition(), response.offset());
             } catch (Exception e) {
-                throw new NotificationException(e);
+                LOG.warn("Could not send message - {}", context.getMessage(), e);
+                lastFailureException = e;
+                failedMessages.add(context.getMessage());
             }
         }
+        if (lastFailureException != null) {
+            throw new NotificationException(lastFailureException, failedMessages);
+        }
     }
 
     // ----- helper methods --------------------------------------------------
@@ -359,4 +376,23 @@ public class KafkaNotification extends AbstractNotification implements
Service {
             }
         }
     }
+
+    private class MessageContext {
+
+        private final Future<RecordMetadata> future;
+        private final String message;
+
+        public MessageContext(Future<RecordMetadata> future, String message) {
+            this.future = future;
+            this.message = message;
+        }
+
+        public Future<RecordMetadata> getFuture() {
+            return future;
+        }
+
+        public String getMessage() {
+            return message;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
index d9d89df..2dd9c9f 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationException.java
@@ -19,11 +19,24 @@ package org.apache.atlas.notification;
 
 import org.apache.atlas.AtlasException;
 
+import java.util.List;
+
 /**
  * Exception from notification.
  */
 public class NotificationException extends AtlasException {
+    private List<String> failedMessages;
+
     public NotificationException(Exception e) {
         super(e);
     }
+
+    public NotificationException(Exception e, List<String> failedMessages) {
+        super(e);
+        this.failedMessages = failedMessages;
+    }
+
+    public List<String> getFailedMessages() {
+        return failedMessages;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
index 16cb0f0..9854bcc 100644
--- a/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
+++ b/notification/src/test/java/org/apache/atlas/hook/AtlasHookTest.java
@@ -21,24 +21,101 @@ package org.apache.atlas.hook;
 import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
 import org.apache.atlas.notification.hook.HookNotification;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.List;
 
 import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyZeroInteractions;
 
 
 public class AtlasHookTest {
+
+    @Mock
+    private NotificationInterface notificationInterface;
+
+    @Mock
+    private FailedMessagesLogger failedMessagesLogger;
+
+    @BeforeMethod
+    public void setup() {
+        MockitoAnnotations.initMocks(this);
+    }
+
+    @Test (timeOut = 10000)
+    public void testNotifyEntitiesDoesNotHangOnException() throws Exception {
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
+        doThrow(new NotificationException(new Exception())).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 0, notificationInterface,
false,
+                failedMessagesLogger);
+        // if we've reached here, the method finished OK.
+    }
+
+    @Test
+    public void testNotifyEntitiesRetriesOnException() throws NotificationException {
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
+        doThrow(new NotificationException(new Exception())).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface,
false,
+                failedMessagesLogger);
+
+        verify(notificationInterface, times(2)).
+                send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+    }
+
+    @Test
+    public void testFailedMessageIsLoggedIfRequired() throws NotificationException {
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
+                .when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface,
true,
+                failedMessagesLogger);
+
+        verify(failedMessagesLogger, times(1)).log("test message");
+    }
+
     @Test
-    public void testnotifyEntities() throws Exception{
+    public void testFailedMessageIsNotLoggedIfNotRequired() throws NotificationException
{
         List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
-        NotificationInterface notifInterface = mock(NotificationInterface.class);
-        doThrow(new NotificationException(new Exception())).when(notifInterface)
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test message")))
+                .when(notificationInterface)
                 .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
-        AtlasHook.notifInterface = notifInterface;
-        AtlasHook.notifyEntities(hookNotificationMessages, 2);
-        System.out.println("AtlasHook.notifyEntities() returns successfully");
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface,
false,
+                failedMessagesLogger);
+
+        verifyZeroInteractions(failedMessagesLogger);
+    }
+
+    @Test
+    public void testAllFailedMessagesAreLogged() throws NotificationException {
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
+        doThrow(new NotificationException(new Exception(), Arrays.asList("test message1",
"test message2")))
+                .when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface,
true,
+                failedMessagesLogger);
+
+        verify(failedMessagesLogger, times(1)).log("test message1");
+        verify(failedMessagesLogger, times(1)).log("test message2");
+    }
+
+    @Test
+    public void testFailedMessageIsNotLoggedIfNotANotificationException() throws Exception
{
+        List<HookNotification.HookNotificationMessage> hookNotificationMessages = new
ArrayList<>();
+        doThrow(new RuntimeException("test message")).when(notificationInterface)
+                .send(NotificationInterface.NotificationType.HOOK, hookNotificationMessages);
+        AtlasHook.notifyEntitiesInternal(hookNotificationMessages, 2, notificationInterface,
true,
+                failedMessagesLogger);
+
+        verifyZeroInteractions(failedMessagesLogger);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
----------------------------------------------------------------------
diff --git a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
index 219bd70..2a49634 100644
--- a/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
+++ b/notification/src/test/java/org/apache/atlas/kafka/KafkaNotificationTest.java
@@ -22,7 +22,12 @@ import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.serializer.StringDecoder;
 import org.apache.atlas.notification.MessageDeserializer;
 import org.apache.atlas.notification.NotificationConsumer;
+import org.apache.atlas.notification.NotificationException;
 import org.apache.atlas.notification.NotificationInterface;
+import org.apache.kafka.clients.producer.Producer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.TopicPartition;
 import org.testng.annotations.Test;
 
 import java.util.ArrayList;
@@ -30,6 +35,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
 
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.eq;
@@ -39,6 +46,7 @@ import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
 
 public class KafkaNotificationTest {
 
@@ -77,6 +85,83 @@ public class KafkaNotificationTest {
         assertTrue(consumers.contains(consumer2));
     }
 
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldSendMessagesSuccessfully() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenReturn(new RecordMetadata(new TopicPartition(topicName,
0), 0, 0));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new String[]{message});
+
+        verify(producer).send(expectedRecord);
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldThrowExceptionIfProducerFails() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message = "This is a test message";
+        Future returnValue = mock(Future.class);
+        when(returnValue.get()).thenThrow(new RuntimeException("Simulating exception"));
+        ProducerRecord expectedRecord = new ProducerRecord(topicName, message);
+        when(producer.send(expectedRecord)).thenReturn(returnValue);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                NotificationInterface.NotificationType.HOOK, new String[]{message});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 1);
+            assertEquals(e.getFailedMessages().get(0), "This is a test message");
+        }
+    }
+
+    @Test
+    @SuppressWarnings("unchecked")
+    public void shouldCollectAllFailedMessagesIfProducerFails() throws NotificationException,
+            ExecutionException, InterruptedException {
+        Properties configProperties = mock(Properties.class);
+        KafkaNotification kafkaNotification = new KafkaNotification(configProperties);
+
+        Producer producer = mock(Producer.class);
+        String topicName = kafkaNotification.getTopicName(NotificationInterface.NotificationType.HOOK);
+        String message1 = "This is a test message1";
+        String message2 = "This is a test message2";
+        Future returnValue1 = mock(Future.class);
+        when(returnValue1.get()).thenThrow(new RuntimeException("Simulating exception"));
+        Future returnValue2 = mock(Future.class);
+        when(returnValue2.get()).thenThrow(new RuntimeException("Simulating exception"));
+        ProducerRecord expectedRecord1 = new ProducerRecord(topicName, message1);
+        when(producer.send(expectedRecord1)).thenReturn(returnValue1);
+        ProducerRecord expectedRecord2 = new ProducerRecord(topicName, message2);
+        when(producer.send(expectedRecord2)).thenReturn(returnValue1);
+
+        try {
+            kafkaNotification.sendInternalToProducer(producer,
+                    NotificationInterface.NotificationType.HOOK, new String[]{message1, message2});
+            fail("Should have thrown NotificationException");
+        } catch (NotificationException e) {
+            assertEquals(e.getFailedMessages().size(), 2);
+            assertEquals(e.getFailedMessages().get(0), "This is a test message1");
+            assertEquals(e.getFailedMessages().get(1), "This is a test message2");
+        }
+    }
+
     class TestKafkaNotification extends KafkaNotification {
 
         private final ConsumerConnector consumerConnector;

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/a2e7738a/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index b243037..413493f 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -23,6 +23,7 @@ ATLAS-409 Atlas will not import avro tables with schema read from a file
(dosset
 ATLAS-379 Create sqoop and falcon metadata addons (venkatnrangan,bvellanki,sowmyaramesh via
shwethags)
 
 ALL CHANGES:
+ATLAS-901 Log messages that cannot be sent to Kafka to a specific log configuration (yhemanth)
 ATLAS-911 Get entity by unique attribute doesn't enforce type (shwethags)
 ATLAS-899 Fix Hive Hook documentation (sumasai via yhemanth)
 ATLAS-890 Log received messages in case of error (sumasai via yhemanth)


Mime
View raw message