brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [21/23] brooklyn-server git commit: fix message publish synching to guarantee in-order delivery
Date Fri, 06 Oct 2017 08:06:45 GMT
fix message publish synching to guarantee in-order delivery

both for initial subscriptions and in-life - two changes, synching in AttributesInternal for
the in-life delivery, and in subscribe/publish for the initial


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/9213f0e2
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/9213f0e2
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/9213f0e2

Branch: refs/heads/master
Commit: 9213f0e25288114718f2a06a50ee206bf0120561
Parents: 508183b
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Wed Oct 4 13:49:42 2017 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Wed Oct 4 14:45:39 2017 +0100

----------------------------------------------------------------------
 .../brooklyn/api/mgmt/SubscriptionManager.java  |   6 +-
 .../brooklyn/core/entity/AbstractEntity.java    |   1 +
 .../mgmt/internal/LocalSubscriptionManager.java | 108 +++++++++++++------
 .../brooklyn/core/sensor/AttributeMap.java      |  15 ++-
 .../core/effector/EffectorSayHiTest.java        |   2 +-
 .../core/entity/EntitySubscriptionTest.java     |   9 +-
 .../internal/LocalSubscriptionManagerTest.java  |  91 ++++++++++++++++
 .../policy/basic/PolicySubscriptionTest.java    |  35 ++++--
 8 files changed, 224 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
index 1fa327e..8302ba8 100644
--- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
+++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionManager.java
@@ -42,8 +42,10 @@ public interface SubscriptionManager {
      * 
      * The method returns an id which can be used to {@link #unsubscribe(SubscriptionHandle)}
later.
      * <p>
-     * The listener callback is in-order single-threaded and synchronized on this object.
The flags
-     * parameters can include the following:
+     * The listener callback is in-order single-threaded and synchronized on this object.
+     * In other words message delivery from a producer to a given subscriber is in publish
order
+     * (or in the case of a late subscriber getting initial values, in subscribe order).

+     * The flags parameters can include the following:
      * <ul>
      * <li>subscriber - object to identify the subscriber (e.g. entity, or console
session uid) 
      * <li><i>in future</i> - control parameters for the subscription (period,
minimum delta for updates, etc)

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
index bd7df06..dfbbc8f 100644
--- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
+++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java
@@ -529,6 +529,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements
E
      * through this method. Internally, all attribute updates synch on this object. Code
wishing to
      * update attributes or publish while holding some other lock should acquire the monitor
on this
      * object first to prevent deadlock. */
+    @Beta
     protected Object getAttributesSynchObjectInternal() {
         return attributesInternal.getSynchObjectInternal();
     }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index a927a89..a726059 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -54,6 +54,7 @@ import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Objects;
 import com.google.common.base.Predicate;
 import com.google.common.collect.ArrayListMultimap;
 import com.google.common.collect.HashMultimap;
@@ -129,15 +130,9 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
         
         if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in
{}", new Object[] {s.id, s.subscriber, producer, sensor, this});
         allSubscriptions.put(s.id, s);
-        addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor),
s);
-        if (s.subscriber!=null) {
-            addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
-        }
-        if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null)
{
-            ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag,
SingleThreadedScheduler.class);
-        }
-
+        T lastVal;
         if (notifyOfInitialValue) {
+            notifyOfInitialValue = false;
             if (producer == null) {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer:
"+s);
             } else if (sensor == null) {
@@ -145,16 +140,58 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
             } else if (!(sensor instanceof AttributeSensor)) {
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute
sensor: "+s);
             } else {
-                if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {}
to {}", new Object[] {s.producer, s.sensor, s});
-                em.submit(
-                    MutableMap.of("tags", ImmutableList.of(BrooklynTaskTags.tagForContextEntity(s.producer),
BrooklynTaskTags.SENSOR_TAG),
-                        "displayName", "Initial publication of "+s.sensor.getName()),
-                    () -> {
-                        T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
-                        submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer,
val), true);
-                    });
+                notifyOfInitialValue = true;
             }
         }
