cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From andreasm...@apache.org
Subject svn commit: r541656 - in /incubator/cxf/trunk: rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/src/test/java/org/apache/cxf/systest/ws/rm/
Date Fri, 25 May 2007 13:56:24 GMT
Author: andreasmyth
Date: Fri May 25 06:56:23 2007
New Revision: 541656

URL: http://svn.apache.org/viewvc?view=rev&rev=541656
Log:
* Defer creation of the RMEndpoint (and insertion into a strong hashmap) until a) first message
is processed on an interceptor chain including RM interceptors or b there actually are messages
to recover/resend from previous sessions.
* Delete unused sequences that were offered as part of CreateSequence requests thereby eliminating
the need to terminate them explicitly by sending of an out-of-band LastMessage and a subsequent
TerminateSequence. Should address the failure of the testTerminateOnShutdown test I disabled
recently.

Modified:
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
    incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Fri May
25 06:56:23 2007
@@ -361,29 +361,36 @@
     void recoverReliableEndpoint(Endpoint endpoint, Conduit conduit) {
         if (null == store || null == retransmissionQueue) {
             return;
-        }
-        
-        RMEndpoint rme = createReliableEndpoint(endpoint);        
-        rme.initialise(conduit, null);
-        reliableEndpoints.put(endpoint, rme);
+        }        
         
         String id = RMUtils.getEndpointIdentifier(endpoint);
-        LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
-                new Object[] {null == conduit ? "client" : "server", id});
+        
         Collection<SourceSequence> sss = store.getSourceSequences(id);
         if (null == sss || 0 == sss.size()) {                        
             return;
         }
         LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
-        for (SourceSequence ss : sss) {
-            rme.getSource().addSequence(ss, false);
+        
+        RMEndpoint rme = null;
+        
+        for (SourceSequence ss : sss) {            
  
             Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
-            if (null == ms) {
+            if (null == ms || 0 == ms.size()) {
                 continue;
             }
             LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
-            for (RMMessage m : ms) {
+            
+            if (null == rme) {
+                LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+                        new Object[] {null == conduit ? "client" : "server", id});
+                rme = createReliableEndpoint(endpoint);
+                rme.initialise(conduit, null);
+                reliableEndpoints.put(endpoint, rme);
+            }
+            rme.getSource().addSequence(ss, false);
+            
+            for (RMMessage m : ms) {                
                 
                 Message message = new MessageImpl();
                 Exchange exchange = new ExchangeImpl();
@@ -415,7 +422,7 @@
                 message.setContent(byte[].class, m.getContent());
                           
                 retransmissionQueue.addUnacknowledged(message);
-            }
+            }            
         }
         retransmissionQueue.start();
         

Modified: incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/Servant.java Fri May 25
06:56:23 2007
@@ -19,6 +19,7 @@
 
 package org.apache.cxf.ws.rm;
 
+import java.math.BigInteger;
 import java.util.Collections;
 import java.util.List;
 import java.util.logging.Level;
@@ -215,18 +216,28 @@
         // the following may be necessary if the last message for this sequence was a oneway
         // request and hence there was no response to which a last message could have been
added
         
-        for (SourceSequence outboundSeq : reliableEndpoint.getSource().getAllSequences())
{
+        // REVISIT: A last message for the correlated sequence should have been sent by the
time
+        // the last message for the underlying sequence was received.
+        
+        Source source = reliableEndpoint.getSource();
+        
+        for (SourceSequence outboundSeq : source.getAllSequences()) {
             if (outboundSeq.offeredBy(sid) && !outboundSeq.isLastMessage()) {
                 
+                if (BigInteger.ZERO.equals(outboundSeq.getCurrentMessageNr())) {
+                    source.removeSequence(outboundSeq);
+                }
                 // send an out of band message with an empty body and a 
                 // sequence header containing a lastMessage element.
                
+                /*
                 Proxy proxy = new Proxy(reliableEndpoint);
                 try {
                     proxy.lastMessage(outboundSeq);
                 } catch (RMException ex) {
                     LogUtils.log(LOG, Level.SEVERE, "CORRELATED_SEQ_TERMINATION_EXC", ex);
                 }
+                */
                 
                 break;
             }

Modified: incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
--- incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java (original)
+++ incubator/cxf/trunk/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java Fri
May 25 06:56:23 2007
@@ -530,9 +530,7 @@
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
         manager.setStore(store);
         manager.setRetransmissionQueue(queue);
-        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
-        RMEndpoint rme = control.createMock(RMEndpoint.class);
-        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+        
         Collection<SourceSequence> sss = new ArrayList<SourceSequence>();
         if (null != ss) {
             sss.add(ss);            
@@ -540,11 +538,7 @@
         EasyMock.expect(store.getSourceSequences("{S}s.{P}p")).andReturn(sss);
         if (null == ss) {
             return;
-        } 
-        Source source = control.createMock(Source.class);
-        EasyMock.expect(rme.getSource()).andReturn(source);
-        source.addSequence(ss, false);
-        EasyMock.expectLastCall();
+        }         
         
         Collection<RMMessage> ms = new ArrayList<RMMessage>();
         if (null != m) {
@@ -557,6 +551,15 @@
         if (null == m) {
             return;
         }
+        
+        manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
+        RMEndpoint rme = control.createMock(RMEndpoint.class);
+        EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
+        Source source = control.createMock(Source.class);
+        EasyMock.expect(rme.getSource()).andReturn(source);
+        source.addSequence(ss, false);
+        EasyMock.expectLastCall();
+        
         Service service = control.createMock(Service.class);
         EasyMock.expect(endpoint.getService()).andReturn(service);
         Binding binding = control.createMock(Binding.class);

Modified: incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
URL: http://svn.apache.org/viewvc/incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java?view=diff&rev=541656&r1=541655&r2=541656
==============================================================================
--- incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
(original)
+++ incubator/cxf/trunk/systests/src/test/java/org/apache/cxf/systest/ws/rm/SequenceTest.java
Fri May 25 06:56:23 2007
@@ -1219,8 +1219,7 @@
         mf.verifyLastMessage(new boolean[3], false);
         mf.verifyAcknowledgements(new boolean[] {false, true, true}, false);
     }
-    
-    @Ignore 
+     
     @Test
     public void testTerminateOnShutdown() throws Exception {
         if (!doTestTerminateOnShutdown) {



Mime
View raw message