cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1535093 - in /cxf/trunk: rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
Date Wed, 23 Oct 2013 18:18:03 GMT
Author: dkulp
Date: Wed Oct 23 18:18:02 2013
New Revision: 1535093

URL: http://svn.apache.org/r1535093
Log:
If something on the ws-discovery network throws a fault, ignore and continue getting the probematches
responses

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1535093&r1=1535092&r2=1535093&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Wed Oct 23 18:18:02 2013
@@ -30,7 +30,11 @@ import java.net.InterfaceAddress;
 import java.net.MulticastSocket;
 import java.net.NetworkInterface;
 import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Queue;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
@@ -76,41 +80,56 @@ public class UDPConduit extends Abstract
         connector.setHandler(new IoHandlerAdapter() {
             public void messageReceived(IoSession session, Object buf) {
                 Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
-                dataReceived(message, (IoBuffer)buf, true);
+                dataReceived(message, (IoBuffer)buf, true, false);
             }
         });
     }
 
-    private void dataReceived(Message message, IoBuffer buf, boolean async) {
-        if (message.getExchange().getInMessage() == null) {
-            final Message inMessage = new MessageImpl();
-            inMessage.setExchange(message.getExchange());
-            message.getExchange().setInMessage(inMessage);
-            
-            IoSessionInputStream ins = new IoSessionInputStream(buf);
-            inMessage.setContent(InputStream.class, ins);
-            inMessage.put(IoSessionInputStream.class, ins);
-            
-            if (async) {
-                WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
-                WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
-                if (queue == null) {
-                    queue = queuem.getAutomaticWorkQueue();
-                }
-                queue.execute(new Runnable() {
-                    public void run() {
-                        incomingObserver.onMessage(inMessage);
+    private void dataReceived(Message message, IoBuffer buf, boolean async, boolean multi)
{
+        synchronized (message.getExchange()) {
+            if (message.getExchange().getInMessage() == null) {
+                final Message inMessage = new MessageImpl();
+                IoSessionInputStream ins = new IoSessionInputStream(buf);
+                inMessage.setContent(InputStream.class, ins);
+                inMessage.put(IoSessionInputStream.class, ins);
+                
+                message.getExchange().setInMessage(inMessage);
+                inMessage.setExchange(message.getExchange());
+                
+                Map<String, Object> mp = null;
+                if (multi) {
+                    mp = new HashMap<String, Object>(message.getExchange());
+                }
+                
+                if (async) {
+                    WorkQueueManager queuem = bus.getExtension(WorkQueueManager.class);
+                    WorkQueue queue = queuem.getNamedWorkQueue("udp-conduit");
+                    if (queue == null) {
+                        queue = queuem.getAutomaticWorkQueue();
+                    }
+                    queue.execute(new Runnable() {
+                        public void run() {
+                            incomingObserver.onMessage(inMessage);
+                        }
+                    });
+                } else {
+                    incomingObserver.onMessage(inMessage);
+                    if (!message.getExchange().isSynchronous() || multi) {
+                        message.getExchange().setInMessage(null);
+                        message.getExchange().setInFaultMessage(null);
                     }
-                });
-            } else {
-                incomingObserver.onMessage(inMessage);
-                if (!message.getExchange().isSynchronous()) {
-                    message.getExchange().setInMessage(null);
                 }
+                if (mp != null) {
+                    Collection<String> s = new ArrayList<String>(message.getExchange().keySet());
+                    for (String s2 : s) {
+                        message.getExchange().remove(s2);
+                    }
+                    message.getExchange().putAll(mp);
+                }
+            } else {
+                IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
+                ins.setBuffer((IoBuffer)buf);
             }
-        } else {
-            IoSessionInputStream ins = message.getExchange().getInMessage().get(IoSessionInputStream.class);
-            ins.setBuffer((IoBuffer)buf);
         }
     }
     
@@ -280,14 +299,14 @@ public class UDPConduit extends Abstract
                 if (i == null || i <= 0 || message.getExchange().isSynchronous()) {
                     socket.setSoTimeout(30000);
                     socket.receive(p);
-                    dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false);
+                    dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()), false,
false);
                 } else {
                     socket.setSoTimeout(i);
                     boolean found = false;
                     try {
                         while (true) {
                             socket.receive(p);
-                            dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()),
false);
+                            dataReceived(message, IoBuffer.wrap(bytes, 0, p.getLength()),
false, true);
                             found = true;
                         }
                     } catch (java.net.SocketTimeoutException ex) {

Modified: cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java?rev=1535093&r1=1535092&r2=1535093&view=diff
==============================================================================
--- cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
(original)
+++ cxf/trunk/services/ws-discovery/ws-discovery-api/src/test/java/org/apache/cxf/ws/discovery/WSDiscoveryClientTest.java
Wed Oct 23 18:18:02 2013
@@ -52,6 +52,15 @@ public final class WSDiscoveryClientTest
             Endpoint ep = Endpoint.publish("http://localhost:51919/Foo/Snarf", new FooImpl());
             WSDiscoveryServiceImpl service = new WSDiscoveryServiceImpl(bus);
             service.startup();
+            
+            //this service will just generate an error.  However, the probes should still
+            //work to probe the above stuff.
+            WSDiscoveryServiceImpl s2 = new WSDiscoveryServiceImpl() {
+                public ProbeMatchesType handleProbe(ProbeType pt) {
+                    throw new RuntimeException("Error!!!");
+                }
+            };
+            s2.startup();
             HelloType h = service.register(ep.getEndpointReference());
 
             
@@ -66,7 +75,7 @@ public final class WSDiscoveryClientTest
             ProbeType pt = new ProbeType();
             ScopesType scopes = new ScopesType();
             pt.setScopes(scopes);
-            ProbeMatchesType pmts = c.probe(pt);
+            ProbeMatchesType pmts = c.probe(pt, 1000);
             System.out.println("2");
             if  (pmts != null) {
                 for (ProbeMatchType pmt : pmts.getProbeMatch()) {
@@ -75,7 +84,13 @@ public final class WSDiscoveryClientTest
                     System.out.println(pmt.getXAddrs());
                 }
             }
+            if (pmts.getProbeMatch().size() == 0) {
+                System.exit(0);
+            }
             pmts = c.probe(pt);
+            
+            System.out.println("Size:" + pmts.getProbeMatch().size());
+            
             System.out.println("3");
 
             W3CEndpointReference ref = null;



Mime
View raw message