brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From aleds...@apache.org
Subject [2/3] git commit: Add EntityLocal.modifyAttribute(AttributeSensor, Function)
Date Mon, 03 Nov 2014 12:27:04 GMT
Add EntityLocal.modifyAttribute(AttributeSensor, Function)

- For atomic (sequential) updates to an attribute, where the new
  value is computed from the old value.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/2b4e8323
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/2b4e8323
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/2b4e8323

Branch: refs/heads/master
Commit: 2b4e832357de3341f60a4eaaa4d05c7c97db4271
Parents: e830251
Author: Aled Sage <aled.sage@gmail.com>
Authored: Fri Oct 31 22:57:14 2014 +0000
Committer: Aled Sage <aled.sage@gmail.com>
Committed: Mon Nov 3 10:05:23 2014 +0000

----------------------------------------------------------------------
 .../java/brooklyn/entity/basic/EntityLocal.java |  22 +++-
 .../brooklyn/entity/basic/AbstractEntity.java   |  25 ++++
 .../java/brooklyn/event/basic/AttributeMap.java |  22 +++-
 .../brooklyn/entity/basic/AttributeMapTest.java | 127 ++++++++++++++-----
 4 files changed, 159 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
index 3405443..772161c 100644
--- a/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
+++ b/api/src/main/java/brooklyn/entity/basic/EntityLocal.java
@@ -33,8 +33,10 @@ import brooklyn.management.SubscriptionContext;
 import brooklyn.management.SubscriptionHandle;
 import brooklyn.management.SubscriptionManager;
 import brooklyn.management.Task;
+import brooklyn.util.guava.Maybe;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
 
 /** 
  * Extended Entity interface for use in places where the caller should have certain privileges,
@@ -66,14 +68,30 @@ public interface EntityLocal extends Entity, Configurable {
     <T> T setConfig(HasConfigKey<T> key, Task<T> val);
 
     /**
-     * Sets the {@link Sensor} data for the given attribute to the specified value.
+     * Sets the {@link AttributeSensor} data for the given attribute to the specified value.
      * 
      * This can be used to "enrich" the entity, such as adding aggregated information, 
      * rolling averages, etc.
      * 
      * @return the old value for the attribute (possibly {@code null})
      */
