camel-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Claus Ibsen <claus.ib...@gmail.com>
Subject Re: Advice needed with Camel/AMQ use case
Date Wed, 04 Apr 2012 08:07:14 GMT
Hi

Just a few notes, as I just skim read your post.

AMQ 5.6 will have support for asynchronous redeliveries. You would
need to enable this to use it.
Also check out the asyncConsumer=true option in Camel 2.9 on the JMS component.



On Tue, Apr 3, 2012 at 5:52 PM, ctytgat <christian.tytgat@gmail.com> wrote:
> Hi All,
>
> I've got the following use case which is currently implemented using
> camel/AMQ:
>
> Queue 'dispatchHttpQueue' contains messages for different destinations. I
> need to support about 500 destinations.
> (The destination is set on the message using a camel/JMS header)
> Each message needs to be delivered to its destination via a rest interface.
> The connection to the destinations is shaky at best (3G/gprs)
>
> Now, what I currently have is a limited amount of concurrent consumers (3)
> that forward messages to a HTTP endpoint.
> Exceptions (mostly IO errors) are handled by re-inserting the message on the
> queue with an AMQ_SCHEDULED_DELAY header (custom bean) until a number of
> retries is exhausted. After all retries failed the message is moved to a
> DLQ.
>
> Code for this is shown below.
>
> But this approach has the following drawbacks:
> - original message order is not maintained when failures occur
> - if the queue contains quite some messages for a destination which is
> temporarily unreachable, it will hinder/slowdown delivery to other
> destinations.
>
>
> I'm looking for a better solution where
> (1) order is preserved
> (2) unreachable destinations have no effect on delivery to reachable ones
> (3) whenever a destination is unreachable, the oldest message is retried
> until it goes through or expires.
>
> What I've considered so far:
> - using AMQ message groups to preserve ordering. Group ID is destination ID.
> But unless I configure 500 concurrentConsumers  (= 500 threads), I still
> don't solve (2). Obviously that's not an option.
> - create 1 queue per destination and consume from all queues using a
> wildcard. But I don't see a way of accomplishing everything without having
> again 500 threads.
>
> Any suggestions would be greatly appreciated!
>
>
>        from(activemq:queue:dispatchHttpQueue +
>
> "?concurrentConsumers={{hermes.dris.distribution.kv78.concurrentOutgoingHttpDispatchers}}&"
> +
>
> "maxConcurrentConsumers={{hermes.dris.distribution.kv78.maxConcurrentOutgoingHttpDispatchers}}")
>            .routeId(ROUTE_ID_DISPATCH_HTTP)
>            // use multicast instead of a pipeline, or the audit log will
> contain the http response!
>            .multicast().stopOnException().to(ENDPOINT_TIMINGPOINT,
> ENDPOINT_AUDIT_OUTGOING);
>
>        from(ENDPOINT_TIMINGPOINT)
>            .routeId(ROUTE_ID_TIMINGPOINT)
>            .onException(IOException.class)
>                .handled(true)
>                .beanRef("dispatchRetryTagger")
>                .choice()
>                    .when(header("dispatchRetryCount").isLessThan(3))
>                        .log(LoggingLevel.WARN, "Failed delivery (attempt
=
> ${in.header.dispatchRetryCount}) for exchangeId ${id}:
> ${in.header.CamelExceptionCaught}")
>                        .to(activemq:queue:dispatchHttpQueue)
>                    .otherwise()
>                        .removeHeader("AMQ_SCHEDULED_DELAY")
>                        .to(ENDPOINT_DLQ)
>                .end()
>            .end()
>            .removeHeaders("CamelHttp*")
>            .setHeader(Exchange.HTTP_URI, simple("http://${header." +
> HEADER_DESTINATION_ADDRESS + "}"))
>            .setHeader(Exchange.HTTP_PATH,
> simple("${properties:hermes.dris.distribution.kv78.timingpoint.urlpath}"))
>            .setHeader(Exchange.CONTENT_TYPE, constant("application/xml"))
>            .setHeader(Exchange.CONTENT_ENCODING, constant("gzip")) // use
> gzip compression
>            .log(LoggingLevel.DEBUG, "Sending request to
> ${in.header.CamelHttpUri}/${in.header.CamelHttpPath}")
>            .to(ENDPOINT_TIMINGPOINT_HTTP +
> "?httpClientConfigurerRef=timingPointHttpClientConfigurer&headerFilterStrategy=#distributionHttpHeaderFilterStrategy");
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Advice-needed-with-Camel-AMQ-use-case-tp5615624p5615624.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
CamelOne 2012 Conference, May 15-16, 2012: http://camelone.com
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Mime
View raw message