activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r386198 - in /incubator/activemq/trunk/activemq-core/src: main/java/org/apache/activemq/transport/multicast/ main/java/org/apache/activemq/transport/reliable/ main/java/org/apache/activemq/transport/udp/ test/java/org/apache/activemq/transp...
Date Wed, 15 Mar 2006 22:43:42 GMT
Author: jstrachan
Date: Wed Mar 15 14:43:40 2006
New Revision: 386198

URL: http://svn.apache.org/viewcvs?rev=386198&view=rev
Log:
added a replay buffer to the ReliableTransport so that nodes with missing messages can re-request stuff

Added:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java   (with props)
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java   (with props)
Modified:
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
    incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
    incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/multicast/MulticastTransport.java Wed Mar 15 14:43:40 2006
@@ -18,10 +18,8 @@
 
 import org.apache.activemq.openwire.OpenWireFormat;
 import org.apache.activemq.transport.udp.CommandChannel;
-import org.apache.activemq.transport.udp.CommandDatagramChannel;
 import org.apache.activemq.transport.udp.CommandDatagramSocket;
 import org.apache.activemq.transport.udp.DatagramHeaderMarshaller;
-import org.apache.activemq.transport.udp.DefaultBufferPool;
 import org.apache.activemq.transport.udp.UdpTransport;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
@@ -36,7 +34,6 @@
 import java.net.SocketException;
 import java.net.URI;
 import java.net.UnknownHostException;
