activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r384569 - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/transport/ src/main/java/org/apache/activemq/transport/udp/ src/test/java/org/apache/activemq/transport/udp/
Date Thu, 09 Mar 2006 18:06:34 GMT
Author: jstrachan
Date: Thu Mar  9 10:06:32 2006
New Revision: 384569

URL: http://svn.apache.org/viewcvs?rev=384569&view=rev
Log:
initial spike of UDP server transport with some test cases (some of which are commented out
as they are not quite working yet)

Added:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
  (with props)
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
  (with props)
Removed:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServerClient.java
Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Thu Mar  9 10:06:32 2006
@@ -361,6 +361,8 @@
                 <exclude>**/MultipleTestsWithXBeanFactoryBeanTest.*</exclude>
                 <exclude>**/MultipleTestsWithSpringXBeanFactoryBeanTest.*</exclude>
                 
+                <!-- TODO FIXME -->
+                <exclude>**/UdpTransportUsingServerTest.*</exclude>
                 <exclude>**/UdpSendReceiveWithTwoConnectionsTest.*</exclude>
             </excludes>
         </unitTest>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/WireFormatNegotiator.java
Thu Mar  9 10:06:32 2006
@@ -58,7 +58,10 @@
         if( firstStart.compareAndSet(true, false) ) {
         	try {
         		WireFormatInfo info = wireFormat.getPreferedWireFormatInfo();
-	            next.oneway(info);
+                if (log.isDebugEnabled()) {
+                    log.debug("Sending: " + info);
+                }
+	            sendWireFormat(info);
         	} finally {
         		wireInfoSentDownLatch.countDown();
         	}
@@ -99,11 +102,12 @@
                 onException((IOException) new InterruptedIOException().initCause(e));
 			}
             readyCountDownLatch.countDown();
-            
+            onWireFormatNegotiated(info);
         }
         getTransportListener().onCommand(command);
     }
