atlas-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From suma...@apache.org
Subject incubator-atlas git commit: ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
Date Fri, 18 Dec 2015 10:38:12 GMT
Repository: incubator-atlas
Updated Branches:
  refs/heads/master fa502b217 -> 2cae42c08


ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)


Project: http://git-wip-us.apache.org/repos/asf/incubator-atlas/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-atlas/commit/2cae42c0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-atlas/tree/2cae42c0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-atlas/diff/2cae42c0

Branch: refs/heads/master
Commit: 2cae42c08a4122db672f3188a46b8385992ddc02
Parents: fa502b2
Author: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Authored: Fri Dec 18 16:07:57 2015 +0530
Committer: Suma Shivaprasad <sumasai.shivaprasad@gmail.com>
Committed: Fri Dec 18 16:07:57 2015 +0530

----------------------------------------------------------------------
 distro/src/conf/application.properties          |  1 +
 .../org/apache/atlas/kafka/KafkaConsumer.java   |  6 ++
 .../AbstractNotificationConsumer.java           |  5 +-
 .../notification/NotificationConsumer.java      |  9 +-
 .../notification/NotificationHookConsumer.java  | 69 +++++++++------
 release-log.txt                                 |  1 +
 .../atlas/repository/graph/GraphHelper.java     |  2 +-
 .../graph/TypedInstanceToGraphMapper.java       |  4 +
 .../src/main/resources/application.properties   |  1 +
 .../notification/EntityNotificationIT.java      | 89 ++++----------------
 .../NotificationHookConsumerIT.java             |  8 +-
 .../atlas/web/resources/BaseResourceIT.java     | 86 ++++++++++---------
 .../web/resources/EntityJerseyResourceIT.java   | 37 +++-----
 13 files changed, 144 insertions(+), 174 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/distro/src/conf/application.properties
----------------------------------------------------------------------
diff --git a/distro/src/conf/application.properties b/distro/src/conf/application.properties
index f9888bd..cb6ee31 100755
--- a/distro/src/conf/application.properties
+++ b/distro/src/conf/application.properties
@@ -57,6 +57,7 @@ atlas.kafka.bootstrap.servers=localhost:9027
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
 atlas.kafka.auto.commit.interval.ms=1000
+atlas.kafka.auto.offset.reset=smallest
 atlas.kafka.hook.group.id=atlas
 
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
index d4e07c0..1f05df4 100644
--- a/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/kafka/KafkaConsumer.java
@@ -69,4 +69,10 @@ public class KafkaConsumer<T> extends AbstractNotificationConsumer<T>
{
                 consumerId, message.topic(), message.partition(), message.offset(), message.message());
         return (String) message.message();
     }
+
+    @Override
+    protected String peekMessage() {
+        MessageAndMetadata message = (MessageAndMetadata) iterator.peek();
+        return (String) message.message();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
index 42a4e7f..b6a9d7b 100644
--- a/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/AbstractNotificationConsumer.java
@@ -94,10 +94,11 @@ public abstract class AbstractNotificationConsumer<T> implements
NotificationCon
     }
 
     @Override