+        if (notifyOfInitialValue) {
+            lastVal = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+        } else {
+            lastVal = null;  // won't be used
+        }
+        addToMapOfSets(subscriptionsByToken, makeEntitySensorToken(s.producer, s.sensor),
s);
+        if (s.subscriber!=null) {
+            addToMapOfSets(subscriptionsBySubscriber, s.subscriber, s);
+        }
+        if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null)
{
+            ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag,
SingleThreadedScheduler.class);
+        }
+
+        if (notifyOfInitialValue) {
+            if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to
{}", new Object[] {s.producer, s.sensor, s});
+            // this is run asynchronously to prevent deadlock when trying to get attribute
and publish;
+            // however we want it:
+            // (a) to run in the same order as subscriptions are made, so use the manager
tag scheduler
+            // (b) ideally to use the last value that was not published to this target, and
+            // (c) to deliver before any subsequent value notification
+            // but we can't guarantee either (b) or (c) without taking a lock from before
we added 
+            // the subscriber above, mutexing other sets and publications, which feels heavy
and dangerous.
+            // so the compromise is to skip this delivery in cases where the last value has
obviously changed -
+            // because a more recent notification is guaranteed to be sent.
+            // we may occasionally still send a duplicate, if delivery got sent in the tiny
+            // window between adding the subscription and taking the last value,
+            // we will think the last value hasn't changed.  but we will never send a
+            // wrong value as this backs out if there is any confusion over the last value.
+            em.submit(
+                MutableMap.of("tags", getPublishTags(s, s.producer),
+                    "displayName", "Initial value publication on subscription to "+s.sensor.getName()),
+                () -> {
+                    T val = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor);
+                    if (!Objects.equal(lastVal, val)) {
+                        // bail out - value has been changed;
+                        // this might be a duplicate if value changed in small window earlier,
+                        // but it won't be delivering an old value later than a newer value
+                        if (LOG.isDebugEnabled()) LOG.debug("skipping initial value delivery
of {} -> {} to {} as value changed from {} to {}", new Object[] {s.producer, s.sensor,
s, lastVal, val});
+                        return;
+                    }
+                    // guard against case where other thread changes the val and publish
+                    // while we are publishing, and our older val is then delivered after
theirs.
+                    // 2017-10 previously we did not do this, then looked at doing it with
the attribute lock object
+                    // synchronized (((AbstractEntity)s.producer).getAttributesSynchObjectInternal())
{
+                    // but realized a better thing is to have initial delivery _done_, not
just submitted, 
+                    // by ourselves, as we are already in the right thread now and can prevent
interleaving this way
+                    submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer,
val), true);
+                });
+        }
         
         return s;
     }
@@ -210,7 +247,6 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
         // (recommend exactly one per subscription to prevent deadlock)
         // this is done with:
         // em.setTaskSchedulerForTag(subscriberId, SingleThreadedScheduler.class);
-        
         //note, generating the notifications must be done in the calling thread to preserve
order
         //e.g. emit(A); emit(B); should cause onEvent(A); onEvent(B) in that order
         if (LOG.isTraceEnabled()) LOG.trace("{} got event {}", this, event);
@@ -228,18 +264,11 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
     }
     
     @SuppressWarnings({ "unchecked", "rawtypes" })
-    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event,
final boolean isInitial) {
+    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event,
final boolean isInitialPublicationOfOldValueInCorrectScheduledThread) {
         if (s.eventFilter!=null && !s.eventFilter.apply(event))
             return;
         
-        List<Object> tags = MutableList.builder()
-            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
-            .add(s.subscriberExecutionManagerTag)
-            .add(BrooklynTaskTags.SENSOR_TAG)
-            // associate the publish event with the publisher (though on init it might be
triggered by subscriber)
-            .addIfNotNull(event.getSource()!=null ? BrooklynTaskTags.tagForTargetEntity(event.getSource())
: null)
-            .build()
-            .asUnmodifiable();
+        List<Object> tags = getPublishTags(s, event.getSource()).asUnmodifiable();
         
         StringBuilder name = new StringBuilder("sensor ");
         StringBuilder description = new StringBuilder("Sensor ");
@@ -271,12 +300,12 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
             "displayName", name.toString(),
             "description", description.toString());
         
-        boolean isEntityStarting = s.subscriber instanceof Entity && isInitial;
+        boolean isEntityStarting = s.subscriber instanceof Entity && isInitialPublicationOfOldValueInCorrectScheduledThread;
         // will have entity (and adjunct) execution context from tags, so can skip getting
exec context
-        em.submit(execFlags, new Runnable() {
+        Runnable deliverer = new Runnable() {
             @Override
             public String toString() {
-                if (isInitial) {
+                if (isInitialPublicationOfOldValueInCorrectScheduledThread) {
                     return "LSM.publishInitial("+event+")";
                 } else {
                     return "LSM.publish("+event+")";
@@ -312,7 +341,26 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
                         LOG.warn("Error processing subscriptions to "+this+": "+t, t);
                     }
                 }
-            }});
+            }};
+        if (!isInitialPublicationOfOldValueInCorrectScheduledThread) {
+            em.submit(execFlags, deliverer);
+        } else {
+            // for initial, caller guarantees he is running in the right thread/context
+            // where the above submission would take place, typically the
+            // subscriber single threaded executor with the entity context;
+            // this allows caller to do extra assertions and bailout steps at the right time
+            deliverer.run();
+        }
+    }
+
+    private MutableList<Object> getPublishTags(final Subscription<?> s, final
Entity source) {
+        return MutableList.builder()
+            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+            .add(s.subscriberExecutionManagerTag)
+            .add(BrooklynTaskTags.SENSOR_TAG)
+            // associate the publish event with the publisher (though on init it might be
triggered by subscriber)
+            .addIfNotNull(source!=null ? BrooklynTaskTags.tagForTargetEntity(source) : null)
+            .build();
     }
     
     protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
