brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rich...@apache.org
Subject [08/10] git commit: addresses review comments, cleans up quite a few things. most importantly:
Date Tue, 10 Jun 2014 00:15:29 GMT
    addresses review comments, cleans up quite a few things.  most importantly:

    * change start sequence so that:
      * HA is known when the web console comes up (initPersistence before startWebApps)
      * launcher exits if persistence is misconfigured or if it fails to rebind

    * big tidy up of startup logging around persistence/HA
    * fix how jclouds integration tests inherit (couldn't find a nice way with any combo of @BeforeGroup / @BeforeMethod, so have just repeated all the test methods from supers)
    * support brooklyn.persistence.location.spec being set in brooklyn.properties
    * support brooklyn.persistence.backup.required (note, this defaults to false in obj store, but we seem to be pretty good about not clobbering things; even so, some prod usage may want versioning on the obj store)
    * clean up how persistence store initialization (prepare) is done
    * refactor of how object store items are written, with wrapper which handles locking, cutting code and fixing append for file and jclouds obj store


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/de3fb4be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/de3fb4be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/de3fb4be

Branch: refs/heads/master
Commit: de3fb4bed7f75bc19bc4f993a1417993a8f1b2ba
Parents: 94495f1
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Thu Jun 5 17:08:14 2014 +0100
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Fri Jun 6 21:46:11 2014 +0100

----------------------------------------------------------------------
 .../brooklyn/config/BrooklynProperties.java     |   6 +-
 .../brooklyn/config/BrooklynServerConfig.java   |  67 +++--
 .../entity/basic/BrooklynShutdownHooks.java     |   7 +-
 .../entity/rebind/RebindManagerImpl.java        |  30 ++-
 .../BrooklynMementoPersisterToMultiFile.java    |   6 +-
 .../BrooklynMementoPersisterToObjectStore.java  | 174 ++++--------
 .../rebind/persister/FileBasedObjectStore.java  | 181 ++++++++-----
 .../persister/FileBasedStoreObjectAccessor.java | 265 ++-----------------
 .../rebind/persister/InMemoryObjectStore.java   | 120 ---------
 .../rebind/persister/MementoFileWriter.java     |   4 +-
 .../persister/PersistenceObjectStore.java       |  55 ++--
 .../persister/StoreObjectAccessorLocking.java   | 153 +++++++++++
 .../ha/HighAvailabilityManagerImpl.java         |  49 ++--
 ...ntPlaneSyncRecordPersisterToObjectStore.java |  71 ++---
 .../main/resources/brooklyn-catalog-empty.xml   |   1 +
 .../entity/group/DynamicMultiGroupTest.java     |   3 +-
 .../brooklyn/entity/rebind/RebindTestUtils.java |   3 +-
 .../BrooklynMementoPersisterTestFixture.java    |   6 +-
 .../FileBasedStoreObjectAccessorWriterTest.java |   6 +-
 .../rebind/persister/InMemoryObjectStore.java   | 119 +++++++++
 .../InMemoryStoreObjectAccessorWriterTest.java  |   6 +-
 .../rebind/persister/ListeningObjectStore.java  |  24 +-
 ...nceStoreObjectAccessorWriterTestFixture.java |  24 +-
 .../ha/HighAvailabilityManagerTestFixture.java  |   5 +-
 .../entity/LocalManagementContextForTests.java  |  12 +-
 .../brooklyn/util/file/ArchiveBuilderTest.java  |   6 +-
 .../JcloudsBlobStoreBasedObjectStore.java       |  39 ++-
 .../jclouds/JcloudsStoreObjectAccessor.java     |  24 +-
 .../rebind/persister/jclouds/BlobStoreTest.java |  20 +-
 ...nMementoPersisterJcloudsObjectStoreTest.java |  19 ++
 ...ailabilityManagerJcloudsObjectStoreTest.java |  24 +-
 .../JcloudsObjectStoreAccessorWriterTest.java   |  23 +-
 usage/cli/src/main/java/brooklyn/cli/Main.java  |   8 +-
 usage/launcher/pom.xml                          |   7 +
 .../brooklyn/launcher/BrooklynLauncher.java     |  75 +++---
 .../BrooklynLauncherHighAvailabilityTest.java   |  11 +-
 .../BrooklynLauncherRebindTestToFiles.java      |   3 +-
 ...lynLauncherRebindToCloudObjectStoreTest.java |  63 ++++-
 .../FatalConfigurationRuntimeException.java     |   3 +-
 .../main/java/brooklyn/util/text/Strings.java   |  49 +++-
 40 files changed, 975 insertions(+), 796 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/config/BrooklynProperties.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/config/BrooklynProperties.java b/core/src/main/java/brooklyn/config/BrooklynProperties.java
index 849bf74..ab56bd0 100644
--- a/core/src/main/java/brooklyn/config/BrooklynProperties.java
+++ b/core/src/main/java/brooklyn/config/BrooklynProperties.java
@@ -331,6 +331,10 @@ public class BrooklynProperties extends LinkedHashMap implements StringConfigMap
         return super.put(key.getName(), value);
     }
     
