brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [2/8] git commit: Adds persister.loadMementoRawData
Date Thu, 25 Sep 2014 17:15:03 GMT
Adds persister.loadMementoRawData

- Refactors BrooklynMementoPersisterToObjectStore to remove duplication
- Adds BrooklynMementoRawData, which has the contents of the raw
  blobs/files for persistence (rather than deserialised mementos)
- Adds BrooklynMementoPersisterToObjectStore.loadMementoRawData
- Adds BrooklynMementoPersisterToObjectStore.checkpoint(BrooklynMementoRawData)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/bba5f3e5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/bba5f3e5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/bba5f3e5

Branch: refs/heads/master
Commit: bba5f3e545f3b91fdee78b2b99d122344194187f
Parents: e8d8a72
Author: Aled Sage <aled.sage@gmail.com>
Authored: Wed Sep 10 10:28:30 2014 +0100
Committer: Aled Sage <aled.sage@gmail.com>
Committed: Thu Sep 18 10:13:04 2014 +0100

----------------------------------------------------------------------
 .../rebind/PersistenceExceptionHandler.java     |   2 +
 .../mementos/BrooklynMementoRawData.java        | 110 ++++++
 .../rebind/PersistenceExceptionHandlerImpl.java |   6 +
 .../BrooklynMementoPersisterToObjectStore.java  | 365 +++++++++++--------
 .../BrooklynMementoPersisterTestFixture.java    |  40 +-
 ...nMementoPersisterJcloudsObjectStoreTest.java |  12 +-
 6 files changed, 363 insertions(+), 172 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/api/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandler.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandler.java b/api/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandler.java
