Return-Path: X-Original-To: apmail-camel-issues-archive@minotaur.apache.org Delivered-To: apmail-camel-issues-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CDDDE9AE4 for ; Fri, 23 Mar 2012 17:45:52 +0000 (UTC) Received: (qmail 10904 invoked by uid 500); 23 Mar 2012 17:45:52 -0000 Delivered-To: apmail-camel-issues-archive@camel.apache.org Received: (qmail 10865 invoked by uid 500); 23 Mar 2012 17:45:52 -0000 Mailing-List: contact issues-help@camel.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@camel.apache.org Delivered-To: mailing list issues@camel.apache.org Received: (qmail 10765 invoked by uid 99); 23 Mar 2012 17:45:52 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2012 17:45:52 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED,T_RP_MATCHES_RCVD X-Spam-Check-By: apache.org Received: from [140.211.11.116] (HELO hel.zones.apache.org) (140.211.11.116) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 23 Mar 2012 17:45:48 +0000 Received: from hel.zones.apache.org (hel.zones.apache.org [140.211.11.116]) by hel.zones.apache.org (Postfix) with ESMTP id 16250342A10 for ; Fri, 23 Mar 2012 17:45:28 +0000 (UTC) Date: Fri, 23 Mar 2012 17:45:28 +0000 (UTC) From: "Daniel Carleton (Updated) (JIRA)" To: issues@camel.apache.org Message-ID: <420604244.9031.1332524728110.JavaMail.tomcat@hel.zones.apache.org> In-Reply-To: <1355880044.8974.1332524008702.JavaMail.tomcat@hel.zones.apache.org> Subject: [jira] [Updated] (CAMEL-5113) Parallel and fault tolerant message processing for SQS endpoints. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 7bit X-JIRA-FingerPrint: 30527f35849b9dde25b450d4833f0394 X-Virus-Checked: Checked by ClamAV on apache.org [ https://issues.apache.org/jira/browse/CAMEL-5113?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Daniel Carleton updated CAMEL-5113: ----------------------------------- Description: I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component: # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint. # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with. # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. I propose the following solutions to these problems: # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor(). (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.) # Replace maxMessagesPerPoll with maxInFlightMessages. Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve. # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility. How does this sound? I'm working on a patch. This is my first work on Camel, so if you see any problems with my approach let me know! Thanks, - Dan was: I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component: # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint. # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with. # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. (I guess if deleteAfterRead is turned on this doesn't make sense, but then doesn't turning on deleteAfterRead break fault tolerance completely?) I propose the following solutions to these problems: # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor(). (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.) # Replace maxMessagesPerPoll with maxInFlightMessages. Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve. # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility. How does this sound? I'm working on a patch. This is my first work on Camel, so if you see any problems with my approach let me know! Thanks, - Dan > Parallel and fault tolerant message processing for SQS endpoints. > ----------------------------------------------------------------- > > Key: CAMEL-5113 > URL: https://issues.apache.org/jira/browse/CAMEL-5113 > Project: Camel > Issue Type: Improvement > Components: camel-aws > Reporter: Daniel Carleton > > I'm using Camel to implement parallel processing of jobs in an SQS queue, and ran into a few issues with the current implementation of the SQS component: > # SqsConsumer uses a blocking/synchronous processor, which prevents parallel processing of multiple messages from a single endpoint. > # Having a maxMessagesPerPoll other than one doesn't seem to make sense, because any messages not being actively processed should be left back in the queue for other consumers to have a chance with. > # Rollback processing doesn't go back to SQS and set the visibility timeout to zero, which prevents immediate retries. > I propose the following solutions to these problems: > # Use an asynchronous processor in SqsConsumer by way of getAsyncProcessor(). (Messages in SQS aren't guaranteed to be FIFO anyway, so there should be no issue with order of processing.) > # Replace maxMessagesPerPoll with maxInFlightMessages. Put a semaphore in SqsConsumer to control the maximum number of in flight messages, and when polling SQS always set the number of available permits as the maximum number of messages to retrieve. > # In the onFailure callback for an exchange set the visibility timeout in SQS to zero via ChangeMessageVisibility. > How does this sound? I'm working on a patch. This is my first work on Camel, so if you see any problems with my approach let me know! > Thanks, > - Dan -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira