brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [03/12] git commit: Persist and rebind feeds
Date Thu, 25 Sep 2014 22:49:27 GMT
Persist and rebind feeds

- Adds Feed interface
- Adds abstractEntity.addFeed; if added like that then feed is persisted
- Adds EntityInternal.getFeedSupport(), which has getFeeds(), addFeed(),
  removeFeed() and getAllFeeds()
- AbstractFeed.poller: make private, with getter;
  and make transient


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/d7ab5b5e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/d7ab5b5e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/d7ab5b5e

Branch: refs/heads/master
Commit: d7ab5b5e26296bfcc1d235c4c3c72d13217e6503
Parents: c1ab0be
Author: Aled Sage <aled.sage@gmail.com>
Authored: Tue Aug 5 10:41:58 2014 +0100
Committer: Aled Sage <aled.sage@gmail.com>
Committed: Thu Sep 4 08:46:49 2014 +0100

----------------------------------------------------------------------
 api/src/main/java/brooklyn/entity/Feed.java     |  59 ++++++++++
 .../brooklyn/entity/rebind/RebindSupport.java   |   2 +
 .../java/brooklyn/mementos/EntityMemento.java   |   6 +
 .../brooklyn/entity/basic/AbstractEntity.java   |  49 ++++++++
 .../brooklyn/entity/basic/EntityInternal.java   |  25 ++++
 .../AbstractBrooklynObjectRebindSupport.java    |   6 +
 .../entity/rebind/BasicEntityRebindSupport.java |  18 +++
 .../rebind/BasicLocationRebindSupport.java      |   5 +
 .../entity/rebind/RebindManagerImpl.java        |   1 +
 .../entity/rebind/dto/BasicEntityMemento.java   |  11 ++
 .../entity/rebind/dto/MementosGenerators.java   |  14 ++-
 .../java/brooklyn/event/feed/AbstractFeed.java  |  45 ++++----
 .../main/java/brooklyn/event/feed/Poller.java   |   4 +-
 .../event/feed/function/FunctionFeed.java       |   6 -
 .../java/brooklyn/event/feed/http/HttpFeed.java |   4 +-
 .../brooklyn/event/feed/shell/ShellFeed.java    |   4 +-
 .../java/brooklyn/event/feed/ssh/SshFeed.java   |   4 +-
 .../windows/WindowsPerformanceCounterFeed.java  |   7 +-
 .../brooklyn/entity/rebind/RebindFeedTest.java  | 113 +++++++++++++++++++
 .../entity/monitoring/zabbix/ZabbixFeed.java    |   2 +-
 .../brooklyn/entity/chef/ChefAttributeFeed.java |   8 +-
 .../java/brooklyn/event/feed/jmx/JmxFeed.java   |   3 +-
 22 files changed, 353 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/api/src/main/java/brooklyn/entity/Feed.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/Feed.java b/api/src/main/java/brooklyn/entity/Feed.java
