activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r387385 [1/2] - in /incubator/activemq/trunk/activemq-core: ./ src/main/java/org/apache/activemq/command/ src/main/java/org/apache/activemq/openwire/v1/ src/main/java/org/apache/activemq/transport/ src/main/java/org/apache/activemq/transpor...
Date Tue, 21 Mar 2006 02:49:05 GMT
Author: jstrachan
Date: Mon Mar 20 18:49:00 2006
New Revision: 387385

URL: http://svn.apache.org/viewcvs?rev=387385&view=rev
Log:
refactored LastPartialCommand so that it is-a PartialCommand so can contain data and should marshal just the same as the PartialCommand; this solves the double-datagram issue when closing a partial command stream
also refactored the reliable transport some more so that it is close to working; should have test cases working soon...

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ResponseRedirectInterceptor.java
Removed:
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/ReliableTransportTest.java
Modified:
    incubator/activemq/trunk/activemq-core/project.xml
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTransactionTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/KeepAliveInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LastPartialCommandTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/LocalTransactionIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageAckTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchNotificationTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageDispatchTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/MessageTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ProducerInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/RemoveSubscriptionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ReplayCommandTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ResponseTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SessionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ShutdownInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/SubscriptionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionIdTestSupport.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/TransactionInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/WireFormatInfoTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/XATransactionIdTest.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/project.xml
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/project.xml?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/project.xml (original)
+++ incubator/activemq/trunk/activemq-core/project.xml Mon Mar 20 18:49:00 2006
@@ -363,6 +363,8 @@
                 
                 <!-- TODO FIX ME ASAP -->
                 <exclude>**/MulticastNetworkTest.*</exclude>
+                <exclude>**/UnreliableUdpTransportTest.*</exclude>
+                <exclude>**/MulticastTransportTest.*</exclude>
                 <exclude>**/UdpSendReceiveWithTwoConnectionsAndLargeMessagesTest.*</exclude>
             </excludes>
         </unitTest>

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/LastPartialCommand.java Mon Mar 20 18:49:00 2006
@@ -24,17 +24,13 @@
  * @openwire:marshaller code="61"
  * @version $Revision$
  */
-public class LastPartialCommand extends BaseCommand {
+public class LastPartialCommand extends PartialCommand {
 
     public static final byte DATA_STRUCTURE_TYPE = CommandTypes.PARTIAL_LAST_COMMAND;
 
     public LastPartialCommand() {
     }
 
-    public LastPartialCommand(boolean responseRequired) {
-        setResponseRequired(responseRequired);
-    }
-
     public byte getDataStructureType() {
         return DATA_STRUCTURE_TYPE;
     }
@@ -44,17 +40,13 @@
     }
 
     /**
-     * Lets copy across the required fields from this last partial command to
-     * the newly unmarshalled complete command
+     * Lets copy across any transient fields from this command 
+     * to the complete command when it is unmarshalled on the other end
      *
      * @param completeCommand the newly unmarshalled complete command
      */
     public void configure(Command completeCommand) {
-        // copy across the transient properties
+        // copy across the transient properties added by the low level transport
         completeCommand.setFrom(getFrom());
-
-        // TODO should not be required as the large command would be marshalled with this property
-        //completeCommand.setResponseRequired(isResponseRequired());
     }
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ReplayCommand.java Mon Mar 20 18:49:00 2006
@@ -115,5 +115,8 @@
         this.lastNakNumber = lastNakNumber;
     }
 
+    public String toString() {
+        return "ReplayCommand {commandId = " + getCommandId() + ", firstNakNumber = " + getFirstNakNumber() + ", lastNakNumber = " + getLastNakNumber() + "}";
+    }
     
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/openwire/v1/LastPartialCommandMarshaller.java Mon Mar 20 18:49:00 2006
@@ -37,7 +37,7 @@
  *
  * @version $Revision$
  */