index dee0700..d9da6ae 100644
--- a/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
+++ b/core/src/main/java/org/apache/brooklyn/core/sensor/AttributeMap.java
@@ -139,9 +139,18 @@ public final class AttributeMap {
     }
 
     public <T> T update(AttributeSensor<T> attribute, T newValue) {
-        T oldValue = updateWithoutPublishing(attribute, newValue);
-        entity.emitInternal(attribute, newValue);
-        return oldValue;
+        // 2017-10 this was unsynched which meant if two threads updated
+        // the last publication would not correspond to the last value.
+        // could introduce deadlock but emit internal and publish should
+        // not seek any locks. _subscribe_ and _delivery_ might, but they
+        // won't be in this block. an issue with _subscribe-and-get-initial_
+        // should be resolved by initial subscription queueing the publication
+        // to a context where locks are not held.
+        synchronized (values) {
+            T oldValue = updateWithoutPublishing(attribute, newValue);
+            entity.emitInternal(attribute, newValue);
+            return oldValue;
+        }
     }
     
     public <T> T updateWithoutPublishing(AttributeSensor<T> attribute, T newValue)
{

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
index e7dc626..4b991ad 100644
--- a/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/effector/EffectorSayHiTest.java
@@ -110,7 +110,7 @@ public class EffectorSayHiTest extends BrooklynAppUnitTestSupport {
             .get( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name", "Bob",
"greeting", "hi")) ), "hi Bob");
     }
     
-    @Test(invocationCount=100)
+    @Test
     public void testInvocationGetImmediately() throws Exception {
         assertEquals(((EntityInternal)e).getExecutionContext()
             .getImmediately( Effectors.invocation(e, MyEntity.SAY_HI_1, ImmutableMap.of("name",
"Bob", "greeting", "hi")) ).get(), "hi Bob");

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
index ea91f36..8ec2794 100644
--- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java
@@ -34,6 +34,8 @@ import org.apache.brooklyn.core.test.policy.TestEnricher;
 import org.apache.brooklyn.core.test.policy.TestPolicy;
 import org.apache.brooklyn.entity.group.BasicGroup;
 import org.apache.brooklyn.test.Asserts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -44,7 +46,9 @@ import com.google.common.collect.Iterables;
 
 public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport {
 
-    // TODO Duplication between this and PolicySubscriptionTest
+    // TODO Duplication between this and PolicySubscriptionTest and LocalSubscriptionManagerTest
+
+    private static final Logger log = LoggerFactory.getLogger(EntitySubscriptionTest.class);
     
     private static final long SHORT_WAIT_MS = 100;
 
@@ -221,6 +225,7 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport
{
     }
     
     @Test
+    @SuppressWarnings("unused")
     public void testUnsubscribeUsingHandleStopsEvents() {
         SubscriptionHandle handle1 = entity.subscriptions().subscribe(observedEntity, TestEntity.SEQUENCE,
listener);
         SubscriptionHandle handle2 = entity.subscriptions().subscribe(observedEntity, TestEntity.NAME,
listener);
@@ -300,6 +305,8 @@ public class EntitySubscriptionTest extends BrooklynAppUnitTestSupport
{
     
     @Test
     public void testContextEntityOnSubscriptionCallbackTask() {
+        log.info("Observing "+observedEntity+" from "+entity);
+        
         observedEntity.sensors().set(TestEntity.NAME, "myval");
         entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity,
TestEntity.NAME, listener);
         

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
index 4e47090..1373545 100644
--- a/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java
@@ -25,6 +25,7 @@ import java.util.List;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.brooklyn.api.entity.EntitySpec;
@@ -32,12 +33,22 @@ import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.mgmt.SubscriptionManager;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
+import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.entity.group.BasicGroup;
+import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.testng.Assert;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
 /**
  * testing the {@link SubscriptionManager} and associated classes.
  */
@@ -170,4 +181,84 @@ public class LocalSubscriptionManagerTest extends BrooklynAppUnitTestSupport
{
         if (threadException.get() != null) throw threadException.get();
     }
 
+    @Test
+    // same test as in PolicySubscriptionTest, but for entities / simpler
+    public void testSubscriptionReceivesInitialValueEventsInOrder() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        entity.sensors().set(TestEntity.NAME, "myname");
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+        // delivery should be in subscription order, so 123 then 456
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.SEQUENCE, listener);
+        // wait for the above delivery - otherwise it might get dropped
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () ->
{ 
+            Asserts.assertSize(listener.getEvents(), 1); });
+        entity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        // notifications don't have "initial value" so don't get -1
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.MY_NOTIF, listener);
+        // but do get 1, after 456
+        entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+        
+        // STOPPING and myname received, in subscription order, after everything else
+        entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.SERVICE_STATE_ACTUAL, listener);
+        entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.NAME, listener);
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable()
{
+            @Override public void run() {
+                Asserts.assertEquals(listener.getEvents(), ImmutableList.of(
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity,
123),
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity,
456),
+                        new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity,
1),
+                        new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL,
entity, Lifecycle.STOPPING),
+                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+                    "actually got: "+listener.getEvents());
+            }});
+    }
+    
+    @Test
+    public void testNotificationOrderMatchesSetValueOrderWhenSynched() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        AtomicInteger count = new AtomicInteger();
+        Runnable set = () -> { 
+            synchronized (count) { 
+                entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); 
+            } 
+        };
+        entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE,
listener);
+        for (int i=0; i<10; i++) {
+            new Thread(set).start();
+        }
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () ->
{ 
+            Asserts.assertSize(listener.getEvents(), 10); });
+        for (int i=0; i<10; i++) {
+            Assert.assertEquals(listener.getEvents().get(i).getValue(), i+1);
+        }
+    }
+
+    @Test
+    public void testNotificationOrderMatchesSetValueOrderWhenNotSynched() {
+        RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>();
+        
+        AtomicInteger count = new AtomicInteger();
+        Runnable set = () -> { 
+            // as this is not synched, the sets may interleave
+            entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); 
+        };
+        entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE,
listener);
+        for (int i=0; i<10; i++) {
+            new Thread(set).start();
+        }
+        
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () ->
{ 
+            Asserts.assertSize(listener.getEvents(), 10); });
+        // all we expect for sure is that the last value is whatever the sensor is at the
end - internal update and publish is mutexed
+        Assert.assertEquals(listener.getEvents().get(9).getValue(), entity.sensors().get(TestEntity.SEQUENCE));
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/9213f0e2/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
index 0f3310e..4d733e6 100644
--- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
+++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java
@@ -24,12 +24,15 @@ import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
 import org.apache.brooklyn.api.policy.PolicySpec;
 import org.apache.brooklyn.core.entity.RecordingSensorEventListener;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
 import org.apache.brooklyn.core.location.SimulatedLocation;
 import org.apache.brooklyn.core.policy.AbstractPolicy;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -102,6 +105,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport
{
     }
     
     @Test
