cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ff...@apache.org
Subject svn commit: r1038519 - in /cxf/branches/2.3.x-fixes: ./ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/ rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/ rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/ systests/ws-specs/src/test/java/org/apache/c...
Date Wed, 24 Nov 2010 08:18:32 GMT
Author: ffang
Date: Wed Nov 24 08:18:31 2010
New Revision: 1038519

URL: http://svn.apache.org/viewvc?rev=1038519&view=rev
Log:
Merged revisions 1038509 via svnmerge from 
https://svn.apache.org/repos/asf/cxf/trunk

........
  r1038509 | ffang | 2010-11-24 15:42:49 +0800 (δΈ‰, 24 11 2010) | 1 line
  
  [CXF-3114]WS-RM's RMTxStore's does not recover stored sequences after restart
........

Modified:
    cxf/branches/2.3.x-fixes/   (props changed)
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
    cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
    cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java

Propchange: cxf/branches/2.3.x-fixes/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java?rev=1038519&r1=1038518&r2=1038519&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java (original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/RMManager.java Wed
Nov 24 08:18:31 2010
@@ -399,12 +399,18 @@ public class RMManager implements Server
         String id = RMUtils.getEndpointIdentifier(endpoint);
         
         Collection<SourceSequence> sss = store.getSourceSequences(id);
-        if (null == sss || 0 == sss.size()) {                        
+        Collection<DestinationSequence> dss = store.getDestinationSequences(id);
+        if ((null == sss || 0 == sss.size()) && (null == dss || 0 == dss.size()))
{                        
             return;
         }
         LOG.log(Level.FINE, "Number of source sequences: {0}", sss.size());
+        LOG.log(Level.FINE, "Number of destination sequences: {0}", dss.size());
         
-        RMEndpoint rme = null;
+        LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
+                new Object[] {null == conduit ? "client" : "server", id});
+        RMEndpoint rme = createReliableEndpoint(endpoint);
+        rme.initialise(conduit, null, null);
+        reliableEndpoints.put(endpoint, rme);
         for (SourceSequence ss : sss) {            
  
             Collection<RMMessage> ms = store.getMessages(ss.getIdentifier(), true);
@@ -413,13 +419,6 @@ public class RMManager implements Server
             }
             LOG.log(Level.FINE, "Number of messages in sequence: {0}", ms.size());
             
-            if (null == rme) {
-                LOG.log(Level.FINE, "Recovering {0} endpoint with id: {1}",
-                        new Object[] {null == conduit ? "client" : "server", id});
-                rme = createReliableEndpoint(endpoint);
-                rme.initialise(conduit, null, null);
-                reliableEndpoints.put(endpoint, rme);
-            }
             rme.getSource().addSequence(ss, false);
             
             for (RMMessage m : ms) {                
@@ -462,6 +461,10 @@ public class RMManager implements Server
                 retransmissionQueue.addUnacknowledged(message);
             }            
         }
+        
+        for (DestinationSequence ds : dss) {
+            rme.getDestination().addSequence(ds, false);        
+        }
         retransmissionQueue.start();
         
     }

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java?rev=1038519&r1=1038518&r2=1038519&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
(original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/main/java/org/apache/cxf/ws/rm/persistence/jdbc/RMTxStore.java
Wed Nov 24 08:18:31 2010
@@ -605,7 +605,7 @@ public class RMTxStore implements RMStor
         }
         
         try {
-            connection.setAutoCommit(false);
+            connection.setAutoCommit(true);
             createTables();
         } catch (SQLException ex) {
             LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
@@ -615,7 +615,14 @@ public class RMTxStore implements RMStor
                 LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", se);
             }
             throw new RMStoreException(ex);
