camel-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Roman Kalukiewicz <roman.kalukiew...@gmail.com>
Subject Re: svn commit: r801545 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/processor/BatchProcessor.java test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
Date Thu, 06 Aug 2009 09:19:31 GMT
Hmm.. the test doesn't pass if I did it and expected to have error
handler invoked.

It is because exchange e is discarded really - it is not added to the
aggregated one (as add() failed) and it is a new exchange anyway (a
copy) that was asynchronously pulled from BatchProcessor's internal
blocking queue.

BTW I don't really get why this internal queue exists there. Isn't it
enough to add things to the collection directly? My feeling is that
putting Aggregator as a subclass of BatchProcessor complicates things
more than it should...

Roman

2009/8/6 Claus Ibsen <claus.ibsen@gmail.com>:
> Hi
>
> I wonder if just setting the exception on the exchange should do the
> trick as when its routed later Camel will "see" the exception and be
> able to deal with it in its own error handler stuff.
>
>
>  +                    try {
>  +                        collection.add(e);
>  +                    } catch (Exception e) {
>  +                        e.setException(e);
>  +                    }
>
>
> On Thu, Aug 6, 2009 at 11:03 AM, <romkal@apache.org> wrote:
>> Author: romkal
>> Date: Thu Aug  6 09:03:04 2009
>> New Revision: 801545
>>
>> URL: http://svn.apache.org/viewvc?rev=801545&view=rev
>> Log:
>> CAMEL-1882: Temporary fix applied that catches exception thrown from Colleciotn.add()
in aggregator. Proper error handling to be done
>>
>> Added:
>>    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
  (with props)
>> Modified:
>>    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
>>
>> Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
>> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java?rev=801545&r1=801544&r2=801545&view=diff
>> ==============================================================================
>> --- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
(original)
>> +++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/BatchProcessor.java
Thu Aug  6 09:03:04 2009
>> @@ -316,7 +316,11 @@
>>             for (int i = 0; i < batchSize; ++i) {
>>                 Exchange e = queue.poll();
>>                 if (e != null) {
>> -                    collection.add(e);
>> +                    try {
>> +                        collection.add(e);
>> +                    } catch (Throwable t) {
>> +                        getExceptionHandler().handleException(t);
>> +                    }
>>                 } else {
>>                     break;
>>                 }
>>
>> Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
>> URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java?rev=801545&view=auto
>> ==============================================================================
>> --- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
(added)
>> +++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
Thu Aug  6 09:03:04 2009
>> @@ -0,0 +1,74 @@
>> +/**
>> + * Licensed to the Apache Software Foundation (ASF) under one or more
>> + * contributor license agreements.  See the NOTICE file distributed with
>> + * this work for additional information regarding copyright ownership.
>> + * The ASF licenses this file to You under the Apache License, Version 2.0
>> + * (the "License"); you may not use this file except in compliance with
>> + * the License.  You may obtain a copy of the License at
>> + *
>> + *      http://www.apache.org/licenses/LICENSE-2.0
>> + *
>> + * Unless required by applicable law or agreed to in writing, software
>> + * distributed under the License is distributed on an "AS IS" BASIS,
>> + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>> + * See the License for the specific language governing permissions and
>> + * limitations under the License.
>> + */
>> +package org.apache.camel.processor.aggregator;
>> +
>> +import org.apache.camel.ContextTestSupport;
>> +import org.apache.camel.Exchange;
>> +import org.apache.camel.builder.RouteBuilder;
>> +import org.apache.camel.processor.aggregate.AggregationStrategy;
>> +
>> +/**
>> + * Based on CAMEL-1546
>> + *
>> + * @version $Revision$
>> + */
>> +public class AggregatorExceptionInPredicateTest extends ContextTestSupport {
>> +
>> +    public void testExceptionInAggregationStrategy() throws Exception {
>> +        // failed first aggregation
>> +
>> +        // TODO Following assertion should be true while it is not. Instead
>> +        // exception handler set in BatchProcessor is used and it logs
>> +        // by default.
>> +
>> +        // getMockEndpoint("mock:handled").expectedMessageCount(1);
>> +
>> +        // second aggregated
>> +        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
>> +
>> +        template.sendBodyAndHeader("direct:start", "Damn", "id", 1);
>> +        template.sendBodyAndHeader("direct:start", "Hello World", "id", 1);
>> +
>> +        assertMockEndpointsSatisfied();
>> +    }
>> +
>> +    @Override
>> +    protected RouteBuilder createRouteBuilder() throws Exception {
>> +        return new RouteBuilder() {
>> +            @Override
>> +            public void configure() throws Exception {
>> +                onException(IllegalArgumentException.class).handled(true).to("mock:handled");
>> +
>> +                from("direct:start")
>> +                    .aggregate(header("id"))
>> +                    .batchTimeout(500)
>> +                    .aggregationStrategy(new AggregationStrategy() {
>> +
>> +                        public Exchange aggregate(Exchange oldExchange,
Exchange newExchange) {
>> +                            Object body = newExchange.getIn().getBody();
>> +                            if ("Damn".equals(body)) {
>> +                                throw new IllegalArgumentException();
>> +                            }
>> +                            return newExchange;
>> +                        }
>> +                    })
>> +                    .to("mock:result");
>> +
>> +            }
>> +        };
>> +    }
>> +}
>>
>> Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
>> ------------------------------------------------------------------------------
>>    svn:eol-style = native
>>
>> Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorExceptionInPredicateTest.java
>> ------------------------------------------------------------------------------
>>    svn:keywords = Rev Date
>>
>>
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>

Mime
View raw message