Return-Path: X-Original-To: apmail-camel-commits-archive@www.apache.org Delivered-To: apmail-camel-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 603BA102B8 for ; Wed, 5 Feb 2014 14:46:59 +0000 (UTC) Received: (qmail 52294 invoked by uid 500); 5 Feb 2014 14:46:57 -0000 Delivered-To: apmail-camel-commits-archive@camel.apache.org Received: (qmail 52161 invoked by uid 500); 5 Feb 2014 14:46:48 -0000 Mailing-List: contact commits-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list commits@camel.apache.org Received: (qmail 51420 invoked by uid 99); 5 Feb 2014 14:46:46 -0000 Received: from tyr.zones.apache.org (HELO tyr.zones.apache.org) (140.211.11.114) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 05 Feb 2014 14:46:45 +0000 Received: by tyr.zones.apache.org (Postfix, from userid 65534) id D588691C030; Wed, 5 Feb 2014 14:46:44 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davsclaus@apache.org To: commits@camel.apache.org Date: Wed, 05 Feb 2014 14:46:45 -0000 Message-Id: <3ef04b4328244b72ad4cbc4060a61557@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [2/2] git commit: CAMEL-7146: Fixed aggregator when completion size is 1, eg when completed asap. Should not call remove as we did not add. CAMEL-7146: Fixed aggregator when completion size is 1, eg when completed asap. Should not call remove as we did not add. Project: http://git-wip-us.apache.org/repos/asf/camel/repo Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/090cd028 Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/090cd028 Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/090cd028 Branch: refs/heads/camel-2.12.x Commit: 090cd028f7189738c6aa4dc82b781496e66b7026 Parents: eed81aa Author: Claus Ibsen Authored: Wed Feb 5 15:39:11 2014 +0100 Committer: Claus Ibsen Committed: Wed Feb 5 15:47:35 2014 +0100 ---------------------------------------------------------------------- .../processor/aggregate/AggregateProcessor.java | 8 +- .../apache/camel/spi/AggregationRepository.java | 10 ++ .../AggregateCompletionOnlyOneTest.java | 117 ++++++++++++++++++ .../AggregateCompletionOnlyTwoTest.java | 118 +++++++++++++++++++ 4 files changed, 251 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java index ca39061..16950e0 100644 --- a/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java +++ b/camel-core/src/main/java/org/apache/camel/processor/aggregate/AggregateProcessor.java @@ -455,8 +455,12 @@ public class AggregateProcessor extends ServiceSupport implements AsyncProcessor } aggregated.setProperty(Exchange.AGGREGATED_CORRELATION_KEY, key); - // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's - aggregationRepository.remove(aggregated.getContext(), key, original); + // only remove if we have previous added (as we could potentially complete with only 1 exchange) + // (if we have previous added then we have that as the original exchange) + if (original != null) { + // remove from repository as its completed, we do this first as to trigger any OptimisticLockingException's + aggregationRepository.remove(aggregated.getContext(), key, original); + } if (!fromTimeout && timeoutMap != null) { // cleanup timeout map if it was a incoming exchange which triggered the timeout (and not the timeout checker) http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java ---------------------------------------------------------------------- diff --git a/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java b/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java index 669a956..ba1de6a 100644 --- a/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java +++ b/camel-core/src/main/java/org/apache/camel/spi/AggregationRepository.java @@ -32,6 +32,9 @@ public interface AggregationRepository { * Add the given {@link Exchange} under the correlation key. *

* Will replace any existing exchange. + *

