camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/8] camel git commit: CAMEL-11354: Optimize oldest exchange inflight per route
Date Sun, 28 May 2017 19:54:13 GMT
CAMEL-11354: Optimize oldest exchange inflight per route


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ce4e8f7d
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ce4e8f7d
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ce4e8f7d

Branch: refs/heads/master
Commit: ce4e8f7df2dbc2b8cf9b0a55c8bba01010ad3e86
Parents: 4a3debe
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun May 28 20:03:59 2017 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun May 28 20:03:59 2017 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/Exchange.java    |   6 +
 .../org/apache/camel/impl/DefaultExchange.java  |  10 ++
 .../camel/impl/DefaultInflightRepository.java   |   2 +-
 .../camel/management/mbean/ManagedRoute.java    | 111 ++++---------------
 .../org/apache/camel/util/MessageHelper.java    |   2 +-
 .../EventNotifierExchangeCompletedTest.java     |   2 +-
 6 files changed, 43 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index eecc7e0..87fcf13 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -16,6 +16,7 @@
  */
 package org.apache.camel;
 
+import java.util.Date;
 import java.util.List;
 import java.util.Map;
 
@@ -584,4 +585,9 @@ public interface Exchange {
      */
     List<Synchronization> handoverCompletions();
 
+    /**
+     * Gets the timestamp when this exchange was created.
+     */
+    Date getCreated();
+
 }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index c1a5c39..6b7a686 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -17,6 +17,7 @@
 package org.apache.camel.impl;
 
 import java.util.ArrayList;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
@@ -86,6 +87,15 @@ public final class DefaultExchange implements Exchange {
         return String.format("Exchange[%s]", exchangeId == null ? "" : exchangeId);
     }
 
