cxf-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dk...@apache.org
Subject svn commit: r1369074 - in /cxf/trunk/rt/transports/udp/src: main/java/org/apache/cxf/transport/udp/UDPConduit.java test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Date Fri, 03 Aug 2012 16:14:30 GMT
Author: dkulp
Date: Fri Aug  3 16:14:29 2012
New Revision: 1369074

URL: http://svn.apache.org/viewvc?rev=1369074&view=rev
Log:
Update to add some connection management to the UDP stuff to keep it
from using a ton of threads and a ton of ports

Modified:
    cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
    cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java

Modified: cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java?rev=1369074&r1=1369073&r2=1369074&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
(original)
+++ cxf/trunk/rt/transports/udp/src/main/java/org/apache/cxf/transport/udp/UDPConduit.java
Fri Aug  3 16:14:29 2012
@@ -24,6 +24,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.net.InetSocketAddress;
 import java.net.URI;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.logging.Logger;
 
 import org.apache.cxf.Bus;
@@ -45,19 +48,22 @@ import org.apache.mina.transport.socket.
  * 
  */
 public class UDPConduit extends AbstractConduit {
-
+    private static final String CXF_MESSAGE_ATTR = "CXFMessage";
+    private static final String HOST_PORT = UDPConduit.class + ".host:port";
     private static final Logger LOG = LogUtils.getL7dLogger(UDPDestination.class); 
 
     Bus bus;
-    public UDPConduit(EndpointReferenceType t, Bus bus) {
+    NioDatagramConnector connector = new NioDatagramConnector();
+    ConcurrentHashMap<String, Queue<ConnectFuture>> connections 
+        = new ConcurrentHashMap<String, Queue<ConnectFuture>>();
+    
+    public UDPConduit(EndpointReferenceType t, 
+                      final Bus bus) {
         super(t);
         this.bus = bus;
-    }
-
-    public void prepare(final Message message) throws IOException {
-        NioDatagramConnector connector = new NioDatagramConnector();
         connector.setHandler(new IoHandlerAdapter() {
             public void messageReceived(IoSession session, Object buf) {
+                Message message = (Message)session.getAttribute(CXF_MESSAGE_ATTR);
                 if (message.getExchange().getInMessage() == null) {
                     final Message inMessage = new MessageImpl();
                     inMessage.setExchange(message.getExchange());
@@ -85,17 +91,73 @@ public class UDPConduit extends Abstract
                 }
             }
         });
+    }
+
+    
+    public void close(Message msg) throws IOException {
+        super.close(msg);
+        if (msg.getExchange().isOneWay() 
+            || msg.getExchange().getInMessage() == msg
+            || msg.getExchange().getInFaultMessage() == msg) {
+            String s = (String)msg.getExchange().get(HOST_PORT);
+            ConnectFuture c = msg.getExchange().get(ConnectFuture.class);
+            if (s != null && c != null) {
+                c.getSession().removeAttribute(CXF_MESSAGE_ATTR);
+    
+                Queue<ConnectFuture> q = connections.get(s);
+                if (q == null) {
+                    connections.putIfAbsent(s, new ArrayBlockingQueue<ConnectFuture>(10));
+                    q = connections.get(s);
+                }
+                if (!q.offer(c)) {
+                    c.getSession().close(false);
+                }
+            }
+        }
+    }
+    public void close() {
+        super.close();
+        for (Queue<ConnectFuture> f : connections.values()) {
+            for (ConnectFuture cf : f) {
+                cf.getSession().close(false);
+            }
+        }
+        connections.clear();
+        connector.dispose();
+        connector = null;
+    }
+
+
+    public void prepare(final Message message) throws IOException {
         try {
-            URI uri = new URI(this.getTarget().getAddress().getValue());
+            String address = (String)message.get(Message.ENDPOINT_ADDRESS);
+            if (StringUtils.isEmpty(address)) {
+                address = this.getTarget().getAddress().getValue();
+            }
+            URI uri = new URI(address);
             InetSocketAddress isa = null;
+            String hp = ""; 
             if (StringUtils.isEmpty(uri.getHost())) {
                 isa = new InetSocketAddress(uri.getPort());
+                hp = ":" + uri.getPort();
             } else {
                 isa = new InetSocketAddress(uri.getHost(), uri.getPort());
+                hp = uri.getHost() + ":" + uri.getPort();
             }
     
-            ConnectFuture connFuture = connector.connect(isa);
-            message.setContent(OutputStream.class, new UDPConduitOutputStream(connector,
connFuture));
+            Queue<ConnectFuture> q = connections.get(hp);
+            ConnectFuture connFuture = null;
+            if (q != null) {
+                connFuture = q.poll();
+            }
+            if (connFuture == null) {
+                connFuture = connector.connect(isa);
+                connFuture.await();
+            }
+            connFuture.getSession().setAttribute(CXF_MESSAGE_ATTR, message);
+            message.setContent(OutputStream.class, new UDPConduitOutputStream(connector,
connFuture, message));
+            message.getExchange().put(ConnectFuture.class, connFuture);
+            message.getExchange().put(HOST_PORT, uri.getHost() + ":" + uri.getPort());
         } catch (Exception ex) {
             throw new IOException(ex);
         }
@@ -105,11 +167,15 @@ public class UDPConduit extends Abstract
         final ConnectFuture future;
         final NioDatagramConnector connector;
         final IoBuffer buffer = IoBuffer.allocate(64 * 1024); //max size
+        final Message message;
         boolean closed;
         
-        public UDPConduitOutputStream(NioDatagramConnector connector, ConnectFuture connFuture)
{
+        public UDPConduitOutputStream(NioDatagramConnector connector,
+                                      ConnectFuture connFuture,
+                                      Message m) {
             this.connector = connector;
             this.future = connFuture;
+            this.message = m;
         }
 
         public void write(int b) throws IOException {
@@ -136,6 +202,9 @@ public class UDPConduit extends Abstract
             }
             buffer.flip();
             future.getSession().write(buffer);
+            if (message.getExchange().isOneWay()) {
+                future.getSession().close(true);
+            }
         }
     }
     

Modified: cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
URL: http://svn.apache.org/viewvc/cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java?rev=1369074&r1=1369073&r2=1369074&view=diff
==============================================================================
--- cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
(original)
+++ cxf/trunk/rt/transports/udp/src/test/java/org/apache/cxf/transport/udp/UDPTransportTest.java
Fri Aug  3 16:14:29 2012
@@ -58,7 +58,9 @@ public class UDPTransportTest extends Ab
         JaxWsProxyFactoryBean fact = new JaxWsProxyFactoryBean(); 
         fact.setAddress("udp://localhost:" + PORT);
         Greeter g = fact.create(Greeter.class);
-        assertEquals("Hello World", g.greetMe("World"));
+        for (int x = 0; x < 500; x++) {
+            assertEquals("Hello World", g.greetMe("World"));
+        }
                
         ((java.io.Closeable)g).close();
     }



Mime
View raw message