camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From vassilis <2bi...@gmail.com>
Subject Aggregator + (LevelDB or HawtDB for persistency) incorrect behavior
Date Thu, 09 Apr 2015 14:10:37 GMT
Hi all!

Using camel 2.12.5 problem is reproducible (same for latest 2.11, latest
2.13, latest 2.14, latest 2.15).
Using camel 2.10.7 (i.e. latest 2.10) problem is NOT reproducible.
Problem appears only if a LevelDB or HawtDB repo is used. In memory default
aggregator works fine.
Reproduced by unit test.
Route and unit tests in the end.

Repo definition:

HawtDBAggregationRepository repo = new HawtDBAggregationRepository("repo1",
"target/data/hawtdb.dat");
repo.setDeadLetterUri(dlq.getEndpointUri());
repo.setMaximumRedeliveries(3);

Problems: 
1. Wrong number of aggregated messages (configured to fire for
completionSize=2 or timeout=10000 millis)
2. Error Handling of HawtDB/ LevelDB is not working (i.e. no retries, no
message goes to DLQ)

Any ideas??

Many thanks!!!

---------------------------

Unit test output regarding (1):

The unit test sends 4 messages and expects 2 aggregated messages (i.e. 2
incoming messages per group). But, aggregator outputs 4 messages. The last
two are NOT correct and have EMPTY header
"in.header.CamelAggregatedCompletedBy".

INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=1 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
INFO  09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#info:96] [Timer
with FILE REF=2 cancelled]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Aggregation Completion reason=size]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]
DEBUG 09 Apr 14:22:33 [org.slf4j.helpers.MarkerIgnoringBase#debug:72]
[Ignore - Aggregation Completion Reason Not Expected=]

----------------------------

Route:

		LastMessageAggregationStrategy aggregationStratery = new
LastMessageAggregationStrategy();
		
		from(timerRouteFrom).routeId("timerRoute")
		    .aggregate(header(TimerRoute.FILE_REF_HEADER),
aggregationStratery).completionSize(2).completionTimeout(timeout).aggregationRepository(repo)
		    .choice()
                	.when(simple("${in.header.CamelAggregatedCompletedBy}
contains 'timeout'"))
                		.log(LoggingLevel.ERROR, timeoutMessage)
						.log(LoggingLevel.ERROR, "Timeout threshold (millis): " + timeout)
						.log(LoggingLevel.ERROR, "File Ref= ${in.header." + FILE_REF_HEADER
+"}")
						.log(LoggingLevel.DEBUG, "Sending to: " + timeoutUri)
						.setHeader("contentType", constant("text/html"))
						.setHeader(TIMEOUT_THRESHOLD_HEADER, constant(timeout))
			            .to(freeMarkerTemplate).id("freemarkerId")
			            .log(LoggingLevel.DEBUG, "Emailing Exchange body: ${body}")
			            .to(timeoutUri)
			        .when(simple("${in.header.CamelAggregatedCompletedBy} contains
'size'"))    
                		.log(LoggingLevel.INFO, "Timer with FILE REF=${in.header."
+ FILE_REF_HEADER +"} cancelled")
                		.log(LoggingLevel.DEBUG, "Aggregation Completion
reason=${in.header.CamelAggregatedCompletedBy}")
                		.to(successUri)
                	.otherwise()
                		.log(LoggingLevel.DEBUG, "Ignore - Aggregation Completion
Reason Not Expected=${in.header.CamelAggregatedCompletedBy}")
                	.end()
                .end();


	class LastMessageAggregationStrategy implements AggregationStrategy {
	 
	    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {      
	    	return newExchange;
	    }
	}

----------------------

Unit Test (aggregation size=2, so expecting 2 aggregated messages):

    public void happyPath() throws InterruptedException {
    	
        successUri.expectedMessageCount(2);
        timeoutUri.expectedMessageCount(0);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        Map<String, Object> headers2 = new HashMap<String, Object>();
        headers2.put(TimerRoute.FILE_REF_HEADER, "2");
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body2..", headers2);

    	Thread.sleep(5000);
    	
        assertMockEndpointsSatisfied();
    	
    }

    public void testAggregatorRetries() throws Exception {
		
    	context.getRouteDefinition("timerRoute").adviceWith(context, new
AdviceWithRouteBuilder() {
            @Override
            public void configure() throws Exception {
                
                interceptSendToEndpoint(timeoutUri.getEndpointUri())
                	.skipSendToOriginalEndpoint()
                	.throwException(new IOException("This a simulated exception
Timeout!"));
                
                interceptSendToEndpoint(successUri.getEndpointUri())
            	.skipSendToOriginalEndpoint()
            	.throwException(new IOException("This a simulated exception
Success!")); 
            }
        });
    		
        successUri.expectedMessageCount(0);
        timeoutUri.expectedMessageCount(0);
        dlq.expectedMessageCount(1);
        
        Map<String, Object> headers1 = new HashMap<String, Object>();
        headers1.put(TimerRoute.FILE_REF_HEADER, "1");
        
        for(int i=0;i<2;i++) 
        	fromTemplate.sendBodyAndHeaders("dummy body1..", headers1);
        
    	Thread.sleep(5000);
    	
        assertMockEndpointsSatisfied();	
    }

	protected RouteBuilder createRouteBuilder() throws Exception {
		
		// create the repo
		HawtDBAggregationRepository repo = new
HawtDBAggregationRepository("repo1", "target/data/hawtdb.dat");
		repo.setDeadLetterUri(dlq.getEndpointUri());
		repo.setMaximumRedeliveries(3);
		// create the route that will be tested
		TimerRoute routePutToTest = new TimerRoute("direct:from", "Expired!!!!" ,
repo, 10 * 1000, successUri.getEndpointUri(),
				timeoutUri.getEndpointUri());
		
		return routePutToTest;	
	}



	



--
View this message in context: http://camel.465427.n5.nabble.com/Aggregator-LevelDB-or-HawtDB-for-persistency-incorrect-behavior-tp5765524.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Mime
View raw message