+    @Override
+    public Date getCreated() {
+        if (hasProperties()) {
+            return getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
+        } else {
+            return null;
+        }
+    }
+
     public Exchange copy() {
         // to be backwards compatible as today
         return copy(false);

http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
index 01a2826..2c6c78b 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultInflightRepository.java
@@ -167,7 +167,7 @@ public class DefaultInflightRepository extends ServiceSupport implements
Infligh
 
     private static long getExchangeDuration(Exchange exchange) {
         long duration = 0;
-        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
+        Date created = exchange.getCreated();
         if (created != null) {
             duration = System.currentTimeMillis() - created.getTime();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
index 166f3a2..1b031b1 100644
--- a/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
+++ b/camel-core/src/main/java/org/apache/camel/management/mbean/ManagedRoute.java
@@ -21,12 +21,12 @@ import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
+import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import javax.management.AttributeValueExp;
@@ -68,8 +68,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
     protected final String description;
     protected final ModelCamelContext context;
     private final LoadTriplet load = new LoadTriplet();
-    private final ConcurrentSkipListMap<InFlightKey, Long> exchangesInFlightStartTimestamps
= new ConcurrentSkipListMap<InFlightKey, Long>();
-    private final ConcurrentHashMap<String, InFlightKey> exchangesInFlightKeys = new
ConcurrentHashMap<String, InFlightKey>();
+    private final ConcurrentHashMap<String, Exchange> exchangesInFlight = new ConcurrentHashMap<String,
Exchange>();
     private final String jmxDomain;
 
     public ManagedRoute(ModelCamelContext context, Route route) {
@@ -85,8 +84,7 @@ public class ManagedRoute extends ManagedPerformanceCounter implements TimerList
         boolean enabled = context.getManagementStrategy().getManagementAgent().getStatisticsLevel()
!= ManagementStatisticsLevel.Off;
         setStatisticsEnabled(enabled);
 
-        exchangesInFlightKeys.clear();
-        exchangesInFlightStartTimestamps.clear();
+        exchangesInFlight.clear();
     }
 
     public Route getRoute() {
@@ -410,13 +408,14 @@ public class ManagedRoute extends ManagedPerformanceCounter implements
TimerList
         String stat = dumpStatsAsXml(fullStats);
         answer.append(" exchangesInflight=\"").append(getInflightExchanges()).append("\"");
         answer.append(" selfProcessingTime=\"").append(routeSelfTime).append("\"");
-        InFlightKey oldestInflightEntry = getOldestInflightEntry();
-        if (oldestInflightEntry == null) {
+        Exchange oldest = getOldestInflightEntry();
+        if (oldest == null) {
             answer.append(" oldestInflightExchangeId=\"\"");
             answer.append(" oldestInflightDuration=\"\"");
         } else {
-            answer.append(" oldestInflightExchangeId=\"").append(oldestInflightEntry.exchangeId).append("\"");
-            answer.append(" oldestInflightDuration=\"").append(System.currentTimeMillis()
- oldestInflightEntry.timeStamp).append("\"");
+            long duration = System.currentTimeMillis() - oldest.getCreated().getTime();
+            answer.append(" oldestInflightExchangeId=\"").append(oldest.getExchangeId()).append("\"");
+            answer.append(" oldestInflightDuration=\"").append(duration).append("\"");
         }
         answer.append(" ").append(stat.substring(7, stat.length() - 2)).append(">\n");
 
@@ -466,111 +465,49 @@ public class ManagedRoute extends ManagedPerformanceCounter implements
TimerList
         return route.hashCode();
     }
 
-    private InFlightKey getOldestInflightEntry() {
-        Map.Entry<InFlightKey, Long> entry = exchangesInFlightStartTimestamps.firstEntry();
-        if (entry != null) {
-            return entry.getKey();
-        }
-        return null;
+    private Exchange getOldestInflightEntry() {
+        return exchangesInFlight.values().stream().max(Comparator.comparing(Exchange::getCreated)).orElse(null);
     }
 
     public Long getOldestInflightDuration() {
-        InFlightKey oldest = getOldestInflightEntry();
-        if (oldest == null) {
+        Exchange exchange = getOldestInflightEntry();
+        if (exchange == null) {
+            return null;
+        }
+        Date created = exchange.getCreated();
+        if (created != null) {
+            return System.currentTimeMillis() - created.getTime();
+        } else {
             return null;
         }
-        return System.currentTimeMillis() - oldest.timeStamp;
     }
 
     public String getOldestInflightExchangeId() {
-        InFlightKey oldest = getOldestInflightEntry();
-        if (oldest == null) {
+        Exchange exchange = getOldestInflightEntry();
+        if (exchange == null) {
             return null;
         }
-        return oldest.exchangeId;
+        return exchange.getExchangeId();
     }
 
     @Override
     public void processExchange(Exchange exchange) {
-        exchangesInFlightKeys.computeIfAbsent(exchange.getExchangeId(), id -> {
-            InFlightKey key = new InFlightKey(System.currentTimeMillis(), exchange.getExchangeId());
-            exchangesInFlightStartTimestamps.put(key, key.timeStamp);
-            return key;
-        });
+        exchangesInFlight.put(exchange.getExchangeId(), exchange);
         super.processExchange(exchange);
     }
 
     @Override
     public void completedExchange(Exchange exchange, long time) {
-        exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) ->
{
-            exchangesInFlightStartTimestamps.remove(key);
-            return null;
-        });
+        exchangesInFlight.remove(exchange.getExchangeId());
         super.completedExchange(exchange, time);
     }
 
     @Override
     public void failedExchange(Exchange exchange) {
-        exchangesInFlightKeys.computeIfPresent(exchange.getExchangeId(), (id, key) ->
{
-            exchangesInFlightStartTimestamps.remove(key);
-            return null;
-        });
+        exchangesInFlight.remove(exchange.getExchangeId());
         super.failedExchange(exchange);
     }
 
-    private static class InFlightKey implements Comparable<InFlightKey> {
-
-        private final Long timeStamp;
-        private final String exchangeId;
-
-        InFlightKey(Long timeStamp, String exchangeId) {
-            this.timeStamp = timeStamp;
-            this.exchangeId = exchangeId;
-        }
-
-        @Override
-        public int compareTo(InFlightKey o) {
-            int compare = Long.compare(timeStamp, o.timeStamp);
-            if (compare == 0) {
-                return exchangeId.compareTo(o.exchangeId);
-            }
-            return compare;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-
-            InFlightKey that = (InFlightKey) o;
-
-            if (!exchangeId.equals(that.exchangeId)) {
-                return false;
-            }
-            if (!timeStamp.equals(that.timeStamp)) {
-                return false;
-            }
-
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = timeStamp.hashCode();
-            result = 31 * result + exchangeId.hashCode();
-            return result;
-        }
-
-        @Override
-        public String toString() {
-            return exchangeId;
-        }
-    }
-
     /**
      * Used for sorting the processor mbeans accordingly to their index.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
index 311e987..050fecc 100644
--- a/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/MessageHelper.java
@@ -560,7 +560,7 @@ public final class MessageHelper {
             label = URISupport.sanitizeUri(exchange.getFromEndpoint().getEndpointUri());
         }
         long elapsed = 0;
-        Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
+        Date created = exchange.getCreated();
         if (created != null) {
             elapsed = new StopWatch(created).taken();
         }

http://git-wip-us.apache.org/repos/asf/camel/blob/ce4e8f7d/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java
b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java
index 45e10a0..f37e7e0 100644
--- a/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java
+++ b/camel-core/src/test/java/org/apache/camel/management/EventNotifierExchangeCompletedTest.java
@@ -85,7 +85,7 @@ public class EventNotifierExchangeCompletedTest extends ContextTestSupport
{
         assertEquals("direct://start", event.getExchange().getFromEndpoint().getEndpointUri());
 
         // grab the created timestamp
-        Date created = event.getExchange().getProperty(Exchange.CREATED_TIMESTAMP, Date.class);
+        Date created = event.getExchange().getCreated();
         assertNotNull(created);
 
         // calculate elapsed time


Mime
View raw message