new file mode 100644
index 0000000..cf0430d
--- /dev/null
+++ b/api/src/main/java/brooklyn/entity/Feed.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity;
+
+import com.google.common.annotations.Beta;
+
+/** 
+ * A sensor feed.
+ * These generally poll or subscribe to get sensor values for an entity.
+ * They make it easy to poll over http, jmx, etc.
+ * 
+ * Assumes:
+ *   <ul>
+ *     <li>There will not be concurrent calls to start and stop.
+ *     <li>There will only be one call to start and that will be done immediately after
construction,
+ *         in the same thread.
+ *     <li>Once stopped, the feed will not be re-started.
+ *   </ul>
+ */
+@Beta
+public interface Feed {
+
+    /** 
+     * True if everything has been _started_ (or it is starting) but not stopped,
+     * even if it is suspended; see also {@link #isActive()}
+     */
+    boolean isActivated();
+    
+    /** 
+     * @eturn true iff the feed is running
+     */
+    boolean isActive();
+    
+    void start();
+
+    /** suspends this feed (stops the poller, or indicates that the feed should start in
a state where the poller is stopped) */
+    void suspend();
+    
+    /** resumes this feed if it has been suspended and not stopped */
+    void resume();
+    
+    void stop();
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/api/src/main/java/brooklyn/entity/rebind/RebindSupport.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/entity/rebind/RebindSupport.java b/api/src/main/java/brooklyn/entity/rebind/RebindSupport.java
index e6bbbb4..435fbc6 100644
--- a/api/src/main/java/brooklyn/entity/rebind/RebindSupport.java
+++ b/api/src/main/java/brooklyn/entity/rebind/RebindSupport.java
@@ -52,4 +52,6 @@ public interface RebindSupport<T extends Memento> {
     void addPolicies(RebindContext rebindContext, T Memento);
     
     void addEnrichers(RebindContext rebindContext, T Memento);
+    
+    void addFeeds(RebindContext rebindContext, T Memento);
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/api/src/main/java/brooklyn/mementos/EntityMemento.java
----------------------------------------------------------------------
diff --git a/api/src/main/java/brooklyn/mementos/EntityMemento.java b/api/src/main/java/brooklyn/mementos/EntityMemento.java
index c0547d4..fbe96ba 100644
--- a/api/src/main/java/brooklyn/mementos/EntityMemento.java
+++ b/api/src/main/java/brooklyn/mementos/EntityMemento.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
+import brooklyn.entity.Feed;
 import brooklyn.entity.rebind.RebindSupport;
 import brooklyn.event.AttributeSensor;
 
@@ -72,4 +73,9 @@ public interface EntityMemento extends Memento, TreeNode {
      * The ids of the enrichers of this entity.
      */
     public Collection<String> getEnrichers();
+
+    /**
+     * The sensor feeds attached to this entity.
+     */
+    public Collection<Feed> getFeeds();
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
index ec2d5e9..031ceb4 100644
--- a/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
+++ b/core/src/main/java/brooklyn/entity/basic/AbstractEntity.java
@@ -38,6 +38,7 @@ import brooklyn.entity.Application;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
 import brooklyn.entity.EntityType;
+import brooklyn.entity.Feed;
 import brooklyn.entity.Group;
 import brooklyn.entity.basic.ServiceStateLogic.ServiceNotUpLogic;
 import brooklyn.entity.proxying.EntitySpec;
@@ -173,6 +174,7 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements
E
     Map<String,Object> presentationAttributes = Maps.newLinkedHashMap();
     Collection<AbstractPolicy> policies = Lists.newCopyOnWriteArrayList();
     Collection<AbstractEnricher> enrichers = Lists.newCopyOnWriteArrayList();
+    Collection<Feed> feeds = Lists.newCopyOnWriteArrayList();
 
     // FIXME we do not currently support changing parents, but to implement a cluster that
can shrink we need to support at least
     // orphaning (i.e. removing ownership). This flag notes if the entity has previously
had a parent, and if an attempt is made to
@@ -1213,6 +1215,53 @@ public abstract class AbstractEntity extends AbstractBrooklynObject
implements E
         return changed;
     }
     
+    // -------- FEEDS --------------------
+
+    /**
+     * Convenience, which calls {@link EntityInternal#getFeedSupport()} and {@link FeedSupport#addFeed(Feed)}.
+     */
+    protected <T extends Feed> T addFeed(T feed) {
+        return getFeedSupport().addFeed(feed);
+    }
+
+    public FeedSupport getFeedSupport() {
+        return new FeedSupport() {
+            @Override
+            public Collection<Feed> getFeeds() {
+                return ImmutableList.<Feed>copyOf(feeds);
+            }
+
+            @Override
+            public <T extends Feed> T addFeed(T feed) {
+                feeds.add(feed);
+                
+                getManagementSupport().getEntityChangeListener().onChanged();
+                return feed;
+            }
+
+            @Override
+            public boolean removeFeed(Feed feed) {
+                feed.stop();destroy();
+                boolean changed = feeds.remove(feed);
+                
+                if (changed) {
+                    getManagementSupport().getEntityChangeListener().onChanged();
+                }
+                return changed;
+
+            }
+
+            @Override
+            public boolean removeAllFeeds() {
+                boolean changed = false;
+                for (Feed feed : feeds) {
+                    changed = removeFeed(feed) || changed;
+                }
+                return changed;
+            }
+        };
+    }
+    
     // -------- SENSORS --------------------
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/basic/EntityInternal.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/basic/EntityInternal.java b/core/src/main/java/brooklyn/entity/basic/EntityInternal.java
index 7fef8c5..ab1fd59 100644
--- a/core/src/main/java/brooklyn/entity/basic/EntityInternal.java
+++ b/core/src/main/java/brooklyn/entity/basic/EntityInternal.java
@@ -24,6 +24,7 @@ import java.util.Map;
 import brooklyn.basic.BrooklynObjectInternal;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
+import brooklyn.entity.Feed;
 import brooklyn.entity.rebind.RebindSupport;
 import brooklyn.entity.rebind.Rebindable;
 import brooklyn.event.AttributeSensor;
@@ -132,6 +133,8 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal,
Reb
     @Beta
     Effector<?> getEffector(String effectorName);
     
+    FeedSupport getFeedSupport();
+    
     Map<String, String> toMetadataRecord();
     
     @Override
@@ -142,4 +145,26 @@ public interface EntityInternal extends BrooklynObjectInternal, EntityLocal,
Reb
      * This persistence may happen asynchronously, or may not happen at all if persistence
is disabled.
      */
     void requestPersist();
+    
+    public interface FeedSupport {
+        Collection<Feed> getFeeds();
+        
+        /**
+         * Adds the given feed to this entity. The feed will automatically be re-added on
brooklyn restart.
+         */
+        <T extends Feed> T addFeed(T feed);
+        
+        /**
+         * Removes the given feed from this entity. 
+         * @return True if the feed existed at this entity; false otherwise
+         */
+        boolean removeFeed(Feed feed);
+        
+        /**
+         * Removes all feeds from this entity.
+         * Use with caution as some entities automatically register feeds; this will remove
those feeds as well.
+         * @return True if any feeds existed at this entity; false otherwise
+         */
+        boolean removeAllFeeds();
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
b/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
index 4549dd2..0d9dc81 100644
--- a/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
+++ b/core/src/main/java/brooklyn/entity/rebind/AbstractBrooklynObjectRebindSupport.java
@@ -23,6 +23,7 @@ import org.slf4j.LoggerFactory;
 
 import brooklyn.basic.AbstractBrooklynObject;
 import brooklyn.entity.rebind.dto.MementosGenerators;
+import brooklyn.mementos.EnricherMemento;
 import brooklyn.mementos.Memento;
 
 public abstract class AbstractBrooklynObjectRebindSupport<T extends Memento> implements
RebindSupport<T> {
@@ -76,6 +77,11 @@ public abstract class AbstractBrooklynObjectRebindSupport<T extends
Memento> imp
         throw new UnsupportedOperationException();
     }
 
+    @Override
+    public void addFeeds(RebindContext rebindContext, T Memento) {
+        throw new UnsupportedOperationException();
+    }
+
     /**
      * For overriding, to give custom reconstruct behaviour.
      * 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/BasicEntityRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/BasicEntityRebindSupport.java b/core/src/main/java/brooklyn/entity/rebind/BasicEntityRebindSupport.java
index a6e1341..36f5f8a 100644
--- a/core/src/main/java/brooklyn/entity/rebind/BasicEntityRebindSupport.java
+++ b/core/src/main/java/brooklyn/entity/rebind/BasicEntityRebindSupport.java
@@ -30,6 +30,7 @@ import brooklyn.config.ConfigKey;
 import brooklyn.enricher.basic.AbstractEnricher;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
+import brooklyn.entity.Feed;
 import brooklyn.entity.Group;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.basic.EntityInternal;
@@ -145,6 +146,23 @@ public class BasicEntityRebindSupport extends AbstractBrooklynObjectRebindSuppor
         }
     }
     
+    @Override
+    public void addFeeds(RebindContext rebindContext, EntityMemento memento) {
+        for (Feed feed : memento.getFeeds()) {
+            if (feed != null) {
+                try {
+                    feed.start();
+                    ((EntityInternal)entity).getFeedSupport().addFeed(feed);
+                } catch (Exception e) {
+                    rebindContext.getExceptionHandler().onRebindFailed(BrooklynObjectType.ENTITY,
entity, e);
+                }
+            } else {
+                LOG.warn("Feed not found; discarding feed of entity {}({})",
+                        new Object[] {memento.getType(), memento.getId()});
+            }
+        }
+    }
+    
     protected void addMembers(RebindContext rebindContext, EntityMemento memento) {
         if (memento.getMembers().size() > 0) {
             if (entity instanceof Group) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/BasicLocationRebindSupport.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/BasicLocationRebindSupport.java b/core/src/main/java/brooklyn/entity/rebind/BasicLocationRebindSupport.java
index 50b4147..69463e2 100644
--- a/core/src/main/java/brooklyn/entity/rebind/BasicLocationRebindSupport.java
+++ b/core/src/main/java/brooklyn/entity/rebind/BasicLocationRebindSupport.java
@@ -109,6 +109,11 @@ public class BasicLocationRebindSupport extends AbstractBrooklynObjectRebindSupp
         location.init(); // TODO deprecated calling init; will be deleted
     }
 
+    @Override
+    public void addFeeds(RebindContext rebindContext, LocationMemento Memento) {
+        throw new UnsupportedOperationException();
+    }
+
     protected void addChildren(RebindContext rebindContext, LocationMemento memento) {
         for (String childId : memento.getChildren()) {
             Location child = rebindContext.getLocation(childId);

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
index 2b4d4f8..0056e8b 100644
--- a/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
+++ b/core/src/main/java/brooklyn/entity/rebind/RebindManagerImpl.java
@@ -532,6 +532,7 @@ public class RebindManagerImpl implements RebindManager {
                         entityMemento.injectTypeClass(entity.getClass());
                         ((EntityInternal)entity).getRebindSupport().addPolicies(rebindContext,
entityMemento);
                         ((EntityInternal)entity).getRebindSupport().addEnrichers(rebindContext,
entityMemento);
+                        ((EntityInternal)entity).getRebindSupport().addFeeds(rebindContext,
entityMemento);
                     } catch (Exception e) {
                         exceptionHandler.onRebindFailed(BrooklynObjectType.ENTITY, entity,
e);
                     }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/dto/BasicEntityMemento.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/BasicEntityMemento.java b/core/src/main/java/brooklyn/entity/rebind/dto/BasicEntityMemento.java
index 30eb653..df72ee5 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/BasicEntityMemento.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/BasicEntityMemento.java
@@ -30,6 +30,7 @@ import brooklyn.basic.BrooklynTypes;
 import brooklyn.config.ConfigKey;
 import brooklyn.entity.Effector;
 import brooklyn.entity.Entity;
+import brooklyn.entity.Feed;
 import brooklyn.entity.basic.AbstractEntity;
 import brooklyn.entity.basic.Entities;
 import brooklyn.entity.rebind.RebindSupport;
@@ -70,6 +71,7 @@ public class BasicEntityMemento extends AbstractTreeNodeMemento implements
Entit
         protected List<String> enrichers = Lists.newArrayList();
         protected List<String> members = Lists.newArrayList();
         protected List<Effector<?>> effectors = Lists.newArrayList();
+        protected List<Feed> feeds = Lists.newArrayList();
         
         public Builder from(EntityMemento other) {
             super.from((TreeNode)other);
@@ -83,6 +85,7 @@ public class BasicEntityMemento extends AbstractTreeNodeMemento implements
Entit
             enrichers.addAll(other.getEnrichers());
             members.addAll(other.getMembers());
             effectors.addAll(other.getEffectors());
+            feeds.addAll(other.getFeeds());
             tags.addAll(other.getTags());
             return this;
         }
@@ -103,6 +106,7 @@ public class BasicEntityMemento extends AbstractTreeNodeMemento implements
Entit
     private Map<String, Object> attributes;
     private List<String> policies;
     private List<String> enrichers;
+    private List<Feed> feeds;
     
     // TODO can we move some of these to entity type, or remove/re-insert those which are
final statics?
     private Map<String, ConfigKey<?>> configKeys;
@@ -130,6 +134,7 @@ public class BasicEntityMemento extends AbstractTreeNodeMemento implements
Entit
         locations = toPersistedList(builder.locations);
         policies = toPersistedList(builder.policies);
         enrichers = toPersistedList(builder.enrichers);
+        feeds = toPersistedList(builder.feeds);
         members = toPersistedList(builder.members);
         
         effectors = toPersistedList(builder.effectors);
@@ -281,6 +286,12 @@ public class BasicEntityMemento extends AbstractTreeNodeMemento implements
Entit
     }
 
     @Override
+    public List<Feed> getFeeds() {
+        if (configByKey == null) postDeserialize();
+        return fromPersistedList(feeds);
+    }
+    
+    @Override
     protected ToStringHelper newVerboseStringHelper() {
         return super.newVerboseStringHelper()
                 .add("members", getMembers())

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java b/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
index 3289665..ebcea96 100644
--- a/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
+++ b/core/src/main/java/brooklyn/entity/rebind/dto/MementosGenerators.java
@@ -30,6 +30,7 @@ import brooklyn.config.ConfigKey;
 import brooklyn.enricher.basic.AbstractEnricher;
 import brooklyn.entity.Application;
 import brooklyn.entity.Entity;
+import brooklyn.entity.Feed;
 import brooklyn.entity.Group;
 import brooklyn.entity.basic.EntityDynamicType;
 import brooklyn.entity.basic.EntityInternal;
@@ -124,7 +125,8 @@ public class MementosGenerators {
      * @deprecated since 0.7.0; use {@link #newMemento(BrooklynObject)} instead
      */
     @Deprecated
-    public static BasicEntityMemento.Builder newEntityMementoBuilder(Entity entity) {
+    public static BasicEntityMemento.Builder newEntityMementoBuilder(Entity entityRaw) {
+        EntityInternal entity = (EntityInternal) entityRaw;
         BasicEntityMemento.Builder builder = BasicEntityMemento.builder();
         populateBrooklynObjectMementoBuilder(entity, builder);
         
@@ -138,14 +140,14 @@ public class MementosGenerators {
         
         builder.isTopLevelApp = (entity instanceof Application && entity.getParent()
== null);
 
-        Map<ConfigKey<?>, Object> localConfig = ((EntityInternal)entity).getConfigMap().getLocalConfig();
+        Map<ConfigKey<?>, Object> localConfig = entity.getConfigMap().getLocalConfig();
         for (Map.Entry<ConfigKey<?>, Object> entry : localConfig.entrySet())
{
             ConfigKey<?> key = checkNotNull(entry.getKey(), localConfig);
             Object value = configValueToPersistable(entry.getValue());
             builder.config.put(key, value); 
         }
         
-        Map<String, Object> localConfigUnmatched = MutableMap.copyOf(((EntityInternal)entity).getConfigMap().getLocalConfigBag().getAllConfig());
+        Map<String, Object> localConfigUnmatched = MutableMap.copyOf(entity.getConfigMap().getLocalConfigBag().getAllConfig());
         for (ConfigKey<?> key : localConfig.keySet()) {
             localConfigUnmatched.remove(key.getName());
         }
@@ -157,7 +159,7 @@ public class MementosGenerators {
         }
         
         @SuppressWarnings("rawtypes")
-        Map<AttributeSensor, Object> allAttributes = ((EntityInternal)entity).getAllAttributes();
+        Map<AttributeSensor, Object> allAttributes = entity.getAllAttributes();
         for (@SuppressWarnings("rawtypes") Map.Entry<AttributeSensor, Object> entry
: allAttributes.entrySet()) {
             AttributeSensor<?> key = checkNotNull(entry.getKey(), allAttributes);
             Object value = entry.getValue();
@@ -180,6 +182,10 @@ public class MementosGenerators {
             builder.enrichers.add(enricher.getId()); 
         }
         
+        for (Feed feed : entity.getFeedSupport().getFeeds()) {
+            builder.feeds.add(feed); 
+        }
+        
         Entity parentEntity = entity.getParent();
         builder.parent = (parentEntity != null) ? parentEntity.getId() : null;
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
index 1a7770f..11b5b3b 100644
--- a/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/AbstractFeed.java
@@ -23,29 +23,24 @@ import static com.google.common.base.Preconditions.checkNotNull;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import brooklyn.entity.Feed;
 import brooklyn.entity.basic.EntityLocal;
 
 /** 
  * Captures common fields and processes for sensor feeds.
  * These generally poll or subscribe to get sensor values for an entity.
  * They make it easy to poll over http, jmx, etc.
- * 
- * Assumes:
- *   <ul>
- *     <li>There will not be concurrent calls to start and stop.
- *     <li>There will only be one call to start and that will be done immediately after
construction,
- *         in the same thread.
- *     <li>Once stopped, the feed will not be re-started.
- *   </ul>
  */
-public abstract class AbstractFeed {
+public abstract class AbstractFeed implements Feed {
 
     private static final Logger log = LoggerFactory.getLogger(AbstractFeed.class);
     
     protected final EntityLocal entity;
-    protected final Poller<?> poller;
-    private volatile boolean activated, suspended;
-    private final Object pollerStateMutex = new Object(); 
+    protected final boolean onlyIfServiceUp; 
+    private final Object pollerStateMutex = new Object();
+    private transient volatile Poller<?> poller;
+    private transient volatile boolean activated;
+    private transient volatile boolean suspended;
 
     public AbstractFeed(EntityLocal entity) {
         this(entity, false);
@@ -53,16 +48,15 @@ public abstract class AbstractFeed {
     
     public AbstractFeed(EntityLocal entity, boolean onlyIfServiceUp) {
         this.entity = checkNotNull(entity, "entity");
-        this.poller = new Poller<Object>(entity, onlyIfServiceUp);
+        this.onlyIfServiceUp = onlyIfServiceUp;
     }
     
-    /** true if everything has been _started_ (or it is starting) but not stopped,
-     * even if it is suspended; see also {@link #isActive()} */
+    @Override
     public boolean isActivated() {
         return activated;
     }
     
-    /** true iff the feed is running */
+    @Override
     public boolean isActive() {
         return activated && !suspended;
     }
@@ -80,13 +74,18 @@ public abstract class AbstractFeed {
         return isActivated();
     }
 
-    protected void start() {
+    @Override
+    public void start() {
         if (log.isDebugEnabled()) log.debug("Starting feed {} for {}", this, entity);
         if (activated) { 
             throw new IllegalStateException(String.format("Attempt to start feed %s of entity
%s when already running", 
                     this, entity));
         }
+        if (poller != null) {
+            throw new IllegalStateException(String.format("Attempt to re-start feed %s of
entity %s", this, entity));
+        }
         
+        poller = new Poller<Object>(entity, onlyIfServiceUp);
         activated = true;
         preStart();
         synchronized (pollerStateMutex) {
@@ -97,7 +96,7 @@ public abstract class AbstractFeed {
         }
     }
 
-    /** suspends this feed (stops the poller, or indicates that the feed should start in
a state where the poller is stopped) */
+    @Override
     public void suspend() {
         synchronized (pollerStateMutex) {
             if (activated && !suspended) {
@@ -107,7 +106,7 @@ public abstract class AbstractFeed {
         }
     }
     
-    /** resumes this feed if it has been suspended and not stopped */
+    @Override
     public void resume() {
         synchronized (pollerStateMutex) {
             if (activated && suspended) {
@@ -117,6 +116,7 @@ public abstract class AbstractFeed {
         }
     }
     
+    @Override
     public void stop() {
         if (!activated) { 
             log.debug("Ignoring attempt to stop feed {} of entity {} when not running", this,
entity);
@@ -151,4 +151,11 @@ public abstract class AbstractFeed {
      */
     protected void postStop() {
     }
+    
+    /**
+     * For overriding, where sub-class can change return-type generics!
+     */
+    protected Poller<?> getPoller() {
+        return poller;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/Poller.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/Poller.java b/core/src/main/java/brooklyn/event/feed/Poller.java
index 5ce336f..34ec5dd 100644
--- a/core/src/main/java/brooklyn/event/feed/Poller.java
+++ b/core/src/main/java/brooklyn/event/feed/Poller.java
@@ -175,10 +175,10 @@ public class Poller<V> {
         
         running = false;
         for (Task<?> task : oneOffTasks) {
-            task.cancel(true);
+            if (task != null) task.cancel(true);
         }
         for (ScheduledTask task : tasks) {
-            task.cancel();
+            if (task != null) task.cancel();
         }
         oneOffTasks.clear();
         tasks.clear();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java b/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java
index 0690da9..94a866b 100644
--- a/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/function/FunctionFeed.java
@@ -32,7 +32,6 @@ import brooklyn.entity.basic.EntityLocal;
 import brooklyn.event.feed.AbstractFeed;
 import brooklyn.event.feed.AttributePollHandler;
 import brooklyn.event.feed.DelegatingPollHandler;
-import brooklyn.event.feed.Poller;
 import brooklyn.util.time.Duration;
 
 import com.google.common.base.Objects;
@@ -180,9 +179,4 @@ public class FunctionFeed extends AbstractFeed {
                     minPeriod);
         }
     }
-    
-    @SuppressWarnings("unchecked")
-    private Poller<Object> getPoller() {
-        return (Poller<Object>) poller;
-    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java b/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java
index bea1d80..bff833b 100644
--- a/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/http/HttpFeed.java
@@ -354,7 +354,7 @@ public class HttpFeed extends AbstractFeed {
     }
 
     @SuppressWarnings("unchecked")
-    private Poller<HttpToolResponse> getPoller() {
-        return (Poller<HttpToolResponse>) poller;
+    protected Poller<HttpToolResponse> getPoller() {
+        return (Poller<HttpToolResponse>) super.getPoller();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/shell/ShellFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/shell/ShellFeed.java b/core/src/main/java/brooklyn/event/feed/shell/ShellFeed.java
index ec14f3e..b90b2ea 100644
--- a/core/src/main/java/brooklyn/event/feed/shell/ShellFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/shell/ShellFeed.java
@@ -223,8 +223,8 @@ public class ShellFeed extends AbstractFeed {
     }
     
     @SuppressWarnings("unchecked")
-    private Poller<SshPollValue> getPoller() {
-        return (Poller<SshPollValue>) poller;
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
     }
     
     /**

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java b/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java
index e8c5148..14988ab 100644
--- a/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/ssh/SshFeed.java
@@ -235,8 +235,8 @@ public class SshFeed extends AbstractFeed {
     }
     
     @SuppressWarnings("unchecked")
-    private Poller<SshPollValue> getPoller() {
-        return (Poller<SshPollValue>) poller;
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
     }
     
     private SshPollValue exec(String command, Map<String,String> env) throws IOException
{

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/main/java/brooklyn/event/feed/windows/WindowsPerformanceCounterFeed.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/event/feed/windows/WindowsPerformanceCounterFeed.java
b/core/src/main/java/brooklyn/event/feed/windows/WindowsPerformanceCounterFeed.java
index dcb7ba6..7ccc9fc 100644
--- a/core/src/main/java/brooklyn/event/feed/windows/WindowsPerformanceCounterFeed.java
+++ b/core/src/main/java/brooklyn/event/feed/windows/WindowsPerformanceCounterFeed.java
@@ -182,12 +182,17 @@ public class WindowsPerformanceCounterFeed extends AbstractFeed {
             }
         };
 
-        ((Poller<SshPollValue>) poller).scheduleAtFixedRate(
+        getPoller().scheduleAtFixedRate(
                 new CallInEntityExecutionContext<SshPollValue>(entity, queryForCounterValues),
                 new SendPerfCountersToSensors(entity, attributeSensors),
                 periodUnits.toMillis(period));
     }
 
+    @SuppressWarnings("unchecked")
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
+    }
+
     /**
      * A {@link java.util.concurrent.Callable} that wraps another {@link java.util.concurrent.Callable},
where the
      * inner {@link java.util.concurrent.Callable} is executed in the context of a

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java b/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
new file mode 100644
index 0000000..056e440
--- /dev/null
+++ b/core/src/test/java/brooklyn/entity/rebind/RebindFeedTest.java
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package brooklyn.entity.rebind;
+
+import static org.testng.Assert.assertEquals;
+
+import java.net.URL;
+import java.util.Collection;
+
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import brooklyn.config.ConfigKey;
+import brooklyn.entity.Feed;
+import brooklyn.entity.basic.ConfigKeys;
+import brooklyn.entity.proxying.EntitySpec;
+import brooklyn.event.AttributeSensor;
+import brooklyn.event.basic.Sensors;
+import brooklyn.event.feed.http.HttpFeed;
+import brooklyn.event.feed.http.HttpPollConfig;
+import brooklyn.event.feed.http.HttpValueFunctions;
+import brooklyn.test.EntityTestUtils;
+import brooklyn.test.entity.TestEntity;
+import brooklyn.test.entity.TestEntityImpl;
+import brooklyn.util.http.BetterMockWebServer;
+
+import com.google.common.collect.Iterables;
+import com.google.mockwebserver.MockResponse;
+
+public class RebindFeedTest extends RebindTestFixtureWithApp {
+
+    final static AttributeSensor<String> SENSOR_STRING = Sensors.newStringSensor("aString",
"");
+    final static AttributeSensor<Integer> SENSOR_INT = Sensors.newIntegerSensor( "aLong",
"");
+
+    private BetterMockWebServer server;
+    private URL baseUrl;
+    
+    @BeforeMethod(alwaysRun=true)
+    @Override
+    public void setUp() throws Exception {
+        super.setUp();
+        server = BetterMockWebServer.newInstanceLocalhost();
+        for (int i = 0; i < 100; i++) {
+            server.enqueue(new MockResponse().setResponseCode(200).addHeader("content-type:
application/json").setBody("{\"foo\":\"myfoo\"}"));
+        }
+        server.play();
+        baseUrl = server.getUrl("/");
+    }
+
+    @AfterMethod(alwaysRun=true)
+    @Override
+    public void tearDown() throws Exception {
+        if (server != null) server.shutdown();
+        super.tearDown();
+    }
+    
+    @Test
+    public void testHttpFeedRegisteredInInitIsPersisted() throws Exception {
+        TestEntity origEntity = origApp.createAndManageChild(EntitySpec.create(TestEntity.class).impl(MyEntityImpl.class)
+                .configure(MyEntityImpl.BASE_URL, baseUrl));
+        EntityTestUtils.assertAttributeEqualsEventually(origEntity, SENSOR_INT, (Integer)200);
+        EntityTestUtils.assertAttributeEqualsEventually(origEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
+        assertEquals(origEntity.getFeedSupport().getFeeds().size(), 1);
+
+        newApp = rebind(false);
+        TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(newApp.getChildren());
+        
+        Collection<Feed> newFeeds = newEntity.getFeedSupport().getFeeds();
+        assertEquals(newFeeds.size(), 1);
+        
+        // Expect the feed to still be polling
+        newEntity.setAttribute(SENSOR_INT, null);
+        newEntity.setAttribute(SENSOR_STRING, null);
+        EntityTestUtils.assertAttributeEqualsEventually(newEntity, SENSOR_INT, (Integer)200);
+        EntityTestUtils.assertAttributeEqualsEventually(newEntity, SENSOR_STRING, "{\"foo\":\"myfoo\"}");
+    }
+
+    public static class MyEntityImpl extends TestEntityImpl {
+        public static final ConfigKey<URL> BASE_URL = ConfigKeys.newConfigKey(URL.class,
"rebindFeedTest.baseUrl");
+        
+        @Override
+        public void init() {
+            super.init();
+            HttpFeed feed = addFeed(HttpFeed.builder()
+                    .entity(this)
+                    .baseUrl(getConfig(BASE_URL))
+                    .poll(HttpPollConfig.forSensor(SENSOR_INT)
+                            .period(100)
+                            .onSuccess(HttpValueFunctions.responseCode()))
+                    .poll(HttpPollConfig.forSensor(SENSOR_STRING)
+                            .period(100)
+                            .onSuccess(HttpValueFunctions.stringContentsFunction()))
+                    .build());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/sandbox/monitoring/src/main/java/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java
----------------------------------------------------------------------
diff --git a/sandbox/monitoring/src/main/java/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java
b/sandbox/monitoring/src/main/java/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java
index f50db7d..55645f4 100644
--- a/sandbox/monitoring/src/main/java/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java
+++ b/sandbox/monitoring/src/main/java/brooklyn/entity/monitoring/zabbix/ZabbixFeed.java
@@ -424,6 +424,6 @@ public class ZabbixFeed extends AbstractFeed {
 
     @SuppressWarnings("unchecked")
     protected Poller<HttpToolResponse> getPoller() {
-        return (Poller<HttpToolResponse>) poller;
+        return (Poller<HttpToolResponse>) super.getPoller();
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java b/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java
index 17b295b..de2edb4 100644
--- a/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java
+++ b/software/base/src/main/java/brooklyn/entity/chef/ChefAttributeFeed.java
@@ -180,7 +180,6 @@ public class ChefAttributeFeed extends AbstractFeed {
         knifeTaskFactory = new KnifeNodeAttributeQueryTaskFactory(nodeName);
     }
 
-    @SuppressWarnings("unchecked")
     @Override
     protected void preStart() {
         final Callable<SshPollValue> getAttributesFromKnife = new Callable<SshPollValue>()
{
@@ -195,12 +194,17 @@ public class ChefAttributeFeed extends AbstractFeed {
             }
         };
 
-        ((Poller<SshPollValue>) poller).scheduleAtFixedRate(
+        getPoller().scheduleAtFixedRate(
                 new CallInEntityExecutionContext<SshPollValue>(entity, getAttributesFromKnife),
                 new SendChefAttributesToSensors(entity, chefAttributeSensors),
                 periodUnits.toMillis(period));
     }
 
+    @SuppressWarnings("unchecked")
+    protected Poller<SshPollValue> getPoller() {
+        return (Poller<SshPollValue>) super.getPoller();
+    }
+
     /**
      * An implementation of {@link KnifeTaskFactory} that queries for the attributes of a
node.
      */

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/d7ab5b5e/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
----------------------------------------------------------------------
diff --git a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
index 026f000..ab05914 100644
--- a/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
+++ b/software/base/src/main/java/brooklyn/event/feed/jmx/JmxFeed.java
@@ -187,10 +187,9 @@ public class JmxFeed extends AbstractFeed {
         return jmxUri;
     }
     
-    @VisibleForTesting
     @SuppressWarnings("unchecked")
     protected Poller<Object> getPoller() {
-        return (Poller<Object>) poller;
+        return (Poller<Object>) super.getPoller();
     }
     
     @Override


Mime
View raw message