-public class LastPartialCommandMarshaller extends BaseCommandMarshaller {
+public class LastPartialCommandMarshaller extends PartialCommandMarshaller {
 
     /**
      * Return the type of Data Structure we marshal

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/CommandJoiner.java Mon Mar 20 18:49:00 2006
@@ -47,7 +47,7 @@
 
     public void onCommand(Command command) {
         byte type = command.getDataStructureType();
-        if (type == PartialCommand.DATA_STRUCTURE_TYPE) {
+        if (type == PartialCommand.DATA_STRUCTURE_TYPE || type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
             PartialCommand header = (PartialCommand) command;
             byte[] partialData = header.getData();
             try {
@@ -56,21 +56,22 @@
             catch (IOException e) {
                 getTransportListener().onException(e);
             }
-        }
-        else if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
-            try {
-                byte[] fullData = out.toByteArray();
-                out.reset();
-                Command completeCommand = (Command) wireFormat.unmarshal(new DataInputStream(new ByteArrayInputStream(fullData)));
-                
-                LastPartialCommand lastCommand = (LastPartialCommand) command;
-                lastCommand.configure(completeCommand);
-                
-                getTransportListener().onCommand(completeCommand);
-            }
-            catch (IOException e) {
-                log.warn("Failed to unmarshal partial command: " + command);
-                getTransportListener().onException(e);
+            if (type == LastPartialCommand.DATA_STRUCTURE_TYPE) {
+                try {
+                    byte[] fullData = out.toByteArray();
+                    out.reset();
+                    DataInputStream dataIn = new DataInputStream(new ByteArrayInputStream(fullData));
+                    Command completeCommand = (Command) wireFormat.unmarshal(dataIn);
+
+                    LastPartialCommand lastCommand = (LastPartialCommand) command;
+                    lastCommand.configure(completeCommand);
+
+                    getTransportListener().onCommand(completeCommand);
+                }
+                catch (IOException e) {
+                    log.warn("Failed to unmarshal partial command: " + command);
+                    getTransportListener().onException(e);
+                }
             }
         }
         else {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/TransportFactory.java Mon Mar 20 18:49:00 2006
@@ -198,7 +198,7 @@
         return "default";
     }
 
-    protected Transport configure(Transport transport, WireFormat wf, Map options) {
+    protected Transport configure(Transport transport, WireFormat wf, Map options) throws Exception {
         IntrospectionSupport.setProperties(transport, options);
         transport = new MutexTransport(transport);
         transport = new ResponseCorrelator(transport);

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java Mon Mar 20 18:49:00 2006
@@ -114,11 +114,20 @@
         socket.joinGroup(getMulticastAddress());
         socket.setSoTimeout((int) keepAliveInterval);
 
-        return new CommandDatagramSocket( this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), socket);
+        return new CommandDatagramSocket(this, getWireFormat(), getDatagramSize(), getTargetAddress(),
+                createDatagramHeaderMarshaller(), getSocket());
     }
 
     protected InetAddress getMulticastAddress() {
         return mcastAddress;
+    }
+
+    protected MulticastSocket getSocket() {
+        return socket;
+    }
+
+    protected void setSocket(MulticastSocket socket) {
+        this.socket = socket;
     }
 
     protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java Mon Mar 20 18:49:00 2006
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.transport.reliable;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
 import java.io.IOException;
 import java.util.HashMap;
 import java.util.Map;
@@ -26,6 +29,8 @@
  */
 public class DefaultReplayBuffer implements ReplayBuffer {
 
+    private static final Log log = LogFactory.getLog(DefaultReplayBuffer.class);
+
     private final int size;
     private ReplayBufferListener listener;
     private Map map;
@@ -38,6 +43,9 @@
     }
 
     public void addBuffer(int commandId, Object buffer) {
+        if (log.isDebugEnabled()) {
+            log.debug("Adding command ID: " + commandId + " to replay buffer: " + this + " object: " + buffer);
+        }
         synchronized (lock) {
             int max = size - 1;
             while (map.size() >= max) {
@@ -54,6 +62,12 @@
     }
 
     public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException {
+        if (replayer == null) {
+            throw new IllegalArgumentException("No Replayer parameter specified");
+        }
+        if (log.isDebugEnabled()) {
+            log.debug("Buffer: " + this + " replaying messages from: " + fromCommandId + " to: " + toCommandId);
+        }
         for (int i = fromCommandId; i <= toCommandId; i++) {
             Object buffer = null;
             synchronized (lock) {
@@ -72,5 +86,4 @@
             listener.onBufferDiscarded(commandId, buffer);
         }
     }
-
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java Mon Mar 20 18:49:00 2006
@@ -35,15 +35,15 @@
         this.maximumDifference = maximumDifference;
     }
 
-    public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
+    public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter, int nextAvailableCounter) throws IOException {
         int difference = actualCounter - expectedCounter;
         long count = Math.abs(difference);
         if (count > maximumDifference) {
-            int upperLimit = actualCounter;
+            int upperLimit = actualCounter - 1;
             if (upperLimit < expectedCounter) {
                 upperLimit = expectedCounter;
             }
-            transport.requestReplay(expectedCounter, upperLimit );
+            transport.requestReplay(expectedCounter, upperLimit);
         }
 
         // lets discard old commands

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ExceptionIfDroppedReplayStrategy.java Mon Mar 20 18:49:00 2006
@@ -35,7 +35,7 @@
         this.maximumDifference = maximumDifference;
     }
 
-    public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
+    public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter, int nextAvailableCounter) throws IOException {
         int difference = actualCounter - expectedCounter;
         long count = Math.abs(difference);
         if (count > maximumDifference) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Mon Mar 20 18:49:00 2006
@@ -23,6 +23,7 @@
 import org.apache.activemq.transport.FutureResponse;
 import org.apache.activemq.transport.ResponseCorrelator;
 import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.udp.UdpTransport;
 import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -47,15 +48,18 @@
     private int requestTimeout = 2000;
     private ReplayBuffer replayBuffer;
     private Replayer replayer;
+    private UdpTransport udpTransport;
 
     public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
         super(next);
         this.replayStrategy = replayStrategy;
     }
 
-    public ReliableTransport(Transport next, IntSequenceGenerator sequenceGenerator, ReplayStrategy replayStrategy) {
-        super(next, sequenceGenerator);
-        this.replayStrategy = replayStrategy;
+    public ReliableTransport(Transport next, UdpTransport udpTransport)
+            throws IOException {
+        super(next, udpTransport.getSequenceGenerator());
+        this.udpTransport = udpTransport;
+        this.replayer = udpTransport.createReplayer();
     }
 
     /**
@@ -72,7 +76,6 @@
             getTransportListener().onException(e);
         }
     }
-    
 
     public Response request(Command command) throws IOException {
         FutureResponse response = asyncRequest(command);
@@ -89,7 +92,7 @@
         FutureResponse response = asyncRequest(command);
         while (timeout > 0) {
             int time = timeout;
-            if (timeout > requestTimeout) { 
+            if (timeout > requestTimeout) {
                 time = requestTimeout;
             }
             Response result = response.getResult(time);
@@ -118,8 +121,15 @@
 
         if (!valid) {
             synchronized (commands) {
+                int nextCounter = actualCounter;
+                boolean empty = commands.isEmpty();
+                if (!empty) {
+                    Command nextAvailable = (Command) commands.first();
+                    nextCounter = nextAvailable.getCommandId();
+                }
+                
                 try {
-                    boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter);
+                    boolean keep = replayStrategy.onDroppedPackets(this, expectedCounter, actualCounter, nextCounter);
 
                     if (keep) {
                         // lets add it to the list for later on
@@ -133,7 +143,7 @@
                     onException(e);
                 }
 
-                if (!commands.isEmpty()) {
+                if (!empty) {
                     // lets see if the first item in the set is the next
                     // expected
                     command = (Command) commands.first();
@@ -185,25 +195,26 @@
         this.expectedCounter = expectedCounter;
     }
 
-    
     public int getRequestTimeout() {
         return requestTimeout;
     }
 
     /**
-     * Sets the default timeout of requests before starting to request commands are replayed
+     * Sets the default timeout of requests before starting to request commands
+     * are replayed
      */
     public void setRequestTimeout(int requestTimeout) {
         this.requestTimeout = requestTimeout;
     }
 
-
     public ReplayStrategy getReplayStrategy() {
         return replayStrategy;
     }
 
-
     public ReplayBuffer getReplayBuffer() {
+        if (replayBuffer == null) {
+            replayBuffer = createReplayBuffer();
+        }
         return replayBuffer;
     }
 
@@ -222,16 +233,30 @@
         this.replayBufferCommandCount = replayBufferSize;
     }
 
+    public void setReplayStrategy(ReplayStrategy replayStrategy) {
+        this.replayStrategy = replayStrategy;
+    }
+
+    public Replayer getReplayer() {
+        return replayer;
+    }
+
+    public void setReplayer(Replayer replayer) {
+        this.replayer = replayer;
+    }
+
     public String toString() {
         return next.toString();
     }
-    
-    
+
     public void start() throws Exception {
-        super.start();
-        if (replayBuffer == null) {
-            replayBuffer = createReplayBuffer();
+        if (udpTransport != null) {
+            udpTransport.setReplayBuffer(getReplayBuffer());
+        }
+        if (replayStrategy == null) {
+            throw new IllegalArgumentException("Property replayStrategy not specified");
         }
+        super.start();
     }
 
     /**
@@ -239,20 +264,27 @@
      */
     protected void onMissingResponse(Command command, FutureResponse response) {
         log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
-        
+
         int commandId = command.getCommandId();
         requestReplay(commandId, commandId);
     }
-    
+
     protected ReplayBuffer createReplayBuffer() {
         return new DefaultReplayBuffer(getReplayBufferCommandCount());
     }
 
     protected void replayCommands(ReplayCommand command) {
         try {
-            replayBuffer.replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
-            
-            // TODO we could proactively remove ack'd stuff from the replay buffer
+            if (replayer == null) {
+                onException(new IOException("Cannot replay commands. No replayer property configured"));
+            }
+            if (log.isDebugEnabled()) {
+                log.debug("Processing replay command: " + command);
+            }
+            getReplayBuffer().replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
+
+            // TODO we could proactively remove ack'd stuff from the replay
+            // buffer
             // if we only have a single client talking to us
         }
         catch (IOException e) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayStrategy.java Mon Mar 20 18:49:00 2006
@@ -32,9 +32,10 @@
      * @param transport the transport on which the packet was dropped
      * @param expectedCounter the expected command counter
      * @param actualCounter the actual command counter
+     * @param nextAvailableCounter TODO
      * @return true if the command should be buffered or false if it should be discarded
      */
-    boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException;
+    boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter, int nextAvailableCounter) throws IOException;
 
     void onReceivedPacket(ReliableTransport transport, long expectedCounter);
 

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Mon Mar 20 18:49:00 2006
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.reliable.ReplayBuffer;
 import org.apache.activemq.transport.reliable.Replayer;
 
 import java.io.IOException;
@@ -29,22 +30,24 @@
  */
 public interface CommandChannel extends Replayer, Service {
 
-    public abstract Command read() throws IOException;
+    public Command read() throws IOException;
 
-    public abstract void write(Command command, SocketAddress address) throws IOException;
+    public void write(Command command, SocketAddress address) throws IOException;
 
-    public abstract int getDatagramSize();
+    public int getDatagramSize();
 
     /**
      * Sets the default size of a datagram on the network.
      */
-    public abstract void setDatagramSize(int datagramSize);
+    public void setDatagramSize(int datagramSize);
 
-    public abstract DatagramHeaderMarshaller getHeaderMarshaller();
+    public DatagramHeaderMarshaller getHeaderMarshaller();
 
-    public abstract void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
+    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller);
 
-    public abstract void setTargetAddress(SocketAddress address);
+    public void setTargetAddress(SocketAddress address);
 
-    public abstract void setReplayAddress(SocketAddress address);
+    public void setReplayAddress(SocketAddress address);
+
+    public void setReplayBuffer(ReplayBuffer replayBuffer);
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java Mon Mar 20 18:49:00 2006
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.command.Command;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.reliable.ReplayBuffer;
 import org.apache.activemq.util.IntSequenceGenerator;
 
 import java.io.IOException;
@@ -36,6 +37,7 @@
     protected final String name;
     protected final IntSequenceGenerator sequenceGenerator;
     protected DatagramHeaderMarshaller headerMarshaller;
+    private ReplayBuffer replayBuffer;
 
     public CommandChannelSupport(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
             DatagramHeaderMarshaller headerMarshaller) {
@@ -96,6 +98,14 @@
 
     public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
         this.headerMarshaller = headerMarshaller;
+    }
+
+    public ReplayBuffer getReplayBuffer() {
+        return replayBuffer;
+    }
+
+    public void setReplayBuffer(ReplayBuffer replayBuffer) {
+        this.replayBuffer = replayBuffer;
     }
 
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Mon Mar 20 18:49:00 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.PartialCommand;
 import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.reliable.ReplayBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -45,6 +46,7 @@
 
     private DatagramChannel channel;
     private ByteBufferPool bufferPool;
+
     // reading
     private Object readLock = new Object();
     private ByteBuffer readBuffer;
@@ -53,7 +55,9 @@
     private Object writeLock = new Object();
     private int defaultMarshalBufferSize = 64 * 1024;
 
-    public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, ByteBufferPool bufferPool) {
+    public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel,
+            ByteBufferPool bufferPool) {
         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
         this.bufferPool = bufferPool;
@@ -83,7 +87,7 @@
                     continue;
                 }
                 from = headerMarshaller.createEndpoint(readBuffer, address);
- 
+
                 int remaining = readBuffer.remaining();
                 byte[] data = new byte[remaining];
                 readBuffer.get(data);
@@ -99,7 +103,7 @@
         }
         if (answer != null) {
             answer.setFrom(from);
-            
+
             if (log.isDebugEnabled()) {
                 log.debug("Channel: " + name + " received from: " + from + " about to process: " + answer);
             }
@@ -107,7 +111,6 @@
         return answer;
     }
 
-    
     public void write(Command command, SocketAddress address) throws IOException {
         synchronized (writeLock) {
 
@@ -127,6 +130,7 @@
                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
                     // write the header
                     if (fragment > 0) {
+                        writeBuffer = bufferPool.borrowBuffer();
                         writeBuffer.clear();
                         headerMarshaller.writeHeader(command, writeBuffer);
                     }
@@ -170,7 +174,12 @@
                         chunkSize = length - offset;
                     }
 
-                    writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
+                    if (lastFragment) {
+                        writeBuffer.put(LastPartialCommand.DATA_STRUCTURE_TYPE);
+                    }
+                    else {
+                        writeBuffer.put(PartialCommand.DATA_STRUCTURE_TYPE);
+                    }
 
                     if (bs != null) {
                         bs.marshal(writeBuffer);
@@ -192,23 +201,13 @@
                     writeBuffer.put(data, offset, chunkSize);
 
                     offset += chunkSize;
-                    sendWriteBuffer(address, writeBuffer, commandId);
+                    sendWriteBuffer(commandId, address, writeBuffer, false);
                 }
-
-                // now lets write the last partial command
-                command = new LastPartialCommand(command.isResponseRequired());
-                command.setCommandId(sequenceGenerator.getNextSequenceId());
-                
-                largeBuffer = new ByteArrayOutputStream(defaultMarshalBufferSize);
-                wireFormat.marshal(command, new DataOutputStream(largeBuffer));
-                data = largeBuffer.toByteArray();
-
-                writeBuffer.clear();
-                headerMarshaller.writeHeader(command, writeBuffer);
             }
-
-            writeBuffer.put(data);
-            sendWriteBuffer(address, writeBuffer, command.getCommandId());
+            else {
+                writeBuffer.put(data);
+                sendWriteBuffer(command.getCommandId(), address, writeBuffer, false);
+            }
         }
     }
 
