camel-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From davscl...@apache.org
Subject [4/6] camel git commit: Issue CAMEL-8640. Encapsulated BacklogTracer queue. Implementation of queue changed to list based instead of array based. Ensuring free space in queue responsibility moved into BacklogTracer instead from BacklogTracer's user.
Date Sun, 03 May 2015 09:38:22 GMT
Issue CAMEL-8640. Encapsulated BacklogTracer queue. Implementation of queue changed to list
based instead of array based. Ensuring free space in queue responsibility moved into BacklogTracer
instead from BacklogTracer's user.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/2c19b34f
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/2c19b34f
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/2c19b34f

Branch: refs/heads/camel-2.15.x
Commit: 2c19b34f7a439d8f6119f3c500a784d63307d3c0
Parents: 2f2746e
Author: Claus Ibsen <davsclaus@apache.org>
Authored: Sun May 3 11:32:49 2015 +0200
Committer: Claus Ibsen <davsclaus@apache.org>
Committed: Sun May 3 11:41:48 2015 +0200

----------------------------------------------------------------------
 .../apache/camel/impl/DefaultCamelContext.java  |  2 +-
 .../camel/processor/CamelInternalProcessor.java | 21 +++----------
 .../processor/interceptor/BacklogTracer.java    | 32 ++++++++++++--------
 .../processor/interceptor/DefaultChannel.java   |  2 +-
 4 files changed, 26 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 99127f2..0b60f6a 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -3461,7 +3461,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
 
     public InterceptStrategy getDefaultBacklogTracer() {
         if (defaultBacklogTracer == null) {
-            defaultBacklogTracer = new BacklogTracer(this);
+            defaultBacklogTracer = BacklogTracer.createTracer(this);
         }
         return defaultBacklogTracer;
     }

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 97f557b..f8c71f4 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -19,7 +19,6 @@ package org.apache.camel.processor;
 import java.util.ArrayList;
 import java.util.Date;
 import java.util.List;
-import java.util.Queue;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
@@ -521,15 +520,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
      */
     public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice
{
 
-        private final Queue<DefaultBacklogTracerEventMessage> queue;
         private final BacklogTracer backlogTracer;
         private final ProcessorDefinition<?> processorDefinition;
         private final ProcessorDefinition<?> routeDefinition;
         private final boolean first;
 
-        public BacklogTracerAdvice(Queue<DefaultBacklogTracerEventMessage> queue, BacklogTracer
backlogTracer,
-                                   ProcessorDefinition<?> processorDefinition, ProcessorDefinition<?>
routeDefinition, boolean first) {
-            this.queue = queue;
+        public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition<?>
processorDefinition,
+                                   ProcessorDefinition<?> routeDefinition, boolean
first) {
             this.backlogTracer = backlogTracer;
             this.processorDefinition = processorDefinition;
             this.routeDefinition = routeDefinition;
@@ -539,16 +536,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
         @Override
         public Object before(Exchange exchange) throws Exception {
             if (backlogTracer.shouldTrace(processorDefinition, exchange)) {
-                // ensure there is space on the queue
-                int drain = queue.size() - backlogTracer.getBacklogSize();
-                // and we need room for ourselves and possible also a first pseudo message
as well
-                drain += first ? 2 : 1;
-                if (drain > 0) {
-                    for (int i = 0; i < drain; i++) {
-                        queue.poll();
-                    }
-                }
-
                 Date timestamp = new Date();
                 String toNode = processorDefinition.getId();
                 String exchangeId = exchange.getExchangeId();
@@ -560,10 +547,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
                 if (first) {
                     Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp,
Date.class);
                     DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(),
created, routeId, null, exchangeId, messageAsXml);
-                    queue.add(pseudo);
+                    backlogTracer.traceEvent(pseudo);
                 }
                 DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(),
timestamp, routeId, toNode, exchangeId, messageAsXml);
-                queue.add(event);
+                backlogTracer.traceEvent(event);
             }
 
             return null;

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
index 6dde932..e2c51bd 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java
@@ -19,7 +19,7 @@ package org.apache.camel.processor.interceptor;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Queue;
-import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.camel.CamelContext;
@@ -52,7 +52,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
     private boolean enabled;
     private final AtomicLong traceCounter = new AtomicLong(0);
     // use a queue with a upper limit to avoid storing too many messages
-    private final Queue<DefaultBacklogTracerEventMessage> queue = new ArrayBlockingQueue<DefaultBacklogTracerEventMessage>(MAX_BACKLOG_SIZE);
+    private final Queue<BacklogTracerEventMessage> queue = new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE);
     // how many of the last messages to keep in the backlog at total
     private int backlogSize = 1000;
     private boolean removeOnDump = true;
@@ -65,14 +65,10 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
     private String traceFilter;
     private Predicate predicate;
 
-    public BacklogTracer(CamelContext camelContext) {
+    private BacklogTracer(CamelContext camelContext) {
         this.camelContext = camelContext;
     }
 
-    public Queue<DefaultBacklogTracerEventMessage> getQueue() {
-        return queue;
-    }
-
     @Override
     @Deprecated
     public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition<?>
definition, Processor target, Processor nextTarget) throws Exception {
@@ -86,8 +82,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
      * @return a new backlog tracer
      */
     public static BacklogTracer createTracer(CamelContext context) {
-        BacklogTracer tracer = new BacklogTracer(context);
-        return tracer;
+        return new BacklogTracer(context);
     }
 
     /**
@@ -153,6 +148,19 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
         return false;
     }
 
+    public void traceEvent(DefaultBacklogTracerEventMessage event) {
+        if (!enabled) {
+            return;
+        }
+
+        // ensure there is space on the queue and we need room for ourselves and possible
also a first pseudo message as well
+        if (queue.size() >= backlogSize) {
+            queue.poll();
+        }
+
+        queue.add(event);
+    }
+
     private boolean shouldTraceFilter(Exchange exchange) {
         return predicate.matches(exchange);
     }
@@ -251,9 +259,9 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
     }
 
     public List<BacklogTracerEventMessage> dumpTracedMessages(String nodeId) {
-        List<BacklogTracerEventMessage> answer = new ArrayList<BacklogTracerEventMessage>();
+        List<BacklogTracerEventMessage> answer = new ArrayList<>();
         if (nodeId != null) {
-            for (DefaultBacklogTracerEventMessage message : queue) {
+            for (BacklogTracerEventMessage message : queue) {
                 if (nodeId.equals(message.getToNode()) || nodeId.equals(message.getRouteId()))
{
                     answer.add(message);
                 }
@@ -280,7 +288,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy
{
     }
 
     public List<BacklogTracerEventMessage> dumpAllTracedMessages() {
-        List<BacklogTracerEventMessage> answer = new ArrayList<BacklogTracerEventMessage>();
+        List<BacklogTracerEventMessage> answer = new ArrayList<>();
         answer.addAll(queue);
         if (isRemoveOnDump()) {
             queue.clear();

http://git-wip-us.apache.org/repos/asf/camel/blob/2c19b34f/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
index c9ae2f3..dc719e3 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java
@@ -229,7 +229,7 @@ public class DefaultChannel extends CamelInternalProcessor implements
ModelChann
                 first = route.getOutputs().get(0) == definition;
             }
 
-            addAdvice(new BacklogTracerAdvice(backlogTracer.getQueue(), backlogTracer, targetOutputDef,
route, first));
+            addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, route, first));
 
             // add debugger as well so we have both tracing and debugging out of the box
             InterceptStrategy debugger = getOrCreateBacklogDebugger();


Mime
View raw message