-import java.nio.channels.DatagramChannel;
 
 /**
  * A multicast based transport.
@@ -75,7 +72,7 @@
         super.doStop(stopper);
         if (socket != null) {
             try {
-                socket.leaveGroup(mcastAddress);
+                socket.leaveGroup(getMulticastAddress());
             }
             catch (IOException e) {
                 stopper.onException(this, e);
@@ -89,11 +86,15 @@
         socket.setLoopbackMode(loopBackMode);
         socket.setTimeToLive(timeToLive);
 
-        log.debug("Joining multicast address: " + mcastAddress);
-        socket.joinGroup(mcastAddress);
+        log.debug("Joining multicast address: " + getMulticastAddress());
+        socket.joinGroup(getMulticastAddress());
         socket.setSoTimeout((int) keepAliveInterval);
 
-        return new CommandDatagramSocket(this, socket, getWireFormat(), getDatagramSize(), mcastAddress, mcastPort, createDatagramHeaderMarshaller());
+        return new CommandDatagramSocket( this, getWireFormat(), getDatagramSize(), getTargetAddress(), createDatagramHeaderMarshaller(), socket);
+    }
+
+    protected InetAddress getMulticastAddress() {
+        return mcastAddress;
     }
 
     protected InetSocketAddress createAddress(URI remoteLocation) throws UnknownHostException, IOException {

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,76 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public class DefaultReplayBuffer implements ReplayBuffer {
+
+    private final int size;
+    private ReplayBufferListener listener;
+    private Map map;
+    private int lowestCommandId = 1;
+    private Object lock = new Object();
+
+    public DefaultReplayBuffer(int size) {
+        this.size = size;
+        map = createMap(size);
+    }
+
+    public void addBuffer(int commandId, Object buffer) {
+        synchronized (lock) {
+            int max = size - 1;
+            while (map.size() >= max) {
+                // lets find things to evict
+                Object evictedBuffer = map.remove(new Integer(++lowestCommandId));
+                onEvictedBuffer(lowestCommandId, evictedBuffer);
+            }
+            map.put(new Integer(commandId), buffer);
+        }
+    }
+
+    public void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter) {
+        this.listener = bufferPoolAdapter;
+    }
+
+    public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException {
+        for (int i = fromCommandId; i <= toCommandId; i++) {
+            Object buffer = null;
+            synchronized (lock) {
+                buffer = map.get(new Integer(i));
+            }
+            replayer.sendBuffer(i, buffer);
+        }
+    }
+
+    protected Map createMap(int maximumSize) {
+        return new HashMap(maximumSize);
+    }
+
+    protected void onEvictedBuffer(int commandId, Object buffer) {
+        if (listener != null) {
+            listener.onBufferDiscarded(commandId, buffer);
+        }
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayBuffer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,69 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * Throws an exception if packets are dropped causing the transport to be
+ * closed.
+ * 
+ * @version $Revision$
+ */
+public class DefaultReplayStrategy implements ReplayStrategy {
+
+    private int maximumDifference = 5;
+
+    public DefaultReplayStrategy() {
+    }
+
+    public DefaultReplayStrategy(int maximumDifference) {
+        this.maximumDifference = maximumDifference;
+    }
+
+    public boolean onDroppedPackets(ReliableTransport transport, int expectedCounter, int actualCounter) throws IOException {
+        int difference = actualCounter - expectedCounter;
+        long count = Math.abs(difference);
+        if (count > maximumDifference) {
+            int upperLimit = actualCounter;
+            if (upperLimit < expectedCounter) {
+                upperLimit = expectedCounter;
+            }
+            transport.requestReplay(expectedCounter, upperLimit );
+        }
+
+        // lets discard old commands
+        return difference > 0;
+    }
+
+    public void onReceivedPacket(ReliableTransport transport, long expectedCounter) {
+        // TODO we could pro-actively evict stuff from the buffer if we knew there was only one client
+    }
+
+    public int getMaximumDifference() {
+        return maximumDifference;
+    }
+
+    /**
+     * Sets the maximum allowed difference between an expected packet and an
+     * actual packet before an error occurs
+     */
+    public void setMaximumDifference(int maximumDifference) {
+        this.maximumDifference = maximumDifference;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/DefaultReplayStrategy.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReliableTransport.java Wed Mar 15 14:43:40 2006
@@ -17,6 +17,7 @@
 package org.apache.activemq.transport.reliable;
 
 import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ReplayCommand;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.openwire.CommandIdComparator;
 import org.apache.activemq.transport.FutureResponse;
@@ -42,7 +43,10 @@
     private ReplayStrategy replayStrategy;
     private SortedSet commands = new TreeSet(new CommandIdComparator());
     private int expectedCounter = 1;
+    private int replayBufferCommandCount = 50;
     private int requestTimeout = 2000;
+    private ReplayBuffer replayBuffer;
+    private Replayer replayer;
 
     public ReliableTransport(Transport next, ReplayStrategy replayStrategy) {
         super(next);
@@ -54,6 +58,21 @@
         this.replayStrategy = replayStrategy;
     }
 
+    /**
+     * Requests that a range of commands be replayed
+     */
+    public void requestReplay(int fromCommandId, int toCommandId) {
+        ReplayCommand replay = new ReplayCommand();
+        replay.setFirstNakNumber(fromCommandId);
+        replay.setLastNakNumber(toCommandId);
+        try {
+            oneway(replay);
+        }
+        catch (IOException e) {
+            getTransportListener().onException(e);
+        }
+    }
+    
 
     public Response request(Command command) throws IOException {
         FutureResponse response = asyncRequest(command);
@@ -62,7 +81,7 @@
             if (result != null) {
                 return result;
             }
-            replayRequest(command, response);
+            onMissingResponse(command, response);
         }
     }
 
@@ -77,7 +96,7 @@
             if (result != null) {
                 return result;
             }
-            replayRequest(command, response);
+            onMissingResponse(command, response);
             timeout -= time;
         }
         return response.getResult(0);
@@ -89,6 +108,10 @@
             super.onCommand(command);
             return;
         }
+        else if (command.getDataStructureType() == ReplayCommand.DATA_STRUCTURE_TYPE) {
+            replayCommands((ReplayCommand) command);
+            return;
+        }
 
         int actualCounter = command.getCommandId();
         boolean valid = expectedCounter == actualCounter;
@@ -107,7 +130,7 @@
                     }
                 }
                 catch (IOException e) {
-                    getTransportListener().onException(e);
+                    onException(e);
                 }
 
                 if (!commands.isEmpty()) {
@@ -180,13 +203,61 @@
     }
 
 