-    <T> T setAttribute(AttributeSensor<T> sensor, T val);
+    <T> T setAttribute(AttributeSensor<T> attribute, T val);
+
+    /**
+     * Atomically modifies the {@link AttributeSensor}, ensuring that only one modification
is done
+     * at a time.
+     * 
+     * If the modifier returns {@link Maybe#absent()} then the attribute will be
+     * left unmodified, and the existing value will be returned.
+     * 
+     * For details of the synchronization model used to achieve this, refer to the underlying

+     * attribute store (e.g. AttributeMap).
+     * 
+     * @return the old value for the attribute (possibly {@code null})
+     * @since 0.7.0-M2
+     */
+    @Beta
+    <T> T modifyAttribute(AttributeSensor<T> attribute, Function<? super T,
Maybe<T>> modifier);
 
     /**
      * @deprecated in 0.5; use {@link #getConfig(ConfigKey)}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
index 00b58b7..e66bdef 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
@@ -92,6 +92,7 @@ import brooklyn.util.task.DeferredSupplier;
 import brooklyn.util.text.Strings;
 
 import com.google.common.annotations.Beta;
+import com.google.common.base.Function;
 import com.google.common.base.Objects;
 import com.google.common.base.Objects.ToStringHelper;
 import com.google.common.collect.ImmutableList;
@@ -829,6 +830,30 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements
E
         return result;
     }
 
+    @Beta
+    @Override
+    public <T> T modifyAttribute(AttributeSensor<T> attribute, Function<?
super T, Maybe<T>> modifier) {
+        if (LOG.isTraceEnabled())
+            LOG.trace(""+this+" modifyAttribute "+attribute+" "+modifier);
+        
+        if (Boolean.TRUE.equals(getManagementSupport().isReadOnlyRaw())) {
+            if (WARNED_READ_ONLY_ATTRIBUTES.add(attribute.getName())) {
+                LOG.warn(""+this+" modifying "+attribute+" = "+modifier+" in read only mode;
will have no effect (future messages for this sensor logged at trace)");
+            } else if (LOG.isTraceEnabled()) {
+                LOG.trace(""+this+" setting "+attribute+" = "+modifier+" in read only mode;
will have no effect");
+            }
+        }
+        T result = attributesInternal.modify(attribute, modifier);
+        if (result == null) {
+            // could be this is a new sensor
+            entityType.addSensorIfAbsent(attribute);
+        }
+        
+        // TODO Conditionally set onAttributeChanged, only if was modified
+        getManagementSupport().getEntityChangeListener().onAttributeChanged(attribute);
+        return result;
+    }
+
     @Override
     public void removeAttribute(AttributeSensor<?> attribute) {
         if (LOG.isTraceEnabled())

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/main/java/brooklyn/event/basic/AttributeMap.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/basic/AttributeMap.java b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
index ca7322d..08f1f91 100644
--- a/core/src/main/java/brooklyn/event/basic/AttributeMap.java
+++ b/core/src/main/java/brooklyn/event/basic/AttributeMap.java
@@ -31,7 +31,9 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.event.AttributeSensor;
 import brooklyn.util.flags.TypeCoercions;
+import brooklyn.util.guava.Maybe;
 
+import com.google.common.base.Function;
 import com.google.common.base.Joiner;
 import com.google.common.base.Objects;
 import com.google.common.base.Preconditions;
@@ -53,7 +55,7 @@ public final class AttributeMap implements Serializable {
     
     private final AbstractEntity entity;
 
-    // Note that we synchronize on the top-level map, to handle concurrent updates and and
gets (ENGR-2111)
+    // Assumed to be something like a ConcurrentMap passed in.
     private final Map<Collection<String>, Object> values;
 
     /**
@@ -132,6 +134,24 @@ public final class AttributeMap implements Serializable {
         return (isNull(oldValue)) ? null : oldValue;
     }
 
+    /**
+     * Where atomicity is desired, the methods in this class synchronize on the {@link #values}
map.
+     */
+    public <T> T modify(AttributeSensor<T> attribute, Function<? super T,
Maybe<T>> modifier) {
+        synchronized (values) {
+            T oldValue = getValue(attribute);
+            Maybe<? extends T> newValue = modifier.apply(oldValue);
+
+            if (newValue.isPresent()) {
+                if (log.isTraceEnabled()) log.trace("modified attribute {} to {} (was {})
on {}", new Object[] {attribute.getName(), newValue, oldValue, entity});
+                return update(attribute, newValue.get());
+            } else {
+                if (log.isTraceEnabled()) log.trace("modified attribute {} unchanged; not
emitting on {}", new Object[] {attribute.getName(), newValue, this});
+                return oldValue;
+            }
+        }
+    }
+
     public void remove(AttributeSensor<?> attribute) {
         if (log.isDebugEnabled()) {
             log.debug("removing attribute {} on {}", attribute.getName(), entity);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/2b4e8323/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
index e3d992d..3224041 100644
--- a/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
+++ b/core/src/test/java/brooklyn/entity/basic/AttributeMapTest.java
@@ -19,6 +19,7 @@
 package brooklyn.entity.basic;
 
 import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertTrue;
 
 import java.util.Collection;
 import java.util.Collections;
@@ -34,75 +35,73 @@ import org.testng.annotations.Test;
 
 import brooklyn.entity.Application;
 import brooklyn.event.AttributeSensor;
+import brooklyn.event.SensorEvent;
+import brooklyn.event.SensorEventListener;
 import brooklyn.event.basic.AttributeMap;
 import brooklyn.event.basic.Sensors;
+import brooklyn.test.Asserts;
 import brooklyn.test.entity.TestApplication;
 import brooklyn.test.entity.TestEntityImpl;
 import brooklyn.util.collections.MutableMap;
+import brooklyn.util.guava.Maybe;
 
+import com.google.common.base.Function;
+import com.google.common.base.Functions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Lists;
 
 public class AttributeMapTest {
 
     Application app;
+    TestEntityImpl entity;
     AttributeMap map;
-
+    ExecutorService executor;
+    
     @BeforeMethod(alwaysRun=true)
     public void setUp() {
         app = TestApplication.Factory.newManagedInstanceForTests();
-        TestEntityImpl e = new TestEntityImpl(app);
-        map = new AttributeMap(e, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
-        Entities.startManagement(app);
+        entity = new TestEntityImpl(app);
+        map = new AttributeMap(entity, Collections.synchronizedMap(MutableMap.<Collection<String>,Object>of()));
+        Entities.manage(entity);
+        executor = Executors.newCachedThreadPool();
     }
     
     @AfterMethod(alwaysRun=true)
     public void tearDown() {
+        if (executor != null) executor.shutdownNow();
         if (app != null) Entities.destroyAll(app.getManagementContext());
     }
     
     // See ENGR-2111
     @Test
     public void testConcurrentUpdatesDoNotCauseConcurrentModificationException() throws Exception
{
-        ExecutorService executor = Executors.newCachedThreadPool();
         List<Future<?>> futures = Lists.newArrayList();
         
-        try {
-            for (int i = 0; i < 1000; i++) {
-                final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i,
"");
-                Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor,
"a"));
-                futures.add(future);
-            }
-            
-            for (Future<?> future : futures) {
-                future.get();
-            }
-            
-        } finally {
-            executor.shutdownNow();
+        for (int i = 0; i < 1000; i++) {
+            final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i,
"");
+            Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor,
"a"));
+            futures.add(future);
+        }
+        
+        for (Future<?> future : futures) {
+            future.get();
         }
     }
     
     @Test
     public void testConcurrentUpdatesAndGetsDoNotCauseConcurrentModificationException() throws
Exception {
-        ExecutorService executor = Executors.newCachedThreadPool();
         List<Future<?>> futures = Lists.newArrayList();
         
-        try {
-            for (int i = 0; i < 1000; i++) {
-                final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i,
"");
-                Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor,
"a"));
-                Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
-                futures.add(future);
-                futures.add(future2);
-            }
+        for (int i = 0; i < 1000; i++) {
+            final AttributeSensor<String> nextSensor = Sensors.newStringSensor("attributeMapTest.exampleSensor"+i,
"");
+            Future<?> future = executor.submit(newUpdateMapRunnable(map, nextSensor,
"a"));
+            Future<?> future2 = executor.submit(newGetAttributeCallable(map, nextSensor));
+            futures.add(future);
+            futures.add(future2);
+        }
 
