camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [2/4] camel git commit: CAMEL-8252: Add inflight exchanges to jmx performance counters as that is similar to the other stats, instead of having to filter all inflights to find per processor.
Date Sat, 17 Jan 2015 07:27:38 GMT
CAMEL-8252: Add inflight exchanges to jmx performance counters as that is similar to the other
stats, instead of having to filter all inflights to find per processor.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e0581024
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e0581024
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e0581024

Branch: refs/heads/master
Commit: e0581024441efc99cfaf4e1aa59e6c94f38f078c
Parents: bc2a1d0
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Fri Jan 16 17:44:44 2015 +0100
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Fri Jan 16 17:44:44 2015 +0100

----------------------------------------------------------------------
 .../api/management/PerformanceCounter.java      |  7 +++++
 .../mbean/ManagedCamelContextMBean.java         |  3 ++
 .../mbean/ManagedPerformanceCounterMBean.java   |  3 ++
 .../api/management/mbean/ManagedRouteMBean.java |  4 +++
 .../management/CompositePerformanceCounter.java | 10 ++++++
 .../management/DelegatePerformanceCounter.java  | 10 +++++-
 .../management/InstrumentationProcessor.java    |  9 ++++++
 .../management/mbean/ManagedCamelContext.java   | 32 ++------------------
 .../mbean/ManagedPerformanceCounter.java        | 11 +++++++
 .../camel/management/mbean/ManagedRoute.java    |  2 +-
 .../camel/management/mbean/Statistic.java       |  4 +++
 .../camel/processor/CamelInternalProcessor.java | 10 +++++-
 12 files changed, 73 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
