Return-Path: X-Original-To: apmail-cxf-commits-archive@www.apache.org Delivered-To: apmail-cxf-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 0A663DA86 for ; Thu, 13 Dec 2012 14:49:49 +0000 (UTC) Received: (qmail 14243 invoked by uid 500); 13 Dec 2012 14:49:48 -0000 Delivered-To: apmail-cxf-commits-archive@cxf.apache.org Received: (qmail 14068 invoked by uid 500); 13 Dec 2012 14:49:46 -0000 Mailing-List: contact commits-help@cxf.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@cxf.apache.org Delivered-To: mailing list commits@cxf.apache.org Received: (qmail 14039 invoked by uid 99); 13 Dec 2012 14:49:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Dec 2012 14:49:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 13 Dec 2012 14:49:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id DBC762388980; Thu, 13 Dec 2012 14:49:23 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1421320 - in /cxf/trunk/rt/ws/rm/src: main/java/org/apache/cxf/ws/rm/ main/java/org/apache/cxf/ws/rm/soap/ test/java/org/apache/cxf/ws/rm/ Date: Thu, 13 Dec 2012 14:49:23 -0000 To: commits@cxf.apache.org From: ay@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121213144923.DBC762388980@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: ay Date: Thu Dec 13 14:49:22 2012 New Revision: 1421320 URL: http://svn.apache.org/viewvc?rev=1421320&view=rev Log: [CXF-4700] Add an operation for manually cancel/remove WS-RM messages and sequences Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java?rev=1421320&r1=1421319&r2=1421320&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ManagedRMEndpoint.java Thu Dec 13 14:49:22 2012 @@ -496,9 +496,11 @@ public class ManagedRMEndpoint implement if (null == ss) { throw new JMException("no source sequence"); } - //TODO use cancel instead of suspend RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue(); - rq.suspend(ss); + if (rq.countUnacknowledged(ss) > 0) { + throw new JMException("sequence not empty"); + } + rq.stop(ss); ss.getSource().removeSequence(ss); } @@ -511,12 +513,24 @@ public class ManagedRMEndpoint implement if (null == ds) { throw new JMException("no source sequence"); } - //TODO use cancel instead of suspend // RedeliveryQueue rq = endpoint.getManager().getRedeliveryQueue(); // rq.suspend(ds); ds.getDestination().removeSequence(ds); } + @ManagedOperation(description = "Purge UnAcknowledged Messages") + @ManagedOperationParameters({ + @ManagedOperationParameter(name = "sequenceId", description = "The sequence identifier") + }) + public void purgeUnAcknowledgedMessages(String sid) { + SourceSequence ss = getSourceSeq(sid); + if (null == ss) { + throw new IllegalArgumentException("no sequence"); + } + RetransmissionQueue rq = endpoint.getManager().getRetransmissionQueue(); + rq.purgeAll(ss); + } + private static String getAddressValue(EndpointReferenceType epr) { if (null != epr && null != epr.getAddress()) { return epr.getAddress().getValue(); Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java?rev=1421320&r1=1421319&r2=1421320&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RetransmissionQueue.java Thu Dec 13 14:49:22 2012 @@ -60,6 +60,13 @@ public interface RetransmissionQueue { void purgeAcknowledged(SourceSequence seq); /** + * Purge all candiates for the given sequence. + * + * @param seq the sequence object + */ + void purgeAll(SourceSequence seq); + + /** * * @param seq * @return Modified: cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java?rev=1421320&r1=1421319&r2=1421320&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java (original) +++ cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/soap/RetransmissionQueueImpl.java Thu Dec 13 14:49:22 2012 @@ -131,6 +131,21 @@ public class RetransmissionQueueImpl imp * @param seq the sequence object. */ public void purgeAcknowledged(SourceSequence seq) { + purgeCandidates(seq, false); + } + + /** + * Purge all candidates for the given sequence. This method is used to + * terminate the sequence by force and release the resource associated + * with the sequence. + * + * @param seq the sequence object. + */ + public void purgeAll(SourceSequence seq) { + purgeCandidates(seq, true); + } + + private void purgeCandidates(SourceSequence seq, boolean any) { Collection purged = new ArrayList(); synchronized (this) { LOG.fine("Start purging resend candidates."); @@ -139,7 +154,7 @@ public class RetransmissionQueueImpl imp for (int i = sequenceCandidates.size() - 1; i >= 0; i--) { ResendCandidate candidate = sequenceCandidates.get(i); long m = candidate.getNumber(); - if (seq.isAcknowledged(m)) { + if (any || seq.isAcknowledged(m)) { sequenceCandidates.remove(i); candidate.resolved(); unacknowledgedCount--; @@ -260,7 +275,7 @@ public class RetransmissionQueueImpl imp } } } - + /** * @return the exponential backoff */ Modified: cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java URL: http://svn.apache.org/viewvc/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java?rev=1421320&r1=1421319&r2=1421320&view=diff ============================================================================== --- cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java (original) +++ cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ManagedRMManagerTest.java Thu Dec 13 14:49:22 2012 @@ -20,7 +20,9 @@ package org.apache.cxf.ws.rm; import java.util.ArrayList; +import java.util.Collections; import java.util.Date; +import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -174,6 +176,72 @@ public class ManagedRMManagerTest extend } @Test + public void testRemoveSequence() throws Exception { + manager = new RMManager(); + RMEndpoint rme = control.createMock(RMEndpoint.class); + EndpointReferenceType ref = RMUtils.createReference(TEST_URI); + Source source = new Source(rme); + Destination destination = new Destination(rme); + + RetransmissionQueue rq = new TestRetransmissionQueue(); + manager.setRetransmissionQueue(rq); + manager.initialise(); + + SourceSequence ss1 = createTestSourceSequence(source, "seq1", ref, + ProtocolVariation.RM10WSA200408, new long[]{1L, 1L, 3L, 3L}); + SourceSequence ss3 = createTestSourceSequence(source, "seq3", ref, + ProtocolVariation.RM10WSA200408, new long[]{1L, 5L}); + + EasyMock.expect(rme.getManager()).andReturn(manager).anyTimes(); + EasyMock.expect(rme.getSource()).andReturn(source).anyTimes(); + EasyMock.expect(rme.getDestination()).andReturn(destination).anyTimes(); + + control.replay(); + setCurrentMessageNumber(ss1, 5L); + setCurrentMessageNumber(ss3, 5L); + source.addSequence(ss1); + source.addSequence(ss3); + source.setCurrent(ss3); + + ManagedRMEndpoint managedEndpoint = new ManagedRMEndpoint(rme); + + // for those sequences without any unacknowledged messages + CompositeData cd = managedEndpoint.getSourceSequence("seq3"); + assertNotNull(cd); + + managedEndpoint.removeSourceSequence("seq3"); + try { + cd = managedEndpoint.getSourceSequence("seq3"); + fail("sequnce not removed"); + } catch (Exception e) { + // ok + } + + // for those sequences with some unacknowledged messages + cd = managedEndpoint.getSourceSequence("seq1"); + assertNotNull(cd); + + try { + managedEndpoint.removeSourceSequence("seq1"); + fail("sequnce may not be removed"); + } catch (Exception e) { + // ok + } + cd = managedEndpoint.getSourceSequence("seq1"); + assertNotNull(cd); + + managedEndpoint.purgeUnAcknowledgedMessages("seq1"); + managedEndpoint.removeSourceSequence("seq1"); + + try { + cd = managedEndpoint.getSourceSequence("seq1"); + fail("sequnce not removed"); + } catch (Exception e) { + // ok + } + } + + @Test public void testGetSourceSequenceAcknowledgedRange() throws Exception { ManagedRMEndpoint managedEndpoint = createTestManagedRMEndpoint(); @@ -400,15 +468,19 @@ public class ManagedRMManagerTest extend private class TestRetransmissionQueue implements RetransmissionQueue { private Set suspended = new HashSet(); private RetryStatus status = new TestRetransmissionStatus(); + private Map> numlists = new HashMap>(); + + public TestRetransmissionQueue() { + numlists.put("seq1", new ArrayList()); + numlists.put("seq2", new ArrayList()); + Collections.addAll(numlists.get("seq1"), 2L, 4L); + Collections.addAll(numlists.get("seq2"), 3L); + } public int countUnacknowledged(SourceSequence seq) { - final String key = seq.getIdentifier().getValue(); - if ("seq1".equals(key)) { - return 2; - } else if ("seq2".equals(key)) { - return 1; - } - return 0; + final String key = seq.getIdentifier().getValue(); + List list = numlists.get(key); + return list != null ? list.size() : 0; } public boolean isEmpty() { @@ -423,25 +495,24 @@ public class ManagedRMManagerTest extend // TODO Auto-generated method stub } - public List getUnacknowledgedMessageNumbers(SourceSequence seq) { - List list = new ArrayList(); + public void purgeAll(SourceSequence seq) { final String key = seq.getIdentifier().getValue(); - if ("seq1".equals(key)) { - list.add(2L); - list.add(4L); - } else if ("seq2".equals(key)) { - list.add(3L); + List list = numlists.get(key); + if (list != null) { + list.clear(); } - return list; + } + + public List getUnacknowledgedMessageNumbers(SourceSequence seq) { + final String key = seq.getIdentifier().getValue(); + List list = numlists.get(key); + return list != null ? list : new ArrayList(); } public RetryStatus getRetransmissionStatus(SourceSequence seq, long num) { final String key = seq.getIdentifier().getValue(); - if (("seq1".equals(key) && (2L == num || 4L == num)) - || ("seq2".equals(key) && (2L == num))) { - return status; - } - return null; + List list = numlists.get(key); + return list.contains(num) ? status : null; } public Map getRetransmissionStatuses(SourceSequence seq) { @@ -474,7 +545,7 @@ public class ManagedRMManagerTest extend } public int countUnacknowledged() { - return 3; + return numlists.get("seq1").size() + numlists.get("seq2").size(); } }