brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [2/4] git commit: Use thread pool for Object Store persistence
Date Fri, 04 Jul 2014 14:02:40 GMT
Use thread pool for Object Store persistence

- passes BrooklynProperties to BrooklynMementoPersisterToObjectStore
  so can read config like max thread pool size.


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d04761e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d04761e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d04761e3

Branch: refs/heads/master
Commit: d04761e350e86f828200d82cbe4901e4a1db9e9c
Parents: 9350857
Author: Aled Sage <aled.sage@gmail.com>
Authored: Wed Jul 2 15:16:29 2014 +0100
Committer: Aled Sage <aled.sage@gmail.com>
Committed: Fri Jul 4 14:37:09 2014 +0100

----------------------------------------------------------------------
 .../rebind/PeriodicDeltaChangeListener.java     |   5 +
 .../entity/rebind/PersisterDeltaImpl.java       |  47 +-
 .../rebind/RebindExceptionHandlerImpl.java      |  23 +-
 .../entity/rebind/RebindManagerImpl.java        |   2 +-
 .../entity/rebind/dto/BrooklynMementoImpl.java  |  12 +-
 .../rebind/dto/BrooklynMementoManifestImpl.java |  16 +-
 .../BrooklynMementoPersisterToObjectStore.java  | 502 +++++++++++++------
 .../entity/rebind/RebindEntityTest.java         |   7 +
 .../entity/rebind/RebindTestFixture.java        |   6 +-
 .../brooklyn/entity/rebind/RebindTestUtils.java |  11 +-
 .../HighAvailabilityManagerSplitBrainTest.java  |   2 +-
 .../ha/HighAvailabilityManagerTestFixture.java  |   5 +-
 .../brooklyn/launcher/BrooklynLauncher.java     |   4 +-
 .../brooklyn/util/exceptions/Exceptions.java    |   2 +
 14 files changed, 458 insertions(+), 186 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
index 9230d45..91ac9db 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PeriodicDeltaChangeListener.java
@@ -21,6 +21,7 @@ import brooklyn.mementos.BrooklynMementoPersister;
 import brooklyn.policy.Enricher;
 import brooklyn.policy.Policy;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeInterruptedException;
 import brooklyn.util.task.BasicTask;
 import brooklyn.util.task.ScheduledTask;
 import brooklyn.util.time.Duration;
