camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject svn commit: r830073 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/impl/ main/java/org/apache/camel/management/mbean/ main/java/org/apache/camel/spi/ test/java/org/apache/camel/impl/ test/java/org/apache/camel/management/
Date Tue, 27 Oct 2009 07:05:15 GMT
Author: davsclaus
Date: Tue Oct 27 07:05:04 2009
New Revision: 830073

URL: http://svn.apache.org/viewvc?rev=830073&view=rev
Log:
CAMEL-2110: InflightRepository is now more fine grained by tracking per from endpoint as well
= where the exchange was consumed from.

Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterRouteTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePerformanceCounterTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java Tue
Oct 27 07:05:04 2009
@@ -1017,6 +1017,7 @@
 
         forceLazyInitialization();
         startServices(components.values());
+        addService(inflightRepository);
 
         // To avoid initiating the routeDefinitions after stopping the camel context
         if (!routeDefinitionInitiated) {
@@ -1049,6 +1050,8 @@
         stopServices(servicesToClose);
         servicesToClose.clear();
 
+        stopServices(inflightRepository);
+
         try {
             for (LifecycleStrategy strategy : lifecycleStrategies) {
                 strategy.onContextStop(this);

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
Tue Oct 27 07:05:04 2009
@@ -16,29 +16,76 @@
  */
 package org.apache.camel.impl;
 
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
 import org.apache.camel.spi.InflightRepository;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Default implement which just uses a counter
  *
  * @version $Revision$
  */
-public class DefaultInflightRepository implements InflightRepository {
+public class DefaultInflightRepository extends ServiceSupport implements InflightRepository
 {
 
-    private final AtomicInteger count = new AtomicInteger();
+    private static final transient Log LOG = LogFactory.getLog(DefaultInflightRepository.class);
+    private final AtomicInteger totalCount = new AtomicInteger();
+    // us endpoint key as key so endpoints with lenient properties is registered using the
same key (eg dynamic http endpoints)
+    private final ConcurrentHashMap<String, AtomicInteger> endpointCount = new ConcurrentHashMap<String,
AtomicInteger>();
 
     public void add(Exchange exchange) {
-        count.incrementAndGet();
+        totalCount.incrementAndGet();
+
+        if (exchange.getFromEndpoint() == null) {
+            return;
+        }
+
+        String key = exchange.getFromEndpoint().getEndpointKey();
+        AtomicInteger existing = endpointCount.putIfAbsent(key, new AtomicInteger(1));
+        if (existing != null) {
+            existing.addAndGet(1);
+        }
     }
 
     public void remove(Exchange exchange) {
-        count.decrementAndGet();
+        totalCount.decrementAndGet();
+
+        if (exchange.getFromEndpoint() == null) {
+            return;
+        }
+
+        String key = exchange.getFromEndpoint().getEndpointKey();
+        AtomicInteger existing = endpointCount.get(key);
+        if (existing != null) {
+            existing.addAndGet(-1);
+        }
     }
 
     public int size() {
-        return count.get();
+        return totalCount.get();
+    }
+
+    public int size(Endpoint endpoint) {
+        AtomicInteger answer = endpointCount.get(endpoint.getEndpointKey());
+        return answer != null ? answer.get() : 0;
+    }
+
+    @Override
+    protected void doStart() throws Exception {
+    }
+
+    @Override
+    protected void doStop() throws Exception {
+        int count = size();
+        if (count > 0) {
+            LOG.warn("Shutting down while there are still " + count + " in flight exchanges.");
+        } else {
+            LOG.info("Shutting down with no inflight exchanges.");
+        }
+        endpointCount.clear();
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedConsumer.java
Tue Oct 27 07:05:04 2009
@@ -43,4 +43,9 @@
         return consumer.getEndpoint().getEndpointUri();
     }
 
+    @ManagedAttribute(description = "Current number of inflight Exchanges")
+    public Integer getInflightExchanges() {
+        return getContext().getInflightRepository().size(consumer.getEndpoint());
+    }
+
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
(original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
Tue Oct 27 07:05:04 2009
@@ -21,7 +21,6 @@
 import org.apache.camel.ManagementStatisticsLevel;
 import org.apache.camel.Route;
 import org.apache.camel.ServiceStatus;
-import org.apache.camel.spi.ManagementStrategy;
 import org.springframework.jmx.export.annotation.ManagedAttribute;
 import org.springframework.jmx.export.annotation.ManagedOperation;
 import org.springframework.jmx.export.annotation.ManagedResource;
@@ -81,6 +80,15 @@
         return status.name();
     }
 
+    @ManagedAttribute(description = "Current number of inflight Exchanges")
+    public Integer getInflightExchanges() {
+        if (route.getEndpoint() != null) {
+            return context.getInflightRepository().size(route.getEndpoint());
+        } else {
+            return null;
+        }
+    }
+
     @ManagedAttribute(description = "Camel id")
     public String getCamelId() {
         return context.getName();

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java Tue
Oct 27 07:05:04 2009
@@ -16,12 +16,14 @@
  */
 package org.apache.camel.spi;
 
+import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
+import org.apache.camel.Service;
 
 /**
  * @version $Revision$
  */
-public interface InflightRepository {
+public interface InflightRepository extends Service {
 
     /**
      * Adds the exchange to the inflight registry
@@ -46,4 +48,14 @@
      */
     int size();
 
+    /**
+     * Current size of inflight exchanges which are from the given endpoint.
+     * <p/>
+     * Will return 0 if there are no inflight exchanges.
+     *
+     * @param endpoint the endpoint where the {@link Exchange} are from.
+     * @return number of exchanges currently in flight.
+     */
+    int size(Endpoint endpoint);
+
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/InflightRepositoryRouteTest.java
Tue Oct 27 07:05:04 2009
@@ -39,10 +39,17 @@
 
         assertEquals(1, context.getInflightRepository().size());
 
+        // must be 1 in flight from this endpoint
+        assertEquals(1, context.getInflightRepository().size(context.getEndpoint("direct:start")));
+
+        // but 0 from this endpoint
+        assertEquals(0, context.getInflightRepository().size(context.getEndpoint("mock:result")));
+
         // wait to be sure its done
         Thread.sleep(2000);
 
         assertEquals(0, context.getInflightRepository().size());
+        assertEquals(0, context.getInflightRepository().size(context.getEndpoint("direct:start")));
     }
 
     @Override

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/impl/MultipleLifecycleStrategyTest.java
Tue Oct 27 07:05:04 2009
@@ -50,19 +50,21 @@
 
         context.stop();
 
-        assertEquals(5, dummy1.getEvents().size());
-        assertEquals(5, dummy2.getEvents().size());
+        assertEquals(6, dummy1.getEvents().size());
+        assertEquals(6, dummy2.getEvents().size());
 
         assertEquals("onContextStart", dummy1.getEvents().get(0));
         assertEquals("onContextStart", dummy2.getEvents().get(0));
-        assertEquals("onComponentAdd", dummy1.getEvents().get(1));
-        assertEquals("onComponentAdd", dummy2.getEvents().get(1));
-        assertEquals("onEndpointAdd", dummy1.getEvents().get(2));
-        assertEquals("onEndpointAdd", dummy2.getEvents().get(2));
-        assertEquals("onComponentRemove", dummy1.getEvents().get(3));
-        assertEquals("onComponentRemove", dummy2.getEvents().get(3));
-        assertEquals("onContextStop", dummy1.getEvents().get(4));
-        assertEquals("onContextStop", dummy2.getEvents().get(4));
+        assertEquals("onServiceAdd", dummy1.getEvents().get(1));
+        assertEquals("onServiceAdd", dummy2.getEvents().get(1));
+        assertEquals("onComponentAdd", dummy1.getEvents().get(2));
+        assertEquals("onComponentAdd", dummy2.getEvents().get(2));
+        assertEquals("onEndpointAdd", dummy1.getEvents().get(3));
+        assertEquals("onEndpointAdd", dummy2.getEvents().get(3));
+        assertEquals("onComponentRemove", dummy1.getEvents().get(4));
+        assertEquals("onComponentRemove", dummy2.getEvents().get(4));
+        assertEquals("onContextStop", dummy1.getEvents().get(5));
+        assertEquals("onContextStop", dummy2.getEvents().get(5));
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterRouteTest.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterRouteTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRegisterRouteTest.java
Tue Oct 27 07:05:04 2009
@@ -44,6 +44,10 @@
         // the route has this starting endpoint uri
         assertEquals("direct://start", uri);
 
+        Integer val = (Integer) mbeanServer.getAttribute(on, "InflightExchanges");
+        // the route has no inflight exchanges
+        assertEquals(0, val.intValue());
+
         // should be started
         String state = (String) mbeanServer.getAttribute(on, "State");
         assertEquals("Should be started", ServiceStatus.Started.name(), state);

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePerformanceCounterTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePerformanceCounterTest.java?rev=830073&r1=830072&r2=830073&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePerformanceCounterTest.java
(original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/management/ManagedRoutePerformanceCounterTest.java
Tue Oct 27 07:05:04 2009
@@ -39,15 +39,22 @@
     }
 
     public void testPerformanceCounterStats() throws Exception {
+        // get the stats for the route
+        MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
+        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route1\"");
+
         getMockEndpoint("mock:result").expectedMessageCount(1);
 
-        template.sendBody("direct:start", "Hello World");
+        template.asyncSendBody("direct:start", "Hello World");
+
+        Thread.sleep(500);
+
+        Integer inFlight = (Integer) mbeanServer.getAttribute(on, "InflightExchanges");
+        assertEquals(1, inFlight.longValue());
 
         assertMockEndpointsSatisfied();
 
-        // get the stats for the route
-        MBeanServer mbeanServer = context.getManagementStrategy().getManagementAgent().getMBeanServer();
-        ObjectName on = ObjectName.getInstance("org.apache.camel:context=localhost/camel-1,type=routes,name=\"route1\"");
+        Thread.sleep(500);
 
         Long completed = (Long) mbeanServer.getAttribute(on, "ExchangesCompleted");
         assertEquals(1, completed.longValue());
@@ -73,6 +80,9 @@
         Date firstFailed = (Date) mbeanServer.getAttribute(on, "FirstExchangeFailureTimestamp");
         assertNull(lastFailed);
         assertNull(firstFailed);
+
+        inFlight = (Integer) mbeanServer.getAttribute(on, "InflightExchanges");
+        assertEquals(0, inFlight.longValue());
     }
 
     @Override



Mime
View raw message