Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 5A60418CB9 for ; Sun, 3 May 2015 09:38:19 +0000 (UTC) Received: (qmail 28455 invoked by uid 500); 3 May 2015 09:38:19 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 28384 invoked by uid 500); 3 May 2015 09:38:19 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 28365 invoked by uid 99); 3 May 2015 09:38:19 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 03 May 2015 09:38:19 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 17ACCE0833; Sun, 3 May 2015 09:38:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Sun, 03 May 2015 09:38:19 -0000 Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: [1/6] camel git commit: Issue CAMEL-8640. Encapsulated BacklogTracer queue. Implementation of queue changed to list based instead of array based. Ensuring free space in queue responsibility moved into BacklogTracer instead from BacklogTracer's user. Repository: camel Updated Branches: refs/heads/camel-2.15.x 2f2746e35 -> 6f8fdc4bb refs/heads/master a30617333 -> e669002a9 Issue CAMEL-8640. Encapsulated BacklogTracer queue. Implementation of queue changed to list based instead of array based. Ensuring free space in queue responsibility moved into BacklogTracer instead from BacklogTracer's user. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d0bf15f5 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d0bf15f5 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d0bf15f5 Branch: refs/heads/master Commit: d0bf15f5e3daf02ca2113a9936335ae8323efcd0 Parents: a306173 Author: Claus Ibsen Authored: Sun May 3 11:32:49 2015 +0200 Committer: Claus Ibsen Committed: Sun May 3 11:32:49 2015 +0200 ---------------------------------------------------------------------- .../apache/camel/impl/DefaultCamelContext.java | 2 +- .../camel/processor/CamelInternalProcessor.java | 21 +++---------- .../processor/interceptor/BacklogTracer.java | 32 ++++++++++++-------- .../processor/interceptor/DefaultChannel.java | 2 +- 4 files changed, 26 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/d0bf15f5/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java index 17de821..7056932 100644 --- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java +++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java @@ -3589,7 +3589,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon public InterceptStrategy getDefaultBacklogTracer() { if (defaultBacklogTracer == null) { - defaultBacklogTracer = new BacklogTracer(this); + defaultBacklogTracer = BacklogTracer.createTracer(this); } return defaultBacklogTracer; } http://git-wip-us.apache.org/repos/asf/camel/blob/d0bf15f5/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 97f557b..f8c71f4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -19,7 +19,6 @@ package org.apache.camel.processor; import java.util.ArrayList; import java.util.Date; import java.util.List; -import java.util.Queue; import java.util.concurrent.RejectedExecutionException; import org.apache.camel.AsyncCallback; @@ -521,15 +520,13 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { */ public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice { - private final Queue queue; private final BacklogTracer backlogTracer; private final ProcessorDefinition processorDefinition; private final ProcessorDefinition routeDefinition; private final boolean first; - public BacklogTracerAdvice(Queue queue, BacklogTracer backlogTracer, - ProcessorDefinition processorDefinition, ProcessorDefinition routeDefinition, boolean first) { - this.queue = queue; + public BacklogTracerAdvice(BacklogTracer backlogTracer, ProcessorDefinition processorDefinition, + ProcessorDefinition routeDefinition, boolean first) { this.backlogTracer = backlogTracer; this.processorDefinition = processorDefinition; this.routeDefinition = routeDefinition; @@ -539,16 +536,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { @Override public Object before(Exchange exchange) throws Exception { if (backlogTracer.shouldTrace(processorDefinition, exchange)) { - // ensure there is space on the queue - int drain = queue.size() - backlogTracer.getBacklogSize(); - // and we need room for ourselves and possible also a first pseudo message as well - drain += first ? 2 : 1; - if (drain > 0) { - for (int i = 0; i < drain; i++) { - queue.poll(); - } - } - Date timestamp = new Date(); String toNode = processorDefinition.getId(); String exchangeId = exchange.getExchangeId(); @@ -560,10 +547,10 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { if (first) { Date created = exchange.getProperty(Exchange.CREATED_TIMESTAMP, timestamp, Date.class); DefaultBacklogTracerEventMessage pseudo = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), created, routeId, null, exchangeId, messageAsXml); - queue.add(pseudo); + backlogTracer.traceEvent(pseudo); } DefaultBacklogTracerEventMessage event = new DefaultBacklogTracerEventMessage(backlogTracer.incrementTraceCounter(), timestamp, routeId, toNode, exchangeId, messageAsXml); - queue.add(event); + backlogTracer.traceEvent(event); } return null; http://git-wip-us.apache.org/repos/asf/camel/blob/d0bf15f5/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java index 6dde932..e2c51bd 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/BacklogTracer.java @@ -19,7 +19,7 @@ package org.apache.camel.processor.interceptor; import java.util.ArrayList; import java.util.List; import java.util.Queue; -import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.atomic.AtomicLong; import org.apache.camel.CamelContext; @@ -52,7 +52,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { private boolean enabled; private final AtomicLong traceCounter = new AtomicLong(0); // use a queue with a upper limit to avoid storing too many messages - private final Queue queue = new ArrayBlockingQueue(MAX_BACKLOG_SIZE); + private final Queue queue = new LinkedBlockingQueue<>(MAX_BACKLOG_SIZE); // how many of the last messages to keep in the backlog at total private int backlogSize = 1000; private boolean removeOnDump = true; @@ -65,14 +65,10 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { private String traceFilter; private Predicate predicate; - public BacklogTracer(CamelContext camelContext) { + private BacklogTracer(CamelContext camelContext) { this.camelContext = camelContext; } - public Queue getQueue() { - return queue; - } - @Override @Deprecated public Processor wrapProcessorInInterceptors(CamelContext context, ProcessorDefinition definition, Processor target, Processor nextTarget) throws Exception { @@ -86,8 +82,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { * @return a new backlog tracer */ public static BacklogTracer createTracer(CamelContext context) { - BacklogTracer tracer = new BacklogTracer(context); - return tracer; + return new BacklogTracer(context); } /** @@ -153,6 +148,19 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { return false; } + public void traceEvent(DefaultBacklogTracerEventMessage event) { + if (!enabled) { + return; + } + + // ensure there is space on the queue and we need room for ourselves and possible also a first pseudo message as well + if (queue.size() >= backlogSize) { + queue.poll(); + } + + queue.add(event); + } + private boolean shouldTraceFilter(Exchange exchange) { return predicate.matches(exchange); } @@ -251,9 +259,9 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { } public List dumpTracedMessages(String nodeId) { - List answer = new ArrayList(); + List answer = new ArrayList<>(); if (nodeId != null) { - for (DefaultBacklogTracerEventMessage message : queue) { + for (BacklogTracerEventMessage message : queue) { if (nodeId.equals(message.getToNode()) || nodeId.equals(message.getRouteId())) { answer.add(message); } @@ -280,7 +288,7 @@ public class BacklogTracer extends ServiceSupport implements InterceptStrategy { } public List dumpAllTracedMessages() { - List answer = new ArrayList(); + List answer = new ArrayList<>(); answer.addAll(queue); if (isRemoveOnDump()) { queue.clear(); http://git-wip-us.apache.org/repos/asf/camel/blob/d0bf15f5/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java index c9ae2f3..dc719e3 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java +++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/DefaultChannel.java @@ -229,7 +229,7 @@ public class DefaultChannel extends CamelInternalProcessor implements ModelChann first = route.getOutputs().get(0) == definition; } - addAdvice(new BacklogTracerAdvice(backlogTracer.getQueue(), backlogTracer, targetOutputDef, route, first)); + addAdvice(new BacklogTracerAdvice(backlogTracer, targetOutputDef, route, first)); // add debugger as well so we have both tracing and debugging out of the box InterceptStrategy debugger = getOrCreateBacklogDebugger();