Return-Path: X-Original-To: apmail-qpid-users-archive@www.apache.org Delivered-To: apmail-qpid-users-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 611DD10D43 for ; Thu, 16 Jan 2014 19:20:49 +0000 (UTC) Received: (qmail 2016 invoked by uid 500); 16 Jan 2014 19:20:48 -0000 Delivered-To: apmail-qpid-users-archive@qpid.apache.org Received: (qmail 1987 invoked by uid 500); 16 Jan 2014 19:20:48 -0000 Mailing-List: contact users-help@qpid.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: users@qpid.apache.org Delivered-To: mailing list users@qpid.apache.org Received: (qmail 1978 invoked by uid 99); 16 Jan 2014 19:20:47 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jan 2014 19:20:47 +0000 X-ASF-Spam-Status: No, hits=1.5 required=5.0 tests=HTML_MESSAGE,RCVD_IN_DNSWL_LOW,SPF_PASS X-Spam-Check-By: apache.org Received-SPF: pass (nike.apache.org: domain of helenkwong@gmail.com designates 74.125.82.178 as permitted sender) Received: from [74.125.82.178] (HELO mail-we0-f178.google.com) (74.125.82.178) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 16 Jan 2014 19:20:40 +0000 Received: by mail-we0-f178.google.com with SMTP id t60so3598941wes.37 for ; Thu, 16 Jan 2014 11:20:20 -0800 (PST) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:date:message-id:subject:from:to:content-type; bh=150GWfLv7ZF9czwYpkCOtWM0vV32UBFbGJZnE2ygVHw=; b=gMKt+qCkAhhL3BqhGtIYp4XDRkeCDWdHLliuMscY0DUH9xKEpo5/s3nN8roeD9S9lZ AoWmrUPW1LxO4aTDqr8L0o4Gux36eyXfkcOBB6SwPPb7I3t7u0kjjtjDIhsfG8OItDho YSWluutulesTebtuL6fHGwldP5iO+3ISQCFcOugYzgHHhymRz+DsxoEbdZKw6wx0aa41 s2NfU//SJI6sRH37PMfxNYB6QOjEK/QeiwJjILf+YIWJkAHTX+PSe63ERduHM5qX6tmw iC3TnOKGcQzwPMpzc3vJSMXgVVU7evOl9NwvEi3UYafRQ8YuxyT2BUGo/NdoyzRI89+x GrSA== MIME-Version: 1.0 X-Received: by 10.180.77.74 with SMTP id q10mr9622744wiw.39.1389900020239; Thu, 16 Jan 2014 11:20:20 -0800 (PST) Received: by 10.216.187.5 with HTTP; Thu, 16 Jan 2014 11:20:20 -0800 (PST) Date: Thu, 16 Jan 2014 11:20:20 -0800 Message-ID: Subject: Limiting the number of concurrent consumers across multiple queues From: Helen Kwong To: users@qpid.apache.org Content-Type: multipart/alternative; boundary=f46d043bdf6afaefbe04f01b50be X-Virus-Checked: Checked by ClamAV on apache.org --f46d043bdf6afaefbe04f01b50be Content-Type: text/plain; charset=windows-1252 Content-Transfer-Encoding: quoted-printable Hi Qpid users / experts, I need to limit the number of consumers concurrently processing messages considered to be in the same group, across multiple queues, and was wondering if anyone has ideas about how to do it. We=92re using the Java broker and client, and have multiple queues, each with multiple listeners, each listener=92s session listening to multiple queues. Some messages are associated with groups, and for a given group we want at most K listeners processing messages from the group at any given time. The messages are enqueued to multiple queues, and it=92s possible for messages from the same group to be in different queues. If messages in the same group can go into only one queue, then the message groups feature will give us what we need (it=92d work directly with K =3D 1= and with K > 1 we can tweak the grouping value, e.g., hash it to one of 1 to K and append the number to the grouping value). But since messages considered to be in the same group can be in different queues, the feature is not enough for our case. Since it looks like the broker side doesn=92t have what we need exactly, we=92re thinking about how to do this from the client side. We=92re thinkin= g along the lines of having some semaphore object per group, shared between the different listeners, and whenever a listener receives a message, it will try to acquire a permit from the semaphore for that group. If it=92s able to acquire a permit, then process the message and release the permit upon completion. If it=92s not able to acquire a permit, reenqueue the message in some way. For example: 1) Reenqueue the message back to the same queue so it can be retried right away. But this would lead to a lot of churning when permits are not available for a while, so we=92ve ruled this out. 2) Same as #1, but sleep for a short while first so we wouldn=92t have the high churning. But since each listener=92s session is responsible for multiple queues, this can decrease the throughput of other queues. 3) Enqueue the message to a special queue that stores messages waiting for a permit, a queue that is not listened to by anyone. A periodic sweeper job will wake up once in a while, say every minute, and pulls all the messages off of the waiting queue and reenqueues them to their respective original queues. But throughput would be limited by sweeper interval. 4) Like #3, but don=92t use a periodic sweeper. Instead, when a listener th= at was able to acquire a permit is done with a message, look up the next waiting message of the same group in the waiting queue using a JMS selector, and reenqueue it back to the original queue. But look up performance might be bad if queue depth is high. Each of these has some drawbacks. Does anyone have ideas about other possible approaches (maybe entirely different from the above), or has done something similar? Thanks, Helen --f46d043bdf6afaefbe04f01b50be--