-        }   
+        } finally {
+            try {
+                connection.setAutoCommit(false);                
+            } catch (SQLException ex) {
+                LogUtils.log(LOG, Level.SEVERE, "CONNECT_EXC", ex);
+                throw new RMStoreException(ex);
+            }
+        }
     }   
     
     Connection getConnection() {

Modified: cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java?rev=1038519&r1=1038518&r2=1038519&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
(original)
+++ cxf/branches/2.3.x-fixes/rt/ws/rm/src/test/java/org/apache/cxf/ws/rm/RMManagerTest.java
Wed Nov 24 08:18:31 2010
@@ -490,7 +490,7 @@ public class RMManagerTest extends Asser
         InterfaceInfo ii = control.createMock(InterfaceInfo.class);
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);          
         Conduit conduit = control.createMock(Conduit.class);        
-        setUpRecoverReliableEndpoint(endpoint, conduit, null, null);
+        setUpRecoverReliableEndpoint(endpoint, conduit, null, null, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -498,7 +498,8 @@ public class RMManagerTest extends Asser
         control.reset();
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);
         SourceSequence ss = control.createMock(SourceSequence.class);
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, null);
+        DestinationSequence ds = control.createMock(DestinationSequence.class);
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, null);
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();
@@ -506,7 +507,7 @@ public class RMManagerTest extends Asser
         control.reset();
         setUpEndpointForRecovery(endpoint, ei, si, bi, ii);  
         RMMessage m = control.createMock(RMMessage.class);
-        setUpRecoverReliableEndpoint(endpoint, conduit, ss, m);        
+        setUpRecoverReliableEndpoint(endpoint, conduit, ss, ds, m);        
         control.replay();
         manager.recoverReliableEndpoint(endpoint, conduit);
         control.verify();        
@@ -528,7 +529,7 @@ public class RMManagerTest extends Asser
     void setUpRecoverReliableEndpoint(Endpoint endpoint,
                                       Conduit conduit, 
                                       SourceSequence ss, 
-                                      RMMessage m)  {                
+                                      DestinationSequence ds, RMMessage m)  {           
    
         RMStore store = control.createMock(RMStore.class);
         RetransmissionQueue queue = control.createMock(RetransmissionQueue.class);
         manager.setStore(store);
@@ -543,6 +544,14 @@ public class RMManagerTest extends Asser
             return;
         }         
         
