camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [6/8] camel git commit: CAMEL-11354: Optimize oldest exchange inflight per route
Date Sun, 28 May 2017 19:54:15 GMT
CAMEL-11354: Optimize oldest exchange inflight per route


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/e6bc3975
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/e6bc3975
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/e6bc3975

Branch: refs/heads/master
Commit: e6bc3975fd31e83a01c3ad94200586f5173e6b9e
Parents: ffd97f6
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun May 28 20:43:13 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun May 28 20:43:13 2017 +0200

----------------------------------------------------------------------
 .../camel/impl/DefaultInflightRepository.java   | 30 ++++++++++++++++++++
 .../camel/management/mbean/ManagedRoute.java    | 19 ++-----------
 .../apache/camel/spi/InflightRepository.java    |  8 ++++++
 3 files changed, 41 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index f176861..cae7ecb 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
 
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
@@ -152,6 +154,34 @@ public class DefaultInflightRepository extends ServiceSupport implements
Infligh
     }
 
     @Override
+    public InflightExchange oldest(String fromRouteId) {
+        Stream<Exchange> values;
+
+        if (fromRouteId == null) {
+            // all values
+            values = inflight.values().stream();
+        } else {
+            // only if route match
+            values = inflight.values().stream()
+                .filter(e -> fromRouteId.equals(e.getFromRouteId()));
+        }
+
+        // sort by duration and grab the first
+        Exchange first = values.sorted((e1, e2) -> {
+            long d1 = getExchangeDuration(e1);
+            long d2 = getExchangeDuration(e2);
+            // need the biggest number first
+            return -1 * Long.compare(d1, d2);
+        }).findFirst().orElse(null);
+
+        if (first != null) {
+            return new InflightExchangeEntry(first);
+        } else {
+            return null;
+        }
+    }
+
+    @Override
     protected void doStart() throws Exception {
     }
 

http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index 81f4794..93248b5 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -19,15 +19,12 @@ package org.apache.camel.management.mbean;
 import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.AttributeValueExp;
@@ -37,11 +34,7 @@ import javax.management.Query;
 import javax.management.QueryExp;
 import javax.management.StringValueExp;
 
-import org.apache.camel.spi.InflightRepository;
-import org.w3c.dom.Document;
-
 import org.apache.camel.CamelContext;
-import org.apache.camel.Exchange;
 import org.apache.camel.ManagementStatisticsLevel;
 import org.apache.camel.Route;
 import org.apache.camel.ServiceStatus;
@@ -52,12 +45,14 @@ import org.apache.camel.api.management.mbean.ManagedRouteMBean;
 import org.apache.camel.model.ModelCamelContext;
 import org.apache.camel.model.ModelHelper;
 import org.apache.camel.model.RouteDefinition;
+import org.apache.camel.spi.InflightRepository;
 import org.apache.camel.spi.ManagementStrategy;
 import org.apache.camel.spi.RoutePolicy;
 import org.apache.camel.util.ObjectHelper;
 import org.apache.camel.util.XmlLineNumberParser;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
+import org.w3c.dom.Document;
 
 @ManagedResource(description = "Managed Route")
 public class ManagedRoute extends ManagedPerformanceCounter implements TimerListener, ManagedRouteMBean
{
@@ -70,7 +65,6 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
     protected final String description;
     protected final ModelCamelContext context;
     private final LoadTriplet load = new LoadTriplet();
-    private final ConcurrentHashMap<String, Exchange> exchangesInFlight = new ConcurrentHashMap<String,
Exchange>();
     private final String jmxDomain;
 
     public ManagedRoute(ModelCamelContext context, Route route) {
@@ -85,8 +79,6 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         super.init(strategy);
         boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel()
!= ManagementStatisticsLevel.Off;
         setStatisticsEnabled(enabled);
-
-        exchangesInFlight.clear();
     }
 
     public Route getRoute() {
@@ -467,12 +459,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements
TimerList
     }
 
     private InflightRepository.InflightExchange getOldestInflightEntry() {
-        Collection<InflightRepository.InflightExchange> list = getContext().getInflightRepository().browse(getRouteId(),
1, true);
-        if (list.size() == 1) {
-            return list.iterator().next();
-        } else {
-            return null;
-        }
+        return getContext().getInflightRepository().oldest(getRouteId());
     }
 
     public Long getOldestInflightDuration() {

http://git-wip-us.apache.org/repos/asf/camel/blob/e6bc3975/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
index 3fbd710..53c02e2 100644
--- a/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/spi/InflightRepository.java
@@ -175,4 +175,12 @@ public interface InflightRepository extends StaticService {
      */
     Collection<InflightExchange> browse(String fromRouteId, int limit, boolean sortByLongestDuration);
 
+    /**
+     * Gets the oldest {@link InflightExchange} that are currently inflight that started
from the given route.
+     *
+     * @param fromRouteId  the route id, or <tt>null</tt> for all routes.
+     * @return the oldest, or <tt>null</tt> if none inflight
+     */
+    InflightExchange oldest(String fromRouteId);
+
 }


Mime
View raw message