index 120512d..ba0d12c 100644
--- a/api/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandler.java
+++ b/api/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandler.java
@@ -37,5 +37,7 @@ public interface PersistenceExceptionHandler {
     
     void onPersistMementoFailed(Memento memento, Exception e);
     
+    void onPersistRawMementoFailed(BrooklynObjectType type, String id, Exception e);
+
     void onDeleteMementoFailed(String id, Exception e);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
new file mode 100644
index 0000000..b010ab8
--- /dev/null
+++ b/api/src/main/java/brooklyn/mementos/BrooklynMementoRawData.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.mementos;
+
+import java.util.Collections;
+import java.util.Map;
+
+import com.google.common.annotations.Beta;
+import com.google.common.collect.Maps;
+
+/**
+ * Represents the raw persisted data.
+ */
+@Beta
+public class BrooklynMementoRawData {
+
+    // TODO Should this be on an interface?
+    // The file-based (or object-store based) structure for storing data may well change;
is this representation sufficient?
+
+    public static Builder builder() {
+        return new Builder();
+    }
+    
+    public static class Builder {
+        protected String brooklynVersion;
+        protected final Map<String, String> entities = Maps.newConcurrentMap();
+        protected final Map<String, String> locations = Maps.newConcurrentMap();
+        protected final Map<String, String> policies = Maps.newConcurrentMap();
+        protected final Map<String, String> enrichers = Maps.newConcurrentMap();
+        
+        public Builder brooklynVersion(String val) {
+            brooklynVersion = val; return this;
+        }
+        public Builder entity(String id, String val) {
+            entities.put(id, val); return this;
+        }
+        public Builder entities(Map<String, String> vals) {
+            entities.putAll(vals); return this;
+        }
+        public Builder location(String id, String val) {
+            locations.put(id, val); return this;
+        }
+        public Builder locations(Map<String, String> vals) {
+            locations.putAll(vals); return this;
+        }
+        public Builder policy(String id, String val) {
+            policies.put(id, val); return this;
+        }
+        public Builder policies(Map<String, String> vals) {
+            policies.putAll(vals); return this;
+        }
+        public Builder enricher(String id, String val) {
+            enrichers.put(id, val); return this;
+        }
+        public Builder enrichers(Map<String, String> vals) {
+            enrichers.putAll(vals); return this;
+        }
+        public BrooklynMementoRawData build() {
+            return new BrooklynMementoRawData(this);
+        }
+    }
+
+    private final Map<String, String> entities;
+    private final Map<String, String> locations;
+    private final Map<String, String> policies;
+    private final Map<String, String> enrichers;
+    
+    private BrooklynMementoRawData(Builder builder) {
+        entities = builder.entities;
+        locations = builder.locations;
+        policies = builder.policies;
+        enrichers = builder.enrichers;
+    }
+
+    public Map<String, String> getEntities() {
+        return Collections.unmodifiableMap(entities);
+    }
+
+    public Map<String, String> getLocations() {
+        return Collections.unmodifiableMap(locations);
+    }
+
+    public Map<String, String> getPolicies() {
+        return Collections.unmodifiableMap(policies);
+    }
+
+    public Map<String, String> getEnrichers() {
+        return Collections.unmodifiableMap(enrichers);
+    }
+    
+    public boolean isEmpty() {
+        return entities.isEmpty() && locations.isEmpty() && policies.isEmpty()
&& enrichers.isEmpty();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/core/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandlerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandlerImpl.java
b/core/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandlerImpl.java
index 799da62..aa16710 100644
--- a/core/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandlerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/PersistenceExceptionHandlerImpl.java
@@ -71,6 +71,12 @@ public class PersistenceExceptionHandlerImpl implements PersistenceExceptionHand
     }
     
     @Override
+    public void onPersistRawMementoFailed(BrooklynObjectType type, String id, Exception e)
{
+        String errmsg = "persist for "+type+" "+"("+id+")";
+        onErrorImpl(errmsg, e, prevFailedPersisters.add(id));
+    }
+    
+    @Override
     public void onDeleteMementoFailed(String id, Exception e) {
         String errmsg = "delete for memento "+id;
         onErrorImpl(errmsg, e, prevFailedPersisters.add(id));

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
index 0a62332..e98bfd6 100644
--- a/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
+++ b/core/src/main/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterToObjectStore.java
@@ -50,6 +50,7 @@ import brooklyn.entity.rebind.persister.PersistenceObjectStore.StoreObjectAccess
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.mementos.BrooklynMementoManifest;
 import brooklyn.mementos.BrooklynMementoPersister;
+import brooklyn.mementos.BrooklynMementoRawData;
 import brooklyn.mementos.EnricherMemento;
 import brooklyn.mementos.EntityMemento;
 import brooklyn.mementos.LocationMemento;
@@ -61,6 +62,7 @@ import brooklyn.util.time.Duration;
 import brooklyn.util.time.Time;
 import brooklyn.util.xstream.XmlUtil;
 
+import com.google.common.annotations.Beta;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Stopwatch;
 import com.google.common.collect.Lists;
@@ -163,8 +165,143 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
 
+    @Beta
+    public BrooklynMementoRawData loadMementoRawData(final RebindExceptionHandler exceptionHandler)
throws IOException {
+        final BrooklynMementoRawData.Builder builder = BrooklynMementoRawData.builder();
+
+        Visitor visitor = new Visitor() {
+            @Override
+            public void visit(String contents, BrooklynObjectType type, String subPath) throws
Exception {
+                switch (type) {
+                    case ENTITY:
+                        builder.entity((String) XmlUtil.xpath(contents, "/entity/id"), contents);
+                        break;
+                    case LOCATION:
+                        builder.location((String) XmlUtil.xpath(contents, "/location/id"),
contents);
+                        break;
+                    case POLICY:
+                        builder.policy((String) XmlUtil.xpath(contents, "/policy/id"), contents);
+                        break;
+                    case ENRICHER:
+                        builder.enricher((String) XmlUtil.xpath(contents, "/enricher/id"),
contents);
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected brooklyn type: "+type);
+                }
+            }
+        };
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        visitMemento(visitor, exceptionHandler);
+        
+        BrooklynMementoRawData result = builder.build();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies,
{} enrichers, from {}", new Object[]{
+                     Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)),
result.getEntities().size(), 
+                     result.getLocations().size(), result.getPolicies().size(), result.getEnrichers().size(),
+                     objectStore.getSummaryName() });
+        }
+
+        return result;
+    }
+
     @Override
     public BrooklynMementoManifest loadMementoManifest(final RebindExceptionHandler exceptionHandler)