+        Collection<DestinationSequence> dss = new ArrayList<DestinationSequence>();
+        if (null != ds) {
+            dss.add(ds);            
+        }
+        EasyMock.expect(store.getDestinationSequences("{S}s.{P}p")).andReturn(dss);
+        if (null == ds) {
+            return;
+        }
         Collection<RMMessage> ms = new ArrayList<RMMessage>();
         if (null != m) {
             ms.add(m);
@@ -551,25 +560,29 @@ public class RMManagerTest extends Asser
         id.setValue("S1");
         EasyMock.expect(ss.getIdentifier()).andReturn(id).times(null == m ? 1 : 2);
         EasyMock.expect(store.getMessages(id, true)).andReturn(ms);
-        if (null == m) {
-            return;
-        }
+        
         
         manager.setReliableEndpointsMap(new HashMap<Endpoint, RMEndpoint>());
         RMEndpoint rme = control.createMock(RMEndpoint.class);
         EasyMock.expect(manager.createReliableEndpoint(endpoint)).andReturn(rme);
         Source source = control.createMock(Source.class);
-        EasyMock.expect(rme.getSource()).andReturn(source);
-        source.addSequence(ss, false);
+        EasyMock.expect(rme.getSource()).andReturn(source).anyTimes();
+                
+        Destination destination = control.createMock(Destination.class);
+        EasyMock.expect(rme.getDestination()).andReturn(destination);
+        destination.addSequence(ds, false);
         EasyMock.expectLastCall();
         
         Service service = control.createMock(Service.class);
-        EasyMock.expect(endpoint.getService()).andReturn(service);
+        EasyMock.expect(endpoint.getService()).andReturn(service).anyTimes();
         Binding binding = control.createMock(Binding.class);
-        EasyMock.expect(endpoint.getBinding()).andReturn(binding);
+        EasyMock.expect(endpoint.getBinding()).andReturn(binding).anyTimes();
        
-        EasyMock.expect(ss.isLastMessage()).andReturn(true);
-        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN);
+        EasyMock.expect(ss.isLastMessage()).andReturn(true).anyTimes();
+        EasyMock.expect(ss.getCurrentMessageNr()).andReturn(BigInteger.TEN).anyTimes();
+        if (null == m) {
+            return;
+        }
         EasyMock.expect(m.getMessageNumber()).andReturn(BigInteger.TEN).times(2);
         if (null == conduit) {
             EasyMock.expect(m.getTo()).andReturn("toAddress");

Modified: cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
URL: http://svn.apache.org/viewvc/cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java?rev=1038519&r1=1038518&r2=1038519&view=diff
==============================================================================
--- cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
(original)
+++ cxf/branches/2.3.x-fixes/systests/ws-specs/src/test/java/org/apache/cxf/systest/ws/rm/ServerPersistenceTest.java
Wed Nov 24 08:18:31 2010
@@ -71,7 +71,8 @@ public class ServerPersistenceTest exten
 
     private OutMessageRecorder out;
     private InMessageRecorder in;
-
+    private Bus greeterBus;
+    
     @BeforeClass
     public static void startServers() throws Exception {
         String derbyHome = System.getProperty("derby.system.home");
@@ -106,7 +107,7 @@ public class ServerPersistenceTest exten
         assertTrue("Failed to start greeter", control.startGreeter(SERVER_LOSS_CFG)); 
         LOG.fine("Started greeter server.");
         
-        Bus greeterBus = new SpringBusFactory().createBus(CFG);
+        greeterBus = new SpringBusFactory().createBus(CFG);
         LOG.fine("Created bus " + greeterBus + " with cfg : " + CFG);        
         BusFactory.setDefaultBus(greeterBus);
         
@@ -134,7 +135,7 @@ public class ServerPersistenceTest exten
         
         LOG.fine("Configured greeter client.");
 
-        Response<GreetMeResponse> responses[] = cast(new Response[3]);
+        Response<GreetMeResponse> responses[] = cast(new Response[4]);
         
         responses[0] = greeter.greetMeAsync("one");
         responses[1] = greeter.greetMeAsync("two");
@@ -153,6 +154,12 @@ public class ServerPersistenceTest exten
         
         verifyServerRecovery(responses);
         
+        out.getOutboundMessages().clear();
+        in.getInboundMessages().clear();
+        
+        responses[3] = greeter.greetMeAsync("four");
+        verifyRetransmissionQueue();
+        
         greeterBus.shutdown(true);
         
         control.stopGreeter(CFG);
@@ -165,7 +172,7 @@ public class ServerPersistenceTest exten
         // wait another while to prove that response to second request is indeed lost
         Thread.sleep(4000);
         int nDone = 0;
-        for (int i = 0; i < responses.length; i++) {
+        for (int i = 0; i < 3; i++) {
             if (responses[i].isDone()) {
                 nDone++;
             }
@@ -200,7 +207,7 @@ public class ServerPersistenceTest exten
         long waited = 0;
         while (waited < 20) {
             nDone = 0;
-            for (int i = 0; i < responses.length; i++) {
+            for (int i = 0; i < responses.length - 1; i++) {
                 if (responses[i].isDone()) {
                     nDone++;
                 }
@@ -231,6 +238,13 @@ public class ServerPersistenceTest exten
     }
   
     
+    void verifyRetransmissionQueue() throws Exception {
+        awaitMessages(1, 3, 40000);
+        
+        boolean empty = greeterBus.getExtension(RMManager.class).getRetransmissionQueue().isEmpty();
+        assertTrue("Retransmission Queue is not empty", empty);
+    }
+
     protected void awaitMessages(int nExpectedOut, int nExpectedIn) {
         awaitMessages(nExpectedOut, nExpectedIn, 10000);
     }



Mime
View raw message