Return-Path: X-Original-To: apmail-curator-commits-archive@minotaur.apache.org Delivered-To: apmail-curator-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 4300C18BD6 for ; Mon, 22 Jun 2015 23:58:34 +0000 (UTC) Received: (qmail 84485 invoked by uid 500); 22 Jun 2015 23:58:34 -0000 Delivered-To: apmail-curator-commits-archive@curator.apache.org Received: (qmail 84457 invoked by uid 500); 22 Jun 2015 23:58:34 -0000 Mailing-List: contact commits-help@curator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@curator.apache.org Delivered-To: mailing list commits@curator.apache.org Received: (qmail 84445 invoked by uid 99); 22 Jun 2015 23:58:34 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2015 23:58:34 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id DD4B5E362D; Mon, 22 Jun 2015 23:58:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: cammckenzie@apache.org To: commits@curator.apache.org Date: Mon, 22 Jun 2015 23:58:33 -0000 Message-Id: <2a2bc94d85944da290207219d0bd2fe5@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] curator git commit: Curator-224: Fixed the requeuing problem with DistributedIdQueue. Repository: curator Updated Branches: refs/heads/CURATOR-224 [created] 19bb4d1c4 Curator-224: Fixed the requeuing problem with DistributedIdQueue. Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/83e1a855 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/83e1a855 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/83e1a855 Branch: refs/heads/CURATOR-224 Commit: 83e1a855a15e17a37ec02d441d6c75e7f08a2617 Parents: 20e92a5 Author: Zhihong Zhang Authored: Mon Jun 22 14:59:02 2015 -0400 Committer: Zhihong Zhang Committed: Mon Jun 22 14:59:02 2015 -0400 ---------------------------------------------------------------------- .../recipes/queue/DistributedQueue.java | 2 +- .../recipes/queue/TestDistributedIdQueue.java | 47 ++++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/83e1a855/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java index 9dd2217..a183adf 100644 --- a/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java +++ b/curator-recipes/src/main/java/org/apache/curator/framework/recipes/queue/DistributedQueue.java @@ -756,7 +756,7 @@ public class DistributedQueue implements QueueBase client.inTransaction() .delete().forPath(itemPath) .and() - .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(makeItemPath(), bytes) + .create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(itemPath, bytes) .and() .commit(); } http://git-wip-us.apache.org/repos/asf/curator/blob/83e1a855/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java ---------------------------------------------------------------------- diff --git a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java index 30e552f..858086b 100644 --- a/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java +++ b/curator-recipes/src/test/java/org/apache/curator/framework/recipes/queue/TestDistributedIdQueue.java @@ -124,4 +124,51 @@ public class TestDistributedIdQueue extends BaseClassForTests CloseableUtils.closeQuietly(client); } } + + @Test + public void testRequeuingWithLock() throws Exception + { + DistributedIdQueue queue = null; + CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1)); + client.start(); + try + { + final CountDownLatch consumingLatch = new CountDownLatch(1); + + QueueConsumer consumer = new QueueConsumer() + { + @Override + public void consumeMessage(TestQueueItem message) throws Exception + { + consumingLatch.countDown(); + // Throw an exception so requeuing occurs + throw new Exception("Consumer failed"); + } + + @Override + public void stateChanged(CuratorFramework client, ConnectionState newState) + { + } + }; + + queue = QueueBuilder.builder(client, consumer, serializer, QUEUE_PATH).lockPath("/locks").buildIdQueue(); + queue.start(); + + queue.put(new TestQueueItem("test"), "id"); + + Assert.assertTrue(consumingLatch.await(10, TimeUnit.SECONDS)); // wait until consumer has it + + // Sleep one more second + + Thread.sleep(1000); + + Assert.assertEquals(queue.remove("id"), 1); + + } + finally + { + CloseableUtils.closeQuietly(queue); + CloseableUtils.closeQuietly(client); + } + } }