@@ -106,6 +107,10 @@ public class PeriodicDeltaChangeListener implements ChangeListener {
                         try {
                             persistNow();
                             return null;
+                        } catch (RuntimeInterruptedException e) {
+                            LOG.debug("Interrupted persisting change-delta (rethrowing)", e);
+                            Thread.currentThread().interrupt();
+                            return null;
                         } catch (Exception e) {
                             // Don't rethrow: the behaviour of executionManager is different from a scheduledExecutorService,
                             // if we throw an exception, then our task will never get executed again

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
index 0950f81..1fdafec 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PersisterDeltaImpl.java
@@ -10,7 +10,52 @@ import brooklyn.mementos.PolicyMemento;
 
 import com.google.common.collect.Sets;
 
-class PersisterDeltaImpl implements Delta {
+public class PersisterDeltaImpl implements Delta {
+    
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        private final PersisterDeltaImpl delta = new PersisterDeltaImpl();
+
+        public Builder locations(Collection<? extends LocationMemento> vals) {
+            delta.locations.addAll(vals);
+            return this;
+        }
+        public Builder entities(Collection<? extends EntityMemento> vals) {
+            delta.entities.addAll(vals);
+            return this;
+        }
+        public Builder policies(Collection<? extends PolicyMemento> vals) {
+            delta.policies.addAll(vals);
+            return this;
+        }
+        public Builder enrichers(Collection<? extends EnricherMemento> vals) {
+            delta.enrichers.addAll(vals);
+            return this;
+        }
+        public Builder removedLocationIds(Collection<String> vals) {
+            delta.removedLocationIds.addAll(vals);
+            return this;
+        }
+        public Builder removedEntityIds(Collection<String> vals) {
+            delta.removedEntityIds.addAll(vals);
+            return this;
+        }
+        public Builder removedPolicyIds(Collection<String> vals) {
+            delta.removedPolicyIds.addAll(vals);
+            return this;
+        }
+        public Builder removedEnricherIds(Collection<String> vals) {
+            delta.removedEnricherIds.addAll(vals);
+            return this;
+        }
+        public Delta build() {
+            return delta;
+        }
+    }
+    
     Collection<LocationMemento> locations = Sets.newLinkedHashSet();
     Collection<EntityMemento> entities = Sets.newLinkedHashSet();
     Collection<PolicyMemento> policies = Sets.newLinkedHashSet();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
index 937d58d..eb4f8a3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindExceptionHandlerImpl.java
@@ -2,6 +2,7 @@ package brooklyn.entity.rebind;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
@@ -29,17 +30,17 @@ public class RebindExceptionHandlerImpl implements RebindExceptionHandler {
     protected final RebindFailureMode addPolicyFailureMode;
     protected final RebindFailureMode loadPolicyFailureMode;
 
-    protected final Set<String> missingEntities = Sets.newLinkedHashSet();
-    protected final Set<String> missingLocations = Sets.newLinkedHashSet();
-    protected final Set<String> missingPolicies = Sets.newLinkedHashSet();
-    protected final Set<String> missingEnrichers = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedEntities = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedLocations = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedPolicies = Sets.newLinkedHashSet();
-    protected final Set<String> creationFailedEnrichers = Sets.newLinkedHashSet();
-    protected final Set<Exception> addPolicyFailures = Sets.newLinkedHashSet();
-    protected final Set<Exception> loadPolicyFailures = Sets.newLinkedHashSet();
-    protected final List<Exception> exceptions = Lists.newArrayList();
+    protected final Set<String> missingEntities = Sets.newConcurrentHashSet();
+    protected final Set<String> missingLocations = Sets.newConcurrentHashSet();
+    protected final Set<String> missingPolicies = Sets.newConcurrentHashSet();
+    protected final Set<String> missingEnrichers = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedEntities = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedLocations = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedPolicies = Sets.newConcurrentHashSet();
+    protected final Set<String> creationFailedEnrichers = Sets.newConcurrentHashSet();
+    protected final Set<Exception> addPolicyFailures = Sets.newConcurrentHashSet();
+    protected final Set<Exception> loadPolicyFailures = Sets.newConcurrentHashSet();
+    protected final List<Exception> exceptions = Collections.synchronizedList(Lists.<Exception>newArrayList());
     
     public static Builder builder() {
         return new Builder();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 5775dc4..1f2ae81 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -536,7 +536,7 @@ public class RebindManagerImpl implements RebindManager {
             T nodeinchain = node;
             while (nodeinchain != null) {
                 tempchain.add(0, nodeinchain);
-                nodeinchain = nodes.get(nodeinchain.getParent());
+                nodeinchain = (nodeinchain.getParent() == null) ? null : nodes.get(nodeinchain.getParent());
             }
             for (T n : tempchain) {
                 result.put(n.getId(), n);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
index b75a112..a04a75e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoImpl.java
@@ -27,12 +27,12 @@ public class BrooklynMementoImpl implements BrooklynMemento, Serializable {
     
     public static class Builder {
         protected String brooklynVersion = BrooklynVersion.get();
-        protected final List<String> applicationIds = Lists.newArrayList();
-        protected final List<String> topLevelLocationIds = Lists.newArrayList();
-        protected final Map<String, EntityMemento> entities = Maps.newLinkedHashMap();
-        protected final Map<String, LocationMemento> locations = Maps.newLinkedHashMap();
-        protected final Map<String, PolicyMemento> policies = Maps.newLinkedHashMap();
-        protected final Map<String, EnricherMemento> enrichers = Maps.newLinkedHashMap();
+        protected final List<String> applicationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+        protected final List<String> topLevelLocationIds = Collections.synchronizedList(Lists.<String>newArrayList());
+        protected final Map<String, EntityMemento> entities = Maps.newConcurrentMap();
+        protected final Map<String, LocationMemento> locations = Maps.newConcurrentMap();
+        protected final Map<String, PolicyMemento> policies = Maps.newConcurrentMap();
+        protected final Map<String, EnricherMemento> enrichers = Maps.newConcurrentMap();
         
         public Builder brooklynVersion(String val) {
             brooklynVersion = val; return this;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
index e517f15..1ec617e 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BrooklynMementoManifestImpl.java
@@ -18,10 +18,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
     
     public static class Builder {
         protected String brooklynVersion;
-        protected final Map<String, String> entityIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> locationIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> policyIdToType = Maps.newLinkedHashMap();
-        protected final Map<String, String> enricherIdToType = Maps.newLinkedHashMap();
+        protected final Map<String, String> entityIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> locationIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> policyIdToType = Maps.newConcurrentMap();
+        protected final Map<String, String> enricherIdToType = Maps.newConcurrentMap();
         
         public Builder brooklynVersion(String val) {
             brooklynVersion = val; return this;
@@ -55,10 +55,10 @@ public class BrooklynMementoManifestImpl implements BrooklynMementoManifest, Ser
         }
     }
 
-    private Map<String, String> entityIdToType;
-    private Map<String, String> locationIdToType;
-    private Map<String, String> policyIdToType;
-    private Map<String, String> enricherIdToType;
+    private final Map<String, String> entityIdToType;
+    private final Map<String, String> locationIdToType;
+    private final Map<String, String> policyIdToType;
+    private final Map<String, String> enricherIdToType;
     
     private BrooklynMementoManifestImpl(Builder builder) {
         entityIdToType = builder.entityIdToType;

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 8746cc0..8fcb3b3 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -6,13 +6,23 @@ import java.io.IOException;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.config.BrooklynProperties;
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.entity.rebind.PersistenceExceptionHandler;
+import brooklyn.entity.rebind.PersisterDeltaImpl;
 import brooklyn.entity.rebind.RebindExceptionHandler;
 import brooklyn.entity.rebind.dto.BrooklynMementoImpl;
 import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl;
@@ -26,6 +36,7 @@ import brooklyn.mementos.EntityMemento;
 import brooklyn.mementos.LocationMemento;
 import brooklyn.mementos.Memento;
 import brooklyn.mementos.PolicyMemento;
+import brooklyn.util.exceptions.CompoundRuntimeException;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
@@ -33,29 +44,57 @@ import brooklyn.util.xstream.XmlUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 /** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
  * {@link PersistenceObjectStore} such as a file system or a jclouds object store */
 public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPersister {
 
+    // TODO Crazy amount of duplication between handling entity, location, policy + enricher;
+    // Need to remove that duplication.
+
+    // TODO Should stop() take a timeout, and shutdown the executor gracefully?
+    
     private static final Logger LOG = LoggerFactory.getLogger(BrooklynMementoPersisterToObjectStore.class);
 
-    private static final int MAX_SERIALIZATION_ATTEMPTS = 5;
+    public static final ConfigKey<Integer> PERSISTER_MAX_THREAD_POOL_SIZE = ConfigKeys.newIntegerConfigKey(
+            "persister.threadpool.maxSize",
+            "Maximum number of concurrent operations for persistence (reads/writes/deletes of *different* objects)", 
+            10);
+
+    public static final ConfigKey<Integer> PERSISTER_MAX_SERIALIZATION_ATTEMPTS = ConfigKeys.newIntegerConfigKey(
+            "persister.maxSerializationAttempts",
+            "Maximum number of attempts to serialize a memento (e.g. if first attempts fail because of concurrent modifications of an entity)", 
+            5);
 
     private final PersistenceObjectStore objectStore;
     private final MementoSerializer<Object> serializer;
 
     private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
 
+    private final ListeningExecutorService executor;
+
     private volatile boolean running = true;
 
+    /**
+     * Lock used on writes (checkpoint + delta) so that {@link #waitForWritesCompleted(Duration)} can block
+     * for any concurrent call to complete.
+     */
+    private final ReadWriteLock lock = new ReentrantReadWriteLock();
 
-    public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, ClassLoader classLoader) {
+    public BrooklynMementoPersisterToObjectStore(PersistenceObjectStore objectStore, BrooklynProperties brooklynProperties, ClassLoader classLoader) {
         this.objectStore = checkNotNull(objectStore, "objectStore");
-        MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
-        this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS);
         
-        // TODO it's 95% the same code for each of these, throughout, so refactor to avoid repetition
+        int maxSerializationAttempts = brooklynProperties.getConfig(PERSISTER_MAX_SERIALIZATION_ATTEMPTS);
+        int maxThreadPoolSize = brooklynProperties.getConfig(PERSISTER_MAX_THREAD_POOL_SIZE);
+                
+        MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
+        this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, maxSerializationAttempts);
+
         objectStore.createSubPath("entities");
         objectStore.createSubPath("locations");
         objectStore.createSubPath("policies");
@@ -63,6 +102,11 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
 
         // FIXME does it belong here or to ManagementPlaneSyncRecordPersisterToObjectStore ?
         objectStore.createSubPath("plane");
+        
+        executor = MoreExecutors.listeningDecorator(Executors.newFixedThreadPool(maxThreadPoolSize, new ThreadFactory() {
+            @Override public Thread newThread(Runnable r) {
+                return new Thread(r, "brooklyn-persister");
+            }}));
     }
 
     public PersistenceObjectStore getObjectStore() {
@@ -72,6 +116,9 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     @Override
     public void stop() {
         running = false;
+        if (executor != null) {
+            executor.shutdownNow();
+        }
     }
     
     protected StoreObjectAccessorWithLock getWriter(String path) {
@@ -87,7 +134,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     }
 
     @Override
-    public BrooklynMementoManifest loadMementoManifest(RebindExceptionHandler exceptionHandler) throws IOException {
+    public BrooklynMementoManifest loadMementoManifest(final RebindExceptionHandler exceptionHandler) throws IOException {
         if (!running) {
             throw new IllegalStateException("Persister not running; cannot load memento manifest from " + objectStore.getSummaryName());
         }
@@ -110,67 +157,119 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         Stopwatch stopwatch = Stopwatch.createStarted();
 
         LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
-            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            entitySubPathList/*.size()*/, locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
-        BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
+        final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
 
-        for (String subPath : entitySubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/entity/id");
-                String type = (String) XmlUtil.xpath(contents, "/entity/type");
-                builder.entity(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
-            }
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        
+        for (final String subPath : entitySubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/entity/id");
+                        String type = (String) XmlUtil.xpath(contents, "/entity/type");
+                        builder.entity(id, type);
+                        LOG.debug("Loaded manifest for entity "+subPath+"; id "+id+"; type "+type); // FIXME
+                    } catch (Exception e) {
+                        LOG.debug("Problem loading manifest for entity "+subPath); // FIXME
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : locationSubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/location/id");
-                String type = (String) XmlUtil.xpath(contents, "/location/type");
-                builder.location(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
-            }
+        for (final String subPath : locationSubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/location/id");
+                        String type = (String) XmlUtil.xpath(contents, "/location/type");
+                        builder.location(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : policySubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/policy/id");
-                String type = (String) XmlUtil.xpath(contents, "/policy/type");
-                builder.policy(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
-            }
+        for (final String subPath : policySubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/policy/id");
+                        String type = (String) XmlUtil.xpath(contents, "/policy/type");
+                        builder.policy(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
         }
-        for (String subPath : enricherSubPathList) {
-            try {
-                StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.get();
-                String id = (String) XmlUtil.xpath(contents, "/enricher/id");
-                String type = (String) XmlUtil.xpath(contents, "/enricher/type");
-                builder.enricher(id, type);
-            } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+        for (final String subPath : enricherSubPathList) {
+            futures.add(executor.submit(new Runnable() {
+                public void run() {
+                    try {
+                        String contents = read(subPath);
+                        String id = (String) XmlUtil.xpath(contents, "/enricher/id");
+                        String type = (String) XmlUtil.xpath(contents, "/enricher/type");
+                        builder.enricher(id, type);
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                    }
+                }}));
+        }
+
+        try {
+            // Wait for all, failing fast if any exceptions.
+            Futures.allAsList(futures).get();
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            
+            List<Exception> exceptions = Lists.newArrayList();
+            
+            for (ListenableFuture<?> future : futures) {
+                if (future.isDone()) {
+                    try {
+                        future.get();
+                    } catch (InterruptedException e2) {
+                        throw Exceptions.propagate(e2);
+                    } catch (ExecutionException e2) {
+                        LOG.warn("Problem loading memento manifest", e2);
+                        exceptions.add(e2);
+                    }
+                    future.cancel(true);
+                }
             }
+            if (exceptions.isEmpty()) {
+                throw Exceptions.propagate(e);
+            } else {
+                // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+                throw new CompoundRuntimeException("Problem loading mementos", exceptions);
+            }
+        }
+
+        BrooklynMementoManifest result = builder.build();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+                     Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIdToType().size(), 
+                     result.getLocationIdToType().size(), result.getPolicyIdToType().size(), result.getEnricherIdToType().size(),
+                     objectStore.getSummaryName() });
+        }
+
+        if (result.getEntityIdToType().size() != entitySubPathList.size()) {
+            LOG.error("Lost an entity?!");
         }
         
-        if (LOG.isDebugEnabled())
-            LOG.debug("Loaded memento manifest; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
-        return builder.build();
+        return result;
     }
 
     @Override
-    public BrooklynMemento loadMemento(LookupContext lookupContext, RebindExceptionHandler exceptionHandler) throws IOException {
+    public BrooklynMemento loadMemento(LookupContext lookupContext, final RebindExceptionHandler exceptionHandler) throws IOException {
         if (!running) {
             throw new IllegalStateException("Persister not running; cannot load memento from " + objectStore.getSummaryName());
         }
@@ -195,71 +294,121 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
-        BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
+        final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
         serializer.setLookupContext(lookupContext);
+        
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        
         try {
-            for (String subPath : entitySubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EntityMemento memento = (EntityMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.entity(memento);
-                        if (memento.isTopLevelApp()) {
-                            builder.applicationId(memento.getId());
+            for (final String subPath : entitySubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            EntityMemento memento = (EntityMemento) serializer.fromString(read(subPath));
+                            if (memento == null) {
+                                LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.entity(memento);
+                                if (memento.isTopLevelApp()) {
+                                    builder.applicationId(memento.getId());
+                                }
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
                         }
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadEntityMementoFailed("Memento "+subPath, e);
-                }
+                    }}));
             }
-            for (String subPath : locationSubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    LocationMemento memento = (LocationMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.location(memento);
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
-                }
+            for (final String subPath : locationSubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            LocationMemento memento = (LocationMemento) serializer.fromString(read(subPath));
+                            if (memento == null) {
+                                LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.location(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadLocationMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
             }
-            for (String subPath : policySubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.policy(memento);
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
-                }
+            for (final String subPath : policySubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+                            PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
+                            if (memento == null) {
+                                LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.policy(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadPolicyMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
             }
-            for (String subPath : enricherSubPathList) {
-                try {
-                    StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
-                    if (memento == null) {
-                        LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.enricher(memento);
+            for (final String subPath : enricherSubPathList) {
+                futures.add(executor.submit(new Runnable() {
+                    public void run() {
+                        try {
+                            StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+                            EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
+                            if (memento == null) {
+                                LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
+                            } else {
+                                builder.enricher(memento);
+                            }
+                        } catch (Exception e) {
+                            exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                        }
+                    }}));
+            }
+            
+            try {
+                // Wait for all, failing fast if any exceptions.
+                Futures.allAsList(futures).get();
+            } catch (Exception e) {
+                Exceptions.propagateIfFatal(e);
+                
+                List<Exception> exceptions = Lists.newArrayList();
+                
+                for (ListenableFuture<?> future : futures) {
+                    if (future.isDone()) {
+                        try {
+                            future.get();
+                        } catch (InterruptedException e2) {
+                            throw Exceptions.propagate(e2);
+                        } catch (ExecutionException e2) {
+                            LOG.warn("Problem loading memento", e2);
+                            exceptions.add(e2);
+                        }
+                        future.cancel(true);
                     }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadEnricherMementoFailed("Memento "+subPath, e);
+                }
+                if (exceptions.isEmpty()) {
+                    throw Exceptions.propagate(e);
+                } else {
+                    // Normally there should be at lesat one failure; otherwise all.get() would not have failed.
+                    throw new CompoundRuntimeException("Problem loading mementos", exceptions);
                 }
             }
-            
+
         } finally {
             serializer.unsetLookupContext();
         }
 
-        if (LOG.isDebugEnabled()) LOG.debug("Loaded memento; took {}", Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
-        return builder.build();
+        BrooklynMemento result = builder.build();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento; took {}; {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+                      Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), result.getEntityIds().size(), 
+                      result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
+                      objectStore.getSummaryName() });
+        }
+        
+        return result;
     }
     
     @Override
@@ -268,67 +417,84 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointing entire memento, because not running");
             return;
         }
-        objectStore.prepareForMasterUse();
         
-        Stopwatch stopwatch = Stopwatch.createStarted();
-
-        for (EntityMemento entity : newMemento.getEntityMementos().values()) {
-            persist("entities", entity, exceptionHandler);
-        }
-        for (LocationMemento location : newMemento.getLocationMementos().values()) {
-            persist("locations", location, exceptionHandler);
-        }
-        for (PolicyMemento policy : newMemento.getPolicyMementos().values()) {
-            persist("policies", policy, exceptionHandler);
-        }
-        for (EnricherMemento enricher : newMemento.getEnricherMementos().values()) {
-            persist("enrichers", enricher, exceptionHandler);
-        }
+        Delta delta = PersisterDeltaImpl.builder()
+                .entities(newMemento.getEntityMementos().values())
+                .locations(newMemento.getLocationMementos().values())
+                .policies(newMemento.getPolicyMementos().values())
+                .enrichers(newMemento.getEnricherMementos().values())
+                .build();
+        Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
         
         if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
     }
-    
+
     @Override
     public void delta(Delta delta, PersistenceExceptionHandler exceptionHandler) {
         if (!running) {
             if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointed delta of memento, because not running");
             return;
         }
-        objectStore.prepareForMasterUse();
-        
-        Stopwatch stopwatch = Stopwatch.createStarted();
-        
-        for (EntityMemento entity : delta.entities()) {
-            persist("entities", entity, exceptionHandler);
-        }
-        for (LocationMemento location : delta.locations()) {
-            persist("locations", location, exceptionHandler);
-        }
-        for (PolicyMemento policy : delta.policies()) {
-            persist("policies", policy, exceptionHandler);
-        }
-        for (EnricherMemento enricher : delta.enrichers()) {
-            persist("enrichers", enricher, exceptionHandler);
-        }
-        
-        for (String id : delta.removedEntityIds()) {
-            delete("entities", id, exceptionHandler);
-        }
-        for (String id : delta.removedLocationIds()) {
-            delete("locations", id, exceptionHandler);
-        }
-        for (String id : delta.removedPolicyIds()) {
-            delete("policies", id, exceptionHandler);
-        }
-        for (String id : delta.removedEnricherIds()) {
-            delete("enrichers", id, exceptionHandler);
-        }
+        Stopwatch stopwatch = deltaImpl(delta, exceptionHandler);
         
         if (LOG.isDebugEnabled()) LOG.debug("Checkpointed delta of memento in {}; updated {} entities, {} locations and {} policies; " +
                 "removing {} entities, {} locations and {} policies", 
                 new Object[] {Time.makeTimeStringRounded(stopwatch), delta.entities(), delta.locations(), delta.policies(),
                 delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
     }
+    
+    private Stopwatch deltaImpl(Delta delta, PersistenceExceptionHandler exceptionHandler) {
+        try {
+            lock.writeLock().lockInterruptibly();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+        try {
+            objectStore.prepareForMasterUse();
+            
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            List<ListenableFuture<?>> futures = Lists.newArrayList();
+            
+            for (EntityMemento entity : delta.entities()) {
+                futures.add(asyncPersist("entities", entity, exceptionHandler));
+            }
+            for (LocationMemento location : delta.locations()) {
+                futures.add(asyncPersist("locations", location, exceptionHandler));
+            }
+            for (PolicyMemento policy : delta.policies()) {
+                futures.add(asyncPersist("policies", policy, exceptionHandler));
+            }
+            for (EnricherMemento enricher : delta.enrichers()) {
+                futures.add(asyncPersist("enrichers", enricher, exceptionHandler));
+            }
+            
+            for (String id : delta.removedEntityIds()) {
+                futures.add(asyncDelete("entities", id, exceptionHandler));
+            }
+            for (String id : delta.removedLocationIds()) {
+                futures.add(asyncDelete("locations", id, exceptionHandler));
+            }
+            for (String id : delta.removedPolicyIds()) {
+                futures.add(asyncDelete("policies", id, exceptionHandler));
+            }
+            for (String id : delta.removedEnricherIds()) {
+                futures.add(asyncDelete("enrichers", id, exceptionHandler));
+            }
+            
+            try {
+                // Wait for all the tasks to complete or fail, rather than aborting on the first failure.
+                // But then propagate failure if any fail. (hence the two calls).
+                Futures.successfulAsList(futures).get();
+                Futures.allAsList(futures).get();
+            } catch (Exception e) {
+                throw Exceptions.propagate(e);
+            }
+            
+            return stopwatch;
+        } finally {
+            lock.writeLock().unlock();
+        }
+    }
 
     @Override
     @VisibleForTesting
@@ -336,11 +502,20 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         waitForWritesCompleted(Duration.of(timeout, unit));
     }
     
+    @Override
     public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
+        lock.writeLock().lockInterruptibly();
+        lock.writeLock().unlock();
+        
         for (StoreObjectAccessorWithLock writer : writers.values())
             writer.waitForCurrentWrites(timeout);
     }
 
+    private String read(String subPath) {
+        StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
+        return objectAccessor.get();
+    }
+
     private void persist(String subPath, Memento memento, PersistenceExceptionHandler exceptionHandler) {
         try {
             getWriter(getPath(subPath, memento.getId())).put(serializer.toString(memento));
@@ -348,7 +523,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             exceptionHandler.onPersistMementoFailed(memento, e);
         }
     }
-
+    
     private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler) {
         try {
             StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
@@ -361,6 +536,27 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
+    private ListenableFuture<String> asyncRead(final String subPath) {
+        return executor.submit(new Callable<String>() {
+            public String call() {
+                return read(subPath);
+            }});
+    }
+
+    private ListenableFuture<?> asyncPersist(final String subPath, final Memento memento, final PersistenceExceptionHandler exceptionHandler) {
+        return executor.submit(new Runnable() {
+            public void run() {
+                persist(subPath, memento, exceptionHandler);
+            }});
+    }
+
+    private ListenableFuture<?> asyncDelete(final String subPath, final String id, final PersistenceExceptionHandler exceptionHandler) {
+        return executor.submit(new Runnable() {
+            public void run() {
+                delete(subPath, id, exceptionHandler);
+            }});
+    }
+    
     private String getPath(String subPath, String id) {
         return subPath+"/"+id;
     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
index 290cc3b..ea9f9f6 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindEntityTest.java
@@ -175,6 +175,13 @@ public class RebindEntityTest extends RebindTestFixtureWithApp {
         assertEquals(newE.getDisplayName(), "mydisplayname");
     }
     
+    // Saw this fail during development (fixed now); but want at least one of these tests to be run 
+    // many times for stress testing purposes
+    @Test(invocationCount=100, groups="Integeration")
+    public void testRestoresEntityIdAndDisplayNameManyTimes() throws Exception {
+        testRestoresEntityIdAndDisplayName();
+    }
+    
     @Test
     public void testCanCustomizeRebind() throws Exception {
         MyEntity2 origE = origApp.createAndManageChild(EntitySpec.create(MyEntity2.class).configure("myfield", "myval"));

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
index 4c8e8f7..2fd873e 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestFixture.java
@@ -16,6 +16,7 @@ import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.management.ManagementContext;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.mementos.BrooklynMementoManifest;
 import brooklyn.test.entity.LocalManagementContextForTests;
 import brooklyn.util.os.Os;
@@ -111,7 +112,10 @@ public abstract class RebindTestFixture<T extends StartableApplication> {
         FileBasedObjectStore objectStore = new FileBasedObjectStore(mementoDir);
         objectStore.injectManagementContext(newManagementContext);
         objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
-        BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+                objectStore,
+                ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+                classLoader);
         RebindExceptionHandler exceptionHandler = new RecordingRebindExceptionHandler(RebindManager.RebindFailureMode.FAIL_AT_END, RebindManager.RebindFailureMode.FAIL_AT_END);
         BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
         persister.stop();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
index 4b4e54c..d402bb9 100644
--- a/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindTestUtils.java
@@ -27,6 +27,7 @@ import brooklyn.location.Location;
 import brooklyn.management.ManagementContext;
 import brooklyn.management.ha.HighAvailabilityMode;
 import brooklyn.management.internal.LocalManagementContext;
+import brooklyn.management.internal.ManagementContextInternal;
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.test.entity.LocalManagementContextForTests;
 import brooklyn.util.javalang.Serializers;
@@ -170,7 +171,10 @@ public class RebindTestUtils {
             
             objectStore.injectManagementContext(unstarted);
             objectStore.prepareForSharedUse(PersistMode.AUTO, HighAvailabilityMode.DISABLED);
-            BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+            BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+                    objectStore, 
+                    unstarted.getBrooklynProperties(), 
+                    classLoader);
             ((RebindManagerImpl) unstarted.getRebindManager()).setPeriodicPersistPeriod(persistPeriod);
             unstarted.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
             return unstarted;
@@ -235,7 +239,10 @@ public class RebindTestUtils {
     public static Collection<Application> rebindAll(ManagementContext newManagementContext, File mementoDir, ClassLoader classLoader, RebindExceptionHandler exceptionHandler, PersistenceObjectStore objectStore) throws Exception {
         LOG.info("Rebinding app, using directory "+mementoDir);
 
-        BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore newPersister = new BrooklynMementoPersisterToObjectStore(
+                objectStore,
+                ((ManagementContextInternal)newManagementContext).getBrooklynProperties(),
+                classLoader);
         newManagementContext.getRebindManager().setPersister(newPersister, PersistenceExceptionHandlerImpl.builder().build());
         List<Application> newApps;
         if (exceptionHandler == null) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
index e33ac4f..0183985 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerSplitBrainTest.java
@@ -80,7 +80,7 @@ public class HighAvailabilityManagerSplitBrainTest {
             objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
             persister = new ManagementPlaneSyncRecordPersisterToObjectStore(mgmt, objectStore, classLoader);
             ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
-            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+            BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, mgmt.getBrooklynProperties(), classLoader);
             mgmt.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
             ha = new HighAvailabilityManagerImpl(mgmt)
                 .setPollPeriod(Duration.PRACTICALLY_FOREVER)

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
index b572a80..6e89c37 100644
--- a/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
+++ b/core/src/test/java/brooklyn/management/ha/HighAvailabilityManagerTestFixture.java
@@ -68,7 +68,10 @@ public abstract class HighAvailabilityManagerTestFixture {
         objectStore.prepareForSharedUse(PersistMode.CLEAN, HighAvailabilityMode.DISABLED);
         persister = new ManagementPlaneSyncRecordPersisterToObjectStore(managementContext, objectStore, classLoader);
         ((ManagementPlaneSyncRecordPersisterToObjectStore)persister).allowRemoteTimestampInMemento();
-        BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(objectStore, classLoader);
+        BrooklynMementoPersisterToObjectStore persisterObj = new BrooklynMementoPersisterToObjectStore(
+                objectStore, 
+                managementContext.getBrooklynProperties(), 
+                classLoader);
         managementContext.getRebindManager().setPersister(persisterObj, PersistenceExceptionHandlerImpl.builder().build());
         manager = new HighAvailabilityManagerImpl(managementContext)
                 .setPollPeriod(getPollPeriod())

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
----------------------------------------------------------------------
diff --git a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
index 64ef089..638c029 100644
--- a/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
+++ b/usage/launcher/src/main/java/brooklyn/launcher/BrooklynLauncher.java
@@ -541,7 +541,9 @@ public class BrooklynLauncher {
 
             RebindManager rebindManager = managementContext.getRebindManager();
 
-            BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(objectStore,
+            BrooklynMementoPersisterToObjectStore persister = new BrooklynMementoPersisterToObjectStore(
+                    objectStore,
+                    ((ManagementContextInternal)managementContext).getBrooklynProperties(),
                     managementContext.getCatalog().getRootClassLoader());
             PersistenceExceptionHandler persistenceExceptionHandler = PersistenceExceptionHandlerImpl.builder().build();
             ((RebindManagerImpl) rebindManager).setPeriodicPersistPeriod(persistPeriod);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d04761e3/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
index 5f0fc8a..42fddb2 100644
--- a/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
+++ b/utils/common/src/main/java/brooklyn/util/exceptions/Exceptions.java
@@ -80,6 +80,8 @@ public class Exceptions {
     public static void propagateIfFatal(Throwable throwable) {
         if (throwable instanceof InterruptedException)
             throw new RuntimeInterruptedException((InterruptedException) throwable);
+        if (throwable instanceof RuntimeInterruptedException)
+            throw (RuntimeInterruptedException) throwable;
         if (throwable instanceof Error)
             throw (Error) throwable;
     }


Mime
View raw message