throws IOException {
+        final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
+
+        Visitor visitor = new Visitor() {
+            @Override
+            public void visit(String contents, BrooklynObjectType type, String subPath) throws
Exception {
+                switch (type) {
+                    case ENTITY:
+                        String id = (String) XmlUtil.xpath(contents, "/entity/id");
+                        String objType = (String) XmlUtil.xpath(contents, "/entity/type");
+                        builder.entity(id, objType);
+                        break;
+                    case LOCATION:
+                        id = (String) XmlUtil.xpath(contents, "/location/id");
+                        objType = (String) XmlUtil.xpath(contents, "/location/type");
+                        builder.location(id, objType);
+                        break;
+                    case POLICY:
+                        id = (String) XmlUtil.xpath(contents, "/policy/id");
+                        objType = (String) XmlUtil.xpath(contents, "/policy/type");
+                        builder.policy(id, objType);
+                        break;
+                    case ENRICHER:
+                        id = (String) XmlUtil.xpath(contents, "/enricher/id");
+                        objType = (String) XmlUtil.xpath(contents, "/enricher/type");
+                        builder.enricher(id, objType);
+                        break;
+                    default:
+                        throw new IllegalStateException("Unexpected brooklyn type: "+type);
+                }
+            }
+        };
+
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        visitMemento(visitor, exceptionHandler);
+        
+        BrooklynMementoManifest result = builder.build();
+
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies,
{} enrichers, from {}", new Object[]{
+                     Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)),
result.getEntityIdToType().size(), 
+                     result.getLocationIdToType().size(), result.getPolicyIdToType().size(),
result.getEnricherIdToType().size(),
+                     objectStore.getSummaryName() });
+        }
+
+        return result;
+    }
+
+    @Override
+    public BrooklynMemento loadMemento(LookupContext lookupContext, final RebindExceptionHandler
exceptionHandler) throws IOException {
+        Stopwatch stopwatch = Stopwatch.createStarted();
+
+        final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
+        
+        Visitor visitor = new Visitor() {
+            @Override
+            public void visit(String contents, BrooklynObjectType type, String subPath) throws
Exception {
+                try {
+                    Memento memento = (Memento) serializer.fromString(contents);
+                    if (memento == null) {
+                        LOG.warn("No "+type.toString().toLowerCase()+"-memento deserialized
from " + subPath + "; ignoring and continuing");
+                    } else {
+                        builder.memento(memento);
+                    }
+                } catch (Exception e) {
+                    exceptionHandler.onLoadMementoFailed(type, "Memento "+subPath, e);
+                }
+            }
+        };
+
+        serializer.setLookupContext(lookupContext);
+        try {
+            visitMemento(visitor, exceptionHandler);
+        } finally {
+            serializer.unsetLookupContext();
+        }
+
+        BrooklynMemento result = builder.build();
+        
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Loaded memento; took {}; {} entities, {} locations, {} policies, {}
enrichers, from {}", new Object[]{
+                      Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)),
result.getEntityIds().size(), 
+                      result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
+                      objectStore.getSummaryName() });
+        }
+        
+        return result;
+    }
+    
+    protected interface Visitor {
+        public void visit(String contents, BrooklynObjectType type, String subPath) throws
Exception;
+    }
+    protected void visitMemento(final Visitor visitor, final RebindExceptionHandler exceptionHandler)
throws IOException {
         if (!running) {
             throw new IllegalStateException("Persister not running; cannot load memento manifest
from " + objectStore.getSummaryName());
         }
@@ -184,71 +321,41 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             throw new IllegalStateException("Failed to list memento files in "+objectStore,
e);
         }
 
-        Stopwatch stopwatch = Stopwatch.createStarted();
-
         LOG.debug("Scanning persisted state: {} entities, {} locations, {} policies, {} enrichers,
from {}", new Object[]{
             entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(),
enricherSubPathList.size(),
             objectStore.getSummaryName() });
 
-        final BrooklynMementoManifestImpl.Builder builder = BrooklynMementoManifestImpl.builder();
-
         List<ListenableFuture<?>> futures = Lists.newArrayList();
         
