brooklyn-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From henev...@apache.org
Subject [1/3] brooklyn-server git commit: better presentation for sensor publishing tasks
Date Sat, 18 Feb 2017 01:00:11 GMT
Repository: brooklyn-server
Updated Branches:
  refs/heads/master 110482861 -> ebcea30f9


better presentation for sensor publishing tasks

previously they didn't even have a name; now they have a nice name, description, and a tag.
there is some attempt to optimize the use of toString so it isn't hugely computationally expensive,
although this will increase expense a bit; i tend to think it's worth it for increased visibility.
(if we are publishing vast numbers of sensor events maybe we are doing something wrong, and
if this
is identified as the bottleneck we can parameterise it, and in any case when we come to be
multi-host
we'll need to revisit how we do this as currently we're thinking a good datastore is good
enough for
the intended volumes, as opposed to a dedicated message bus.)


Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/2c2f3fcd
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/2c2f3fcd
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/2c2f3fcd

Branch: refs/heads/master
Commit: 2c2f3fcdc456358dff108a62e0cd5ce09783048c
Parents: 1104828
Author: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Authored: Thu Feb 16 10:31:50 2017 +0000
Committer: Alex Heneveld <alex.heneveld@cloudsoftcorp.com>
Committed: Thu Feb 16 10:31:50 2017 +0000

----------------------------------------------------------------------
 .../brooklyn/core/mgmt/BrooklynTaskTags.java    |   2 +
 .../mgmt/internal/LocalSubscriptionManager.java | 144 +++++++++++--------
 2 files changed, 83 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2c2f3fcd/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
index 39b2f70..2280be7 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/BrooklynTaskTags.java
@@ -71,6 +71,8 @@ public class BrooklynTaskTags extends TaskTags {
     public static final String BROOKLYN_SERVER_TASK_TAG = "BROOKLYN-SERVER";
     /** Tag for a task which represents an effector */
     public static final String EFFECTOR_TAG = "EFFECTOR";
+    /** Tag for a task which represents a sensor being published */
+    public static final String SENSOR_TAG = "SENSOR";
     /** Tag for a task which *is* interesting, in contrast to {@link #TRANSIENT_TASK_TAG}
*/
     public static final String NON_TRANSIENT_TASK_TAG = "NON-TRANSIENT";
     /** indicates a task is transient, roughly that is to say it is uninteresting -- 

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/2c2f3fcd/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 9de2d3b..7039ee2 100644
--- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -42,6 +42,7 @@ import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
 import org.apache.brooklyn.api.sensor.SensorEventListener;
 import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
@@ -86,10 +87,13 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
         return allSubscriptions.size();
     }
 
+    /** The total number of sensor change events generated (irrespective of number subscribers,
see {@link #getTotalEventsDelivered()}) */
     public long getTotalEventsPublished() {
         return totalEventsPublishedCount.get();
     }
     
+    /** The total number of sensor change events submitted for delivery, counting multiple
deliveries for multipe subscribers (see {@link #getTotalEventsPublished()}),
+     * but excluding initial notifications, and incremented when submitted ie prior to delivery
*/
     public long getTotalEventsDelivered() {
         return totalEventsDeliveredCount.get();
     }
@@ -111,7 +115,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
             s.subscriberExecutionManagerTagSupplied = true;
         } else {
             s.subscriberExecutionManagerTag = 
-                s.subscriber instanceof Entity ? "subscription-delivery-entity-"+((Entity)s.subscriber).getId()+"["+s.subscriber+"]"
: 
+                s.subscriber instanceof Entity ? "subscription-delivery-entity-"+((Entity)s.subscriber).getId()
: 
                 s.subscriber instanceof String ? "subscription-delivery-string["+s.subscriber+"]"
: 
                 "subscription-delivery-object["+s.subscriber+"]";
             s.subscriberExecutionManagerTagSupplied = false;
@@ -139,37 +143,8 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
                 LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute
sensor: "+s);
             } else {
                 if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {}
to {}", new Object[] {s.producer, s.sensor, s});
-                List<Object> tags = MutableList.builder()
-                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() :
s.subscriberExtraExecTags)
-                        .add(s.subscriberExecutionManagerTag)
-                        .build()
-                        .asUnmodifiable();
-                Map<String, ?> execFlags = MutableMap.of("tags", tags);
-                em.submit(execFlags, new Runnable() {
-                    @Override
-                    public String toString() {
-                        return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")";
-                    }
-                    @Override
-                    public void run() {
-                        Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor);
-                        @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation
error if try to use <T>
-                        SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val);
-                        if (s.eventFilter!=null && !s.eventFilter.apply(event))
-                            return;
-                        try {
-                            int count = s.eventCount.incrementAndGet();
-                            if (count > 0 && count % 1000 == 0) LOG.debug("{}
events for subscriber {}", count, s);
-                            
-                            s.listener.onEvent(event);
-                        } catch (Throwable t) {
-                            if (event!=null && event.getSource()!=null &&
Entities.isNoLongerManaged(event.getSource())) {
-                                LOG.debug("Error processing initial-value subscription to
"+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t);
-                            } else {
-                                LOG.warn("Error processing initial-value subscription to
"+LocalSubscriptionManager.this+": "+t, t);
-                            }
-                        }
-                    }});
+                T val = (T) s.producer.getAttribute((AttributeSensor<?>) s.sensor);
+                submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer,
val), true);
             }
         }
         
