Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 534DB200B43 for ; Tue, 28 Jun 2016 09:37:19 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 52074160A56; Tue, 28 Jun 2016 07:37:19 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B8BF160A6C for ; Tue, 28 Jun 2016 09:37:18 +0200 (CEST) Received: (qmail 95773 invoked by uid 500); 28 Jun 2016 07:37:17 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 95667 invoked by uid 99); 28 Jun 2016 07:37:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 28 Jun 2016 07:37:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 5962AE08FE; Tue, 28 Jun 2016 07:37:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Tue, 28 Jun 2016 07:37:19 -0000 Message-Id: <7ec097d35de14dd9b881b01b94ee3bf5@git.apache.org> In-Reply-To: <842401d930e94cb98c276f5380b38f5c@git.apache.org> References: <842401d930e94cb98c276f5380b38f5c@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/3] camel git commit: CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup. archived-at: Tue, 28 Jun 2016 07:37:19 -0000 CAMEL-10096: Camel tracer with stream caching should tracer after stream cache has been setup. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/ca53a592 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/ca53a592 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/ca53a592 Branch: refs/heads/camel-2.17.x Commit: ca53a592fcb4d0b410d1a4066bf618e02ec7ffd4 Parents: 58bb801 Author: Claus Ibsen Authored: Tue Jun 28 09:32:29 2016 +0200 Committer: Claus Ibsen Committed: Tue Jun 28 09:34:57 2016 +0200 ---------------------------------------------------------------------- .../camel/processor/CamelInternalProcessor.java | 33 +++++- .../BacklogTracerStreamCachingTest.java | 101 +++++++++++++++++++ 2 files changed, 130 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/ca53a592/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java index 7ce5a4d..a7f31e4 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java @@ -17,6 +17,7 @@ package org.apache.camel.processor; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; import java.util.List; import java.util.concurrent.RejectedExecutionException; @@ -25,6 +26,7 @@ import org.apache.camel.AsyncCallback; import org.apache.camel.CamelContext; import org.apache.camel.Exchange; import org.apache.camel.MessageHistory; +import org.apache.camel.Ordered; import org.apache.camel.Processor; import org.apache.camel.Route; import org.apache.camel.StatefulService; @@ -44,6 +46,7 @@ import org.apache.camel.spi.RoutePolicy; import org.apache.camel.spi.StreamCachingStrategy; import org.apache.camel.spi.UnitOfWork; import org.apache.camel.util.MessageHelper; +import org.apache.camel.util.OrderedComparator; import org.apache.camel.util.StopWatch; import org.apache.camel.util.UnitOfWorkHelper; import org.slf4j.Logger; @@ -72,6 +75,8 @@ import org.slf4j.LoggerFactory; * Debugging tips: Camel end users whom want to debug their Camel applications with the Camel source code, then make sure to * read the source code of this class about the debugging tips, which you can find in the * {@link #process(org.apache.camel.Exchange, org.apache.camel.AsyncCallback)} method. + *

+ * The added advices can implement {@link Ordered} to control in which order the advices are executed. */ public class CamelInternalProcessor extends DelegateAsyncProcessor { @@ -92,6 +97,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { */ public void addAdvice(CamelInternalProcessorAdvice advice) { advices.add(advice); + // ensure advices are sorted so they are in the order we want + Collections.sort(advices, new OrderedComparator()); } /** @@ -125,7 +132,6 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { // you can see in the code below. // ---------------------------------------------------------- - if (processor == null || !continueProcessing(exchange)) { // no processor or we should not continue then we are done callback.done(true); @@ -522,7 +528,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { /** * Advice to execute the {@link BacklogTracer} if enabled. */ - public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice { + public static final class BacklogTracerAdvice implements CamelInternalProcessorAdvice, Ordered { private final BacklogTracer backlogTracer; private final ProcessorDefinition processorDefinition; @@ -564,12 +570,19 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { public void after(Exchange exchange, Object data) throws Exception { // noop } + + @Override + public int getOrder() { + // we want tracer just before calling the processor + return Ordered.LOWEST - 1; + } + } /** * Advice to execute the {@link org.apache.camel.processor.interceptor.BacklogDebugger} if enabled. */ - public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice { + public static final class BacklogDebuggerAdvice implements CamelInternalProcessorAdvice, Ordered { private final BacklogDebugger backlogDebugger; private final Processor target; @@ -600,6 +613,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { backlogDebugger.afterProcess(exchange, target, definition, stopWatch.stop()); } } + + @Override + public int getOrder() { + // we want debugger just before calling the processor + return Ordered.LOWEST; + } } /** @@ -744,7 +763,7 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { /** * Advice for {@link org.apache.camel.spi.StreamCachingStrategy} */ - public static class StreamCachingAdvice implements CamelInternalProcessorAdvice { + public static class StreamCachingAdvice implements CamelInternalProcessorAdvice, Ordered { private final StreamCachingStrategy strategy; @@ -785,6 +804,12 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor { ((StreamCache) body).reset(); } } + + @Override + public int getOrder() { + // we want stream caching first + return Ordered.HIGHEST; + } } /** http://git-wip-us.apache.org/repos/asf/camel/blob/ca53a592/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java new file mode 100644 index 0000000..14eeecb --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/management/BacklogTracerStreamCachingTest.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.management; + +import java.io.ByteArrayInputStream; +import java.util.List; +import javax.management.Attribute; +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import org.apache.camel.Exchange; +import org.apache.camel.Processor; +import org.apache.camel.api.management.mbean.BacklogTracerEventMessage; +import org.apache.camel.builder.RouteBuilder; + +public class BacklogTracerStreamCachingTest extends ManagementTestSupport { + + @SuppressWarnings("unchecked") + public void testBacklogTracerEventMessageStreamCaching() throws Exception { + // JMX tests dont work well on AIX CI servers (hangs them) + if (isPlatform("aix")) { + return; + } + + MBeanServer mbeanServer = getMBeanServer(); + ObjectName on = new ObjectName("org.apache.camel:context=camel-1,type=tracer,name=BacklogTracer"); + assertNotNull(on); + assertTrue(mbeanServer.isRegistered(on)); + + Boolean enabled = (Boolean) mbeanServer.getAttribute(on, "Enabled"); + assertEquals("Should not be enabled", Boolean.FALSE, enabled); + + Integer size = (Integer) mbeanServer.getAttribute(on, "BacklogSize"); + assertEquals("Should be 1000", 1000, size.intValue()); + + Boolean removeOnDump = (Boolean) mbeanServer.getAttribute(on, "RemoveOnDump"); + assertEquals(Boolean.TRUE, removeOnDump); + + // enable streams + mbeanServer.setAttribute(on, new Attribute("BodyIncludeStreams", Boolean.TRUE)); + + // enable it + mbeanServer.setAttribute(on, new Attribute("Enabled", Boolean.TRUE)); + + getMockEndpoint("mock:bar").expectedMessageCount(1); + + template.sendBody("direct:start", "Hello World"); + + assertMockEndpointsSatisfied(); + + List exchanges = getMockEndpoint("mock:bar").getReceivedExchanges(); + + List events = (List) mbeanServer.invoke(on, "dumpTracedMessages", + new Object[]{"bar"}, new String[]{"java.lang.String"}); + + assertNotNull(events); + assertEquals(1, events.size()); + + BacklogTracerEventMessage event1 = events.get(0); + assertEquals("bar", event1.getToNode()); + assertEquals(" \n" + + " Bye World\n" + + " ", event1.getMessageAsXml()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + context.setUseBreadcrumb(false); + + from("direct:start").streamCaching() + .process(new Processor() { + @Override + public void process(Exchange exchange) throws Exception { + ByteArrayInputStream is = new ByteArrayInputStream("Bye World".getBytes()); + exchange.getIn().setBody(is); + } + }) + .log("Got ${body}") + .to("mock:bar").id("bar"); + } + }; + } + +}