+        class VisitorWrapper implements Runnable {
+            private final String subPath;
+            private final BrooklynObjectType type;
+            public VisitorWrapper(String subPath, BrooklynObjectType type) {
+                this.subPath = subPath;
+                this.type = type;
+            }
+            public void run() {
+                try {
+                    String contents = read(subPath);
+                    visitor.visit(contents, type, subPath);
+                } catch (Exception e) {
+                    Exceptions.propagateIfFatal(e);
+                    exceptionHandler.onLoadMementoFailed(type, "Memento "+subPath, e);
+                }
+            }
+        }
+        
         for (final String subPath : entitySubPathList) {
-            futures.add(executor.submit(new Runnable() {
-                public void run() {
-                    try {
-                        String contents = read(subPath);
-                        String id = (String) XmlUtil.xpath(contents, "/entity/id");
-                        String type = (String) XmlUtil.xpath(contents, "/entity/type");
-                        builder.entity(id, type);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        exceptionHandler.onLoadMementoFailed(BrooklynObjectType.ENTITY, "Memento
"+subPath, e);
-                    }
-                }}));
+            futures.add(executor.submit(new VisitorWrapper(subPath, BrooklynObjectType.ENTITY)));
         }
         for (final String subPath : locationSubPathList) {
-            futures.add(executor.submit(new Runnable() {
-                public void run() {
-                    try {
-                        String contents = read(subPath);
-                        String id = (String) XmlUtil.xpath(contents, "/location/id");
-                        String type = (String) XmlUtil.xpath(contents, "/location/type");
-                        builder.location(id, type);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        exceptionHandler.onLoadMementoFailed(BrooklynObjectType.LOCATION,
"Memento "+subPath, e);
-                    }
-                }}));
+            futures.add(executor.submit(new VisitorWrapper(subPath, BrooklynObjectType.LOCATION)));
         }
         for (final String subPath : policySubPathList) {
-            futures.add(executor.submit(new Runnable() {
-                public void run() {
-                    try {
-                        String contents = read(subPath);
-                        String id = (String) XmlUtil.xpath(contents, "/policy/id");
-                        String type = (String) XmlUtil.xpath(contents, "/policy/type");
-                        builder.policy(id, type);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        exceptionHandler.onLoadMementoFailed(BrooklynObjectType.POLICY, "Memento
"+subPath, e);
-                    }
-                }}));
+            futures.add(executor.submit(new VisitorWrapper(subPath, BrooklynObjectType.POLICY)));
         }
         for (final String subPath : enricherSubPathList) {
-            futures.add(executor.submit(new Runnable() {
-                public void run() {
-                    try {
-                        String contents = read(subPath);
-                        String id = (String) XmlUtil.xpath(contents, "/enricher/id");
-                        String type = (String) XmlUtil.xpath(contents, "/enricher/type");
-                        builder.enricher(id, type);
-                    } catch (Exception e) {
-                        Exceptions.propagateIfFatal(e);
-                        exceptionHandler.onLoadMementoFailed(BrooklynObjectType.ENRICHER,
"Memento "+subPath, e);
-                    }
-                }}));
+            futures.add(executor.submit(new VisitorWrapper(subPath, BrooklynObjectType.ENRICHER)));
         }
 
         try {
@@ -279,134 +386,56 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
                 throw new CompoundRuntimeException("Problem loading mementos", exceptions);
             }
         }
-
-        BrooklynMementoManifest result = builder.build();
-
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Loaded memento manifest; took {}; {} entities, {} locations, {} policies,
{} enrichers, from {}", new Object[]{
-                     Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)),
result.getEntityIdToType().size(), 
-                     result.getLocationIdToType().size(), result.getPolicyIdToType().size(),
result.getEnricherIdToType().size(),
-                     objectStore.getSummaryName() });
-        }
-
-        if (result.getEntityIdToType().size() != entitySubPathList.size()) {
-            LOG.error("Lost an entity?!");
-        }
-        
-        return result;
     }
 