-    public void remove() {
-        throw new UnsupportedOperationException("The remove method is not supported.");
+    public T peek() {
+        return GSON.fromJson(peekMessage(), type);
     }
 
+    protected abstract String peekMessage();
 
     // ----- inner class : ImmutableListDeserializer ---------------------------
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
index 346ec3e..d2da975 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationConsumer.java
@@ -17,8 +17,11 @@
 
 package org.apache.atlas.notification;
 
-import java.util.Iterator;
-
 // TODO : docs!
-public interface NotificationConsumer<T> extends Iterator<T>{
+public interface NotificationConsumer<T>{
+    boolean hasNext();
+
+    T next();
+
+    T peek();
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
----------------------------------------------------------------------
diff --git a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
index e02aafa..3352cd0 100644
--- a/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
+++ b/notification/src/main/java/org/apache/atlas/notification/NotificationHookConsumer.java
@@ -19,10 +19,10 @@ package org.apache.atlas.notification;
 
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
+import kafka.consumer.ConsumerTimeoutException;
 import org.apache.atlas.ApplicationProperties;
 import org.apache.atlas.AtlasClient;
 import org.apache.atlas.AtlasException;
-import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.notification.hook.HookNotification;
 import org.apache.atlas.service.Service;
 import org.apache.commons.configuration.Configuration;
@@ -98,6 +98,14 @@ public class NotificationHookConsumer implements Service {
             this.consumer = consumer;
         }
 
+        private boolean hasNext() {
+            try {
+                return consumer.hasNext();
+            } catch(ConsumerTimeoutException e) {
+                return false;
+            }
+        }
+
         @Override
         public void run() {
 
@@ -105,34 +113,39 @@ public class NotificationHookConsumer implements Service {
                 return;
             }
 
-            while(consumer.hasNext()) {
-                HookNotification.HookNotificationMessage message = consumer.next();
-
+            while(true) {
                 try {
-                    switch (message.getType()) {
-                        case ENTITY_CREATE:
-                            HookNotification.EntityCreateRequest createRequest =
-                                    (HookNotification.EntityCreateRequest) message;
-                            atlasClient.createEntity(createRequest.getEntities());
-                            break;
-
-                        case ENTITY_PARTIAL_UPDATE:
-                            HookNotification.EntityPartialUpdateRequest partialUpdateRequest
=
-                                    (HookNotification.EntityPartialUpdateRequest) message;
-                            atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
-                                    partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
-                                    partialUpdateRequest.getEntity());
-                            break;
-
-                        case ENTITY_FULL_UPDATE:
-                            HookNotification.EntityUpdateRequest updateRequest =
-                                    (HookNotification.EntityUpdateRequest) message;
-                            atlasClient.updateEntities(updateRequest.getEntities());
-                            break;
+                    if (hasNext()) {
+                        HookNotification.HookNotificationMessage message = consumer.next();
+                        try {
+                            switch (message.getType()) {
+                                case ENTITY_CREATE:
+                                    HookNotification.EntityCreateRequest createRequest =
+                                            (HookNotification.EntityCreateRequest) message;
+                                    atlasClient.createEntity(createRequest.getEntities());
+                                    break;
+
+                                case ENTITY_PARTIAL_UPDATE:
+                                    HookNotification.EntityPartialUpdateRequest partialUpdateRequest
=
+                                            (HookNotification.EntityPartialUpdateRequest)
message;
+                                    atlasClient.updateEntity(partialUpdateRequest.getTypeName(),
+                                            partialUpdateRequest.getAttribute(), partialUpdateRequest.getAttributeValue(),
+                                            partialUpdateRequest.getEntity());
+                                    break;
+
+                                case ENTITY_FULL_UPDATE:
+                                    HookNotification.EntityUpdateRequest updateRequest =
+                                            (HookNotification.EntityUpdateRequest) message;
+                                    atlasClient.updateEntities(updateRequest.getEntities());
+                                    break;
+                            }
+                        } catch (Exception e) {
+                            //todo handle failures
+                            LOG.warn("Error handling message {}", message, e);
+                        }
                     }
-                } catch (Exception e) {
-                    //todo handle failures
-                    LOG.debug("Error handling message {}", message, e);
+                } catch(Throwable t) {
+                    LOG.warn("Failure in NotificationHookConsumer", t);
                 }
             }
         }
@@ -150,7 +163,7 @@ public class NotificationHookConsumer implements Service {
                         return false;
                     }
                 }
-            } catch (AtlasServiceException e) {
+            } catch (Throwable e) {
                 LOG.info(
                         "Handled AtlasServiceException while waiting for Atlas Server to
become ready, " +
                                 "exiting consumer thread.", e);

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/release-log.txt
----------------------------------------------------------------------
diff --git a/release-log.txt b/release-log.txt
index 7d45a7b..0588ff9 100644
--- a/release-log.txt
+++ b/release-log.txt
@@ -14,6 +14,7 @@ ATLAS-54 Rename configs in hive hook (shwethags)
 ATLAS-3 Mixed Index creation fails with Date types (sumasai via shwethags)
 
 ALL CHANGES:
+ATLAS-394 Fix BaseResourceIT.waitForNotification (shwethags via sumasai)
 ATLAS-385 Support for Lineage for entities with SuperType as DataSet (anilsg via sumasai)
 ATLAS-342 Atlas is sending an ENTITY_CREATE event to the ATLAS_ENTITIES topic even if the
entity exists already (shwethags)
 ATLAS-386 Handle hive rename Table (shwethags)

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
index 6b2d5d1..9955f07 100755
--- a/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/GraphHelper.java
@@ -252,7 +252,7 @@ public final class GraphHelper {
      */
     public Vertex getVertexForInstanceByUniqueAttribute(ClassType classType, IReferenceableInstance
instance)
         throws AtlasException {
-
+        LOG.debug("Checking if there is an instance with the same unique attributes for instance
{}", instance);
         Vertex result = null;
         for (AttributeInfo attributeInfo : classType.fieldMapping().fields.values()) {
             if (attributeInfo.isUnique) {

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
----------------------------------------------------------------------
diff --git a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
index 996f31b..2f3eb30 100644
--- a/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
+++ b/repository/src/main/java/org/apache/atlas/repository/graph/TypedInstanceToGraphMapper.java
@@ -231,12 +231,14 @@ public final class TypedInstanceToGraphMapper {
         List<ITypedReferenceableInstance> instancesToUpdate = new ArrayList<>();
 
         for (IReferenceableInstance instance : instances) {
+            LOG.debug("Discovering instance to create/update for {}", instance);
             ITypedReferenceableInstance newInstance;
             Id id = instance.getId();
 
             if (!idToVertexMap.containsKey(id)) {
                 Vertex instanceVertex;
                 if (id.isAssigned()) {  // has a GUID
+                    LOG.debug("Instance {} has an assigned id", instance.getId()._getId());
                     instanceVertex = graphHelper.getVertexForGUID(id.id);
                     if (!(instance instanceof ReferenceableInstance)) {
                         throw new IllegalStateException(
@@ -252,6 +254,7 @@ public final class TypedInstanceToGraphMapper {
 
                     //no entity with the given unique attribute, create new
                     if (instanceVertex == null) {
+                        LOG.debug("Creating new vertex for instance {}", instance);
                         newInstance = classType.convert(instance, Multiplicity.REQUIRED);
                         instanceVertex = graphHelper.createVertexWithIdentity(newInstance,
classType.getAllSuperTypeNames());
                         instancesToCreate.add(newInstance);
@@ -260,6 +263,7 @@ public final class TypedInstanceToGraphMapper {
                         mapInstanceToVertex(newInstance, instanceVertex, classType.fieldMapping().fields,
true, Operation.CREATE);
 
                     } else {
+                        LOG.debug("Re-using existing vertex {} for instance {}", instanceVertex.getId(),
instance);
                         if (!(instance instanceof ReferenceableInstance)) {
                             throw new IllegalStateException(
                                     String.format("%s is not of type ITypedReferenceableInstance",
instance));

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/typesystem/src/main/resources/application.properties
----------------------------------------------------------------------
diff --git a/typesystem/src/main/resources/application.properties b/typesystem/src/main/resources/application.properties
index d475d7e..702c6f2 100644
--- a/typesystem/src/main/resources/application.properties
+++ b/typesystem/src/main/resources/application.properties
@@ -66,6 +66,7 @@ atlas.kafka.bootstrap.servers=localhost:19027
 atlas.kafka.data=${sys:atlas.data}/kafka
 atlas.kafka.zookeeper.session.timeout.ms=400
 atlas.kafka.zookeeper.sync.time.ms=20
+atlas.kafka.consumer.timeout.ms=100
 atlas.kafka.auto.commit.interval.ms=100
 atlas.kafka.hook.group.id=atlas
 atlas.kafka.entities.group.id=atlas_entities

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
index b2501ec..cd4e743 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/EntityNotificationIT.java
@@ -35,10 +35,8 @@ import org.apache.atlas.typesystem.types.TraitType;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.web.resources.BaseResourceIT;
 import org.apache.atlas.web.util.Servlets;
-import org.junit.AfterClass;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
 
@@ -48,7 +46,9 @@ import java.util.Collections;
 import java.util.LinkedList;
 import java.util.List;
 
-import static org.testng.Assert.*;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertTrue;
 
 /**
  * Entity Notification Integration Tests.
@@ -62,9 +62,9 @@ public class EntityNotificationIT extends BaseResourceIT {
     private final String TABLE_NAME = "table" + randomString();
     @Inject
     private NotificationInterface notificationInterface;
-    private EntityNotificationConsumer notificationConsumer;
     private Id tableId;
     private String traitName;
+    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -74,19 +74,7 @@ public class EntityNotificationIT extends BaseResourceIT {
         List<NotificationConsumer<EntityNotification>> consumers =
             notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES,
1);
 
-        NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
-        notificationConsumer = new EntityNotificationConsumer(consumer);
-        notificationConsumer.start();
-    }
-
-    @AfterClass
-    public void tearDown() {
-        notificationConsumer.stop();
-    }
-
-    @BeforeMethod
-    public void setupTest() {
-        notificationConsumer.reset();
+        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -97,17 +85,8 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         final String guid = tableId._getId();
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.ENTITY_CREATE, entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.ENTITY_CREATE,
HIVE_TABLE_TYPE, guid));
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
@@ -119,19 +98,8 @@ public class EntityNotificationIT extends BaseResourceIT {
 
         serviceClient.updateEntityAttribute(guid, property, newValue);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.ENTITY_UPDATE, entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
-
-        assertEquals(newValue, entity.getValuesMap().get(property));
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.ENTITY_UPDATE,
HIVE_TABLE_TYPE, guid));
     }
 
     @Test(dependsOnMethods = "testCreateEntity")
@@ -154,18 +122,10 @@ public class EntityNotificationIT extends BaseResourceIT {
         ClientResponse clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+        EntityNotification entityNotification = waitForNotification(notificationConsumer,
MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE,
guid));
 
         IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
-
         assertTrue(entity.getTraits().contains(traitName));
 
         List<IStruct> allTraits = entityNotification.getAllTraits();
@@ -178,9 +138,6 @@ public class EntityNotificationIT extends BaseResourceIT {
         assertTrue(allTraitNames.contains(superTraitName));
         assertTrue(allTraitNames.contains(superSuperTraitName));
 
-        // add another trait with the same super type to the entity
-        notificationConsumer.reset();
-
         String anotherTraitName = "Trait" + randomString();
         createTrait(anotherTraitName, superTraitName);
 
@@ -191,12 +148,8 @@ public class EntityNotificationIT extends BaseResourceIT {
         clientResponse = addTrait(guid, traitInstanceJSON);
         assertEquals(clientResponse.getStatus(), Response.Status.CREATED.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_ADD, entityNotification.getOperationType());
+        entityNotification = waitForNotification(notificationConsumer, MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_ADD, HIVE_TABLE_TYPE,
guid));
 
         allTraits = entityNotification.getAllTraits();
         allTraitNames = new LinkedList<>();
@@ -217,20 +170,10 @@ public class EntityNotificationIT extends BaseResourceIT {
         ClientResponse clientResponse = deleteTrait(guid, traitName);
         Assert.assertEquals(clientResponse.getStatus(), Response.Status.OK.getStatusCode());
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-
-        EntityNotification entityNotification = notificationConsumer.getLastEntityNotification();
-
-        assertNotNull(entityNotification);
-        assertEquals(EntityNotification.OperationType.TRAIT_DELETE,
-            entityNotification.getOperationType());
-
-        IReferenceableInstance entity = entityNotification.getEntity();
-
-        assertEquals(HIVE_TABLE_TYPE, entity.getTypeName());
-        assertEquals(guid, entity.getId()._getId());
+        EntityNotification entityNotification = waitForNotification(notificationConsumer,
MAX_WAIT_TIME,
+                newNotificationPredicate(EntityNotification.OperationType.TRAIT_DELETE, HIVE_TABLE_TYPE,
guid));
 
-        assertFalse(entity.getTraits().contains(traitName));
+        assertFalse(entityNotification.getEntity().getTraits().contains(traitName));
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
index 3a4661c..e64e949 100644
--- a/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
+++ b/webapp/src/test/java/org/apache/atlas/notification/NotificationHookConsumerIT.java
@@ -59,7 +59,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
         sendHookMessage(new HookNotification.EntityCreateRequest(entity));
 
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'",
DATABASE_TYPE,
@@ -80,7 +80,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         final Referenceable newEntity = new Referenceable(DATABASE_TYPE);
         newEntity.set("owner", randomString());
         sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name",
dbName, newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 Referenceable localEntity = serviceClient.getEntity(DATABASE_TYPE, "name",
dbName);
@@ -106,7 +106,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
         newEntity.set("name", newName);
 
         sendHookMessage(new HookNotification.EntityPartialUpdateRequest(DATABASE_TYPE, "name",
dbName, newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'",
DATABASE_TYPE,
@@ -136,7 +136,7 @@ public class NotificationHookConsumerIT extends BaseResourceIT {
 
         //updating unique attribute
         sendHookMessage(new HookNotification.EntityUpdateRequest(newEntity));
-        waitFor(1000, new Predicate() {
+        waitFor(MAX_WAIT_TIME, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
                 JSONArray results = serviceClient.searchByDSL(String.format("%s where name='%s'",
DATABASE_TYPE,

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
index aba191c..34abeab 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/BaseResourceIT.java
@@ -24,7 +24,10 @@ import com.sun.jersey.api.client.Client;
 import com.sun.jersey.api.client.ClientResponse;
 import com.sun.jersey.api.client.WebResource;
 import com.sun.jersey.api.client.config.DefaultClientConfig;
-import org.apache.atlas.*;
+import kafka.consumer.ConsumerTimeoutException;
+import org.apache.atlas.ApplicationProperties;
+import org.apache.atlas.AtlasClient;
+import org.apache.atlas.AtlasServiceException;
 import org.apache.atlas.notification.NotificationConsumer;
 import org.apache.atlas.notification.entity.EntityNotification;
 import org.apache.atlas.typesystem.Referenceable;
@@ -43,6 +46,7 @@ import org.apache.atlas.typesystem.types.IDataType;
 import org.apache.atlas.typesystem.types.Multiplicity;
 import org.apache.atlas.typesystem.types.StructTypeDefinition;
 import org.apache.atlas.typesystem.types.TraitType;
+import org.apache.atlas.typesystem.types.TypeUtils;
 import org.apache.atlas.typesystem.types.utils.TypesUtil;
 import org.apache.atlas.utils.ParamChecker;
 import org.apache.atlas.web.util.Servlets;
@@ -272,6 +276,17 @@ public abstract class BaseResourceIT {
         boolean evaluate() throws Exception;
     }
 
+    public interface NotificationPredicate {
+
+        /**
+         * Perform a predicate evaluation.
+         *
+         * @return the boolean result of the evaluation.
+         * @throws Exception thrown if the predicate evaluation could not evaluate.
+         */
+        boolean evaluate(EntityNotification notification) throws Exception;
+    }
+
     /**
      * Wait for a condition, expressed via a {@link Predicate} to become true.
      *
@@ -292,49 +307,40 @@ public abstract class BaseResourceIT {
         }
     }
 
-    // ----- inner class : EntityNotificationConsumer --------------------------
-
-    protected static class EntityNotificationConsumer implements Runnable {
-        private final NotificationConsumer<EntityNotification> consumerIterator;
-        private EntityNotification entityNotification = null;
-        private boolean run;
-
-        public EntityNotificationConsumer(NotificationConsumer<EntityNotification>
consumerIterator) {
-            this.consumerIterator = consumerIterator;
-        }
-
-        @Override
-        public void run() {
-            while (run && consumerIterator.hasNext()) {
-                entityNotification = consumerIterator.next();
-            }
-        }
-
-        public void reset() {
-            entityNotification = null;
-        }
-
-        public void start() {
-            Thread thread = new Thread(this);
-            run = true;
-            thread.start();
-        }
-
-        public void stop() {
-            run = false;
-        }
-
-        public EntityNotification getLastEntityNotification() {
-            return entityNotification;
-        }
-    }
-
-    protected void waitForNotification(final EntityNotificationConsumer notificationConsumer,
int maxWait) throws Exception {
+    protected EntityNotification waitForNotification(final NotificationConsumer<EntityNotification>
consumer, int maxWait,
+                                                     final NotificationPredicate predicate)
throws Exception {
+        final TypeUtils.Pair<EntityNotification, String> pair = TypeUtils.Pair.of(null,
null);
+        final long maxCurrentTime = System.currentTimeMillis() + maxWait;
         waitFor(maxWait, new Predicate() {
             @Override
             public boolean evaluate() throws Exception {
-                return notificationConsumer.getLastEntityNotification() != null;
+                try {
+                    while (consumer.hasNext() && System.currentTimeMillis() <
maxCurrentTime) {
+                        EntityNotification notification = consumer.next();
+                        if (predicate.evaluate(notification)) {
+                            pair.left = notification;
+                            return true;
+                        }
+                    }
+                } catch(ConsumerTimeoutException e) {
+                    //ignore
+                }
+                return false;
             }
         });
+        return pair.left;
+    }
+
+    protected NotificationPredicate newNotificationPredicate(final EntityNotification.OperationType
operationType,
+                                                             final String typeName, final
String guid) {
+        return new NotificationPredicate() {
+            @Override
+            public boolean evaluate(EntityNotification notification) throws Exception {
+                return notification != null &&
+                        notification.getOperationType() == operationType &&
+                        notification.getEntity().getTypeName().equals(typeName) &&
+                        notification.getEntity().getId()._getId().equals(guid);
+            }
+        };
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-atlas/blob/2cae42c0/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
----------------------------------------------------------------------
diff --git a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
index a268196..73d26ce 100755
--- a/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
+++ b/webapp/src/test/java/org/apache/atlas/web/resources/EntityJerseyResourceIT.java
@@ -48,12 +48,10 @@ import org.apache.atlas.web.util.Servlets;
 import org.apache.commons.lang.RandomStringUtils;
 import org.codehaus.jettison.json.JSONArray;
 import org.codehaus.jettison.json.JSONObject;
-import org.junit.AfterClass;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.Assert;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Guice;
 import org.testng.annotations.Test;
@@ -67,7 +65,6 @@ import java.util.Map;
 import java.util.UUID;
 
 import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.fail;
 
 /**
@@ -89,7 +86,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
     @Inject
     private NotificationInterface notificationInterface;
-    private EntityNotificationConsumer notificationConsumer;
+    private NotificationConsumer<EntityNotification> notificationConsumer;
 
     @BeforeClass
     public void setUp() throws Exception {
@@ -100,19 +97,7 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
         List<NotificationConsumer<EntityNotification>> consumers =
                 notificationInterface.createConsumers(NotificationInterface.NotificationType.ENTITIES,
1);
 
-        NotificationConsumer<EntityNotification> consumer = consumers.iterator().next();
-        notificationConsumer = new EntityNotificationConsumer(consumer);
-        notificationConsumer.start();
-    }
-
-    @AfterClass
-    public void tearDown() {
-        notificationConsumer.stop();
-    }
-
-    @BeforeMethod
-    public void setupTest() {
-        notificationConsumer.reset();
+        notificationConsumer = consumers.iterator().next();
     }
 
     @Test
@@ -158,20 +143,26 @@ public class EntityJerseyResourceIT extends BaseResourceIT {
 
         serviceClient.createEntity(db).getString(0);
 
-        waitForNotification(notificationConsumer, MAX_WAIT_TIME);
-        EntityNotification notification = notificationConsumer.getLastEntityNotification();
-        assertNotNull(notification);
-        assertEquals(notification.getEntity().get("name"), dbName);
+        waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate()
{
+            @Override
+            public boolean evaluate(EntityNotification notification) throws Exception {
+                return notification != null && notification.getEntity().get("name").equals(dbName);
+            }
+        });
 
         JSONArray results =
                 serviceClient.searchByDSL(String.format("%s where name='%s'", DATABASE_TYPE,
dbName));
         assertEquals(results.length(), 1);
 
         //create entity again shouldn't create another instance with same unique attribute
value
-        notificationConsumer.reset();
         serviceClient.createEntity(db);
         try {
-            waitForNotification(notificationConsumer, MAX_WAIT_TIME);
+            waitForNotification(notificationConsumer, MAX_WAIT_TIME, new NotificationPredicate()
{
+                @Override
+                public boolean evaluate(EntityNotification notification) throws Exception
{
+                    return notification != null && notification.getEntity().get("name").equals(dbName);
+                }
+            });
             fail("Expected time out exception");
         } catch (Exception e) {
             //expected timeout


Mime
View raw message