@@ -237,42 +212,85 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager
{
         if (groovyTruth(subs)) {
             if (LOG.isTraceEnabled()) LOG.trace("sending {}, {} to {}", new Object[] {event.getSensor().getName(),
event, join(subs, ",")});
             for (Subscription s : subs) {
-                if (s.eventFilter!=null && !s.eventFilter.apply(event))
-                    continue;
-                final Subscription sAtClosureCreation = s;
-                
-                List<Object> tags = MutableList.builder()
-                        .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() :
s.subscriberExtraExecTags)
-                        .add(s.subscriberExecutionManagerTag)
-                        .build()
-                        .asUnmodifiable();
-                Map<String, ?> execFlags = MutableMap.of("tags", tags);
-                
-                em.submit(execFlags, new Runnable() {
-                    @Override
-                    public String toString() {
-                        return "LSM.publish("+event+")";
-                    }
-                    @Override
-                    public void run() {
-                        try {
-                            int count = sAtClosureCreation.eventCount.incrementAndGet();
-                            if (count > 0 && count % 1000 == 0) LOG.debug("{}
events for subscriber {}", count, sAtClosureCreation);
-                            
-                            sAtClosureCreation.listener.onEvent(event);
-                        } catch (Throwable t) {
-                            if (event!=null && event.getSource()!=null &&
Entities.isNoLongerManaged(event.getSource())) {
-                                LOG.debug("Error processing subscriptions to "+this+", after
entity unmanaged: "+t, t);
-                            } else {
-                                LOG.warn("Error processing subscriptions to "+this+": "+t,
t);
-                            }
-                        }
-                    }});
+                submitPublishEvent(s, event, false);
+                // excludes initial so only do it here
                 totalEventsDeliveredCount.incrementAndGet();
             }
         }
     }
     
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    private void submitPublishEvent(final Subscription s, final SensorEvent<?> event,
final boolean isInitial) {
+        if (s.eventFilter!=null && !s.eventFilter.apply(event))
+            return;
+        
+        List<Object> tags = MutableList.builder()
+            .addAll(s.subscriberExtraExecTags == null ? ImmutableList.of() : s.subscriberExtraExecTags)
+            .add(s.subscriberExecutionManagerTag)
+            .add(BrooklynTaskTags.SENSOR_TAG)
+            .build()
+            .asUnmodifiable();
+        
+        StringBuilder name = new StringBuilder("sensor ");
+        StringBuilder description = new StringBuilder("Sensor ");
+        String sensorName = s.sensor==null ? null : s.sensor.getName();
+        String sourceName = event.getSource()==null ? null : event.getSource().getId();
+        name.append(sourceName);
+        name.append(":");
+        name.append(sensorName);
+        
+        description.append(sensorName);
+        description.append(" on ");
+        description.append(sourceName);
+        description.append(" publishing to ");
+        description.append(s.subscriber instanceof Entity ? ((Entity)s.subscriber).getId()
: s.subscriber);
+        
+        if (includeDescriptionForSensorTask(event)) {
+            name.append(" ");
+            name.append(event.getValue());
+            description.append(", value: ");
+            description.append(event.getValue());
+        }
+        Map<String, Object> execFlags = MutableMap.of("tags", tags, 
+            "displayName", name.toString(),
+            "description", description.toString());
+        
+        em.submit(execFlags, new Runnable() {
+            @Override
+            public String toString() {
+                if (isInitial) {
+                    return "LSM.publishInitial("+event+")";
+                } else {
+                    return "LSM.publish("+event+")";
+                }
+            }
+            @Override
+            public void run() {
+                try {
+                    int count = s.eventCount.incrementAndGet();
+                    if (count > 0 && count % 1000 == 0) LOG.debug("{} events for
subscriber {}", count, s);
+                    
+                    s.listener.onEvent(event);
+                } catch (Throwable t) {
+                    if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource()))
{
+                        LOG.debug("Error processing subscriptions to "+this+", after entity
unmanaged: "+t, t);
+                    } else {
+                        LOG.warn("Error processing subscriptions to "+this+": "+t, t);
+                    }
+                }
+            }});
+    }
+    
+    protected boolean includeDescriptionForSensorTask(SensorEvent<?> event) {
+        // just do it for simple/quick things to avoid expensive toStrings
+        // (info is rarely useful, but occasionally it will be)
+        if (event.getValue()==null) return true;
+        Class<?> clazz = event.getValue().getClass();
+        if (clazz.isEnum() || clazz.isPrimitive() || Number.class.isAssignableFrom(clazz)
|| 
+            clazz.equals(String.class)) return true;
+        return false;
+    }
+
     @Override
     public String toString() {
         return tostring;


Mime
View raw message