+    public ReplayBuffer getReplayBuffer() {
+        return replayBuffer;
+    }
+
+    public void setReplayBuffer(ReplayBuffer replayBuffer) {
+        this.replayBuffer = replayBuffer;
+    }
+
+    public int getReplayBufferCommandCount() {
+        return replayBufferCommandCount;
+    }
+
+    /**
+     * Sets the default number of commands which are buffered
+     */
+    public void setReplayBufferCommandCount(int replayBufferSize) {
+        this.replayBufferCommandCount = replayBufferSize;
+    }
+
     public String toString() {
         return next.toString();
     }
+    
+    
+    public void start() throws Exception {
+        super.start();
+        if (replayBuffer == null) {
+            replayBuffer = createReplayBuffer();
+        }
+    }
+
     /**
      * Lets attempt to replay the request as a command may have disappeared
      */
-    protected void replayRequest(Command command, FutureResponse response) {
-        log.debug("Still waiting for response on: " + this + " to command: " + command);
+    protected void onMissingResponse(Command command, FutureResponse response) {
+        log.debug("Still waiting for response on: " + this + " to command: " + command + " sending replay message");
+        
+        int commandId = command.getCommandId();
+        requestReplay(commandId, commandId);
     }
+    
+    protected ReplayBuffer createReplayBuffer() {
+        return new DefaultReplayBuffer(getReplayBufferCommandCount());
+    }
+
+    protected void replayCommands(ReplayCommand command) {
+        try {
+            replayBuffer.replayMessages(command.getFirstNakNumber(), command.getLastNakNumber(), replayer);
+            
+            // TODO we could proactively remove ack'd stuff from the replay buffer
+            // if we only have a single client talking to us
+        }
+        catch (IOException e) {
+            onException(e);
+        }
+    }
+
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,39 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * This class keeps around a buffer of old commands which have been sent on
+ * an unreliable transport. The buffers are of type Object as they could be datagrams
+ * or byte[] or ByteBuffer - depending on the underlying transport implementation.
+ * 
+ * @version $Revision$
+ */
+public interface ReplayBuffer {
+
+    /**
+     * Submit a buffer for caching around for a period of time, during which time it can be replayed
+     * to users interested in it.
+     */
+    public void addBuffer(int commandId, Object buffer);
+    
+    public void setReplayBufferListener(ReplayBufferListener bufferPoolAdapter);
+    
+    public void replayMessages(int fromCommandId, int toCommandId, Replayer replayer) throws IOException;
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBuffer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,31 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+/**
+ * Listens to events on a {@link ReplayBuffer}
+ * 
+ * @version $Revision$
+ */
+public interface ReplayBufferListener {
+
+    /**
+     * Indications that the buffer has been discarded and so could be
+     * re-introduced into some pool
+     */
+    public void onBufferDiscarded(int commandId, Object buffer);
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/ReplayBufferListener.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,37 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.reliable;
+
+import java.io.IOException;
+
+/**
+ * Used by a {@link ReplayBuffer} to replay buffers back over an unreliable transport
+ * 
+ * @version $Revision$
+ */
+public interface Replayer {
+
+    /**
+     * Sends the given buffer back to the transport
+     * if the buffer could be found - otherwise maybe send some kind
+     * of exception
+     * 
+     * @param commandId the command ID
+     * @param buffer the buffer to be sent - or null if the buffer no longer exists in the buffer
+     */
+    void sendBuffer(int commandId, Object buffer) throws IOException;
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/reliable/Replayer.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannel.java Wed Mar 15 14:43:40 2006
@@ -18,6 +18,7 @@
 
 import org.apache.activemq.Service;
 import org.apache.activemq.command.Command;
+import org.apache.activemq.transport.reliable.Replayer;
 
 import java.io.IOException;
 import java.net.SocketAddress;
@@ -26,7 +27,7 @@
  *
  * @version $Revision$
  */
-public interface CommandChannel extends Service {
+public interface CommandChannel extends Replayer, Service {
 
     public abstract Command read() throws IOException;
 
@@ -45,4 +46,5 @@
 
     public abstract void setTargetAddress(SocketAddress address);
 
+    public abstract void setReplayAddress(SocketAddress address);
 }

Added: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java?rev=386198&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java (added)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java Wed Mar 15 14:43:40 2006
@@ -0,0 +1,101 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.udp;
+
+import org.apache.activemq.command.Command;
+import org.apache.activemq.openwire.OpenWireFormat;
+import org.apache.activemq.util.IntSequenceGenerator;
+
+import java.io.IOException;
+import java.net.SocketAddress;
+
+/**
+ * 
+ * @version $Revision$
+ */
+public abstract class CommandChannelSupport implements CommandChannel {
+
+    protected OpenWireFormat wireFormat;
+    protected int datagramSize = 4 * 1024;
+    protected SocketAddress targetAddress;
+    protected SocketAddress replayAddress;
+    protected final String name;
+    protected final IntSequenceGenerator sequenceGenerator;
+    protected DatagramHeaderMarshaller headerMarshaller;
+
+    public CommandChannelSupport(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
+            DatagramHeaderMarshaller headerMarshaller) {
+        this.wireFormat = wireFormat;
+        this.datagramSize = datagramSize;
+        this.targetAddress = targetAddress;
+        this.headerMarshaller = headerMarshaller;
+        this.name = transport.toString();
+        this.sequenceGenerator = transport.getSequenceGenerator();
+        this.replayAddress = targetAddress;
+        if (sequenceGenerator == null) {
+            throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
+        }
+    }
+    
+    public void write(Command command) throws IOException {
+        write(command, targetAddress);
+    }
+
+
+    // Properties
+    // -------------------------------------------------------------------------
+
+    public int getDatagramSize() {
+        return datagramSize;
+    }
+
+    /**
+     * Sets the default size of a datagram on the network.
+     */
+    public void setDatagramSize(int datagramSize) {
+        this.datagramSize = datagramSize;
+    }
+
+    public SocketAddress getTargetAddress() {
+        return targetAddress;
+    }
+
+    public void setTargetAddress(SocketAddress targetAddress) {
+        this.targetAddress = targetAddress;
+    }
+
+    public SocketAddress getReplayAddress() {
+        return replayAddress;
+    }
+
+    public void setReplayAddress(SocketAddress replayAddress) {
+        this.replayAddress = replayAddress;
+    }
+
+    public String toString() {
+        return "CommandChannel#" + name;
+    }
+
+    public DatagramHeaderMarshaller getHeaderMarshaller() {
+        return headerMarshaller;
+    }
+
+    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
+        this.headerMarshaller = headerMarshaller;
+    }
+
+}

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Propchange: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandChannelSupport.java
------------------------------------------------------------------------------
    svn:mime-type = text/plain

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramChannel.java Wed Mar 15 14:43:40 2006
@@ -24,7 +24,6 @@
 import org.apache.activemq.command.PartialCommand;
 import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -40,54 +39,30 @@
  * 
  * @version $Revision$
  */
-public class CommandDatagramChannel implements CommandChannel {
+public class CommandDatagramChannel extends CommandChannelSupport {
 
     private static final Log log = LogFactory.getLog(CommandDatagramChannel.class);
 
-    private final String name;
-    private final IntSequenceGenerator sequenceGenerator;
     private DatagramChannel channel;
-    private OpenWireFormat wireFormat;
     private ByteBufferPool bufferPool;
-    private int datagramSize = 4 * 1024;
-    private SocketAddress targetAddress;
-    private DatagramHeaderMarshaller headerMarshaller;
-
     // reading
     private Object readLock = new Object();
     private ByteBuffer readBuffer;
 
     // writing
     private Object writeLock = new Object();
-    private ByteBuffer writeBuffer;
     private int defaultMarshalBufferSize = 64 * 1024;
 
-
-
-    public CommandDatagramChannel(UdpTransport transport, DatagramChannel channel, OpenWireFormat wireFormat, ByteBufferPool bufferPool, int datagramSize,
-            SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller) {
+    public CommandDatagramChannel(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress, DatagramHeaderMarshaller headerMarshaller, DatagramChannel channel, ByteBufferPool bufferPool) {
+        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
-        this.wireFormat = wireFormat;
         this.bufferPool = bufferPool;
-        this.datagramSize = datagramSize;
-        this.targetAddress = targetAddress;
-        this.headerMarshaller = headerMarshaller;
-        this.name = transport.toString();
-        this.sequenceGenerator = transport.getSequenceGenerator();
-        if (sequenceGenerator == null) {
-            throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
-        }
-    }
-
-    public String toString() {
-        return "CommandChannel#" + name;
     }
 
     public void start() throws Exception {
         bufferPool.setDefaultSize(datagramSize);
         bufferPool.start();
         readBuffer = bufferPool.borrowBuffer();
-        writeBuffer = bufferPool.borrowBuffer();
     }
 
     public void stop() throws Exception {
@@ -132,6 +107,7 @@
         return answer;
     }
 
+    
     public void write(Command command, SocketAddress address) throws IOException {
         synchronized (writeLock) {
 
@@ -140,6 +116,7 @@
             byte[] data = largeBuffer.toByteArray();
             int size = data.length;
 
+            ByteBuffer writeBuffer = bufferPool.borrowBuffer();
             writeBuffer.clear();
             headerMarshaller.writeHeader(command, writeBuffer);
 
@@ -215,7 +192,7 @@
                     writeBuffer.put(data, offset, chunkSize);
 
                     offset += chunkSize;
-                    sendWriteBuffer(address, commandId);
+                    sendWriteBuffer(address, writeBuffer, commandId);
                 }
 
                 // now lets write the last partial command
@@ -231,21 +208,13 @@
             }
 
             writeBuffer.put(data);
-            sendWriteBuffer(address, command.getCommandId());
+            sendWriteBuffer(address, writeBuffer, command.getCommandId());
         }
     }
 
     // Properties
     // -------------------------------------------------------------------------
 
-    public int getDatagramSize() {
-        return datagramSize;
-    }
-
-    public void setDatagramSize(int datagramSize) {
-        this.datagramSize = datagramSize;
-    }
-
     public ByteBufferPool getBufferPool() {
         return bufferPool;
     }
@@ -257,32 +226,23 @@
         this.bufferPool = bufferPool;
     }
 
-    public DatagramHeaderMarshaller getHeaderMarshaller() {
-        return headerMarshaller;
-    }
-
-    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
-        this.headerMarshaller = headerMarshaller;
-    }
-
-    
-    public SocketAddress getTargetAddress() {
-        return targetAddress;
-    }
-
-    public void setTargetAddress(SocketAddress targetAddress) {
-        this.targetAddress = targetAddress;
-    }
-
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(SocketAddress address, int commandId) throws IOException {
+    protected void sendWriteBuffer(SocketAddress address, ByteBuffer writeBuffer, int commandId) throws IOException {
         writeBuffer.flip();
 
         if (log.isDebugEnabled()) {
             log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
         }
         channel.send(writeBuffer, address);
+        
+        // now lets put the buffer back into the replay buffer
     }
 
+    public void sendBuffer(int commandId, Object buffer) throws IOException {
+        ByteBuffer writeBuffer = (ByteBuffer) buffer;
+        sendWriteBuffer(getReplayAddress(), writeBuffer, commandId);
+    }
+
+    
 }

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/CommandDatagramSocket.java Wed Mar 15 14:43:40 2006
@@ -24,7 +24,6 @@
 import org.apache.activemq.command.PartialCommand;
 import org.apache.activemq.openwire.BooleanStream;
 import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 
@@ -33,8 +32,6 @@
 import java.io.IOException;
 import java.net.DatagramPacket;
 import java.net.DatagramSocket;
-import java.net.InetAddress;
-import java.net.InetSocketAddress;
 import java.net.SocketAddress;
 
 /**
@@ -42,38 +39,18 @@
  * 
  * @version $Revision$
  */
-public class CommandDatagramSocket implements CommandChannel {
+public class CommandDatagramSocket extends CommandChannelSupport {
 
     private static final Log log = LogFactory.getLog(CommandDatagramSocket.class);
 
-    private final String name;
     private DatagramSocket channel;
-    private InetAddress targetAddress;
-    private int targetPort;
-    private OpenWireFormat wireFormat;
-    private int datagramSize = 4 * 1024;
-    private DatagramHeaderMarshaller headerMarshaller;
-    private IntSequenceGenerator sequenceGenerator;
     private Object readLock = new Object();
     private Object writeLock = new Object();
 
-    public CommandDatagramSocket(UdpTransport transport, DatagramSocket channel, OpenWireFormat wireFormat, int datagramSize, InetAddress targetAddress,
-            int targetPort, DatagramHeaderMarshaller headerMarshaller) {
+    public CommandDatagramSocket(UdpTransport transport, OpenWireFormat wireFormat, int datagramSize, SocketAddress targetAddress,
+            DatagramHeaderMarshaller headerMarshaller, DatagramSocket channel) {
+        super(transport, wireFormat, datagramSize, targetAddress, headerMarshaller);
         this.channel = channel;
-        this.wireFormat = wireFormat;
-        this.datagramSize = datagramSize;
-        this.targetAddress = targetAddress;
-        this.targetPort = targetPort;
-        this.headerMarshaller = headerMarshaller;
-        this.name = transport.toString();
-        this.sequenceGenerator = transport.getSequenceGenerator();
-        if (sequenceGenerator == null) {
-            throw new IllegalArgumentException("No sequenceGenerator on the given transport: " + transport);
-        }
-    }
-
-    public String toString() {
-        return "CommandChannel#" + name;
     }
 
     public void start() throws Exception {
@@ -110,11 +87,6 @@
     }
 
     public void write(Command command, SocketAddress address) throws IOException {
-        InetSocketAddress ia = (InetSocketAddress) address;
-        write(command, ia.getAddress(), ia.getPort());
-    }
-
-    public void write(Command command, InetAddress address, int port) throws IOException {
         synchronized (writeLock) {
 
             ByteArrayOutputStream writeBuffer = createByteArrayOutputStream();
@@ -126,7 +98,7 @@
             wireFormat.marshal(command, dataOut);
 
             if (remaining(writeBuffer) >= 0) {
-                sendWriteBuffer(address, port, writeBuffer, command.getCommandId());
+                sendWriteBuffer(address, writeBuffer, command.getCommandId());
             }
             else {
                 // lets split the command up into chunks
@@ -197,7 +169,7 @@
                     dataOut.write(data, offset, chunkSize);
 
                     offset += chunkSize;
-                    sendWriteBuffer(address, port, writeBuffer, commandId);
+                    sendWriteBuffer(address, writeBuffer, commandId);
                 }
 
                 // now lets write the last partial command
@@ -208,58 +180,37 @@
                 headerMarshaller.writeHeader(command, dataOut);
                 wireFormat.marshal(command, dataOut);
 
-                sendWriteBuffer(address, port, writeBuffer, command.getCommandId());
+                sendWriteBuffer(address, writeBuffer, command.getCommandId());
             }
         }
     }
 
-    // Properties
-    // -------------------------------------------------------------------------
-
     public int getDatagramSize() {
         return datagramSize;
     }
 
-    /**
-     * Sets the default size of a datagram on the network.
-     */
     public void setDatagramSize(int datagramSize) {
         this.datagramSize = datagramSize;
     }
 
-    public DatagramHeaderMarshaller getHeaderMarshaller() {
-        return headerMarshaller;
-    }
-
-    public void setHeaderMarshaller(DatagramHeaderMarshaller headerMarshaller) {
-        this.headerMarshaller = headerMarshaller;
-    }
-
-    
-    public SocketAddress getTargetAddress() {
-        return new InetSocketAddress(targetAddress, targetPort);
-    }
-
-    public void setTargetAddress(SocketAddress address) {
-        if (address instanceof InetSocketAddress) {
-            InetSocketAddress ia = (InetSocketAddress) address;
-            targetAddress = ia.getAddress();
-            targetPort = ia.getPort();
-        }
-        else {
-            throw new IllegalArgumentException("Address must be instance of InetSocketAddress");
-        }
-    }
-
     // Implementation methods
     // -------------------------------------------------------------------------
-    protected void sendWriteBuffer(InetAddress address, int port, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
+    protected void sendWriteBuffer(SocketAddress address, ByteArrayOutputStream writeBuffer, int commandId) throws IOException {
+        byte[] data = writeBuffer.toByteArray();
+        sendWriteBuffer(address, commandId, data);
+    }
+
+    protected void sendWriteBuffer(SocketAddress address, int commandId, byte[] data) throws IOException {
         if (log.isDebugEnabled()) {
             log.debug("Channel: " + name + " sending datagram: " + commandId + " to: " + address);
         }
-        byte[] data = writeBuffer.toByteArray();
-        DatagramPacket packet = new DatagramPacket(data, 0, data.length, address, port);
+        DatagramPacket packet = new DatagramPacket(data, 0, data.length, address);
         channel.send(packet);
+    }
+
+    public void sendBuffer(int commandId, Object buffer) throws IOException {
+        byte[] data = (byte[]) buffer;
+        sendWriteBuffer(replayAddress, commandId, data);
     }
 
     protected DatagramPacket createDatagramPacket() {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransport.java Wed Mar 15 14:43:40 2006
@@ -24,6 +24,7 @@
 import org.apache.activemq.transport.TransportThreadSupport;
 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
 import org.apache.activemq.transport.reliable.ReplayStrategy;
+import org.apache.activemq.transport.reliable.Replayer;
 import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.activemq.util.ServiceStopper;
 import org.apache.commons.logging.Log;
@@ -66,6 +67,7 @@
     private String description = null;
     private Runnable runnable;
     private IntSequenceGenerator sequenceGenerator;
+    private boolean replayEnabled = true;
 
     protected UdpTransport(OpenWireFormat wireFormat) throws IOException {
         this.wireFormat = wireFormat;
@@ -93,6 +95,18 @@
         this.description = getProtocolName() + "Server@";
     }
 
+
+    /**
+     * Creates a replayer for working with the reliable transport
+     * @return
+     */
+    public Replayer createReplayer() {
+        if (replayEnabled ) {
+            return commandChannel;
+        }
+        return null;
+    }
+
     /**
      * A one way asynchronous send
      */
@@ -311,6 +325,19 @@
         this.sequenceGenerator = sequenceGenerator;
     }
     
+    public boolean isReplayEnabled() {
+        return replayEnabled;
+    }
+
+    /**
+     * Sets whether or not replay should be enabled when using the reliable transport.
+     * i.e. should we maintain a buffer of messages that can be replayed?
+     */
+    public void setReplayEnabled(boolean replayEnabled) {
+        this.replayEnabled = replayEnabled;
+    }
+
+    
     // Implementation methods
     // -------------------------------------------------------------------------
 
@@ -354,7 +381,7 @@
         if (bufferPool == null) {
             bufferPool = new DefaultBufferPool();
         }
-        return new CommandDatagramChannel(this, channel, wireFormat, bufferPool, datagramSize, targetAddress, createDatagramHeaderMarshaller());
+        return new CommandDatagramChannel(this, wireFormat, datagramSize, targetAddress, createDatagramHeaderMarshaller(), channel, bufferPool);
     }
 
     protected void bind(DatagramSocket socket, SocketAddress localAddress) throws IOException {

Modified: incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java (original)
+++ incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/udp/UdpTransportFactory.java Wed Mar 15 14:43:40 2006
@@ -27,9 +27,11 @@
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportLogger;
 import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.reliable.DefaultReplayStrategy;
 import org.apache.activemq.transport.reliable.ExceptionIfDroppedReplayStrategy;
 import org.apache.activemq.transport.reliable.ReliableTransport;
 import org.apache.activemq.transport.reliable.ReplayStrategy;
+import org.apache.activemq.transport.reliable.Replayer;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IntSequenceGenerator;
 import org.apache.activemq.util.IntrospectionSupport;
@@ -56,15 +58,7 @@
             UdpTransport transport = new UdpTransport(openWireFormat, port);
 
             Transport configuredTransport = configure(transport, wf, options, true);
-            ReplayStrategy replayStrategy = null;
-            if (configuredTransport instanceof ReliableTransport) {
-                ReliableTransport rt = (ReliableTransport) configuredTransport;
-                replayStrategy = rt.getReplayStrategy();
-            }
-            if (replayStrategy == null) {
-                replayStrategy = createReplayStrategy();
-            }
-            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, replayStrategy);
+            UdpTransportServer server = new UdpTransportServer(location, transport, configuredTransport, createReplayStrategy());
             return server;
         }
         catch (URISyntaxException e) {
@@ -106,16 +100,25 @@
         return new UdpTransport(wireFormat, location);
     }
 
-    protected Transport configure(Transport transport, WireFormat format, Map options, boolean server) {
+    /**
+     * Configures the transport
+     * 
+     * @param acceptServer
+     *            true if this transport is used purely as an 'accept' transport
+     *            for new connections which work like TCP SocketServers where
+     *            new connections spin up a new separate UDP transport
+     */
+    protected Transport configure(Transport transport, WireFormat format, Map options, boolean acceptServer) {
         IntrospectionSupport.setProperties(transport, options);
         UdpTransport udpTransport = (UdpTransport) transport;
+
         OpenWireFormat openWireFormat = asOpenWireFormat(format);
 
         if (udpTransport.isTrace()) {
             transport = new TransportLogger(transport);
         }
 
-        if (!server && format instanceof OpenWireFormat) {
+        if (!acceptServer && format instanceof OpenWireFormat) {
             transport = configureClientSideNegotiator(transport, format, udpTransport);
         }
 
@@ -125,7 +128,11 @@
 
         // deal with fragmentation
 
-        if (server) {
+        if (acceptServer) {
+            // lets not support a buffer of messages to enable reliable
+            // messaging on the 'accept server' transport
+            udpTransport.setReplayEnabled(true);
+
             // we don't want to do reliable checks on this transport as we
             // delegate to one that does
             transport = new CommandJoiner(transport, openWireFormat);
@@ -133,7 +140,8 @@
             return transport;
         }
         else {
-            ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy());
+            Replayer replayer = udpTransport.createReplayer();
+            ReliableTransport reliableTransport = new ReliableTransport(transport, createReplayStrategy(replayer));
             udpTransport.setSequenceGenerator(reliableTransport.getSequenceGenerator());
 
             // Joiner must be on outside as the inbound messages must be
@@ -142,8 +150,15 @@
         }
     }
 
-    protected ReplayStrategy createReplayStrategy() {
+    protected ReplayStrategy createReplayStrategy(Replayer replayer) {
+        if (replayer != null) {
+            return new DefaultReplayStrategy(5);
+        }
         return new ExceptionIfDroppedReplayStrategy(1);
+    }
+
+    protected ReplayStrategy createReplayStrategy() {
+        return new DefaultReplayStrategy(5);
     }
 
     protected Transport configureClientSideNegotiator(Transport transport, WireFormat format, final UdpTransport udpTransport) {

Modified: incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java?rev=386198&r1=386197&r2=386198&view=diff
==============================================================================
--- incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java (original)
+++ incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/udp/UdpTestSupport.java Wed Mar 15 14:43:40 2006
@@ -25,14 +25,12 @@
 import org.apache.activemq.command.WireFormatInfo;
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportAcceptListener;
-import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.transport.TransportListener;
 import org.apache.activemq.transport.TransportServer;
 
 import javax.jms.MessageNotWriteableException;
 
 import java.io.IOException;
-import java.net.URI;
 
 import junit.framework.TestCase;
 



Mime
View raw message