+    @SuppressWarnings("unused")
     public void testUnsubscribeUsingHandleStopsEvents() throws Exception {
         SubscriptionHandle handle1 = policy.subscriptions().subscribe(entity, TestEntity.SEQUENCE,
listener);
         SubscriptionHandle handle2 = policy.subscriptions().subscribe(entity, TestEntity.NAME,
listener);
@@ -122,18 +126,37 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport
{
     }
 
     @Test
-    public void testSubscriptionReceivesInitialValueEvents() {
-        entity.sensors().set(TestEntity.SEQUENCE, 123);
+    public void testSubscriptionReceivesInitialValueEventsInOrder() {
         entity.sensors().set(TestEntity.NAME, "myname");
-        
+        entity.sensors().set(TestEntity.SEQUENCE, 123);
+        entity.sensors().emit(TestEntity.MY_NOTIF, -1);
+
+        // delivery should be in subscription order, so 123 then 456
         policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.SEQUENCE, listener);
+        // wait for the above delivery - otherwise it might get dropped
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () ->
{ 
+            Asserts.assertSize(listener.getEvents(), 1); });
+        entity.sensors().set(TestEntity.SEQUENCE, 456);
+        
+        // notifications don't have "initial value" so don't get -1
+        policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.MY_NOTIF, listener);
+        // but do get 1, after 456
+        entity.sensors().emit(TestEntity.MY_NOTIF, 1);
+        
+        // STOPPING and myname received, in subscription order, after everything else
+        entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING);
+        policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.SERVICE_STATE_ACTUAL, listener);
         policy.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity,
TestEntity.NAME, listener);
         
-        Asserts.succeedsEventually(new Runnable() {
+        Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable()
{
             @Override public void run() {
                 assertEquals(listener.getEvents(), ImmutableList.of(
                         new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity,
123),
-                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")));
+                        new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity,
456),
+                        new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity,
1),
+                        new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL,
entity, Lifecycle.STOPPING),
+                        new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")),
+                    "actually got: "+listener.getEvents());
             }});
     }
     
@@ -147,7 +170,7 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport
{
         
         Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable()
{
             @Override public void run() {
-                assertEquals(listener.getEvents(), ImmutableList.of());
+                Asserts.assertSize(listener.getEvents(), 0);
             }});
     }
     


Mime
View raw message