+    public <T> boolean putIfAbsent(ConfigKey<T> key, T value) {
+        return putIfAbsent(key.getName(), value);
+    }
+    
     @Override
     public <T> T getConfig(ConfigKey<T> key) {
         return getConfig(key, null);
@@ -393,5 +397,5 @@ public class BrooklynProperties extends LinkedHashMap implements StringConfigMap
     public Map<String, Object> asMapWithStringKeys() {
         return this;
     }
-    
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/config/BrooklynServerConfig.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/config/BrooklynServerConfig.java b/core/src/main/java/brooklyn/config/BrooklynServerConfig.java
index cb0bf9e..0eaf998 100644
--- a/core/src/main/java/brooklyn/config/BrooklynServerConfig.java
+++ b/core/src/main/java/brooklyn/config/BrooklynServerConfig.java
@@ -9,10 +9,10 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.entity.basic.ConfigKeys;
 import brooklyn.management.ManagementContext;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.os.Os;
-import brooklyn.util.text.Strings;
 
 /** config keys for the brooklyn server */
 public class BrooklynServerConfig {
@@ -30,14 +30,28 @@ public class BrooklynServerConfig {
     public static final ConfigKey<String> BROOKLYN_DATA_DIR = newStringConfigKey(
             "brooklyn.datadir", "Directory for writing all brooklyn data");
 
-    /** also used for containers */
-    public static final String PERSISTENCE_PATH_SEGMENT = "brooklyn-persisted-state";
+    public static final String DEFAULT_PERSISTENCE_CONTAINER_NAME = "brooklyn-persisted-state";
+    /** on file system, the 'data' subdir is used so that there is an obvious place to put backup dirs */ 
+    public static final String DEFAULT_PERSISTENCE_DIR_FOR_FILESYSTEM = Os.mergePaths(DEFAULT_PERSISTENCE_CONTAINER_NAME, "data");
     
-    /** provided for setting; consumers should use {@link #getPersistenceDir(ManagementContext)},
-     * but note for object stores this may be treated specially */ 
+    /** provided for setting; consumers should query the management context persistence subsystem for the actual target,
+     * or use {@link #resolvePersistencePath(String, StringConfigMap, String)} if trying to resolve the value */
     public static final ConfigKey<String> PERSISTENCE_DIR = newStringConfigKey(
-        "brooklyn.persistence.dir", "Directory for writing brooklyn persisted state; if not absolute, taken relative to mgmt base", 
-        Os.mergePaths(PERSISTENCE_PATH_SEGMENT, "data"));
+        "brooklyn.persistence.dir", 
+        "Directory or container name for writing brooklyn persisted state");
+
+    public static final ConfigKey<String> PERSISTENCE_LOCATION_SPEC = newStringConfigKey(
+        "brooklyn.persistence.location.spec", 
+        "Optional location spec string for an object store (e.g. jclouds:swift:URL) where persisted state should be kept;"
+        + "if blank or not supplied, the file system is used"); 
+
+    public static final ConfigKey<Boolean> PERSISTENCE_BACKUPS_REQUIRED =
+        ConfigKeys.newBooleanConfigKey("brooklyn.persistence.backups.required",
+            "Whether a backup should always be made of the persistence directory; "
+            + "if true, it will fail if this operation is not permitted (e.g. jclouds-based cloud object stores); "
+            + "if false, the persistence store will be overwritten with changes (but files not removed if they are unreadable); "
+            + "if null or not set, the legacy beahviour of creating backups where possible (e.g. file system) is currently used, "
+            + "but this may be changed in future versions");
 
     public static String getMgmtBaseDir(ManagementContext mgmt) {
         return getMgmtBaseDir(mgmt.getConfig());
@@ -60,22 +74,41 @@ public class BrooklynServerConfig {
         return Os.tidyPath(base)+File.separator;
     }
     
-    protected static String relativeToBase(StringConfigMap brooklynProperties, ConfigKey<String> key) {
-        String d = brooklynProperties.getConfig(key);
-        if (Strings.isBlank(d))
-            throw new IllegalArgumentException(""+key+" must not be blank");
-        if (!Os.isAbsolute(d)) d = Os.mergePaths(getMgmtBaseDir(brooklynProperties), d);
-        return Os.tidyPath(d);
+    protected static String resolveAgainstBaseDir(StringConfigMap brooklynProperties, String path) {
+        if (!Os.isAbsolute(path)) path = Os.mergePaths(getMgmtBaseDir(brooklynProperties), path);
+        return Os.tidyPath(path);
     }
     
-    /** dir where persistence should be put according to configuration,
-     * but note for object stores this may be treated specially */
+    /** @deprecated since 0.7.0 use {@link #resolvePersistencePath(String, StringConfigMap, String)} */
     public static String getPersistenceDir(ManagementContext mgmt) {
         return getPersistenceDir(mgmt.getConfig());
     }
-    /** see {@link #getPersistenceDir(ManagementContext)} */ 
+    /** @deprecated since 0.7.0 use {@link #resolvePersistencePath(String, StringConfigMap, String)} */ 
     public static String getPersistenceDir(StringConfigMap brooklynProperties) {
-        return relativeToBase(brooklynProperties, PERSISTENCE_DIR);
+        return resolvePersistencePath(null, brooklynProperties, null);
+    }
+    
+    /** container name or full path for where persist state should be kept
+     * @param optionalSuppliedValue  a value which has been supplied explicitly, optionally
+     * @param brooklynProperties  the properties map where the persistence path should be looked up
+     *   if not supplied, along with finding the brooklyn.base.dir if needed (using file system persistence with a relative path)
+     * @param optionalObjectStoreLocationSpec  if a location spec is supplied, this will return a container name
+     *    suitable for use with the given object store based on brooklyn.persistence.dir;
+     *    if null this method will return a full file system path, relative 
+     *    to the brooklyn.base.dir if the configured brooklyn.persistence.dir is not absolute
+     */
+    public static String resolvePersistencePath(String optionalSuppliedValue, StringConfigMap brooklynProperties, String optionalObjectStoreLocationSpec) {
+        String path = optionalSuppliedValue;
+        if (path==null) path = brooklynProperties.getConfig(PERSISTENCE_DIR);
+        if (optionalObjectStoreLocationSpec==null) {
+            // file system
+            if (path==null) path=DEFAULT_PERSISTENCE_DIR_FOR_FILESYSTEM;
+            return resolveAgainstBaseDir(brooklynProperties, path);
+        } else {
+            // obj store
+            if (path==null) path=DEFAULT_PERSISTENCE_CONTAINER_NAME;
+            return path;
+        }
     }
 
     public static File getBrooklynWebTmpDir(ManagementContext mgmt) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/basic/BrooklynShutdownHooks.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/BrooklynShutdownHooks.java b/core/src/main/java/brooklyn/entity/basic/BrooklynShutdownHooks.java
index 8f44a7e..d6fe3b3 100644
--- a/core/src/main/java/brooklyn/entity/basic/BrooklynShutdownHooks.java
+++ b/core/src/main/java/brooklyn/entity/basic/BrooklynShutdownHooks.java
@@ -74,7 +74,10 @@ public class BrooklynShutdownHooks {
         public void run() {
             // First stop entities; on interrupt, abort waiting for tasks - but let shutdown hook continue
             synchronized (mutex) {
-                log.info("Brooklyn stopOnShutdown shutdown-hook invoked: stopping entities: "+entitiesToStopOnShutdown);
+                if (entitiesToStopOnShutdown.isEmpty())
+                    log.debug("Brooklyn stopOnShutdown shutdown-hook invoked: no entities to stop");
+                else
+                    log.info("Brooklyn stopOnShutdown shutdown-hook invoked: stopping entities: "+entitiesToStopOnShutdown);
                 List<Task> stops = new ArrayList<Task>();
                 for (Entity entity: entitiesToStopOnShutdown) {
                     try {
@@ -100,7 +103,7 @@ public class BrooklynShutdownHooks {
                 }
             
                 // Then terminate management contexts
-                log.info("Brooklyn terminateOnShutdown shutdown-hook invoked: terminating management contexts: "+managementContextsToTerminateOnShutdown);
+                log.debug("Brooklyn terminateOnShutdown shutdown-hook invoked: terminating management contexts: "+managementContextsToTerminateOnShutdown);
                 for (ManagementContextInternal managementContext: managementContextsToTerminateOnShutdown) {
                     try {
                         managementContext.terminate();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index e4e78c3..658b1c1 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -45,6 +45,7 @@ import brooklyn.util.collections.MutableMap;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.flags.FlagUtils;
 import brooklyn.util.javalang.Reflections;
+import brooklyn.util.text.Strings;
 import brooklyn.util.time.Duration;
 
 import com.google.common.annotations.VisibleForTesting;
@@ -203,7 +204,7 @@ public class RebindManagerImpl implements RebindManager {
             BrooklynMementoManifest mementoManifest = persister.loadMementoManifest(exceptionHandler);
             
             // Instantiate locations
-            LOG.info("RebindManager instantiating locations: {}", mementoManifest.getLocationIdToType().keySet());
+            LOG.debug("RebindManager instantiating locations: {}", mementoManifest.getLocationIdToType().keySet());
             for (Map.Entry<String, String> entry : mementoManifest.getLocationIdToType().entrySet()) {
                 String locId = entry.getKey();
                 String locType = entry.getValue();
@@ -219,7 +220,7 @@ public class RebindManagerImpl implements RebindManager {
             }
             
             // Instantiate entities
-            LOG.info("RebindManager instantiating entities: {}", mementoManifest.getEntityIdToType().keySet());
+            LOG.debug("RebindManager instantiating entities: {}", mementoManifest.getEntityIdToType().keySet());
             for (Map.Entry<String, String> entry : mementoManifest.getEntityIdToType().entrySet()) {
                 String entityId = entry.getKey();
                 String entityType = entry.getValue();
@@ -238,7 +239,7 @@ public class RebindManagerImpl implements RebindManager {
             
             // Instantiate policies
             if (persistPoliciesEnabled) {
-                LOG.info("RebindManager instantiating policies: {}", memento.getPolicyIds());
+                LOG.debug("RebindManager instantiating policies: {}", memento.getPolicyIds());
                 for (PolicyMemento policyMemento : memento.getPolicyMementos().values()) {
                     if (LOG.isDebugEnabled()) LOG.debug("RebindManager instantiating policy {}", policyMemento);
                     
@@ -256,7 +257,7 @@ public class RebindManagerImpl implements RebindManager {
             
             // Instantiate enrichers
             if (persistEnrichersEnabled) {
-                LOG.info("RebindManager instantiating enrichers: {}", memento.getEnricherIds());
+                LOG.debug("RebindManager instantiating enrichers: {}", memento.getEnricherIds());
                 for (EnricherMemento enricherMemento : memento.getEnricherMementos().values()) {
                     if (LOG.isDebugEnabled()) LOG.debug("RebindManager instantiating enricher {}", enricherMemento);
                     
@@ -269,7 +270,7 @@ public class RebindManagerImpl implements RebindManager {
             } 
             
             // Reconstruct locations
-            LOG.info("RebindManager reconstructing locations");
+            LOG.debug("RebindManager reconstructing locations");
             for (LocationMemento locMemento : sortParentFirst(memento.getLocationMementos()).values()) {
                 Location location = rebindContext.getLocation(locMemento.getId());
                 if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing location {}", locMemento);
@@ -287,7 +288,7 @@ public class RebindManagerImpl implements RebindManager {
 
             // Reconstruct policies
             if (persistPoliciesEnabled) {
-                LOG.info("RebindManager reconstructing policies");
+                LOG.debug("RebindManager reconstructing policies");
                 for (PolicyMemento policyMemento : memento.getPolicyMementos().values()) {
                     Policy policy = rebindContext.getPolicy(policyMemento.getId());
                     if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing policy {}", policyMemento);
@@ -307,7 +308,7 @@ public class RebindManagerImpl implements RebindManager {
 
             // Reconstruct enrichers
             if (persistEnrichersEnabled) {
-                LOG.info("RebindManager reconstructing enrichers");
+                LOG.debug("RebindManager reconstructing enrichers");
                 for (EnricherMemento enricherMemento : memento.getEnricherMementos().values()) {
                     Enricher enricher = rebindContext.getEnricher(enricherMemento.getId());
                     if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing enricher {}", enricherMemento);
@@ -317,7 +318,7 @@ public class RebindManagerImpl implements RebindManager {
             }
     
             // Reconstruct entities
-            LOG.info("RebindManager reconstructing entities");
+            LOG.debug("RebindManager reconstructing entities");
             for (EntityMemento entityMemento : sortParentFirst(memento.getEntityMementos()).values()) {
                 Entity entity = rebindContext.getEntity(entityMemento.getId());
                 if (LOG.isDebugEnabled()) LOG.debug("RebindManager reconstructing entity {}", entityMemento);
@@ -335,7 +336,7 @@ public class RebindManagerImpl implements RebindManager {
                 }
             }
             
-            LOG.info("RebindManager managing locations");
+            LOG.debug("RebindManager managing locations");
             for (Location location: locations.values()) {
                 if (location.getParent()==null) {
                     // manage all root locations
@@ -349,7 +350,7 @@ public class RebindManagerImpl implements RebindManager {
             }
             
             // Manage the top-level apps (causing everything under them to become managed)
-            LOG.info("RebindManager managing entities");
+            LOG.debug("RebindManager managing entities");
             List<Application> apps = Lists.newArrayList();
             for (String appId : memento.getApplicationIds()) {
                 Entity entity = rebindContext.getEntity(appId);
@@ -368,8 +369,15 @@ public class RebindManagerImpl implements RebindManager {
             
             exceptionHandler.onDone();
 
+            LOG.info("Rebind complete: {} app{}, {} entit{}, {} location{}, {} polic{}, {} enricher{}", new Object[]{
+                apps.size(), Strings.s(apps),
+                entities.size(), Strings.ies(entities),
+                locations.size(), Strings.s(locations),
+                policies.size(), Strings.ies(policies),
+                enrichers.size(), Strings.s(enrichers) });
+
             // Return the top-level applications
-            LOG.info("RebindManager complete; return apps: {}", memento.getApplicationIds());
+            LOG.debug("RebindManager complete; return apps: {}", memento.getApplicationIds());
             return apps;
             
         } catch (RuntimeException e) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
index bcd5c72..c2a1a85 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToMultiFile.java
@@ -135,8 +135,7 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
             enricherFiles = enrichersDir.listFiles(fileFilter);
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            if (exceptionHandler!=null)
-                exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
+            exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
             throw new IllegalStateException("Failed to list memento files in "+dir, e);
         }
         
@@ -219,8 +218,7 @@ public class BrooklynMementoPersisterToMultiFile implements BrooklynMementoPersi
             enricherFiles = enrichersDir.listFiles(fileFilter);
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            if (exceptionHandler!=null)
-                exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
+            exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
             throw new IllegalStateException("Failed to list memento files in "+dir, e);
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 92270db..a884419 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -2,10 +2,10 @@ package brooklyn.entity.rebind.persister;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import java.io.File;
 import java.io.IOException;
+import java.util.LinkedHashMap;
 import java.util.List;
-import java.util.concurrent.ConcurrentMap;
+import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -16,12 +16,14 @@ import brooklyn.entity.rebind.RebindExceptionHandler;
 import brooklyn.entity.rebind.dto.BrooklynMementoImpl;
 import brooklyn.entity.rebind.dto.BrooklynMementoManifestImpl;
 import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessor;
+import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccessorWithLock;
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.mementos.BrooklynMementoManifest;
 import brooklyn.mementos.BrooklynMementoPersister;
 import brooklyn.mementos.EnricherMemento;
 import brooklyn.mementos.EntityMemento;
 import brooklyn.mementos.LocationMemento;
+import brooklyn.mementos.Memento;
 import brooklyn.mementos.PolicyMemento;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.time.Duration;
@@ -30,7 +32,6 @@ import brooklyn.util.xstream.XmlUtil;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
-import com.google.common.collect.Maps;
 
 /** Implementation of the {@link BrooklynMementoPersister} backed by a pluggable
  * {@link PersistenceObjectStore} such as a file system or a jclouds object store */
@@ -43,11 +44,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     private final PersistenceObjectStore objectStore;
     private final MementoSerializer<Object> serializer;
 
-    // TODO it's 95% the same code for each of these, so refactor to avoid repetition
-    private final ConcurrentMap<String, StoreObjectAccessor> entityWriters = Maps.newConcurrentMap();
-    private final ConcurrentMap<String, StoreObjectAccessor> locationWriters = Maps.newConcurrentMap();
-    private final ConcurrentMap<String, StoreObjectAccessor> policyWriters = Maps.newConcurrentMap();
-    private final ConcurrentMap<String, StoreObjectAccessor> enricherWriters = Maps.newConcurrentMap();
+    private final Map<String, StoreObjectAccessorWithLock> writers = new LinkedHashMap<String, PersistenceObjectStore.StoreObjectAccessorWithLock>();
 
     private volatile boolean running = true;
 
@@ -56,6 +53,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         MementoSerializer<Object> rawSerializer = new XmlMementoSerializer<Object>(classLoader);
         this.serializer = new RetryingMementoSerializer<Object>(rawSerializer, MAX_SERIALIZATION_ATTEMPTS);
 
+        // TODO it's 95% the same code for each of these, throughout, so refactor to avoid repetition
         objectStore.createSubPath("entities");
         objectStore.createSubPath("locations");
         objectStore.createSubPath("policies");
@@ -73,6 +71,18 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     public void stop() {
         running = false;
     }
+    
+    protected StoreObjectAccessorWithLock getWriter(String path) {
+        String id = path.substring(path.lastIndexOf('/')+1);
+        synchronized (writers) {
+            StoreObjectAccessorWithLock writer = writers.get(id);
+            if (writer == null) {
+                writer = new StoreObjectAccessorLocking( objectStore.newAccessor(path) );
+                writers.put(id, writer);
+            }
+            return writer;
+        }
+    }
 
     @Override
     public BrooklynMementoManifest loadMementoManifest(RebindExceptionHandler exceptionHandler) throws IOException {
@@ -91,22 +101,22 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             enricherSubPathList = objectStore.listContentsWithSubPath("enrichers");
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            if (exceptionHandler!=null)
-                exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
+            exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
             throw new IllegalStateException("Failed to list memento files in "+objectStore, e);
         }
 
         Stopwatch stopwatch = Stopwatch.createStarted();
 
-        LOG.info("Loading memento from {}; {} entities, {} locations, {} policies, {} enrichers",
-            new Object[]{objectStore.getSummaryName(), entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size()});
+        LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            objectStore.getSummaryName() });
 
         BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
 
         for (String subPath : entitySubPathList) {
             try {
                 StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.read();
+                String contents = objectAccessor.get();
                 String id = (String) XmlUtil.xpath(contents, "/entity/id");
                 String type = (String) XmlUtil.xpath(contents, "/entity/type");
                 builder.entity(id, type);
@@ -118,7 +128,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         for (String subPath : locationSubPathList) {
             try {
                 StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.read();
+                String contents = objectAccessor.get();
                 String id = (String) XmlUtil.xpath(contents, "/location/id");
                 String type = (String) XmlUtil.xpath(contents, "/location/type");
                 builder.location(id, type);
@@ -130,7 +140,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         for (String subPath : policySubPathList) {
             try {
                 StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.read();
+                String contents = objectAccessor.get();
                 String id = (String) XmlUtil.xpath(contents, "/policy/id");
                 String type = (String) XmlUtil.xpath(contents, "/policy/type");
                 builder.policy(id, type);
@@ -142,7 +152,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         for (String subPath : enricherSubPathList) {
             try {
                 StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                String contents = objectAccessor.read();
+                String contents = objectAccessor.get();
                 String id = (String) XmlUtil.xpath(contents, "/enricher/id");
                 String type = (String) XmlUtil.xpath(contents, "/enricher/type");
                 builder.enricher(id, type);
@@ -175,13 +185,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             enricherSubPathList = objectStore.listContentsWithSubPath("enrichers");
         } catch (Exception e) {
             Exceptions.propagateIfFatal(e);
-            if (exceptionHandler!=null)
-                exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
+            exceptionHandler.onLoadBrooklynMementoFailed("Failed to list files", e);
             throw new IllegalStateException("Failed to list memento files in "+objectStore+": "+e, e);
         }
         
-        LOG.info("Loading memento from {}; {} entities, {} locations, {} policies, {} enrichers",
-                new Object[]{objectStore.getSummaryName(), entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size()});
+        LOG.debug("Loading persisted state: {} entities, {} locations, {} policies, {} enrichers, from {}", new Object[]{
+            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(), enricherSubPathList.size(),
+            objectStore.getSummaryName() });
 
         BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
         serializer.setLookupContext(lookupContext);
@@ -189,7 +199,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             for (String subPath : entitySubPathList) {
                 try {
                     StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EntityMemento memento = (EntityMemento) serializer.fromString(objectAccessor.read());
+                    EntityMemento memento = (EntityMemento) serializer.fromString(objectAccessor.get());
                     if (memento == null) {
                         LOG.warn("No entity-memento deserialized from " + subPath + "; ignoring and continuing");
                     } else {
@@ -205,7 +215,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             for (String subPath : locationSubPathList) {
                 try {
                     StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    LocationMemento memento = (LocationMemento) serializer.fromString(objectAccessor.read());
+                    LocationMemento memento = (LocationMemento) serializer.fromString(objectAccessor.get());
                     if (memento == null) {
                         LOG.warn("No location-memento deserialized from " + subPath + "; ignoring and continuing");
                     } else {
@@ -218,7 +228,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             for (String subPath : policySubPathList) {
                 try {
                     StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.read());
+                    PolicyMemento memento = (PolicyMemento) serializer.fromString(objectAccessor.get());
                     if (memento == null) {
                         LOG.warn("No policy-memento deserialized from " + subPath + "; ignoring and continuing");
                     } else {
@@ -231,7 +241,7 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             for (String subPath : enricherSubPathList) {
                 try {
                     StoreObjectAccessor objectAccessor = objectStore.newAccessor(subPath);
-                    EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.read());
+                    EnricherMemento memento = (EnricherMemento) serializer.fromString(objectAccessor.get());
                     if (memento == null) {
                         LOG.warn("No enricher-memento deserialized from " + subPath + "; ignoring and continuing");
                     } else {
@@ -259,16 +269,16 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         if (LOG.isDebugEnabled()) LOG.debug("Checkpointing entire memento");
         
         for (EntityMemento entity : newMemento.getEntityMementos().values()) {
-            persist(entity);
+            persist("entities", entity);
         }
         for (LocationMemento location : newMemento.getLocationMementos().values()) {
-            persist(location);
+            persist("locations", location);
         }
         for (PolicyMemento policy : newMemento.getPolicyMementos().values()) {
-            persist(policy);
+            persist("policies", policy);
         }
         for (EnricherMemento enricher : newMemento.getEnricherMementos().values()) {
-            persist(enricher);
+            persist("enrichers", enricher);
         }
     }
     
@@ -284,28 +294,29 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                 delta.removedEntityIds(), delta.removedLocationIds(), delta.removedPolicyIds()});
         
         for (EntityMemento entity : delta.entities()) {
-            persist(entity);
+            persist("entities", entity);
         }
         for (LocationMemento location : delta.locations()) {
-            persist(location);
+            persist("locations", location);
         }
         for (PolicyMemento policy : delta.policies()) {
-            persist(policy);
+            persist("policies", policy);
         }
         for (EnricherMemento enricher : delta.enrichers()) {
-            persist(enricher);
+            persist("enrichers", enricher);
         }
+        
         for (String id : delta.removedEntityIds()) {
-            deleteEntity(id);
+            delete("entities", id);
         }
         for (String id : delta.removedLocationIds()) {
-            deleteLocation(id);
+            delete("locations", id);
         }
         for (String id : delta.removedPolicyIds()) {
-            deletePolicy(id);
+            delete("policies", id);
         }
         for (String id : delta.removedEnricherIds()) {
-            deleteEnricher(id);
+            delete("enrichers", id);
         }
     }
 
@@ -316,93 +327,24 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     }
     
     public void waitForWritesCompleted(Duration timeout) throws InterruptedException, TimeoutException {
-        for (StoreObjectAccessor writer : entityWriters.values()) {
-            writer.waitForWriteCompleted(timeout);
-        }
-        for (StoreObjectAccessor writer : locationWriters.values()) {
-            writer.waitForWriteCompleted(timeout);
-        }
-        for (StoreObjectAccessor writer : policyWriters.values()) {
-            writer.waitForWriteCompleted(timeout);
-        }
-        for (StoreObjectAccessor writer : enricherWriters.values()) {
-            writer.waitForWriteCompleted(timeout);
-        }
+        for (StoreObjectAccessorWithLock writer : writers.values())
+            writer.waitForCurrentWrites(timeout);
     }
 
-    // TODO Promote somewhere sensible; share code with BrooklynLauncher.checkPersistenceDirAccessible
-    public static void checkDirIsAccessible(File dir) {
-        if (!(dir.exists() && dir.isDirectory() && dir.canRead() && dir.canWrite())) {
-            throw new IllegalStateException("Invalid directory "+dir+" because "+
-                    (!dir.exists() ? "does not exist" :
-                        (!dir.isDirectory() ? "not a directory" :
-                            (!dir.canRead() ? "not readable" :
-                                (!dir.canWrite() ? "not writable" : "unknown reason")))));
-        }
-    }
-    
-    private void persist(EntityMemento entity) {
-        StoreObjectAccessor writer = entityWriters.get(entity.getId());
-        if (writer == null) {
-            entityWriters.putIfAbsent(entity.getId(), objectStore.newAccessor("entities/"+entity.getId()));
-            writer = entityWriters.get(entity.getId());
-        }
-        writer.writeAsync(serializer.toString(entity));
-    }
-    
-    private void persist(LocationMemento location) {
-        StoreObjectAccessor writer = locationWriters.get(location.getId());
-        if (writer == null) {
-            locationWriters.putIfAbsent(location.getId(), objectStore.newAccessor("locations/"+location.getId()));
-            writer = locationWriters.get(location.getId());
-        }
-        writer.writeAsync(serializer.toString(location));
-    }
-    
-    private void persist(PolicyMemento policy) {
-        StoreObjectAccessor writer = policyWriters.get(policy.getId());
-        if (writer == null) {
-            policyWriters.putIfAbsent(policy.getId(), objectStore.newAccessor("policies/"+policy.getId()));
-            writer = policyWriters.get(policy.getId());
-        }
-        writer.writeAsync(serializer.toString(policy));
+    private void persist(String subPath, Memento entity) {
+        getWriter(getPath(subPath, entity.getId())).put(serializer.toString(entity));
     }
 
-    private void persist(EnricherMemento enricher) {
-        StoreObjectAccessor writer = enricherWriters.get(enricher.getId());
-        if (writer == null) {
-            enricherWriters.putIfAbsent(enricher.getId(), objectStore.newAccessor("enrichers/"+enricher.getId()));
-            writer = enricherWriters.get(enricher.getId());
+    private void delete(String subPath, String id) {
+        StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
+        w.delete();
+        synchronized (writers) {
+            writers.remove(id);
         }
-        writer.writeAsync(serializer.toString(enricher));
     }
 
-    private void deleteEntity(String id) {
-        StoreObjectAccessor writer = entityWriters.get(id);
-        if (writer != null) {
-            writer.deleteAsync();
-        }
-    }
-    
-    private void deleteLocation(String id) {
-        StoreObjectAccessor writer = locationWriters.get(id);
-        if (writer != null) {
-            writer.deleteAsync();
-        }
+    private String getPath(String subPath, String id) {
+        return subPath+"/"+id;
     }
     
-    private void deletePolicy(String id) {
-        StoreObjectAccessor writer = policyWriters.get(id);
-        if (writer != null) {
-            writer.deleteAsync();
-        }
-    }
-
-    private void deleteEnricher(String id) {
-        StoreObjectAccessor writer = enricherWriters.get(id);
-        if (writer != null) {
-            writer.deleteAsync();
-        }
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedObjectStore.java
index 3c5d326..b468082 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedObjectStore.java
@@ -18,6 +18,7 @@ import org.apache.commons.io.FileUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.config.BrooklynServerConfig;
 import brooklyn.management.ManagementContext;
 import brooklyn.util.exceptions.Exceptions;
 import brooklyn.util.exceptions.FatalConfigurationRuntimeException;
@@ -50,7 +51,7 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
     public FileBasedObjectStore(File basedir) {
         this.basedir = basedir;
         this.executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
-        log.info("File-based objectStore will use directory {}", basedir);
+        log.debug("File-based objectStore will use directory {}", basedir);
         // don't check accessible yet, we do that when we prepare
     }
 
@@ -74,7 +75,7 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
     public StoreObjectAccessor newAccessor(String path) {
         String tmpExt = ".tmp";
         if (mgmt!=null && mgmt.getManagementNodeId()!=null) tmpExt = "."+mgmt.getManagementNodeId()+tmpExt;
-        return new FileBasedStoreObjectAccessor(new File(Os.mergePaths(getBaseDir().getAbsolutePath(), path)), executor, tmpExt);
+        return new FileBasedStoreObjectAccessor(new File(Os.mergePaths(getBaseDir().getAbsolutePath(), path)), tmpExt);
     }
 
     @Override
@@ -98,10 +99,6 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
     }
 
     @Override
-    public void backupContents(String parentSubPath, String backupSubPath) {
-    }
-
-    @Override
     public void close() {
         executor.shutdown();
         try {
@@ -122,68 +119,82 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
             throw new IllegalStateException("Cannot change mgmt context of "+this);
         this.mgmt = mgmt;
 
-        if (persistMode!=null) {
-            File dir = getBaseDir();
-            try {
-                String persistencePath = dir.getAbsolutePath();
-
-                switch (persistMode) {
-                case CLEAN:
-                    if (dir.exists()) {
-                        checkPersistenceDirAccessible(dir);
-                        try {
-                            File old = moveDirectory(dir);
-                            log.info("Persist-clean using "+persistencePath+"; moved old directory to "+old.getAbsolutePath());
-                        } catch (IOException e) {
-                            throw new FatalConfigurationRuntimeException("Error moving old persistence directory "+dir.getAbsolutePath(), e);
-                        }
-                    } else {
-                        log.info("Persist-clean using "+persistencePath+"; no pre-existing persisted data");
-                    }
-                    break;
-                case REBIND:
+        if (persistMode==null || persistMode==PersistMode.DISABLED)
+            // is this check needed? shouldn't come here now without persistence on.
+            return;
+        
+        Boolean backups = mgmt.getConfig().getConfig(BrooklynServerConfig.PERSISTENCE_BACKUPS_REQUIRED);
+        if (backups==null) backups = true; // for file system
+
+        File dir = getBaseDir();
+        try {
+            String persistencePath = dir.getAbsolutePath();
+
+            switch (persistMode) {
+            case CLEAN:
+                if (dir.exists()) {
                     checkPersistenceDirAccessible(dir);
-                    checkPersistenceDirNonEmpty(dir);
                     try {
-                        File backup = backupDirectory(dir);
-                        log.info("Persist-rebind using "+persistencePath+"; backed up directory to "+backup.getAbsolutePath());
+                        if (backups) {
+                            File old = backupDirByMoving(dir);
+                            log.info("Persistence mode CLEAN, directory "+persistencePath+" backed up to "+old.getAbsolutePath());
+                        } else {
+                            deleteCompletely();
+                            log.info("Persistence mode CLEAN, directory "+persistencePath+" deleted");
+                        }
                     } catch (IOException e) {
-                        throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
+                        throw new FatalConfigurationRuntimeException("Error using existing persistence directory "+dir.getAbsolutePath(), e);
                     }
-                    break;
-                case AUTO:
-                    if (dir.exists()) {
-                        checkPersistenceDirAccessible(dir);
+                } else {
+                    log.debug("Persistence mode CLEAN, directory "+persistencePath+", no previous state");
+                }
+                break;
+            case REBIND:
+                checkPersistenceDirAccessible(dir);
+                checkPersistenceDirNonEmpty(dir);
+                try {
+                    if (backups) {
+                        File backup = backupDirByCopying(dir);
+                        log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath());
                     }
-                    if (dir.exists() && !isMementoDirExistButEmpty(dir)) {
-                        try {
-                            File backup = backupDirectory(dir);
-                            log.info("Persist-auto will rebind using "+persistencePath+"; backed up directory to "+backup.getAbsolutePath());
-                        } catch (IOException e) {
-                            throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
+                } catch (IOException e) {
+                    throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
+                }
+                break;
+            case AUTO:
+                if (dir.exists()) {
+                    checkPersistenceDirAccessible(dir);
+                }
+                if (dir.exists() && !isMementoDirExistButEmpty(dir)) {
+                    try {
+                        if (backups) {
+                            File backup = backupDirByCopying(dir);
+                            log.info("Persistence mode REBIND, directory "+persistencePath+" backed up to "+backup.getAbsolutePath());
                         }
-                    } else {
-                        log.info("Persist-auto using fresh "+persistencePath+"; no pre-existing persisted data");
-                    }
-                    break;
-                default:
-                    throw new FatalConfigurationRuntimeException("Unexpected persist mode "+persistMode+"; modified during initialization?!");
-                };
-
-                if (!dir.exists()) {
-                    boolean success = dir.mkdirs();
-                    if (!success) {
-                        throw new FatalConfigurationRuntimeException("Failed to create persistence directory "+dir);
+                    } catch (IOException e) {
+                        throw new FatalConfigurationRuntimeException("Error backing up persistence directory "+dir.getAbsolutePath(), e);
                     }
+                } else {
+                    log.debug("Persistence mode AUTO, directory "+persistencePath+", no previous state");
+                }
+                break;
+            default:
+                throw new FatalConfigurationRuntimeException("Unexpected persist mode "+persistMode+"; modified during initialization?!");
+            };
+
+            if (!dir.exists()) {
+                boolean success = dir.mkdirs();
+                if (!success) {
+                    throw new FatalConfigurationRuntimeException("Failed to create persistence directory "+dir);
                 }
-
-            } catch (Exception e) {
-                throw Exceptions.propagate(e);
             }
+
+        } catch (Exception e) {
+            throw Exceptions.propagate(e);
         }
     }
 
-    static void checkPersistenceDirAccessible(File dir) {
+    protected void checkPersistenceDirAccessible(File dir) {
         if (!(dir.exists() && dir.isDirectory() && dir.canRead() && dir.canWrite())) {
             throw new FatalConfigurationRuntimeException("Invalid persistence directory " + dir + ": " +
                     (!dir.exists() ? "does not exist" :
@@ -191,7 +202,7 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
                                     (!dir.canRead() ? "not readable" :
                                             (!dir.canWrite() ? "not writable" : "unknown reason")))));
         } else {
-            log.info("Directory {} has been created.", dir);
+            log.debug("Created dir {} for {}", dir, this);
         }
     }
 
@@ -203,34 +214,62 @@ public class FileBasedObjectStore implements PersistenceObjectStore {
         }
     }
 
-    static File backupDirectory(File dir) throws IOException, InterruptedException {
+    protected File backupDirByCopying(File dir) throws IOException, InterruptedException {
         File parentDir = dir.getParentFile();
         String simpleName = dir.getName();
         String timestamp = new SimpleDateFormat("yyyy-MM-dd-hhmm-ss").format(new Date());
         File backupDir = new File(parentDir, simpleName+"-"+timestamp+".bak");
         
-        String cmd = "cp -R "+dir.getAbsolutePath()+" "+backupDir.getAbsolutePath();
-        Process proc = Runtime.getRuntime().exec(cmd);
-        proc.waitFor();
-        if (proc.exitValue() != 0) {
-            throw new IOException("Error backing up directory, with command `"+cmd+"` (exit value "+proc.exitValue()+")");
-        }
+        copyDir(dir, backupDir);
         return backupDir;
     }
 
-    static File moveDirectory(File dir) throws InterruptedException, IOException {
+    protected File backupDirByMoving(File dir) throws InterruptedException, IOException {
         File parentDir = dir.getParentFile();
         String simpleName = dir.getName();
         String timestamp = new SimpleDateFormat("yyyy-MM-dd-hhmm-ss").format(new Date());
         File newDir = new File(parentDir, simpleName+"-"+timestamp+".old");
+
+        moveDir(dir, newDir);
+        return newDir;
+    }
+
+    /** 
+     * Attempts an fs level atomic move then fall back to pure java rename.
+     * Assumes files are on same mount point.
+     * <p>
+     * TODO Java 7 gives an atomic Files.move() which would be preferred.
+     */
+    static void moveFile(File srcFile, File destFile) throws IOException, InterruptedException {
+        if (!Os.isMicrosoftWindows()) {
+            String cmd = "mv '"+srcFile.getAbsolutePath()+"' '"+destFile.getAbsolutePath()+"'";
+            Process proc = Runtime.getRuntime().exec(cmd);
+            proc.waitFor();
+            if (proc.exitValue() == 0) return;
+        }
         
-        String cmd = "mv  "+dir.getAbsolutePath()+" "+newDir.getAbsolutePath();
-        Process proc = Runtime.getRuntime().exec(cmd);
-        proc.waitFor();
-        if (proc.exitValue() != 0) {
-            throw new IOException("Error moving directory, with command "+cmd);
+        destFile.delete();
+        srcFile.renameTo(destFile);
+    }
+    static void moveDir(File srcDir, File destDir) throws IOException, InterruptedException {
+        if (!Os.isMicrosoftWindows()) {
+            String cmd = "mv '"+srcDir.getAbsolutePath()+"' '"+destDir.getAbsolutePath()+"'";
+            Process proc = Runtime.getRuntime().exec(cmd);
+            proc.waitFor();
+            if (proc.exitValue() == 0) return;
         }
-        return newDir;
+        
+        FileUtils.moveDirectory(srcDir, destDir);
+    }
+    static void copyDir(File srcDir, File destDir) throws IOException, InterruptedException {
+        if (!Os.isMicrosoftWindows()) {
+            String cmd = "cp -R '"+srcDir.getAbsolutePath()+"' '"+destDir.getAbsolutePath()+"'";
+            Process proc = Runtime.getRuntime().exec(cmd);
+            proc.waitFor();
+            if (proc.exitValue() == 0) return;
+        }
+        
+        FileUtils.copyDirectory(srcDir, destDir);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedStoreObjectAccessor.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedStoreObjectAccessor.java b/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedStoreObjectAccessor.java
index a913031..60977a5 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedStoreObjectAccessor.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/FileBasedStoreObjectAccessor.java
@@ -2,291 +2,84 @@ package brooklyn.entity.rebind.persister;
 
 import java.io.File;
 import java.io.IOException;
-import java.util.concurrent.Callable;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-import java.util.concurrent.atomic.AtomicReference;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.text.Strings;
 
-import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Charsets;
 import com.google.common.base.Objects;
-import com.google.common.base.Stopwatch;
 import com.google.common.base.Throwables;
 import com.google.common.io.Files;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-
-import brooklyn.util.exceptions.Exceptions;
-import brooklyn.util.text.Strings;
-import brooklyn.util.time.CountdownTimer;
-import brooklyn.util.time.Duration;
-import brooklyn.util.time.Time;
 
 /**
- * For asynchronously writing to a file.
- *
- * This class is thread-safe. If a write is in progress, one will be scheduled. If a write is already 
- * scheduled, we will just rely on the existing one; otherwise we will write now.
+ * Reads/writes to a file. This impl does it immediately, with no synchronisation.
+ * Callers should wrap in {@link StoreObjectAccessorLocking} if multiple threads may be accessing this.
  *
  * @author aled
  */
 public class FileBasedStoreObjectAccessor implements PersistenceObjectStore.StoreObjectAccessor {
 
-    private static final Logger LOG = LoggerFactory.getLogger(FileBasedStoreObjectAccessor.class);
+    /**
+     * @param file
+     * @param executor A sequential executor (e.g. SingleThreadedExecutor, or equivalent)
+     */
+    public FileBasedStoreObjectAccessor(File file, String tmpExtension) {
+        this.file = file;
+        this.tmpFile = new File(file.getParentFile(), file.getName()+(Strings.isBlank(tmpExtension) ? ".tmp" : tmpExtension));
+    }
 
     private final File file;
     private final File tmpFile;
-    private final ListeningExecutorService executor;
-    private final AtomicBoolean executing = new AtomicBoolean();
-    private final AtomicReference<String> requireWrite = new AtomicReference<String>();
-    private final AtomicBoolean requireDelete = new AtomicBoolean();
-    private final AtomicBoolean deleted = new AtomicBoolean();
-    private final AtomicLong modCount = new AtomicLong();
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
-
+    
     @Override
-    public String read() {
-        // FIXME do we need to synchronize with writer?
-        return readFile(file);
-    }
-
-    private String readFile(File file) {
+    public String get() {
         try {
+            if (!exists()) return null;
             return Files.asCharSource(file, Charsets.UTF_8).read();
         } catch (IOException e) {
             throw Throwables.propagate(e);
         }
     }
 
-    /**
-     * @param file
-     * @param executor A sequential executor (e.g. SingleThreadedExecutor, or equivalent)
-     */
-    public FileBasedStoreObjectAccessor(File file, ListeningExecutorService executor, String tmpExtension) {
-        this.file = file;
-        this.executor = executor;
-        this.tmpFile = new File(file.getParentFile(), file.getName()+(Strings.isBlank(tmpExtension) ? ".tmp" : tmpExtension));
-    }
-
     @Override
     public boolean exists() {
         return file.exists();
     }
 
     @Override
-    public void writeAsync(String val) {
-        requireWrite.set(val);
-        if (requireDelete.get() || deleted.get()) {
-            LOG.warn("Not writing {}, because already deleted", file);
-        } else if (executing.compareAndSet(false, true)) {
-            if (LOG.isTraceEnabled()) LOG.trace("Submitting write task for {}", file);
-            writeAsyncImpl();
-        } else {
-            if (LOG.isTraceEnabled()) LOG.trace("Execution already in-progress for {}; recorded write-requirement; returning", file);
+    public void put(String val) {
+        try {
+            if (val==null) val = "";
+            Files.write(val, tmpFile, Charsets.UTF_8);
+            FileBasedObjectStore.moveFile(tmpFile, file);
+            
+        } catch (IOException e) {
+            throw Exceptions.propagate(e);
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
         }
     }
 
     @Override
     public void append(String val) {
         try {
-            lock.writeLock().lockInterruptibly();
-        } catch (InterruptedException e) {
-            throw Exceptions.propagate(e);
-        }
-        try {
-            Stopwatch stopwatch = Stopwatch.createStarted();
-
-            // Write to the temp file, then atomically move it to the permanent file location
+            if (val==null) val = "";
             Files.append(val, file, Charsets.UTF_8);
-            modCount.incrementAndGet();
-
-            if (LOG.isTraceEnabled()) LOG.trace("Wrote {}, took {}; modified file {} times",
-                    new Object[] {file, Time.makeTimeStringRounded(stopwatch), modCount});
+            
         } catch (IOException e) {
             throw Exceptions.propagate(e);
-        } finally {
-            lock.writeLock().unlock();
         }
     }
 
     @Override
-    public void deleteAsync() {
-        if (deleted.get() || requireDelete.get()) {
-            if (LOG.isDebugEnabled()) LOG.debug("Duplicate call to delete {}; ignoring", file);
-            return;
-        }
-
-        requireWrite.set(null);
-        requireDelete.set(true);
-        if (executing.compareAndSet(false, true)) {
-            if (LOG.isTraceEnabled()) LOG.trace("Submitting delete task for {}", file);
-            deleteAsyncImpl();
-        } else {
-            if (LOG.isTraceEnabled()) LOG.trace("Execution already in-progress for {}; recorded delete-requirement; returning", file);
-        }
-    }
-
-    /**
-     * This method must only be used for testing. If required in production, then revisit implementation!
-     */
-    @VisibleForTesting
-    public void waitForWriteCompleted(long timeout, TimeUnit unit) throws InterruptedException, TimeoutException {
-        waitForWriteCompleted(Duration.of(timeout, unit));
-    }
-
-    @VisibleForTesting
-    public void waitForWriteCompleted(Duration timeout) throws InterruptedException, TimeoutException {
-        // Every time we finish writing, we increment a counter. We note the current val, and then
-        // wait until we can guarantee that a complete additional write has been done. Not sufficient
-        // to wait for `writeCount > origWriteCount` because we might have read the value when it was 
-        // almost finished a write.
-        CountdownTimer timer = CountdownTimer.newInstanceStarted(timeout);
-
-        long origModCount = modCount.get();
-        while (true) {
-            if (modCount.get() > (origModCount+1)) {
-                return;
-            } else if (requireWrite.get() != null) {
-                // must continue waiting for mods+1
-            } else if (executing.get()) {
-                // must wait for either this invocation to complete, or mods+1 (because might have already updated)
-            } else {
-                return;
-            }
-
-            if (timer.isExpired()) {
-                throw new TimeoutException("Timeout waiting for pending complete of rebind-periodic-delta, after "+Time.makeTimeStringRounded(timeout));
-            }
-            Thread.sleep(10);
-        }
-    }
-
-    protected void deleteAsyncImpl() {
-        ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
-            @Override public Void call() throws IOException {
-                try {
-                    deleteNow();
-                    return null;
-                } catch (Throwable t) {
-                    if (executor.isShutdown()) {
-                        LOG.debug("Error deleting "+file+" (but executor shutdown)", t);
-                        return null; // just return without throwing; no more work to do
-                    } else {
-                        LOG.error("Error deleting "+file, t);
-                        throw Exceptions.propagate(t);
-                    }
-                }
-            }});
-        addPostExecListener(future);
-    }
-
-    protected void writeAsyncImpl() {
-        ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
-            @Override public Void call() throws IOException {
-                try {
-                    writeNow();
-                    return null;
-                } catch (Throwable t) {
-                    if (executor.isShutdown()) {
-                        LOG.debug("Error writing to "+file+" (but executor shutdown)", t);
-                        return null; // just return without throwing; no more work to do
-                    } else {
-                        LOG.error("Error writing to "+file, t);
-                        throw Exceptions.propagate(t);
-                    }
-                }
-            }});
-        addPostExecListener(future);
-    }
-
-    private void addPostExecListener(ListenableFuture<?> future) {
-        future.addListener(
-                new Runnable() {
-                    @Override public void run() {
-                        if (LOG.isTraceEnabled()) LOG.trace("Write complete for {}", file);
-                        try {
-                            executing.set(false);
-                            if (requireDelete.get()) {
-                                if (executing.compareAndSet(false, true)) {
-                                    if (LOG.isTraceEnabled()) LOG.trace("Submitting delete-task for {} (in post-exec) due to recorded delete-requirement", file);
-                                    deleteAsyncImpl();
-                                } else {
-                                    if (LOG.isTraceEnabled()) LOG.trace("Delete-requirement for {} (in post-exec) handled by other thread; returning", file);
-                                }
-
-                            } else if (requireWrite.get() != null) {
-                                if (executing.compareAndSet(false, true)) {
-                                    if (LOG.isTraceEnabled()) LOG.trace("Submitting write task for {} (in post-exec) due to recorded write-requirement", file);
-                                    writeAsyncImpl();
-                                } else {
-                                    if (LOG.isTraceEnabled()) LOG.trace("Write-requirement for {} (in post-exec) handled by other thread; returning", file);
-                                }
-                            } else {
-                                if (LOG.isTraceEnabled()) LOG.trace("No pending exec-requirements for {}", file);
-                            }
-                        } catch (Throwable t) {
-                            if (executor.isShutdown()) {
-                                LOG.debug("Error in post-exec for "+file+" (but executor shutdown)", t);
-                                return; // just return without throwing; no more work to do
-                            } else {
-                                LOG.error("Error in post-exec for "+file, t);
-                                throw Exceptions.propagate(t);
-                            }
-                        }
-                    }
-                },
-                MoreExecutors.sameThreadExecutor());
-    }
-
-    private void writeNow() throws IOException {
-        String val = requireWrite.getAndSet(null);
-        
-        /*
-         * Need to guarantee "happens before", with any thread that has written 
-         * fields of these mementos. In particular, saw failures where SshMachineLocation
-         * had null address field. Our hypothesis is that another thread wrote the memento,
-         * but that no synchronization subsequently happened so we did not see all the values
-         * in that memento from this thread.
-         * 
-         * See PeriodicDeltaChangeListener.persistNow for the corresponding synchronization,
-         * that guarantees its thread made the writes visible.
-         */
-        synchronized (new Object()) {}
-
-        Stopwatch stopwatch = Stopwatch.createStarted();
-
-        // Write to the temp file, then atomically move it to the permanent file location
-        Files.write(val, tmpFile, Charsets.UTF_8);
-        Files.move(tmpFile, file);
-
-        modCount.incrementAndGet();
-
-        if (LOG.isTraceEnabled()) LOG.trace("Wrote {}, took {}; modified file {} times",
-                new Object[] {file, Time.makeTimeStringRounded(stopwatch), modCount});
-    }
-
-    private void deleteNow() throws IOException {
-        if (LOG.isTraceEnabled()) LOG.trace("Deleting {} and {}", file, tmpFile);
-        deleted.set(true);
-        requireDelete.set(false);
-
+    public void delete() {
         file.delete();
         tmpFile.delete();
-
-        modCount.incrementAndGet();
     }
 
     @Override
     public String toString() {
         return Objects.toStringHelper(this).add("file", file).toString();
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
deleted file mode 100644
index b89d611..0000000
--- a/core/src/main/java/brooklyn/entity/rebind/persister/InMemoryObjectStore.java
+++ /dev/null
@@ -1,120 +0,0 @@
-package brooklyn.entity.rebind.persister;
-
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeoutException;
-
-import javax.annotation.Nullable;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import brooklyn.management.ManagementContext;
-import brooklyn.util.collections.MutableList;
-import brooklyn.util.collections.MutableMap;
-import brooklyn.util.time.Duration;
-
-import com.google.common.base.Objects;
-
-public class InMemoryObjectStore implements PersistenceObjectStore {
-
-    private static final Logger log = LoggerFactory.getLogger(InMemoryObjectStore.class);
-
-    Map<String,String> filesByName = MutableMap.<String,String>of();
-    boolean prepared = false;
-    
-    public InMemoryObjectStore() {
-        log.info("Using memory-based objectStore");
-    }
-    
-    @Override
-    public String getSummaryName() {
-        return "in-memory (test) persistence store";
-    }
-    
-    @Override
-    public void createSubPath(String subPath) {
-        if (!prepared) throw new IllegalStateException("prepare method not yet invoked: "+this);
-    }
-
-    @Override
-    public StoreObjectAccessor newAccessor(final String path) {
-        return new StoreObjectAccessor() {
-            @Override
-            public void writeAsync(String val) {
-                synchronized (filesByName) {
-                    filesByName.put(path, val);
-                }
-            }
-            
-            @Override
-            public void waitForWriteCompleted(Duration timeout) throws InterruptedException, TimeoutException {
-            }
-            
-            @Override
-            public String read() {
-                synchronized (filesByName) {
-                    return filesByName.get(path);
-                }
-            }
-            
-            @Override
-            public boolean exists() {
-                synchronized (filesByName) {
-                    return filesByName.containsKey(path);
-                }
-            }
-            
-            @Override
-            public void deleteAsync() {
-                synchronized (filesByName) {
-                    filesByName.remove(path);
-                }
-            }
-            
-            @Override
-            public void append(String s) {
-                synchronized (filesByName) {
-                    writeAsync(read()+s);
-                }
-            }
-        };
-    }
-
-    @Override
-    public List<String> listContentsWithSubPath(final String parentSubPath) {
-        synchronized (filesByName) {
-            List<String> result = MutableList.of();
-            for (String file: filesByName.keySet())
-                if (file.startsWith(parentSubPath))
-                    result.add(file);
-            return result;
-        }
-    }
-
-    @Override
-    public void backupContents(String parentSubPath, String backupSubPath) {
-    }
-
-    @Override
-    public void close() {
-    }
-
-    @Override
-    public String toString() {
-        return Objects.toStringHelper(this).add("size", filesByName.size()).toString();
-    }
-
-    @Override
-    public void prepareForUse(ManagementContext mgmt, @Nullable PersistMode persistMode) {
-        prepared = true;
-    }
-
-    @Override
-    public void deleteCompletely() {
-        synchronized (filesByName) {
-            filesByName.clear();
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/MementoFileWriter.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/MementoFileWriter.java b/core/src/main/java/brooklyn/entity/rebind/persister/MementoFileWriter.java
index 19988f9..83af866 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/MementoFileWriter.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/MementoFileWriter.java
@@ -127,7 +127,7 @@ public class MementoFileWriter<T> {
         }
     }
 
-    public void deleteAsync() {
+    private void deleteAsync() {
         ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
             @Override public Void call() throws IOException {
                 try {
@@ -146,7 +146,7 @@ public class MementoFileWriter<T> {
         addPostExecListener(future);
     }
 
-    public void writeAsync() {
+    private void writeAsync() {
         ListenableFuture<Void> future = executor.submit(new Callable<Void>() {
             @Override public Void call() throws IOException {
                 try {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/PersistenceObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/PersistenceObjectStore.java b/core/src/main/java/brooklyn/entity/rebind/persister/PersistenceObjectStore.java
index 30cacdb..c75c6a1 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/PersistenceObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/PersistenceObjectStore.java
@@ -2,10 +2,13 @@ package brooklyn.entity.rebind.persister;
 
 import java.util.List;
 import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
 
 import brooklyn.management.ManagementContext;
 import brooklyn.util.time.Duration;
 
+import com.google.common.annotations.Beta;
+
 /**
  * Interface for working with persistence targets, including file system and jclouds object stores.
  * @author Andrea Turli
@@ -14,20 +17,39 @@ public interface PersistenceObjectStore {
 
     /** accessor to an object/item in a {@link PersistenceObjectStore} */
     public interface StoreObjectAccessor {
+        /** gets the object, or null if not found */
+        String get();
         boolean exists();
-        void writeAsync(String val);
-        void append(String s);
-        void deleteAsync();
-        String read();
-        public void waitForWriteCompleted(Duration timeout) throws InterruptedException, TimeoutException;
+        void put(String contentsToReplaceOrCreate);
+        void append(String contentsToAppendOrCreate);
+        void delete();
+    }
+    public interface StoreObjectAccessorWithLock extends StoreObjectAccessor {
+        /** waits for all currently scheduled write lock operations (puts, appends, and deletes) to complete;
+         * but does not wait on or prevent subsequent modifications.
+         * this is suitable for a model where the caller is managing synchronization.
+         * <p> 
+         * for more complex uses, readers should <code>getLockObject().readLock().lockInterruptibly()</code> 
+         * and ensure they subsequently <code>unlock()</code> it of course. see {@link #getLockObject()}. */
+        void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException;
+        
+        /** returns the underlying lock in case callers need more complex synchronization control */ 
+        ReadWriteLock getLockObject();
     }
 
     /** human-readable name of this object store */
     public String getSummaryName();
     
     /**
-     * For reading/writing data to the item at the given path
+     * For reading/writing data to the item at the given path.
+     * Note that the accessor is not generally thread safe, usually does not support blocking,
+     * and multiple instances may conflict with each other.
+     * <p>
+     * Clients should wrap in a dedicated {@link StoreObjectAccessorLocking} and share
+     * if multiple threads may be accessing the store.
      */
+    // TODO this is not a nice API, better would be to do caching here probably,
+    // but we've already been doing it this way above for now (Jun 2014)
     StoreObjectAccessor newAccessor(String path);
 
     /** create the directory at the given subPath relative to the base of this store */
@@ -42,15 +64,9 @@ public interface PersistenceObjectStore {
      */
     List<String> listContentsWithSubPath(String subPath);
 
-    /**
-     * Makes a copy of all objects under parentSubPath.
-     * For example, if a file-based ObjectStore is configured to write to file://path/to/root/
-     * then parentSubPath="abc", backupSubPath="abc.bak" would be the same as:
-     * `cp -R /path/to/root/abc/* /path/to/root/abc.bak/`
-     * <p>
-     * May be no-op if using a substrate which can provide this by alternate means (e.g. an object store).
-     */
-    void backupContents(String sourceSubPath, String targetSubPathForBackups);
+    /** Entirely delete the contents of this persistence location.
+     * Use with care, primarily in tests. This will recursively wipe the indicated location. */ 
+    public void deleteCompletely();
     
     /**
      * Closes all resources used by this ObjectStore. No subsequent calls should be made to the ObjectStore;
@@ -68,10 +84,9 @@ public interface PersistenceObjectStore {
      * <p>
      * currently subsequent changes are not permitted.
      */
-   public void prepareForUse(ManagementContext managementContext, PersistMode persistMode);
-
-   /** Entirely delete the contents of this persistence location.
-    * Use with care, primarily in tests. This will recursively wipe the indicated location. */ 
-   public void deleteCompletely();
+    // although there is some commonality between the different stores it is mostly different,
+    // so this method currently sits here; it may move if advanced backup strategies have commonalities
+    @Beta
+    public void prepareForUse(ManagementContext managementContext, PersistMode persistMode);
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/de3fb4be/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
new file mode 100644
index 0000000..1e94703
--- /dev/null
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/StoreObjectAccessorLocking.java
@@ -0,0 +1,153 @@
+package brooklyn.entity.rebind.persister;
+
+import java.util.Comparator;
+import java.util.Set;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.time.Duration;
+
+public class StoreObjectAccessorLocking implements PersistenceObjectStore.StoreObjectAccessorWithLock {
+
+    protected static class ThreadComparator implements Comparator<Thread> {
+        @Override
+        public int compare(Thread o1, Thread o2) {
+            if (o1.getId()<o2.getId()) return -1;
+            if (o1.getId()>o2.getId()) return 1;
+            return 0;
+        }
+    }
+    
+    ReadWriteLock lock = new ReentrantReadWriteLock(true);
+    Set<Thread> queuedReaders = new ConcurrentSkipListSet<Thread>(new ThreadComparator());
+    Set<Thread> queuedWriters = new ConcurrentSkipListSet<Thread>(new ThreadComparator());
+    
+    final PersistenceObjectStore.StoreObjectAccessor delegate;
+    
+    public StoreObjectAccessorLocking(PersistenceObjectStore.StoreObjectAccessor delegate) {
+        this.delegate = delegate;
+    }
+    
+    @Override
+    public String get() {
+        try {
+            queuedReaders.add(Thread.currentThread());
+            lock.readLock().lockInterruptibly();
+            try {
+                return delegate.get();
+                
+            } finally {
+                lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            queuedReaders.remove(Thread.currentThread());
+        }
+    }
+
+    @Override
+    public boolean exists() {
+        try {
+            queuedReaders.add(Thread.currentThread());
+            lock.readLock().lockInterruptibly();
+            try {
+                return delegate.exists();
+                
+            } finally {
+                lock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            queuedReaders.remove(Thread.currentThread());
+        }
+    }
+
+    protected boolean hasScheduledPutOrDeleteWithNoRead() {
+        // skip write if there is another write queued and no reader waiting
+        return (!queuedWriters.isEmpty() && queuedReaders.isEmpty());
+    }
+    
+    @Override
+    public void put(String val) {
+        try {
+            queuedWriters.add(Thread.currentThread());
+            lock.writeLock().lockInterruptibly();
+            try {
+                queuedWriters.remove(Thread.currentThread());
+                if (hasScheduledPutOrDeleteWithNoRead()) 
+                    // don't bother writing if someone will write after us and no one is reading
+                    return;
+                delegate.put(val);
+                
+            } finally {
+                lock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            queuedWriters.remove(Thread.currentThread());
+        }
+    }
+    
+    @Override
+    public void append(String val) {
+        try {
+            lock.writeLock().lockInterruptibly();
+            try {
+                if (hasScheduledPutOrDeleteWithNoRead())
+                    // don't bother appending if someone will write after us and no one is reading
+                    return;
+                
+                delegate.append(val);
+                
+            } finally {
+                lock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+    
+    @Override
+    public void delete() {
+        try {
+            queuedWriters.add(Thread.currentThread());
+            lock.writeLock().lockInterruptibly();
+            try {
+                queuedWriters.remove(Thread.currentThread());
+                if (hasScheduledPutOrDeleteWithNoRead()) 
+                    // don't bother deleting if someone will write after us and no one is reading
+                    return;
+                delegate.delete();
+                
+            } finally {
+                lock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        } finally {
+            queuedWriters.remove(Thread.currentThread());
+        }
+    }
+    
+    @Override
+    public void waitForCurrentWrites(Duration timeout) throws InterruptedException, TimeoutException {
+        try {
+            lock.readLock().lockInterruptibly();
+            lock.readLock().unlock();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
+        }
+    }
+
+    @Override
+    public ReadWriteLock getLockObject() {
+        return lock;
+    }
+    
+}


Mime
View raw message