-    
+
+
     public void onException(IOException error) {
         readyCountDownLatch.countDown();
     	super.onException(error);
@@ -111,5 +115,12 @@
     
     public String toString() {
         return next.toString();
+    }
+
+    protected void sendWireFormat(WireFormatInfo info) throws IOException {
+        next.oneway(info);
+    }
+    
+    protected void onWireFormatNegotiated(WireFormatInfo info) {
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Thu Mar  9 10:06:32 2006
@@ -55,6 +55,7 @@
     private Object readLock = new Object();
     private ByteBuffer readBuffer;
     private CommandReadBuffer readStack;
+    private SocketAddress lastReadDatagramAddress;
 
     // writing
     private Object writeLock = new Object();
@@ -63,7 +64,8 @@
     private int largeMessageBufferSize = 128 * 1024;
     private DatagramHeader header = new DatagramHeader();
 
-    public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool
bufferPool, int datagramSize, DatagramReplayStrategy replayStrategy, SocketAddress targetAddress)
{
+    public CommandChannel(DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool
bufferPool, int datagramSize,
+            DatagramReplayStrategy replayStrategy, SocketAddress targetAddress) {
         this.channel = channel;
         this.wireFormat = wireFormat;
         this.bufferPool = bufferPool;
@@ -73,7 +75,7 @@
     }
 
     public void start() throws Exception {
-        //wireFormat.setPrefixPacketSize(false);
+        // wireFormat.setPrefixPacketSize(false);
         wireFormat.setCacheEnabled(false);
         wireFormat.setTightEncodingEnabled(true);
 
@@ -89,33 +91,43 @@
     }
 
     public void read(CommandProcessor processor) throws IOException {
+        DatagramHeader header = null;
         Command answer = null;
-        SocketAddress address = null;
+        lastReadDatagramAddress = null;
         synchronized (readLock) {
             readBuffer.clear();
-            address = channel.receive(readBuffer);
+            lastReadDatagramAddress = channel.receive(readBuffer);
             readBuffer.flip();
-
+            
             if (log.isDebugEnabled()) {
-                log.debug("Read a datagram from: " + address);
+                log.debug("Read a datagram from: " + lastReadDatagramAddress);
             }
-            DatagramHeader header = headerMarshaller.readHeader(readBuffer);
+            header = headerMarshaller.readHeader(readBuffer);
+            header.setFromAddress(lastReadDatagramAddress);
 
+            if (log.isDebugEnabled()) {
+                log.debug("Received datagram from: " + lastReadDatagramAddress + " header:
" + header);
+            }
             int remaining = readBuffer.remaining();
             int size = header.getDataSize();
+            /*
             if (size > remaining) {
                 throw new IOException("Invalid command size: " + size + " when there are
only: " + remaining + " byte(s) remaining");
             }
             else if (size < remaining) {
                 log.warn("Extra bytes in buffer. Expecting: " + size + " but has: " + remaining);
             }
+            */
+            if (size != remaining) {
+                log.warn("Expecting: " + size + " but has: " + remaining);
+            }
             if (header.isPartial()) {
                 byte[] data = new byte[size];
                 readBuffer.get(data);
                 header.setPartialData(data);
             }
             else {
-                byte[] data = new byte[size];
+                byte[] data = new byte[remaining];
                 readBuffer.get(data);
 
                 // TODO use a DataInput implementation that talks direct to the
@@ -128,17 +140,28 @@
             answer = readStack.read(header);
         }
         if (answer != null) {
-            processor.process(answer, address);
+            processor.process(answer, header);
         }
     }
 
+    /**
+     * Called if a packet is received on a different channel from a remote client
+     * @throws IOException 
+     */
+    public Command onDatagramReceived(DatagramHeader header) throws IOException {
+        return readStack.read(header);
+    }
+
     public void write(Command command) throws IOException {
         write(command, targetAddress);
     }
-        
+
     public void write(Command command, SocketAddress address) throws IOException {
         synchronized (writeLock) {
             header.incrementCounter();
+            bs = new BooleanStream();
+            // TODO
+            //bs.clear();
             int size = wireFormat.tightMarshal1(command, bs);
             if (size < datagramSize) {
                 header.setPartial(false);
@@ -187,11 +210,6 @@
         }
     }
 
-    protected void sendWriteBuffer(SocketAddress address) throws IOException {
-        writeBuffer.flip();
-        channel.send(writeBuffer, address);
-    }
-
     // Properties
     // -------------------------------------------------------------------------
 
@@ -225,5 +243,22 @@
         this.headerMarshaller = headerMarshaller;
     }
 
+    public SocketAddress getLastReadDatagramAddress() {
+        synchronized (readLock) {
+            return lastReadDatagramAddress;
+        }
+    }
+    
+
+    // Implementation methods
+    // -------------------------------------------------------------------------
+    protected void sendWriteBuffer(SocketAddress address) throws IOException {
+        writeBuffer.flip();
+        
+        if (log.isDebugEnabled()) {
+            log.debug("Sending datagram to: " + address + " header: " + header);
+        }
+        channel.send(writeBuffer, address);
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandProcessor.java
Thu Mar  9 10:06:32 2006
@@ -18,7 +18,7 @@
 
 import org.apache.activemq.command.Command;
 
-import java.net.SocketAddress;
+import java.io.IOException;
 
 /**
  * A callback used to process inbound commands
@@ -27,6 +27,6 @@
  */
 public interface CommandProcessor {
 
-    void process(Command command, SocketAddress address);
+    void process(Command command, DatagramHeader header) throws IOException;
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandReadBuffer.java
Thu Mar  9 10:06:32 2006
@@ -93,4 +93,5 @@
         return answer;
     }
 
+
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/DatagramHeader.java
Thu Mar  9 10:06:32 2006
@@ -18,6 +18,8 @@
 
 import org.apache.activemq.command.Command;
 
+import java.net.SocketAddress;
+
 /**
  * Represents a header used when sending data grams
  * 
@@ -32,6 +34,7 @@
     private int dataSize;
 
     // transient caches
+    private transient SocketAddress fromAddress;
     private transient byte[] partialData;
     private transient Command command;
 
@@ -66,6 +69,11 @@
         return getClass().getName().compareTo(that.getClass().getName());
     }
 
+    
+    public String toString() {
+        return "DatagramHeader[producer: " + producerId + " counter: " + counter + " flags:
" + getFlags();
+    }
+
     public boolean isComplete() {
         return complete;
     }
@@ -126,6 +134,8 @@
         complete = (flags & 0x2) != 0;
     }
 
+    // Transient cached properties
+    
     public Command getCommand() {
         return command;
     }
@@ -142,6 +152,12 @@
         this.partialData = partialData;
     }
 
-    // Transient cached properties
+    public SocketAddress getFromAddress() {
+        return fromAddress;
+    }
+
+    public void setFromAddress(SocketAddress fromAddress) {
+        this.fromAddress = fromAddress;
+    }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Thu Mar  9 10:06:32 2006
@@ -51,15 +51,22 @@
     private DatagramReplayStrategy replayStrategy = new ExceptionIfDroppedPacketStrategy();
     private int datagramSize = 4 * 1024;
     private long maxInactivityDuration = 0; // 30000;
-    private InetSocketAddress targetAddress;
+    private SocketAddress targetAddress;
+    private SocketAddress originalTargetAddress;
     private DatagramChannel channel;
     private boolean trace = false;
     private boolean useLocalHost = true;
     private int port;
+    private int minmumWireFormatVersion;
+    private String description = null;
+
     private CommandProcessor commandProcessor = new CommandProcessor() {
-        public void process(Command command, SocketAddress address) {
+        public void process(Command command, DatagramHeader header) {
             doConsume(command);
-        }};
+        }
+    };
+
+    private DatagramHeader wireFormatHeader;
 
     protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
         this.wireFormat = wireFormat;
@@ -68,13 +75,25 @@
     public UdpTransport(OpenWireFormat wireFormat, URI remoteLocation) throws UnknownHostException,
IOException {
         this(wireFormat);
         this.targetAddress = createAddress(remoteLocation);
+        description = remoteLocation.toString() + "@";
     }
 
-    public UdpTransport(OpenWireFormat wireFormat, InetSocketAddress socketAddress) throws
IOException {
+    public UdpTransport(OpenWireFormat wireFormat, SocketAddress socketAddress) throws IOException
{
         this(wireFormat);
         this.targetAddress = socketAddress;
+        this.description = "UdpServerConnection@";
     }
-    
+
+    /**
+     * Used by the server transport
+     */
+    public UdpTransport(OpenWireFormat wireFormat, int port) throws UnknownHostException,
IOException {
+        this(wireFormat);
+        this.port = port;
+        this.targetAddress = null;
+        this.description = "UdpServer@";
+    }
+
     /**
      * A one way asynchronous send
      */
@@ -85,19 +104,28 @@
     /**
      * A one way asynchronous send to a given address
      */
-    public void oneway(Command command, InetSocketAddress address) throws IOException {
+    public void oneway(Command command, SocketAddress address) throws IOException {
         if (log.isDebugEnabled()) {
-            log.debug("Sending oneway from port: " + port + " to target: " + targetAddress);
+            log.debug("Sending oneway from: " + this + " to target: " + targetAddress);
         }
         checkStarted(command);
         commandChannel.write(command, address);
     }
 
+    public void doConsume(Command command, DatagramHeader header) throws IOException {
+        wireFormatHeader = header;
+    }
+
     /**
      * @return pretty print of 'this'
      */
     public String toString() {
-        return "udp://" + targetAddress + "?port=" + port;
+        if (description != null) {
+            return description + port;
+        }
+        else {
+            return "udp://" + targetAddress + "@" + port;
+        }
     }
 
     /**
@@ -214,7 +242,18 @@
         this.port = port;
     }
 
-    
+    public int getMinmumWireFormatVersion() {
+        return minmumWireFormatVersion;
+    }
+
+    public void setMinmumWireFormatVersion(int minmumWireFormatVersion) {
+        this.minmumWireFormatVersion = minmumWireFormatVersion;
+    }
+
+    public OpenWireFormat getWireFormat() {
+        return wireFormat;
+    }
+
     // Implementation methods
     // -------------------------------------------------------------------------
     protected CommandProcessor getCommandProcessor() {
@@ -224,7 +263,7 @@
     protected void setCommandProcessor(CommandProcessor commandProcessor) {
         this.commandProcessor = commandProcessor;
     }
-    
+
     /**
      * Creates an address from the given URI
      */
@@ -251,25 +290,50 @@
         // TODO
         // connect to default target address to avoid security checks each time
         // channel = channel.connect(targetAddress);
-        
+
         DatagramSocket socket = channel.socket();
+        if (log.isDebugEnabled()) {
+            log.debug("Binding to address: " + localAddress);
+        }
         socket.bind(localAddress);
         if (port == 0) {
             port = socket.getLocalPort();
         }
-        
+
         if (bufferPool == null) {
             bufferPool = new DefaultBufferPool();
         }
         commandChannel = new CommandChannel(channel, wireFormat, bufferPool, datagramSize,
replayStrategy, targetAddress);
         commandChannel.start();
 
+        // lets pass the header & address into the channel so it avoids a
+        // re-request
+        if (wireFormatHeader != null) {
+            commandChannel.onDatagramReceived(wireFormatHeader);
+        }
+
         super.doStart();
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
         if (channel != null) {
             channel.close();
+        }
+    }
+
+    /**
+     * We have received the WireFormatInfo from the server on the actual channel
+     * we should use for all future communication with the server, so lets set
+     * the target to be the actual channel that the server has chosen for us to
+     * talk on.
+     */
+    public void useLastInboundDatagramAsNewTarget() {
+        if (originalTargetAddress == null) {
+            originalTargetAddress = targetAddress;
+        }
+        SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
+        if (lastAddress != null) {
+            targetAddress = lastAddress;
         }
     }
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Thu Mar  9 10:06:32 2006
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.udp;
 
 import org.activeio.command.WireFormat;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.ResponseCorrelator;
@@ -24,24 +25,33 @@
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntrospectionSupport;
-
-import javax.net.ServerSocketFactory;
-import javax.net.SocketFactory;
+import org.apache.activemq.util.URISupport;
 
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.net.UnknownHostException;
+import java.util.HashMap;
 import java.util.Map;
 
 public class UdpTransportFactory extends TransportFactory {
 
     public TransportServer doBind(String brokerId, final URI location) throws IOException
{
         try {
-            UdpTransport transport = (UdpTransport) doConnect(location);
-            UdpTransportServer server = new UdpTransportServer(transport);
+            Map options = new HashMap(URISupport.parseParamters(location));
+            if (options.containsKey("port")) {
+                throw new IllegalArgumentException("The port property cannot be specified
on a UDP server transport - please use the port in the URI syntax");
+            }
+            WireFormat wf = createWireFormat(options);
+            int port = location.getPort();
+            UdpTransport transport = new UdpTransport(asOpenWireFormat(wf), port);
+
+            Transport configuredTransport = configure(transport, wf, options, true);
+            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport);
+            transport.setCommandProcessor(server);
             return server;
         }
         catch (URISyntaxException e) {
@@ -53,45 +63,67 @@
     }
 
     public Transport configure(Transport transport, WireFormat format, Map options) {
-        IntrospectionSupport.setProperties(transport, options);
-        UdpTransport tcpTransport = (UdpTransport) transport;
+        return configure(transport, format, options, false);
+    }
 
-        if (tcpTransport.isTrace()) {
+    public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
+        IntrospectionSupport.setProperties(transport, options);
+        final UdpTransport udpTransport = (UdpTransport) transport;
+        if (udpTransport.isTrace()) {
             transport = new TransportLogger(transport);
         }
 
-        if (tcpTransport.getMaxInactivityDuration() > 0) {
-            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
+        if (format instanceof OpenWireFormat) {
+            transport = configureClientSideNegotiator(transport, format, udpTransport);
         }
 
-        transport = new ResponseCorrelator(transport);
+        if (udpTransport.getMaxInactivityDuration() > 0) {
+            transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
+        }
         return transport;
     }
 
-    public Transport compositeConfigure(Transport transport, WireFormat format, Map options)
{
+    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException,
IOException {
+        OpenWireFormat wireFormat = asOpenWireFormat(wf);
+        wireFormat.setSizePrefixDisabled(true);
+        return new UdpTransport(wireFormat, location);
+    }
+
+    protected Transport configure(Transport transport, WireFormat format, Map options, boolean
server) {
         IntrospectionSupport.setProperties(transport, options);
-        UdpTransport tcpTransport = (UdpTransport) transport;
-        if (tcpTransport.isTrace()) {
+        UdpTransport udpTransport = (UdpTransport) transport;
+
+        if (udpTransport.isTrace()) {
             transport = new TransportLogger(transport);
         }
 
-        if (tcpTransport.getMaxInactivityDuration() > 0) {
-            transport = new InactivityMonitor(transport, tcpTransport.getMaxInactivityDuration());
+        if (!server && format instanceof OpenWireFormat) {
+            transport = configureClientSideNegotiator(transport, format, udpTransport);
         }
-        return transport;
-    }
 
-    protected Transport createTransport(URI location, WireFormat wf) throws UnknownHostException,
IOException {
-        OpenWireFormat wireFormat = (OpenWireFormat) wf;
-        wireFormat.setSizePrefixDisabled(true);
-        return new UdpTransport(wireFormat, location);
+        if (udpTransport.getMaxInactivityDuration() > 0) {
+            transport = new InactivityMonitor(transport, udpTransport.getMaxInactivityDuration());
+        }
+
+        transport = new ResponseCorrelator(transport);
+        return transport;
     }
 
-    protected ServerSocketFactory createServerSocketFactory() {
-        return ServerSocketFactory.getDefault();
+    protected Transport configureClientSideNegotiator(Transport transport, WireFormat format,
final UdpTransport udpTransport) {
+        transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion())
{
+            protected void onWireFormatNegotiated(WireFormatInfo info) {
+                // lets switch to the targetAddress that the last packet was
+                // received as
+                udpTransport.useLastInboundDatagramAsNewTarget();
+            }
+        };
+        return transport;
     }
 
-    protected SocketFactory createSocketFactory() {
-        return SocketFactory.getDefault();
+    protected OpenWireFormat asOpenWireFormat(WireFormat wf) {
+        OpenWireFormat answer = (OpenWireFormat) wf;
+        answer.setSizePrefixDisabled(true);
+        answer.setCacheEnabled(false);
+        return answer;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
Thu Mar  9 10:06:32 2006
@@ -18,18 +18,23 @@
 
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.WireFormatInfo;
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.TransportServerSupport;
-import org.apache.activemq.transport.TransportSupport;
+import org.apache.activemq.transport.WireFormatNegotiator;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
+import java.io.IOException;
 import java.net.SocketAddress;
+import java.net.URI;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -39,14 +44,17 @@
  * @version $Revision$
  */
 
-public class UdpTransportServer extends TransportServerSupport {
+public class UdpTransportServer extends TransportServerSupport implements CommandProcessor
{
     private static final Log log = LogFactory.getLog(UdpTransportServer.class);
 
     private UdpTransport serverTransport;
+    private Transport configuredTransport;
     private Map transports = new HashMap();
 
-    public UdpTransportServer(UdpTransport serverTransport) {
+    public UdpTransportServer(URI connectURI, UdpTransport serverTransport, Transport configuredTransport)
{
+        super(connectURI);
         this.serverTransport = serverTransport;
+        this.configuredTransport = configuredTransport;
     }
 
     public String toString() {
@@ -64,56 +72,71 @@
     }
 
     protected void doStart() throws Exception {
-        serverTransport.start();
-        serverTransport.setCommandProcessor(new CommandProcessor() {
-            public void process(Command command, SocketAddress address) {
-                onInboundCommand(command, address);
+        log.info("Starting " + this);
+
+        configuredTransport.setTransportListener(new TransportListener() {
+            public void onCommand(Command command) {
+            }
+
+            public void onException(IOException error) {
+            }
+
+            public void transportInterupted() {
+            }
+
+            public void transportResumed() {
             }
         });
+        configuredTransport.start();
     }
 
     protected void doStop(ServiceStopper stopper) throws Exception {
-        serverTransport.stop();
+        configuredTransport.stop();
     }
 
-    protected void onInboundCommand(Command command, SocketAddress address) {
+    public void process(Command command, DatagramHeader header) throws IOException {
+        SocketAddress address = header.getFromAddress();
+        System.out.println(toString() + " received command: " + command + " from address:
" + address);
         Transport transport = null;
         synchronized (transports) {
             transport = (Transport) transports.get(address);
             if (transport == null) {
-                transport = createTransport(address);
+                System.out.println("###Êcreating new server connector");
+                transport = createTransport(command, header);
                 transport = configureTransport(transport);
                 transports.put(address, transport);
             }
-        }
-        processInboundCommand(command, transport);
-    }
-
-    public void sendOutboundCommand(Command command, SocketAddress address) {
-        // TODO we should use an inbound buffer to make this async
-        
-    }
-
-    protected void processInboundCommand(Command command, Transport transport) {
-        // TODO - consider making this asynchronous
-        TransportListener listener = transport.getTransportListener();
-        if (listener != null) {
-            listener.onCommand(command);
-        }
-        else {
-            log.error("No transportListener available for transport: " + transport + " to
process inbound command: " + command);
+            else {
+                log.warn("Discarding duplicate command to server: " + command + " from: "
+ address);
+            }
         }
     }
 
     protected Transport configureTransport(Transport transport) {
         transport = new ResponseCorrelator(transport);
-        transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
+        
+        // TODO
+        //transport = new InactivityMonitor(transport, serverTransport.getMaxInactivityDuration());
+
         getAcceptListener().onAccept(transport);
         return transport;
     }
 
-    protected TransportSupport createTransport(SocketAddress address) {
-        return new UdpTransportServerClient(this, address);
+    protected Transport createTransport(Command command, DatagramHeader header) throws IOException
{
+        final SocketAddress address = header.getFromAddress();
+        // TODO lets copy the wireformat...
+        final UdpTransport transport = new UdpTransport(serverTransport.getWireFormat(),
address);
+        
+        // lets send the packet into the transport so it can track packets
+        transport.doConsume(command, header);
+
+        return new WireFormatNegotiator(transport, serverTransport.getWireFormat(), serverTransport.getMinmumWireFormatVersion())
{
+
+            // lets use the specific addressing of wire format
+            protected void sendWireFormat(WireFormatInfo info) throws IOException {
+                transport.oneway(info, address);
+            }
+        };
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java?rev=384569&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
Thu Mar  9 10:06:32 2006
@@ -0,0 +1,55 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsTopicSendReceiveWithTwoConnectionsTest;
+
+/**
+ * @version
+ */
+public class UdpSendReceiveWithTwoConnectionsTest extends JmsTopicSendReceiveWithTwoConnectionsTest
{
+
+    protected String brokerURI = "udp://localhost:8891";
+    protected BrokerService broker;
+
+    protected void setUp() throws Exception {
+        broker = createBroker();
+        broker.start();
+
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        answer.setPersistent(false);
+        answer.addConnector(brokerURI);
+        return answer;
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory() throws Exception {
+        return new ActiveMQConnectionFactory(brokerURI);
+    }
+}
\ No newline at end of file

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
------------------------------------------------------------------------------
    svn:executable = *

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpSendReceiveWithTwoConnectionsTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
Thu Mar  9 10:06:32 2006
@@ -18,39 +18,50 @@
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportAcceptListener;
+import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
 
 import java.io.IOException;
+import java.net.URI;
 
 import junit.framework.TestCase;
 
 /**
- *
+ * 
  * @version $Revision$
  */
-public abstract class UdpTestSupport extends TestCase implements TransportListener  {
-
-    protected abstract Transport createConsumer() throws Exception;
-
-    protected abstract Transport createProducer() throws Exception;
+public abstract class UdpTestSupport extends TestCase implements TransportListener {
 
     protected Transport producer;
     protected Transport consumer;
 
     protected Object lock = new Object();
     protected Command receivedCommand;
-    
+    private TransportServer server;
+
     public void testSendingSmallMessage() throws Exception {
         ConsumerInfo expected = new ConsumerInfo();
         expected.setSelector("Cheese");
+        expected.setExclusive(true);
+        expected.setCommandId((short) 12);
+        expected.setExclusive(true);
+        expected.setPrefetchSize(3456);
+        
         try {
+            System.out.println("About to send: " + expected);
             producer.oneway(expected);
-            
+
             Command received = assertCommandReceived();
             assertTrue("Should have received a ConsumerInfo but was: " + received, received
instanceof ConsumerInfo);
             ConsumerInfo actual = (ConsumerInfo) received;
             assertEquals("Selector", expected.getSelector(), actual.getSelector());
+            assertEquals("isExclusive", expected.isExclusive(), actual.isExclusive());
+            assertEquals("getCommandId", expected.getCommandId(), actual.getCommandId());
+            assertEquals("getPrefetchSize", expected.getPrefetchSize(), actual.getPrefetchSize());
         }
         catch (Exception e) {
             System.out.println("Caught: " + e);
@@ -60,27 +71,49 @@
     }
 
     protected void setUp() throws Exception {
+        server = createServer();
+        if (server != null) {
+            server.setAcceptListener(new TransportAcceptListener() {
+
+                public void onAccept(Transport transport) {
+                    consumer = transport;
+                    consumer.setTransportListener(UdpTestSupport.this);
+                    try {
+                        consumer.start();
+                    }
+                    catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+                }
+
+                public void onAcceptError(Exception error) {
+                }
+            });
+            server.start();
+        }
+
         consumer = createConsumer();
+        if (consumer != null) {
+            consumer.setTransportListener(this);
+            consumer.start();
+        }
+
         producer = createProducer();
-    
-        consumer.setTransportListener(this);
         producer.setTransportListener(new TransportListener() {
             public void onCommand(Command command) {
             }
-    
+
             public void onException(IOException error) {
             }
-    
+
             public void transportInterupted() {
             }
-    
+
             public void transportResumed() {
             }
         });
-    
-        consumer.start();
+
         producer.start();
-    
     }
 
     protected void tearDown() throws Exception {
@@ -90,14 +123,22 @@
         if (consumer != null) {
             consumer.stop();
         }
+        if (server != null) {
+            server.stop();
+        }
     }
 
     public void onCommand(Command command) {
-        System.out.println("### Received command: " + command);
-        
-        synchronized (lock) {
-            receivedCommand = command;
-            lock.notifyAll();
+        if (command instanceof WireFormatInfo) {
+            System.out.println("Got WireFormatInfo: " + command);
+        }
+        else {
+            System.out.println("### Received command: " + command);
+
+            synchronized (lock) {
+                receivedCommand = command;
+                lock.notifyAll();
+            }
         }
     }
 
@@ -113,16 +154,23 @@
         System.out.println("### Transport resumed");
     }
 
-
     protected Command assertCommandReceived() throws InterruptedException {
         Command answer = null;
         synchronized (lock) {
             lock.wait(5000);
             answer = receivedCommand;
         }
-        
+
         assertNotNull("Should have received a Command by now!", answer);
         return answer;
+    }
+
+    protected abstract Transport createConsumer() throws Exception;
+
+    protected abstract Transport createProducer() throws Exception;
+
+    protected TransportServer createServer() throws Exception {
+        return null;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java?rev=384569&r1=384568&r2=384569&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportTest.java
Thu Mar  9 10:06:32 2006
@@ -16,6 +16,7 @@
  */
 package org.apache.activemq.transport.udp;
 
+import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 
@@ -27,17 +28,28 @@
  */
 public class UdpTransportTest extends UdpTestSupport {
 
-    protected String producerURI = "udp://localhost:8830";
-    protected String consumerURI = "udp://localhost:8831?port=8830";
+    protected int consumerPort = 8830;
+    protected String producerURI = "udp://localhost:" + consumerPort;
+    //protected String producerURI = "udp://localhost:8830";
+    //protected String consumerURI = "udp://localhost:8831?port=8830";
 
     protected Transport createProducer() throws Exception {
         System.out.println("Producer using URI: " + producerURI);
-        return TransportFactory.connect(new URI(producerURI));
+        
+        // The WireFormatNegotiator means we can only connect to servers
+        return new UdpTransport(createWireFormat(), new URI(producerURI));
+        
+        //return TransportFactory.connect(new URI(producerURI));
     }
 
     protected Transport createConsumer() throws Exception {
-        System.out.println("Consumer using URI: " + consumerURI);
-        return TransportFactory.connect(new URI(consumerURI));
+        System.out.println("Consumer on port: " + consumerPort);
+        return new UdpTransport(createWireFormat(), consumerPort);
+        //return TransportFactory.connect(new URI(consumerURI));
+    }
+
+    protected OpenWireFormat createWireFormat() {
+        return new OpenWireFormat();
     }
 
 }

Added: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java?rev=384569&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
(added)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
Thu Mar  9 10:06:32 2006
@@ -0,0 +1,56 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportServer;
+
+import java.net.URI;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class UdpTransportUsingServerTest extends UdpTestSupport {
+
+    protected int consumerPort = 8830;
+    protected String producerURI = "udp://localhost:" + consumerPort;
+    protected String serverURI = producerURI;
+
+    protected Transport createProducer() throws Exception {
+        System.out.println("Producer using URI: " + producerURI);
+        return TransportFactory.connect(new URI(producerURI));
+    }
+
+    protected TransportServer createServer() throws Exception {
+        return TransportFactory.bind("byBroker", new URI(serverURI));
+    }
+    
+    protected Transport createConsumer() throws Exception {
+        return null;
+    }
+
+    protected OpenWireFormat createWireFormat() {
+        OpenWireFormat answer = new OpenWireFormat();
+        answer.setCacheEnabled(false);
+        answer.setSizePrefixDisabled(true);
+        return answer;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTransportUsingServerTest.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain



Mime
View raw message