@@ -228,21 +227,34 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(SocketAddress address, ByteBuffer writeBuffer, int commandId) throws IOException {
+    protected void sendWriteBuffer(int commandId, SocketAddress address, ByteBuffer writeBuffer, boolean redelivery)
+            throws IOException {
+        // lets put the datagram into the replay buffer first to prevent timing
+        // issues
+        ReplayBuffer bufferCache = getReplayBuffer();
+        if (bufferCache != null && !redelivery) {
+            bufferCache.addBuffer(commandId, writeBuffer);
+        }
+        
         writeBuffer.flip();
 
         if (log.isDebugEnabled()) {
-            log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
+            String text = (redelivery) ? "REDELIVERING" : "sending";
+            log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
         }
         channel.send(writeBuffer, address);
-        
-        // now lets put the buffer back into the replay buffer
     }
 
     public void sendBuffer(int commandId, Object buffer) throws IOException {
-        ByteBuffer writeBuffer = (ByteBuffer) buffer;
-        sendWriteBuffer(getReplayAddress(), writeBuffer, commandId);
+        if (buffer != null) {
+            ByteBuffer writeBuffer = (ByteBuffer) buffer;
+            sendWriteBuffer(commandId, getReplayAddress(), writeBuffer, true);
+        }
+        else {
+            if (log.isWarnEnabled()) {
+                log.warn("Request for buffer: " + commandId + " is no longer present");
+            }
+        }
     }
 
-    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Mon Mar 20 18:49:00 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.command.PartialCommand;
 import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.transport.reliable.ReplayBuffer;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -47,8 +48,8 @@
     private Object readLock = new Object();
     private Object writeLock = new Object();
 
-    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
-            DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
+    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize,
+            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
         super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
     }
