atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mad...@apache.org
Subject incubator-atlas git commit: ATLAS-1944: Implemented ShutdownableThread for HookConsumer
Date Thu, 13 Jul 2017 20:49:58 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/0.8-incubating ee3e9a8da -> 9b63509dc


ATLAS-1944: Implemented ShutdownableThread for HookConsumer

Signed-off-by: Madhan Neethiraj <madhan@apache.org>
(cherry picked from commit 18745cf4b98af9c45e853daa280342dde8da1300)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/9b63509d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/9b63509d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/9b63509d

Branch: refs/heads/0.8-incubating
Commit: 9b63509dc7d001092cae39297c3bc4f6f500ed2d
Parents: ee3e9a8
Author: ashutoshm <amestry@hortonworks.com>
Authored: Wed Jul 12 14:43:45 2017 -0700
Committer: Madhan Neethiraj <madhan@apache.org>
Committed: Thu Jul 13 13:49:22 2017 -0700

----------------------------------------------------------------------
 .../notification/NotificationHookConsumer.java  | 56 +++++++++++---------
 1 file changed, 31 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/9b63509d/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index 9e5b864..0dea0e2 100644
--- a/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/webapp/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,6 +19,7 @@ package org.apache.atlas.notification;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import kafka.utils.ShutdownableThread;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasException;
 import org.apache.atlas.AtlasServiceException;
@@ -28,7 +29,11 @@ import org.apache.atlas.ha.HAConfiguration;
 import org.apache.atlas.kafka.AtlasKafkaMessage;
 import org.apache.atlas.listener.ActiveStateChangeHandler;
 import org.apache.atlas.model.instance.AtlasEntity;
+import org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
 import org.apache.atlas.notification.hook.HookNotification.EntityPartialUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
+import org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
 import org.apache.atlas.repository.converters.AtlasInstanceConverter;
 import org.apache.atlas.repository.store.graph.AtlasEntityStore;
 import org.apache.atlas.repository.store.graph.v1.AtlasEntityStream;
@@ -41,11 +46,12 @@ import org.apache.atlas.web.filters.AuditFilter;
 import org.apache.atlas.web.service.ServiceState;
 import org.apache.atlas.web.util.DateTimeHelper;
 import org.apache.commons.configuration.Configuration;
+import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.springframework.core.annotation.Order;
 import org.springframework.stereotype.Component;
-import org.apache.kafka.common.TopicPartition;
+
 import javax.inject.Inject;
 import java.util.ArrayList;
 import java.util.Date;
@@ -56,14 +62,7 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
-import static org.apache.atlas.AtlasClientV2.CREATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.DELETE_ENTITY_BY_ATTRIBUTE;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY;
-import static org.apache.atlas.AtlasClientV2.UPDATE_ENTITY_BY_ATTRIBUTE;
-import static org.apache.atlas.notification.hook.HookNotification.EntityCreateRequest;
-import static org.apache.atlas.notification.hook.HookNotification.EntityDeleteRequest;
-import static org.apache.atlas.notification.hook.HookNotification.EntityUpdateRequest;
-import static org.apache.atlas.notification.hook.HookNotification.HookNotificationMessage;
+import static org.apache.atlas.AtlasClientV2.*;
 
 /**
  * Consumer of notifications from hooks e.g., hive hook etc.
@@ -80,7 +79,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
     public static final String CONSUMER_THREADS_PROPERTY = "atlas.notification.hook.numthreads";
     public static final String CONSUMER_RETRIES_PROPERTY = "atlas.notification.hook.maxretries";
     public static final String CONSUMER_FAILEDCACHESIZE_PROPERTY = "atlas.notification.hook.failedcachesize";
-    public static final String CONSUMER_RETRY_INTERVAL="atlas.notification.consumer.retry.interval";
+    public static final String CONSUMER_RETRY_INTERVAL = "atlas.notification.consumer.retry.interval";
 
     public static final int SERVER_READY_WAIT_TIME_MS = 1000;
     private final AtlasEntityStore atlasEntityStore;
@@ -177,7 +176,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     /**
      * Start Kafka consumer threads that read from Kafka topic when server is activated.
-     *
+     * <p>
      * Since the consumers create / update entities to the shared backend store, only the
active instance
      * should perform this activity. Hence, these threads are started only on server activation.
      */