-            for (Future<?> future : futures) {
-                future.get();
-            }
-            
-        } finally {
-            executor.shutdownNow();
+        for (Future<?> future : futures) {
+            future.get();
         }
     }
     
@@ -147,7 +146,7 @@ public class AttributeMapTest {
         assertEquals(map.getValue(childSensor), "childValue");
         assertEquals(map.getValue(sensor), "parentValue");
     }
-        
+    
     @Test
     public void testCanStoreChildThenParentSensor() throws Exception {
         AttributeSensor<String> sensor = Sensors.newStringSensor("a", "");
@@ -160,6 +159,46 @@ public class AttributeMapTest {
         assertEquals(map.getValue(sensor), "parentValue");
     }
     
+    @Test
+    public void testConcurrentModifyAttributeCalls() throws Exception {
+        AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+        
+        Function<Integer, Maybe<Integer>> modifier = new Function<Integer,
Maybe<Integer>>() {
+            @Override public Maybe<Integer> apply(Integer input) {
+                return Maybe.of((input == null) ? 1 : input + 1);
+            }
+        };
+        
+        List<Future<?>> futures = Lists.newArrayList();
+        
+        for (int i = 0; i < 1000; i++) {
+            Future<?> future = executor.submit(newModifyAttributeCallable(map, sensor,
modifier));
+            futures.add(future);
+        }
+
+        for (Future<?> future : futures) {
+            future.get();
+        }
+
+        assertEquals(map.getValue(sensor), Integer.valueOf(1000));
+    }
+    
+    @Test
+    public void testModifyAttributeReturningAbsentDoesNotEmit() throws Exception {
+        AttributeSensor<Integer> sensor = Sensors.newIntegerSensor("a", "");
+        AttributeSensor<Integer> childSensor = Sensors.newIntegerSensor("a.b", "");
+        
+        final RecordingSensorEventListener listener = new RecordingSensorEventListener();
+        entity.subscribe(entity, sensor, listener);
+        
+        map.modify(childSensor, Functions.constant(Maybe.<Integer>absent()));
+        
+        Asserts.succeedsContinually(new Runnable() {
+            @Override public void run() {
+                assertTrue(listener.getEvents().isEmpty(), "events="+listener.getEvents());
+            }});
+    }
+    
     protected <T> Runnable newUpdateMapRunnable(final AttributeMap map, final AttributeSensor<T>
attribute, final T val) {
         return new Runnable() {
             @Override public void run() {
@@ -175,4 +214,24 @@ public class AttributeMapTest {
             }
         };
     }
+    
+    protected <T> Callable<T> newModifyAttributeCallable(final AttributeMap map,
final AttributeSensor<T> attribute, final Function<? super T, Maybe<T>>
modifier) {
+        return new Callable<T>() {
+            @Override public T call() {
+                return map.modify(attribute, modifier);
+            }
+        };
+    }
+    
+    public static class RecordingSensorEventListener implements SensorEventListener<Object>
{
+        private List<SensorEvent<Object>> events = Collections.synchronizedList(Lists.<SensorEvent<Object>>newArrayList());
+
+        @Override public void onEvent(SensorEvent<Object> event) {
+            events.add(event);
+        }
+        
+        public List<SensorEvent<Object>> getEvents() {
+            return ImmutableList.copyOf(events);
+        }
+    }
 }


Mime
View raw message