index d1779d8..91d25e1 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/PerformanceCounter.java
@@ -26,6 +26,13 @@ import org.apache.camel.Exchange;
 public interface PerformanceCounter {
 
     /**
+     * Executed when an {@link org.apache.camel.Exchange} is about to be processed.
+     *
+     * @param exchange the exchange
+     */
+    void processExchange(Exchange exchange);
+
+    /**
      * Executed when an {@link org.apache.camel.Exchange} is complete.
      *
      * @param exchange the exchange

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
index aa4efeb..c803cdf 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedCamelContextMBean.java
@@ -81,6 +81,9 @@ public interface ManagedCamelContextMBean extends ManagedPerformanceCounterMBean
     @ManagedAttribute(description = "Tracing")
     void setTracing(Boolean tracing);
 
+    /**
+     * @deprecated use {@link #getExchangesInflight()}
+     */
     @ManagedAttribute(description = "Current number of inflight Exchanges")
     Integer getInflightExchanges();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
index 5b83614..613f3b5 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedPerformanceCounterMBean.java
@@ -29,6 +29,9 @@ public interface ManagedPerformanceCounterMBean extends ManagedCounterMBean
{
     @ManagedAttribute(description = "Number of failed exchanges")
     long getExchangesFailed() throws Exception;
 
+    @ManagedAttribute(description = "Number of inflight exchanges")
+    long getExchangesInflight() throws Exception;
+
     @ManagedAttribute(description = "Number of failures handled")
     long getFailuresHandled() throws Exception;
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
index f577320..148c688 100644
--- a/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
+++ b/camel-core/src/main/java/org/apache/camel/api/management/mbean/ManagedRouteMBean.java
@@ -33,7 +33,11 @@ public interface ManagedRouteMBean extends ManagedPerformanceCounterMBean
{
     @ManagedAttribute(description = "Route State")
     String getState();
 
+    /**
+     * @deprecated use {@link #getExchangesInflight()}
+     */
     @ManagedAttribute(description = "Current number of inflight Exchanges")
+    @Deprecated
     Integer getInflightExchanges();
 
     @ManagedAttribute(description = "Camel ID")

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
index 49067a2..2e93b1c 100644
--- a/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
+++ b/camel-core/src/main/java/org/apache/camel/management/CompositePerformanceCounter.java
@@ -37,6 +37,16 @@ public class CompositePerformanceCounter implements PerformanceCounter
{
     }
 
     @Override
+    public void processExchange(Exchange exchange) {
+        if (counter1.isStatisticsEnabled()) {
+            counter1.processExchange(exchange);
+        }
+        if (counter2.isStatisticsEnabled()) {
+            counter2.processExchange(exchange);
+        }
+    }
+
+    @Override
     public void completedExchange(Exchange exchange, long time) {
         if (counter1.isStatisticsEnabled()) {
             counter1.completedExchange(exchange, time);

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
index ef84417..0a0dbff 100644
--- a/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
+++ b/camel-core/src/main/java/org/apache/camel/management/DelegatePerformanceCounter.java
@@ -47,8 +47,16 @@ public class DelegatePerformanceCounter implements PerformanceCounter {
         this.counter.setStatisticsEnabled(statisticsEnabled);
     }
 
+    public void processExchange(Exchange exchange) {
+        if (counter != null) {
+            counter.processExchange(exchange);
+        }
+    }
+
     public void completedExchange(Exchange exchange, long time) {
-        counter.completedExchange(exchange, time);
+        if (counter != null) {
+            counter.completedExchange(exchange, time);
+        }
     }
 
     public void failedExchange(Exchange exchange) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
index ab6859f..4c4d7dc 100644
--- a/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/management/InstrumentationProcessor.java
@@ -69,6 +69,11 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor {
         // only record time if stats is enabled
         final StopWatch watch = (counter != null && counter.isStatisticsEnabled())
? new StopWatch() : null;
 
+        // mark beginning to process the exchange
+        if (watch != null) {
+            beginTime(exchange);
+        }
+
         return processor.process(exchange, new AsyncCallback() {
             public void done(boolean doneSync) {
                 try {
@@ -89,6 +94,10 @@ public class InstrumentationProcessor extends DelegateAsyncProcessor {
         });
     }
 
+    protected void beginTime(Exchange exchange) {
+        counter.processExchange(exchange);
+    }
+
     protected void recordTime(Exchange exchange, long duration) {
         if (LOG.isTraceEnabled()) {
             LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type
!= null ? type + ": " : "", duration, exchange});

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
index be1a69f..c74d391 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedCamelContext.java
@@ -57,7 +57,6 @@ import org.apache.camel.model.RouteDefinition;
 import org.apache.camel.model.RoutesDefinition;
 import org.apache.camel.model.rest.RestDefinition;
 import org.apache.camel.model.rest.RestsDefinition;
-import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.JsonSchemaHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -140,7 +139,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements
Ti
     }
 
     public Integer getInflightExchanges() {
-        return context.getInflightRepository().size();
+        return (int) super.getExchangesInflight();
     }
 
     public Integer getTotalRoutes() {
@@ -398,11 +397,6 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements
Ti
             }
             Collections.sort(processors, new OrderProcessorMBeans());
 
-            Collection<InflightRepository.InflightExchange> inflights = null;
-            if (fullStats) {
-                inflights = context.getInflightRepository().browse();
-            }
-
             // loop the routes, and append the processor stats if needed
             sb.append("  <routeStats>\n");
             for (ObjectName on : routes) {
@@ -410,7 +404,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements
Ti
                 sb.append("    <routeStat").append(String.format(" id=\"%s\" state=\"%s\"",
route.getRouteId(), route.getState()));
                 // use substring as we only want the attributes
                 stat = route.dumpStatsAsXml(fullStats);
-                sb.append(" exchangesInflight=\"").append(context.getInflightRepository().size(route.getRouteId())).append("\"");
+                sb.append(" exchangesInflight=\"").append(route.getExchangesInflight()).append("\"");
                 sb.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");
 
                 // add processor details if needed
@@ -422,10 +416,7 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements
Ti
                             sb.append("        <processorStat").append(String.format("
id=\"%s\" index=\"%s\" state=\"%s\"", processor.getProcessorId(), processor.getIndex(), processor.getState()));
                             // use substring as we only want the attributes
                             stat = processor.dumpStatsAsXml(fullStats);
-                            if (fullStats) {
-                                // only include this in full stats as it may be more expensive
to compute
-                                sb.append(" exchangesInflight=\"").append(inflightSizeAtProcessor(inflights,
processor.getProcessorId())).append("\"");
-                            }
+                            sb.append(" exchangesInflight=\"").append(processor.getExchangesInflight()).append("\"");
                             sb.append(" ").append(stat.substring(7)).append("\n");
                         }
                     }
@@ -440,23 +431,6 @@ public class ManagedCamelContext extends ManagedPerformanceCounter implements
Ti
         return sb.toString();
     }
 
-    /**
-     * Number of inflight exchanges at the given processor
-     *
-     * @param inflights   all inflight exchanges
-     * @param processorId the processor id
-     * @return the number
-     */
-    private String inflightSizeAtProcessor(Collection<InflightRepository.InflightExchange>
inflights, String processorId) {
-        int count = 0;
-        for (InflightRepository.InflightExchange inflight : inflights) {
-            if (processorId.equals(inflight.getNodeId())) {
-                count++;
-            }
-        }
-        return String.valueOf(count);
-    }
-
     public boolean createEndpoint(String uri) throws Exception {
         if (context.hasEndpoint(uri) != null) {
             // endpoint already exists

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
index 6aedc57..07c7f95 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedPerformanceCounter.java
@@ -33,6 +33,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement
 
     private Statistic exchangesCompleted;
     private Statistic exchangesFailed;
+    private Statistic exchangesInflight;
     private Statistic failuresHandled;
     private Statistic redeliveries;
     private Statistic externalRedeliveries;
@@ -56,6 +57,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement
         super.init(strategy);
         this.exchangesCompleted = new Statistic("org.apache.camel.exchangesCompleted", this,
Statistic.UpdateMode.COUNTER);
         this.exchangesFailed = new Statistic("org.apache.camel.exchangesFailed", this, Statistic.UpdateMode.COUNTER);
+        this.exchangesInflight = new Statistic("org.apache.camel.exchangesInflight", this,
Statistic.UpdateMode.COUNTER);
 
         this.failuresHandled = new Statistic("org.apache.camel.failuresHandled", this, Statistic.UpdateMode.COUNTER);
         this.redeliveries = new Statistic("org.apache.camel.redeliveries", this, Statistic.UpdateMode.COUNTER);
@@ -79,6 +81,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter implement
         super.reset();
         exchangesCompleted.reset();
         exchangesFailed.reset();
+        exchangesInflight.reset();
         failuresHandled.reset();
         redeliveries.reset();
         externalRedeliveries.reset();
@@ -106,6 +109,8 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter
implement
         return exchangesFailed.getValue();
     }
 
+    public long getExchangesInflight() { return exchangesInflight.getValue(); }
+
     public long getFailuresHandled() throws Exception {
         return failuresHandled.getValue();
     }
@@ -186,9 +191,14 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter
implement
         this.statisticsEnabled = statisticsEnabled;
     }
 
+    public synchronized void processExchange(Exchange exchange) {
+        exchangesInflight.increment();
+    }
+
     public synchronized void completedExchange(Exchange exchange, long time) {
         increment();
         exchangesCompleted.increment();
+        exchangesInflight.decrement();
 
         if (ExchangeHelper.isFailureHandled(exchange)) {
             failuresHandled.increment();
@@ -224,6 +234,7 @@ public abstract class ManagedPerformanceCounter extends ManagedCounter
implement
     public synchronized void failedExchange(Exchange exchange) {
         increment();
         exchangesFailed.increment();
+        exchangesInflight.decrement();
 
         if (ExchangeHelper.isRedelivered(exchange)) {
             redeliveries.increment();

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index a0bdfb1..3b57332 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -101,7 +101,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements
TimerList
     }
 
     public Integer getInflightExchanges() {
-        return context.getInflightRepository().size(route.getId());
+        return (int) super.getExchangesInflight();
     }
 
     public String getCamelId() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
index 1b4675f..2f00bf3 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/Statistic.java
@@ -101,6 +101,10 @@ public class Statistic {
         updateValue(1);
     }
 
+    public synchronized void decrement() {
+        updateValue(-1);
+    }
+
     public synchronized long getValue() {
         if (updateMode == UpdateMode.DELTA) {
             if (updateCount == 0) {

http://git-wip-us.apache.org/repos/asf/camel/blob/e0581024/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 099af1b..97f557b 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -344,6 +344,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             }
         }
 
+        protected void beginTime(Exchange exchange) {
+            counter.processExchange(exchange);
+        }
+
         protected void recordTime(Exchange exchange, long duration) {
             if (LOG.isTraceEnabled()) {
                 LOG.trace("{}Recording duration: {} millis for exchange: {}", new Object[]{type
!= null ? type + ": " : "", duration, exchange});
@@ -367,7 +371,11 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         @Override
         public StopWatch before(Exchange exchange) throws Exception {
             // only record time if stats is enabled
-            return (counter != null && counter.isStatisticsEnabled()) ? new StopWatch()
: null;
+            StopWatch answer = counter != null && counter.isStatisticsEnabled() ?
new StopWatch() : null;
+            if (answer != null) {
+                beginTime(exchange);
+            }
+            return answer;
         }
 
         @Override


Mime
View raw message