@@ -105,7 +106,7 @@
                 byte[] data = writeBuffer.toByteArray();
                 boolean lastFragment = false;
                 for (int fragment = 0, length = data.length; !lastFragment; fragment++) {
-                    writeBuffer.reset();
+                    writeBuffer = createByteArrayOutputStream();
                     headerMarshaller.writeHeader(command, dataOut);
 
                     int chunkSize = remaining(writeBuffer);
@@ -147,7 +148,12 @@
                         chunkSize = length - offset;
                     }
 
-                    dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
+                    if (lastFragment) {
+                        dataOut.write(LastPartialCommand.DATA_STRUCTURE_TYPE);
+                    }
+                    else {
+                        dataOut.write(PartialCommand.DATA_STRUCTURE_TYPE);
+                    }
 
                     if (bs != null) {
                         bs.marshal(dataOut);
@@ -171,16 +177,6 @@
                     offset += chunkSize;
                     sendWriteBuffer(address, writeBuffer, commandId);
                 }
-
-                // now lets write the last partial command
-                command = new LastPartialCommand(command.isResponseRequired());
-                command.setCommandId(sequenceGenerator.getNextSequenceId());
-
-                writeBuffer.reset();
-                headerMarshaller.writeHeader(command, dataOut);
-                wireFormat.marshal(command, dataOut);
-
-                sendWriteBuffer(address, writeBuffer, command.getCommandId());
             }
         }
     }