+ * Important: This method is not invoked if only one exchange was completed, and therefore + * the exchange does not need to be added to a repository, as its completed immediately. * * @param camelContext the current CamelContext * @param key the correlation key @@ -42,6 +45,8 @@ public interface AggregationRepository { /** * Gets the given exchange with the correlation key + *

+ * This method is always invoked for any incoming exchange in the aggregator. * * @param camelContext the current CamelContext * @param key the correlation key @@ -52,6 +57,9 @@ public interface AggregationRepository { /** * Removes the exchange with the given correlation key, which should happen * when an {@link Exchange} is completed + *

+ * Important: This method is not invoked if only one exchange was completed, and therefore + * the exchange does not need to be added to a repository, as its completed immediately. * * @param camelContext the current CamelContext * @param key the correlation key @@ -61,6 +69,8 @@ public interface AggregationRepository { /** * Confirms the completion of the {@link Exchange}. + *

+ * This method is always invoked. * * @param camelContext the current CamelContext * @param exchangeId exchange id to confirm http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java new file mode 100644 index 0000000..76e0cff --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyOneTest.java @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.spi.AggregationRepository; + +/** + * @version + */ +public class AggregateCompletionOnlyOneTest extends ContextTestSupport { + + private MyRepo repo = new MyRepo(); + + public void testOnlyOne() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A", "B", "C", "END"); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + assertMockEndpointsSatisfied(); + + assertEquals(4, repo.getGet()); + // add and remove is not in use as we are completed immediately + assertEquals(0, repo.getAdd()); + assertEquals(0, repo.getRemove()); + assertEquals(4, repo.getConfirm()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new BodyInAggregatingStrategy()).aggregationRepository(repo) + .completionSize(1) + .to("mock:aggregated"); + } + }; + } + + private class MyRepo implements AggregationRepository { + + private int add; + private int get; + private int remove; + private int confirm; + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + add++; + return null; + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + get++; + return null; + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + remove++; + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + confirm++; + } + + @Override + public Set getKeys() { + return null; + } + + public int getAdd() { + return add; + } + + public int getGet() { + return get; + } + + public int getRemove() { + return remove; + } + + public int getConfirm() { + return confirm; + } + } +} http://git-wip-us.apache.org/repos/asf/camel/blob/090cd028/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java ---------------------------------------------------------------------- diff --git a/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java new file mode 100644 index 0000000..27871c1 --- /dev/null +++ b/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregateCompletionOnlyTwoTest.java @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.processor.aggregator; + +import java.util.Set; + +import org.apache.camel.CamelContext; +import org.apache.camel.ContextTestSupport; +import org.apache.camel.Exchange; +import org.apache.camel.builder.RouteBuilder; +import org.apache.camel.component.mock.MockEndpoint; +import org.apache.camel.processor.BodyInAggregatingStrategy; +import org.apache.camel.processor.aggregate.MemoryAggregationRepository; + +/** + * @version + */ +public class AggregateCompletionOnlyTwoTest extends ContextTestSupport { + + private MyRepo repo = new MyRepo(); + + public void testOnlyTwo() throws Exception { + MockEndpoint mock = getMockEndpoint("mock:aggregated"); + mock.expectedBodiesReceived("A+B", "C+END"); + + template.sendBodyAndHeader("direct:start", "A", "id", "foo"); + template.sendBodyAndHeader("direct:start", "B", "id", "foo"); + template.sendBodyAndHeader("direct:start", "C", "id", "foo"); + template.sendBodyAndHeader("direct:start", "END", "id", "foo"); + + assertMockEndpointsSatisfied(); + + assertEquals(4, repo.getGet()); + assertEquals(2, repo.getAdd()); + assertEquals(2, repo.getRemove()); + assertEquals(2, repo.getConfirm()); + } + + @Override + protected RouteBuilder createRouteBuilder() throws Exception { + return new RouteBuilder() { + @Override + public void configure() throws Exception { + from("direct:start") + .aggregate(header("id"), new BodyInAggregatingStrategy()).aggregationRepository(repo) + .completionSize(2) + .to("mock:aggregated"); + } + }; + } + + private class MyRepo extends MemoryAggregationRepository { + + private int add; + private int get; + private int remove; + private int confirm; + + @Override + public Exchange add(CamelContext camelContext, String key, Exchange exchange) { + add++; + return super.add(camelContext, key, exchange); + } + + @Override + public Exchange get(CamelContext camelContext, String key) { + get++; + return super.get(camelContext, key); + } + + @Override + public void remove(CamelContext camelContext, String key, Exchange exchange) { + remove++; + super.remove(camelContext, key, exchange); + } + + @Override + public void confirm(CamelContext camelContext, String exchangeId) { + confirm++; + super.confirm(camelContext, exchangeId); + } + + @Override + public Set getKeys() { + return super.getKeys(); + } + + public int getAdd() { + return add; + } + + public int getGet() { + return get; + } + + public int getRemove() { + return remove; + } + + public int getConfirm() { + return confirm; + } + } +}