-    @Override
-    public BrooklynMemento loadMemento(LookupContext lookupContext, final RebindExceptionHandler
exceptionHandler) throws IOException {
+    @Beta
+    public void checkpoint(BrooklynMementoRawData newMemento, PersistenceExceptionHandler
exceptionHandler) {
         if (!running) {
-            throw new IllegalStateException("Persister not running; cannot load memento from
" + objectStore.getSummaryName());
-        }
-        Stopwatch stopwatch = Stopwatch.createStarted();
-
-        List<String> entitySubPathList;
-        List<String> locationSubPathList;
-        List<String> policySubPathList;
-        List<String> enricherSubPathList;
-        try {
-            entitySubPathList = objectStore.listContentsWithSubPath("entities");
-            locationSubPathList = objectStore.listContentsWithSubPath("locations");
-            policySubPathList = objectStore.listContentsWithSubPath("policies");
-            enricherSubPathList = objectStore.listContentsWithSubPath("enrichers");
-        } catch (Exception e) {
-            Exceptions.propagateIfFatal(e);
-            exceptionHandler.onLoadMementoFailed(BrooklynObjectType.UNKNOWN, "Failed to list
files", e);
-            throw new IllegalStateException("Failed to list memento files in "+objectStore+":
"+e, e);
+            if (LOG.isDebugEnabled()) LOG.debug("Ignoring checkpointing entire memento, because
not running");
+            return;
         }
         
-        LOG.debug("Loading persisted state: {} entities, {} locations, {} policies, {} enrichers,
from {}", new Object[]{
-            entitySubPathList.size(), locationSubPathList.size(), policySubPathList.size(),
enricherSubPathList.size(),
-            objectStore.getSummaryName() });
-
-        final BrooklynMementoImpl.Builder builder = BrooklynMementoImpl.builder();
-        serializer.setLookupContext(lookupContext);
-        
-        List<ListenableFuture<?>> futures = Lists.newArrayList();
-
-        class MementoLoader implements Runnable {
-            private final String subPath;
-            private final BrooklynObjectType type;
-            public MementoLoader(String subPath, BrooklynObjectType type) {
-                this.subPath = subPath;
-                this.type = type;
-            }
-            public void run() {
-                try {
-                    Memento memento = (Memento) serializer.fromString(read(subPath));
-                    if (memento == null) {
-                        LOG.warn("No "+type.toString().toLowerCase()+"-memento deserialized
from " + subPath + "; ignoring and continuing");
-                    } else {
-                        builder.memento(memento);
-                    }
-                } catch (Exception e) {
-                    exceptionHandler.onLoadMementoFailed(type, "Memento "+subPath, e);
-                }
-            }
+        try {
+            lock.writeLock().lockInterruptibly();
+        } catch (InterruptedException e) {
+            throw Exceptions.propagate(e);
         }
-
         try {
-            for (final String subPath : entitySubPathList) {
-                futures.add(executor.submit(new MementoLoader(subPath, BrooklynObjectType.ENTITY)));
+            objectStore.prepareForMasterUse();
+            
+            Stopwatch stopwatch = Stopwatch.createStarted();
+            List<ListenableFuture<?>> futures = Lists.newArrayList();
+            
+            for (Map.Entry<String, String> entry : newMemento.getEntities().entrySet())
{
+                futures.add(asyncPersist("entities", BrooklynObjectType.ENTITY, entry.getKey(),
entry.getValue(), exceptionHandler));
             }
-            for (final String subPath : locationSubPathList) {
-                futures.add(executor.submit(new MementoLoader(subPath, BrooklynObjectType.LOCATION)));
+            for (Map.Entry<String, String> entry : newMemento.getLocations().entrySet())
{
+                futures.add(asyncPersist("locations", BrooklynObjectType.LOCATION, entry.getKey(),
entry.getValue(), exceptionHandler));
             }
-            for (final String subPath : policySubPathList) {
-                futures.add(executor.submit(new MementoLoader(subPath, BrooklynObjectType.POLICY)));
+            for (Map.Entry<String, String> entry : newMemento.getPolicies().entrySet())
{
+                futures.add(asyncPersist("policies", BrooklynObjectType.POLICY, entry.getKey(),
entry.getValue(), exceptionHandler));
             }
-            for (final String subPath : enricherSubPathList) {
-                futures.add(executor.submit(new MementoLoader(subPath, BrooklynObjectType.ENRICHER)));
+            for (Map.Entry<String, String> entry : newMemento.getEnrichers().entrySet())
{
+                futures.add(asyncPersist("enrichers", BrooklynObjectType.ENRICHER, entry.getKey(),
entry.getValue(), exceptionHandler));
             }
             
             try {
-                // Wait for all, failing fast if any exceptions.
+                // Wait for all the tasks to complete or fail, rather than aborting on the
first failure.
+                // But then propagate failure if any fail. (hence the two calls).
+                Futures.successfulAsList(futures).get();
                 Futures.allAsList(futures).get();
             } catch (Exception e) {
-                Exceptions.propagateIfFatal(e);
-                
-                List<Exception> exceptions = Lists.newArrayList();
-                
-                for (ListenableFuture<?> future : futures) {
-                    if (future.isDone()) {
-                        try {
-                            future.get();
-                        } catch (InterruptedException e2) {
-                            throw Exceptions.propagate(e2);
-                        } catch (ExecutionException e2) {
-                            LOG.warn("Problem loading memento", e2);
-                            exceptions.add(e2);
-                        }
-                        future.cancel(true);
-                    }
-                }
-                if (exceptions.isEmpty()) {
-                    throw Exceptions.propagate(e);
-                } else {
-                    // Normally there should be at lesat one failure; otherwise all.get()
would not have failed.
-                    throw new CompoundRuntimeException("Problem loading mementos", exceptions);
-                }
+                throw Exceptions.propagate(e);
             }
-
+            
+            if (LOG.isDebugEnabled()) LOG.debug("Checkpointed entire memento in {}", Time.makeTimeStringRounded(stopwatch));
         } finally {
-            serializer.unsetLookupContext();
+            lock.writeLock().unlock();
         }
 
-        BrooklynMemento result = builder.build();
-        
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("Loaded memento; took {}; {} entities, {} locations, {} policies, {}
enrichers, from {}", new Object[]{
-                      Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)),
result.getEntityIds().size(), 
-                      result.getLocationIds().size(), result.getPolicyIds().size(), result.getEnricherIds().size(),
-                      objectStore.getSummaryName() });
-        }
-        
-        return result;
     }
-    
+
+
     @Override
     public void checkpoint(BrooklynMemento newMemento, PersistenceExceptionHandler exceptionHandler)
{
         if (!running) {
@@ -533,6 +562,14 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
         }
     }
     
+    private void persist(String subPath, BrooklynObjectType type, String id, String content,
PersistenceExceptionHandler exceptionHandler) {
+        try {
+            getWriter(getPath(subPath, id)).put(content);
+        } catch (Exception e) {
+            exceptionHandler.onPersistRawMementoFailed(type, id, e);
+        }
+    }
+    
     private void delete(String subPath, String id, PersistenceExceptionHandler exceptionHandler)
{
         try {
             StoreObjectAccessorWithLock w = getWriter(getPath(subPath, id));
@@ -552,6 +589,13 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
             }});
     }
 
+    private ListenableFuture<?> asyncPersist(final String subPath, final BrooklynObjectType
type, final String id, final String content, final PersistenceExceptionHandler exceptionHandler)
{
+        return executor.submit(new Runnable() {
+            public void run() {
+                persist(subPath, type, id, content, exceptionHandler);
+            }});
+    }
+
     private ListenableFuture<?> asyncDelete(final String subPath, final String id,
final PersistenceExceptionHandler exceptionHandler) {
         return executor.submit(new Runnable() {
             public void run() {
@@ -567,5 +611,4 @@ public class BrooklynMementoPersisterToObjectStore implements BrooklynMementoPer
     public String getBackingStoreDescription() {
         return getObjectStore().getSummaryName();
     }
-    
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
index c71eaa8..0ae2a9e 100644
--- a/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
+++ b/core/src/test/java/brooklyn/entity/rebind/persister/BrooklynMementoPersisterTestFixture.java
@@ -23,9 +23,7 @@ import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertNotNull;
 import static org.testng.Assert.assertTrue;
 
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-
+import org.testng.SkipException;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
@@ -35,6 +33,8 @@ import brooklyn.entity.Entity;
 import brooklyn.entity.basic.ApplicationBuilder;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.entity.rebind.PersistenceExceptionHandler;
+import brooklyn.entity.rebind.PersistenceExceptionHandlerImpl;
 import brooklyn.entity.rebind.RebindContextImpl;
 import brooklyn.entity.rebind.RebindContextLookupContext;
 import brooklyn.entity.rebind.RebindManager.RebindFailureMode;
@@ -46,6 +46,7 @@ import brooklyn.location.basic.SshMachineLocation;
 import brooklyn.management.ManagementContext;
 import brooklyn.mementos.BrooklynMemento;
 import brooklyn.mementos.BrooklynMementoPersister;
+import brooklyn.mementos.BrooklynMementoRawData;
 import brooklyn.policy.Enricher;
 import brooklyn.policy.Policy;
 import brooklyn.test.entity.TestApplication;
@@ -99,7 +100,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
         persister = null;
     }
 
-    protected BrooklynMemento loadMemento() throws IOException, InterruptedException, TimeoutException
{
+    protected BrooklynMemento loadMemento() throws Exception {
         RebindTestUtils.waitForPersisted(localManagementContext);
         
         RecordingRebindExceptionHandler failFast = new RecordingRebindExceptionHandler(RebindFailureMode.FAIL_FAST,
RebindFailureMode.FAIL_FAST);
@@ -114,8 +115,16 @@ public abstract class BrooklynMementoPersisterTestFixture {
         return reloadedMemento;
     }
     
+    protected BrooklynMementoRawData loadRawMemento(BrooklynMementoPersisterToObjectStore
persister) throws Exception {
+        RebindTestUtils.waitForPersisted(localManagementContext);
+        
+        RecordingRebindExceptionHandler failFast = new RecordingRebindExceptionHandler(RebindFailureMode.FAIL_FAST,
RebindFailureMode.FAIL_FAST);
+        BrooklynMementoRawData rawMemento = persister.loadMementoRawData(failFast);
+        return rawMemento;
+    }
+    
     @Test
-    public void testCheckPointAndLoadMemento() throws IOException, TimeoutException, InterruptedException
{
+    public void testCheckPointAndLoadMemento() throws Exception {
         BrooklynMemento reloadedMemento = loadMemento();
         
         assertNotNull(reloadedMemento);
@@ -126,7 +135,7 @@ public abstract class BrooklynMementoPersisterTestFixture {
     }
 
     @Test
-    public void testDeleteAndLoadMemento() throws TimeoutException, InterruptedException,
IOException {
+    public void testDeleteAndLoadMemento() throws Exception {
         Entities.destroy(entity);
 
         BrooklynMemento reloadedMemento = loadMemento();
@@ -135,4 +144,23 @@ public abstract class BrooklynMementoPersisterTestFixture {
         assertFalse(Iterables.contains(reloadedMemento.getEntityIds(), entity.getId()));
         assertEquals(Iterables.getOnlyElement(reloadedMemento.getLocationIds()), location.getId());
     }
+    
+    @Test
+    public void testLoadAndCheckpointRawMemento() throws Exception {
+        if (persister instanceof BrooklynMementoPersisterToObjectStore) {
+            // Test loading
+            BrooklynMementoRawData rawMemento = loadRawMemento((BrooklynMementoPersisterToObjectStore)persister);
+            assertNotNull(rawMemento);
+            assertTrue(Iterables.contains(rawMemento.getEntities().keySet(), entity.getId()));
+            assertEquals(Iterables.getOnlyElement(rawMemento.getLocations().keySet()), location.getId());
+            assertEquals(Iterables.getOnlyElement(rawMemento.getPolicies().keySet()), policy.getId());
+            assertTrue(rawMemento.getEnrichers().keySet().contains(enricher.getId()));
+            
+            // And test persisting
+            PersistenceExceptionHandler exceptionHandler = PersistenceExceptionHandlerImpl.builder().build();
+            ((BrooklynMementoPersisterToObjectStore) persister).checkpoint(rawMemento, exceptionHandler);
+        } else {
+            throw new SkipException("Persister "+persister+" not a "+BrooklynMementoPersisterToObjectStore.class.getSimpleName());
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/bba5f3e5/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BrooklynMementoPersisterJcloudsObjectStoreTest.java
----------------------------------------------------------------------
diff --git a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BrooklynMementoPersisterJcloudsObjectStoreTest.java
b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BrooklynMementoPersisterJcloudsObjectStoreTest.java
index 2a5495b..c720b41 100644
--- a/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BrooklynMementoPersisterJcloudsObjectStoreTest.java
+++ b/locations/jclouds/src/test/java/brooklyn/entity/rebind/persister/jclouds/BrooklynMementoPersisterJcloudsObjectStoreTest.java
@@ -19,9 +19,6 @@
 package brooklyn.entity.rebind.persister.jclouds;
 
 
-import java.io.IOException;
-import java.util.concurrent.TimeoutException;
-
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
@@ -52,14 +49,19 @@ public class BrooklynMementoPersisterJcloudsObjectStoreTest extends BrooklynMeme
     
     @Test(groups={"Live", "Live-sanity"})
     @Override
-    public void testCheckPointAndLoadMemento() throws IOException, TimeoutException, InterruptedException
{
+    public void testCheckPointAndLoadMemento() throws Exception {
         super.testCheckPointAndLoadMemento();
     }
     
     @Test(groups={"Live", "Live-sanity"})
     @Override
-    public void testDeleteAndLoadMemento() throws TimeoutException, InterruptedException,
IOException {
+    public void testDeleteAndLoadMemento() throws Exception {
         super.testDeleteAndLoadMemento();
     }
     
+    @Test(groups={"Live", "Live-sanity"})
+    @Override
+    public void testLoadAndCheckpointRawMemento() throws Exception {
+        super.testLoadAndCheckpointRawMemento();
+    }
 }


Mime
View raw message