@@ -195,22 +191,39 @@
 
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
+    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId)
+            throws IOException {
         byte[] data = writeBuffer.toByteArray();
-        sendWriteBuffer(address, commandId, data);
+        sendWriteBuffer(commandId, address, data, false);
     }
 
-    protected void sendWriteBuffer(SocketAddress address, int commandId, byte[] data) throws IOException {
+    protected void sendWriteBuffer(int commandId, SocketAddress address, byte[] data, boolean redelivery)
+            throws IOException {
+        // lets put the datagram into the replay buffer first to prevent timing
+        // issues
+        ReplayBuffer bufferCache = getReplayBuffer();
+        if (bufferCache != null && !redelivery) {
+            bufferCache.addBuffer(commandId, data);
+        }
+
         if (log.isDebugEnabled()) {
-            log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
+            String text = (redelivery) ? "REDELIVERING" : "sending";
+            log.debug("Channel: " + name + " " + text + " datagram: " + commandId + " to: " + address);
         }
         DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
         channel.send(packet);
     }
 
     public void sendBuffer(int commandId, Object buffer) throws IOException {
-        byte[] data = (byte[]) buffer;
-        sendWriteBuffer(replayAddress, commandId, data);
+        if (buffer != null) {
+            byte[] data = (byte[]) buffer;
+            sendWriteBuffer(commandId, replayAddress, data, true);
+        }
+        else {
+            if (log.isWarnEnabled()) {
+                log.warn("Request for buffer: " + commandId + " is no longer present");
+            }
+        }
     }
 
     protected DatagramPacket createDatagramPacket() {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ResponseRedirectInterceptor.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ResponseRedirectInterceptor.java?rev=387385&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ResponseRedirectInterceptor.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/ResponseRedirectInterceptor.java Mon Mar 20 18:49:00 2006
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * 
+ * Copyright 2005 LogicBlaze, Inc. http://www.logicblaze.com
+ * 
+ * Licensed under the Apache License, Version 2.0 (the "License"); 
+ * you may not use this file except in compliance with the License. 
+ * You may obtain a copy of the License at 
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, 
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+ * See the License for the specific language governing permissions and 
+ * limitations under the License. 
+ * 
+ **/
+
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.Endpoint;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFilter;
+
+/**
+ *
+ * @version $Revision: $
+ */
+public class ResponseRedirectInterceptor extends TransportFilter {
+    private final UdpTransport transport;
+
+    public ResponseRedirectInterceptor(Transport next, UdpTransport transport) {
+        super(next);
+        this.transport = transport;
+    }
+
+    public void onCommand(Command command) {
+        // redirect to the endpoint that the last response came from
+        Endpoint from = command.getFrom();
+        transport.setTargetEndpoint(from);
+    
+        super.onCommand(command);
+    }
+}
\ No newline at end of file

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Mon Mar 20 18:49:00 2006
@@ -23,6 +23,7 @@
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportThreadSupport;
 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
+import org.apache.activemq.transport.reliable.ReplayBuffer;
 import org.apache.activemq.transport.reliable.ReplayStrategy;
 import org.apache.activemq.transport.reliable.Replayer;
 import org.apache.activemq.util.IntSequenceGenerator;
@@ -54,6 +55,7 @@
     private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
     private ReplayStrategy replayStrategy = new ExceptionIfDroppedReplayStrategy();
+    private ReplayBuffer replayBuffer;
     private int datagramSize = 4 * 1024;
     private long maxInactivityDuration = 0; // 30000;
     private SocketAddress targetAddress;
@@ -98,11 +100,10 @@
 
     /**
      * Creates a replayer for working with the reliable transport
-     * @return
      */
-    public Replayer createReplayer() {
+    public Replayer createReplayer() throws IOException {
         if (replayEnabled ) {
-            return commandChannel;
+            return getCommandChannel();
         }
         return null;
     }
@@ -263,7 +264,10 @@
         this.useLocalHost = useLocalHost;
     }
 
-    public CommandChannel getCommandChannel() {
+    public CommandChannel getCommandChannel() throws IOException {
+        if (commandChannel == null) {
+            commandChannel = createCommandChannel();
+        }
         return commandChannel;
     }
 
@@ -318,6 +322,9 @@
 
 
     public IntSequenceGenerator getSequenceGenerator() {
+        if (sequenceGenerator == null) {
+            sequenceGenerator = new IntSequenceGenerator();
+        }
         return sequenceGenerator;
     }
 
@@ -337,6 +344,26 @@
         this.replayEnabled = replayEnabled;
     }
 
+    public ByteBufferPool getBufferPool() {
+        if (bufferPool == null) {
+            bufferPool = new DefaultBufferPool();
+        }
+        return bufferPool;
+    }
+
+    public void setBufferPool(ByteBufferPool bufferPool) {
+        this.bufferPool = bufferPool;
+    }
+    
+    public ReplayBuffer getReplayBuffer() {
+        return replayBuffer;
+    }
+
+    public void setReplayBuffer(ReplayBuffer replayBuffer) throws IOException {
+        this.replayBuffer = replayBuffer;
+        getCommandChannel().setReplayBuffer(replayBuffer);
+    }
+
     
     // Implementation methods
     // -------------------------------------------------------------------------
@@ -360,8 +387,7 @@
     }
 
     protected void doStart() throws Exception {
-        commandChannel = createCommandChannel();
-        commandChannel.start();
+        getCommandChannel().start();
 
         super.doStart();
     }
@@ -378,10 +404,11 @@
             port = socket.getLocalPort();
         }
 
