activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r384837 - in /incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp: CommandChannel.java UdpTransport.java UdpTransportFactory.java UdpTransportServer.java
Date Fri, 10 Mar 2006 16:12:47 GMT
Author: jstrachan
Date: Fri Mar 10 08:12:47 2006
New Revision: 384837

URL: http://svn.apache.org/viewcvs?rev=384837&view=rev
Log:
a little bit of spring cleaning to remove some of the cruft

Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
Fri Mar 10 08:12:47 2006
@@ -16,15 +16,12 @@
  */
 package org.apache.activemq.transport.udp;
 
-import org.activeio.ByteSequence;
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
 import org.apache.activemq.command.Endpoint;
 import org.apache.activemq.command.LastPartialCommand;
 import org.apache.activemq.command.PartialCommand;
-import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.transport.TransportListener;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -57,7 +54,6 @@
     // reading
     private Object readLock = new Object();
     private ByteBuffer readBuffer;
-    private SocketAddress lastReadDatagramAddress;
 
     // writing
     private Object writeLock = new Object();
@@ -80,10 +76,6 @@
     }
 
     public void start() throws Exception {
-        // wireFormat.setPrefixPacketSize(false);
-        wireFormat.setCacheEnabled(false);
-        wireFormat.setTightEncodingEnabled(true);
-
         bufferPool.setDefaultSize(datagramSize);
         bufferPool.start();
         readBuffer = bufferPool.borrowBuffer();
@@ -96,22 +88,19 @@
 
     public Command read() throws IOException {
         Command answer = null;
-        lastReadDatagramAddress = null;
         synchronized (readLock) {
             readBuffer.clear();
-            lastReadDatagramAddress = channel.receive(readBuffer);
+            SocketAddress address = channel.receive(readBuffer);
             readBuffer.flip();
 
-            Endpoint from = headerMarshaller.createEndpoint(readBuffer, lastReadDatagramAddress);
+            Endpoint from = headerMarshaller.createEndpoint(readBuffer, address);
 
             int remaining = readBuffer.remaining();
-            
             byte[] data = new byte[remaining];
             readBuffer.get(data);
 
             // TODO could use a DataInput implementation that talks direct to
-            // the
-            // ByteBuffer
+            // the ByteBuffer to avoid object allocation and unnecessary buffering?
             DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(data));
             answer = (Command) wireFormat.unmarshal(dataIn);
             answer.setFrom(from);
@@ -124,15 +113,6 @@
         return answer;
     }
 
-    /**
-     * Called if a packet is received on a different channel from a remote
-     * client
-     * 
-     * @throws IOException
-     */
-    public void setWireFormatInfoEndpoint(DatagramEndpoint endpoint) throws IOException {
-    }
-
     public void write(Command command) throws IOException {
         write(command, targetAddress);
     }
@@ -236,11 +216,6 @@
         this.headerMarshaller = headerMarshaller;
     }
 
-    public SocketAddress getLastReadDatagramAddress() {
-        synchronized (readLock) {
-            return lastReadDatagramAddress;
-        }
-    }
 
     // Implementation methods
     // -------------------------------------------------------------------------

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
Fri Mar 10 08:12:47 2006
@@ -18,11 +18,12 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportThreadSupport;
-import org.apache.activemq.transport.replay.ReplayStrategy;
 import org.apache.activemq.transport.replay.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.replay.ReplayStrategy;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -60,7 +61,6 @@
     private int port;
     private int minmumWireFormatVersion;
     private String description = null;
-    private DatagramEndpoint wireFormatHeader;
 
     protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
         this.wireFormat = wireFormat;
@@ -105,11 +105,7 @@
         checkStarted(command);
         commandChannel.write(command, address);
     }
-
-    public void receivedHeader(DatagramEndpoint endpoint) {
-        wireFormatHeader = endpoint;
-    }
-
+    
     /**
      * @return pretty print of 'this'
      */
@@ -132,10 +128,6 @@
                 Command command = commandChannel.read();
                 doConsume(command);
             }
-            /*
-             * catch (SocketTimeoutException e) { } catch
-             * (InterruptedIOException e) { }
-             */
             catch (AsynchronousCloseException e) {
                 try {
                     stop();
@@ -168,13 +160,16 @@
      * the target to be the actual channel that the server has chosen for us to
      * talk on.
      */
-    public void useLastInboundDatagramAsNewTarget() {
-        if (originalTargetAddress == null) {
-            originalTargetAddress = targetAddress;
-        }
-        SocketAddress lastAddress = commandChannel.getLastReadDatagramAddress();
-        if (lastAddress != null) {
-            targetAddress = lastAddress;
+    public void setTargetEndpoint(Endpoint newTarget) {
+        if (newTarget instanceof DatagramEndpoint) {
+            DatagramEndpoint endpoint = (DatagramEndpoint) newTarget;
+            SocketAddress address = endpoint.getAddress();
+            if (address != null) {
+                if (originalTargetAddress == null) {
+                    originalTargetAddress = targetAddress;
+                }
+                targetAddress = address;
+            }
         }
     }
 
@@ -317,12 +312,6 @@
         }
         commandChannel = new CommandChannel(toString(), channel, wireFormat, bufferPool,
datagramSize, targetAddress, createDatagramHeaderMarshaller());
         commandChannel.start();
-
-        // lets pass the header & address into the channel so it avoids a
-        // re-request
-        if (wireFormatHeader != null) {
-            commandChannel.setWireFormatInfoEndpoint(wireFormatHeader);
-        }
 
         super.doStart();
     }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
Fri Mar 10 08:12:47 2006
@@ -120,9 +120,10 @@
     protected Transport configureClientSideNegotiator(Transport transport, WireFormat format,
final UdpTransport udpTransport) {
         transport = new WireFormatNegotiator(transport, asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion())
{
             protected void onWireFormatNegotiated(WireFormatInfo info) {
-                // lets switch to the targetAddress that the last packet was
-                // received as so that all future requests go to the newly created UDP channel
-                udpTransport.useLastInboundDatagramAsNewTarget();
+                // lets switch to the target endpoint
+                // based on the last packet that was received
+                // so that all future requests go to the newly created UDP channel
+                udpTransport.setTargetEndpoint(info.getFrom());
             }
         };
         return transport;

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=384837&r1=384836&r2=384837&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
(original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
Fri Mar 10 08:12:47 2006
@@ -18,7 +18,6 @@
 
 import org.apache.activemq.command.BrokerInfo;
 import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Endpoint;
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.CommandJoiner;
@@ -144,9 +143,6 @@
         final SocketAddress address = endpoint.getAddress();
         final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
         final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
-
-        // TODO - is this still required?
-        transport.receivedHeader(endpoint);
 
         Transport configuredTransport = new CommandJoiner(transport, connectionWireFormat);
         



Mime
View raw message