@@ -189,7 +188,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
     /**
      * Stop Kafka consumer threads that read from Kafka topic when server is de-activated.
-     *
+     * <p>
      * Since the consumers create / update entities to the shared backend store, only the
active instance
      * should perform this activity. Hence, these threads are stopped only on server deactivation.
      */
@@ -205,18 +204,18 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         }
     }
 
-    class HookConsumer implements Runnable {
+    class HookConsumer extends ShutdownableThread {
         private final NotificationConsumer<HookNotificationMessage> consumer;
         private final AtomicBoolean shouldRun = new AtomicBoolean(false);
         private List<HookNotificationMessage> failedMessages = new ArrayList<>();
 
         public HookConsumer(NotificationConsumer<HookNotificationMessage> consumer)
{
+            super("atlas-hook-consumer-thread", false);
             this.consumer = consumer;
         }
 
-
         @Override
-        public void run() {
+        public void doWork() {
             shouldRun.set(true);
 
             if (!serverAvailable(new NotificationHookConsumer.Timer())) {
@@ -226,7 +225,7 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             while (shouldRun.get()) {
                 try {
                     List<AtlasKafkaMessage<HookNotificationMessage>> messages
= consumer.receive(1000L);
-                    for (AtlasKafkaMessage<HookNotificationMessage> msg :  messages){
+                    for (AtlasKafkaMessage<HookNotificationMessage> msg : messages)
{
                         handleMessage(msg);
                     }
                 } catch (Throwable t) {
@@ -267,15 +266,17 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                             if (numRetries == 0) { // audit only on the first attempt
                                 audit(messageUser, UPDATE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                      String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(),
partialUpdateRequest.getTypeName()));
+                                        String.format(UPDATE_ENTITY_BY_ATTRIBUTE.getPath(),
partialUpdateRequest.getTypeName()));
                             }
 
                             Referenceable referenceable = partialUpdateRequest.getEntity();
                             entities = instanceConverter.toAtlasEntity(referenceable);
 
                             AtlasEntityType entityType = typeRegistry.getEntityTypeByName(partialUpdateRequest.getTypeName());
-                            String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType,
new HashMap<String, Object>(){
-                                { put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
}
+                            String guid = AtlasGraphUtilsV1.getGuidByUniqueAttributes(entityType,
new HashMap<String, Object>() {
+                                {
+                                    put(partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue());
+                                }
                             });
 
                             // There should only be one root entity
@@ -289,13 +290,15 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
 
                             if (numRetries == 0) { // audit only on the first attempt
                                 audit(messageUser, DELETE_ENTITY_BY_ATTRIBUTE.getMethod(),
-                                      String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(),
deleteRequest.getTypeName()));
+                                        String.format(DELETE_ENTITY_BY_ATTRIBUTE.getPath(),
deleteRequest.getTypeName()));
                             }
 
                             try {
                                 AtlasEntityType type = (AtlasEntityType) typeRegistry.getType(deleteRequest.getTypeName());
                                 atlasEntityStore.deleteByUniqueAttributes(type,
-                                        new HashMap<String, Object>() {{ put(deleteRequest.getAttribute(),
deleteRequest.getAttributeValue()); }});
+                                        new HashMap<String, Object>() {{
+                                            put(deleteRequest.getAttribute(), deleteRequest.getAttributeValue());
+                                        }});
                             } catch (ClassCastException cle) {
                                 LOG.error("Failed to do a partial update on Entity");
                             }
@@ -319,10 +322,10 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
                     break;
                 } catch (Throwable e) {
                     LOG.warn("Error handling message", e);
-                    try{
+                    try {
                         LOG.info("Sleeping for {} ms before retry", consumerRetryInterval);
                         Thread.sleep(consumerRetryInterval);
-                    }catch (InterruptedException ie){
+                    } catch (InterruptedException ie) {
                         LOG.error("Notification consumer thread sleep interrupted");
                     }
 
@@ -379,9 +382,12 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
             return true;
         }
 
-        public void stop() {
+        @Override
+        public void shutdown() {
+            super.initiateShutdown();
             shouldRun.set(false);
             consumer.close();
+            super.awaitShutdown();
         }
     }
 
@@ -393,4 +399,4 @@ public class NotificationHookConsumer implements Service, ActiveStateChangeHandl
         AuditFilter.audit(messageUser, THREADNAME_PREFIX, method, LOCALHOST, path, LOCALHOST,
                 DateTimeHelper.formatDateUTC(new Date()));
     }
-}
+}
\ No newline at end of file


Mime
View raw message