-        if (bufferPool == null) {
-            bufferPool = new DefaultBufferPool();
-        }
-        return new CommandDatagramChannel(this, wireFormat, datagramSize, targetAddress, createDatagramHeaderMarshaller(), channel, bufferPool);
+        return createCommandDatagramChannel();
+    }
+
+    protected CommandChannel createCommandDatagramChannel() {
+        return new CommandDatagramChannel(this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), getChannel(), getBufferPool());
     }
 
     protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {
@@ -425,5 +452,13 @@
 
     protected SocketAddress getTargetAddress() {
         return targetAddress;
+    }
+
+    protected DatagramChannel getChannel() {
+        return channel;
+    }
+
+    protected void setChannel(DatagramChannel channel) {
+        this.channel = channel;
     }
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Mon Mar 20 18:49:00 2006
@@ -17,14 +17,11 @@
 package org.apache.activemq.transport.udp;
 
 import org.activeio.command.WireFormat;
-import org.apache.activemq.command.Command;
-import org.apache.activemq.command.Endpoint;
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.CommandJoiner;
 import org.apache.activemq.transport.InactivityMonitor;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
-import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
 import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
@@ -69,7 +66,7 @@
         }
     }
 
-    public Transport configure(Transport transport, WireFormat format, Map options) {
+    public Transport configure(Transport transport, WireFormat format, Map options) throws Exception {
         return configure(transport, format, options, false);
     }
 
@@ -108,7 +105,7 @@
      *            for new connections which work like TCP SocketServers where
      *            new connections spin up a new separate UDP transport
      */
-    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) {
+    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) throws Exception {
         IntrospectionSupport.setProperties(transport, options);
         UdpTransport udpTransport = (UdpTransport) transport;
 
@@ -131,18 +128,17 @@
         if (acceptServer) {
             // lets not support a buffer of messages to enable reliable
             // messaging on the 'accept server' transport
-            udpTransport.setReplayEnabled(true);
+            udpTransport.setReplayEnabled(false);
 
             // we don't want to do reliable checks on this transport as we
             // delegate to one that does
             transport = new CommandJoiner(transport, openWireFormat);
-            udpTransport.setSequenceGenerator(new IntSequenceGenerator());
             return transport;
         }
         else {
-            Replayer replayer = udpTransport.createReplayer();
-            ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy(replayer));
-            udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
+            ReliableTransport reliableTransport = new ReliableTransport(transport, udpTransport);
+            Replayer replayer = reliableTransport.getReplayer();
+            reliableTransport.setReplayStrategy(createReplayStrategy(replayer));
 
             // Joiner must be on outside as the inbound messages must be
             // processed by the reliable transport first
@@ -162,17 +158,7 @@
     }
 
     protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {
-        return new TransportFilter(transport) {
-
-            public void onCommand(Command command) {
-                // redirect to the endpoint that the last response came from
-                Endpoint from = command.getFrom();
-                udpTransport.setTargetEndpoint(from);
-
-                super.onCommand(command);
-            }
-
-        };
+        return new ResponseRedirectInterceptor(transport, udpTransport);
         /*
          * transport = new WireFormatNegotiator(transport,
          * asOpenWireFormat(format), udpTransport.getMinmumWireFormatVersion()) {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportServer.java Mon Mar 20 18:49:00 2006
@@ -27,6 +27,7 @@
 import org.apache.activemq.transport.TransportServerSupport;
 import org.apache.activemq.transport.reliable.ReliableTransport;
 import org.apache.activemq.transport.reliable.ReplayStrategy;
+import org.apache.activemq.transport.reliable.Replayer;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -153,8 +154,9 @@
         final OpenWireFormat connectionWireFormat = serverTransport.getWireFormat().copy();
         final UdpTransport transport = new UdpTransport(connectionWireFormat, address);
 
-        final ReliableTransport reliableTransport = new ReliableTransport(transport, replayStrategy);
-        transport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
+        final ReliableTransport reliableTransport = new ReliableTransport(transport, transport);
+        Replayer replayer = reliableTransport.getReplayer();
+        reliableTransport.setReplayStrategy(replayStrategy);
         
         // Joiner must be on outside as the inbound messages must be processed by the reliable transport first
         return new CommandJoiner(reliableTransport, connectionWireFormat) {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQBytesMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQBytesMessageTest extends ActiveMQMessageTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQDestinationTestSupport.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public abstract class ActiveMQDestinationTestSupport extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMapMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQMapMessageTest extends ActiveMQMessageTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQMessageTest extends MessageTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQObjectMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQObjectMessageTest extends ActiveMQMessageTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQQueueTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQQueueTest extends ActiveMQDestinationTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQStreamMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQStreamMessageTest extends ActiveMQMessageTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempDestinationTestSupport.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public abstract class ActiveMQTempDestinationTestSupport extends ActiveMQDestinationTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempQueueTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQTempQueueTest extends ActiveMQTempDestinationTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTempTopicTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQTempTopicTest extends ActiveMQTempDestinationTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTextMessageTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQTextMessageTest extends ActiveMQMessageTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ActiveMQTopicTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ActiveMQTopicTest extends ActiveMQDestinationTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BaseCommandTestSupport.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public abstract class BaseCommandTestSupport extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerIdTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class BrokerIdTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/BrokerInfoTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class BrokerInfoTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionErrorTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ConnectionErrorTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionIdTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ConnectionIdTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConnectionInfoTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ConnectionInfoTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerIdTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ConsumerIdTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ConsumerInfoTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ConsumerInfoTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ControlCommandTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ControlCommandTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataArrayResponseTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class DataArrayResponseTest extends ResponseTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DataResponseTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class DataResponseTest extends ResponseTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DestinationInfoTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class DestinationInfoTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/DiscoveryEventTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class DiscoveryEventTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/ExceptionResponseTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class ExceptionResponseTest extends ResponseTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/FlushCommandTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class FlushCommandTest extends BaseCommandTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/IntegerResponseTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class IntegerResponseTest extends ResponseTest {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalQueueAckTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class JournalQueueAckTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTopicAckTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class JournalTopicAckTest extends DataFileGeneratorTestSupport {
 

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java?rev=387385&r1=387384&r2=387385&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/openwire/v1/JournalTraceTest.java Mon Mar 20 18:49:00 2006
@@ -33,7 +33,7 @@
  *        under src/gram/script and then use maven openwire:generate to regenerate 
  *        this file.
  *
- * @version $Revision: $
+ * @version $Revision$
  */
 public class JournalTraceTest extends DataFileGeneratorTestSupport {
 



Mime
View raw message